Skip to content

Commit

Permalink
Merge branch '3.8' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.8
  • Loading branch information
maxwellpettit committed Mar 17, 2017
2 parents c1714ff + 17e3e7d commit 7020446
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 21 deletions.
Expand Up @@ -239,7 +239,7 @@ public String createInitalLoadSql(Node node, TriggerRouter triggerRouter, Table
sql = replaceDefaultSchemaAndCatalog(sql);
sql = FormatUtils.replace("prefixName", symmetricDialect.getTablePrefix(), sql);
sql = FormatUtils.replace("oracleToClob",
triggerRouter.getTrigger().isUseCaptureLobs() ? "to_clob('')||" : "", sql);
triggerRouter.getTrigger().isUseCaptureLobs() ? toClobExpression(table) : "", sql);

return sql;
}
Expand Down Expand Up @@ -301,7 +301,7 @@ public String createCsvDataSql(Trigger trigger, TriggerHistory triggerHistory, T
false, channel, trigger).columnString;
sql = FormatUtils.replace("columns", columnsText, sql);
sql = FormatUtils.replace("oracleToClob",
trigger.isUseCaptureLobs() ? "to_clob('')||" : "", sql);
trigger.isUseCaptureLobs() ? toClobExpression(table) : "", sql);

sql = FormatUtils.replace("tableName", SymmetricUtils.quote(symmetricDialect, table.getName()), sql);
sql = FormatUtils.replace("schemaName",
Expand Down Expand Up @@ -330,7 +330,7 @@ public String createCsvPrimaryKeySql(Trigger trigger, TriggerHistory triggerHist
false, channel, trigger).toString();
sql = FormatUtils.replace("columns", columnsText, sql);
sql = FormatUtils.replace("oracleToClob",
trigger.isUseCaptureLobs() ? "to_clob('')||" : "", sql);
trigger.isUseCaptureLobs() ? toClobExpression(table) : "", sql);
sql = FormatUtils.replace("tableName", SymmetricUtils.quote(symmetricDialect, table.getName()), sql);
sql = FormatUtils.replace("schemaName",
triggerHistory == null ? getSourceTablePrefix(table)
Expand Down Expand Up @@ -531,7 +531,7 @@ protected String replaceTemplateVariables(DataEventType dml, Trigger trigger,
ddl = replaceDefaultSchemaAndCatalog(ddl);

ddl = FormatUtils.replace("oracleToClob",
trigger.isUseCaptureLobs() ? "to_clob('')||" : "", ddl);
trigger.isUseCaptureLobs() ? toClobExpression(table) : "", ddl);

switch (dml) {
case DELETE:
Expand All @@ -548,6 +548,14 @@ protected String replaceTemplateVariables(DataEventType dml, Trigger trigger,
return ddl;
}

protected String toClobExpression(Table table) {
if (table.hasNTypeColumns()) {
return "to_nclob('')||";
} else {
return "to_clob('')||";
}
}

protected String getChannelExpression(Trigger trigger) {
if (trigger.getChannelId().equals(Constants.CHANNEL_DYNAMIC)) {
if (StringUtils.isNotBlank(trigger.getChannelExpression())) {
Expand Down
Expand Up @@ -125,7 +125,7 @@ public void afterWrite(DataContext context, Table table, CsvData data) {
recordJobManagerRestartNeeded(context, table, data);
recordConflictFlushNeeded(context, table);
recordNodeSecurityFlushNeeded(context, table);
recordNodeFlushNeeded(context, table);
recordNodeFlushNeeded(context, table, data);
}

private void recordGroupletFlushNeeded(DataContext context, Table table) {
Expand Down Expand Up @@ -206,8 +206,9 @@ private void recordNodeSecurityFlushNeeded(DataContext context, Table table) {
}
}

private void recordNodeFlushNeeded(DataContext context, Table table) {
if (matchesTable(table, TableConstants.SYM_NODE)) {
private void recordNodeFlushNeeded(DataContext context, Table table, CsvData data) {
if (data.getDataEventType() == DataEventType.INSERT
&& matchesTable(table, TableConstants.SYM_NODE)) {
context.put(CTX_KEY_FLUSH_NODE_NEEDED, true);
}
}
Expand Down Expand Up @@ -302,6 +303,16 @@ public void syncEnded(DataContext context, List<IncomingBatch> batchesProcessed,
context.remove(CTX_KEY_RESYNC_NEEDED);
}

if (context.get(CTX_KEY_RESYNC_TABLE_NEEDED) != null
&& parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) {
@SuppressWarnings("unchecked")
Set<Table> tables = (Set<Table>)context.get(CTX_KEY_RESYNC_TABLE_NEEDED);
for (Table table : tables) {
engine.getTriggerRouterService().syncTriggers(table, false);
}
context.remove(CTX_KEY_RESYNC_TABLE_NEEDED);
}

}

@Override
Expand Down Expand Up @@ -333,8 +344,7 @@ public void batchCommitted(DataContext context) {
engine.getLoadFilterService().clearCache();
context.remove(CTX_KEY_FLUSH_LOADFILTERS_NEEDED);
}



if (context.get(CTX_KEY_FLUSH_CHANNELS_NEEDED) != null) {
log.info("Channels flushed because new channels came through the data loader");
engine.getConfigurationService().clearCache();
Expand Down Expand Up @@ -363,17 +373,7 @@ public void batchCommitted(DataContext context) {
log.info("About to refresh the cache of nodes because new configuration came through the data loader");
nodeService.flushNodeCache();
context.remove(CTX_KEY_FLUSH_NODE_NEEDED);
}

if (context.get(CTX_KEY_RESYNC_TABLE_NEEDED) != null
&& parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) {
@SuppressWarnings("unchecked")
Set<Table> tables = (Set<Table>)context.get(CTX_KEY_RESYNC_TABLE_NEEDED);
for (Table table : tables) {
engine.getTriggerRouterService().syncTriggers(table, false);
}
context.remove(CTX_KEY_RESYNC_TABLE_NEEDED);
}
}

}
}
Expand Up @@ -20,13 +20,16 @@
*/
package org.jumpmind.symmetric.service.impl;

import static org.apache.commons.lang.StringUtils.isNotBlank;

import java.io.IOException;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.UnknownHostException;
import java.sql.Types;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;

Expand All @@ -43,6 +46,7 @@
import org.jumpmind.symmetric.ext.INodeRegistrationListener;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.model.NodeHost;
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.model.RegistrationRequest;
import org.jumpmind.symmetric.model.RegistrationRequest.RegistrationStatus;
Expand Down Expand Up @@ -623,10 +627,19 @@ protected String openRegistration(Node node, String remoteHost, String remoteAdd
password = filterPasswordOnSaveIfNeeded(password);
sqlTemplate.update(getSql("openRegistrationNodeSecuritySql"), new Object[] {
nodeId, password, masterToMasterOnly ? null : me.getNodeId() });

if (isNotBlank(remoteHost)) {
NodeHost nodeHost = new NodeHost(node.getNodeId());
nodeHost.setHeartbeatTime(new Date());
nodeHost.setIpAddress(remoteAddress);
nodeHost.setHostName(remoteHost);
nodeService.updateNodeHost(nodeHost);
}
nodeService.flushNodeAuthorizedCache();
nodeService.flushNodeCache();
nodeService.insertNodeGroup(node.getNodeGroupId(), null);
nodeService.flushNodeGroupCache();

log.info(
"Just opened registration for external id of {} and a node group of {} and a node id of {}",
new Object[] { node.getExternalId(), node.getNodeGroupId(), nodeId });
Expand Down
Expand Up @@ -1340,7 +1340,7 @@ hsqldb.initialize.db=true
# This is the precision that is used in the number template for oracle triggers
# DatabaseOverridable: true
# Tags: other
oracle.template.precision=30,10
oracle.template.precision=*,38

# Requires access to gv$transaction
# Tags: other
Expand Down
12 changes: 12 additions & 0 deletions symmetric-db/src/main/java/org/jumpmind/db/model/Table.java
Expand Up @@ -607,6 +607,18 @@ public boolean hasPrimaryKey() {
}
return false;
}

public boolean hasNTypeColumns() {
for (Iterator< Column>it = columns.iterator(); it.hasNext();) {
Column column = (Column) it.next();
if (column.getJdbcTypeCode() == ColumnTypes.NCHAR || column.getJdbcTypeCode() == ColumnTypes.NVARCHAR
|| column.getJdbcTypeCode() == ColumnTypes.LONGNVARCHAR || column.getJdbcTypeCode() == ColumnTypes.NCLOB
|| column.getJdbcTypeName().startsWith("NVARCHAR") || column.getJdbcTypeName().startsWith("NCHAR")) {
return true;
}
}
return false;
}

/**
* Finds the column with the specified name, using case insensitive
Expand Down

0 comments on commit 7020446

Please sign in to comment.