Skip to content

Commit

Permalink
Merge branch '3.8' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.8
  • Loading branch information
chenson42 committed Jul 6, 2017
2 parents 4a6fbcd + a8f7b3c commit 07157d1
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 12 deletions.
Expand Up @@ -27,6 +27,7 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -240,9 +241,16 @@ public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerH
@Override
public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerHistory triggerHistory,
List<TransformTableNodeGroupLink> transforms, String deleteSql) {
String sql = null;
List<String> sqlStatements = createPurgeSqlForMultipleTables(node, triggerRouter, triggerHistory, transforms, deleteSql);
return sqlStatements.size() == 1 ? sqlStatements.get(0) : "";
}

@Override
public List<String> createPurgeSqlForMultipleTables(Node node, TriggerRouter triggerRouter, TriggerHistory triggerHistory,
List<TransformTableNodeGroupLink> transforms, String deleteSql) {
List<String> sqlStatements = new ArrayList<String>();
if (StringUtils.isEmpty(triggerRouter.getInitialLoadDeleteStmt())) {
List<String> tableNames = new ArrayList<String>();
Set<String> tableNames = new HashSet<String>();
if (transforms != null) {
for (TransformTableNodeGroupLink transform : transforms) {
tableNames.add(transform.getFullyQualifiedTargetTableName());
Expand All @@ -251,7 +259,6 @@ public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerH
tableNames.add(triggerRouter.qualifiedTargetTableName(triggerHistory));
}

StringBuilder statements = new StringBuilder(128);

for (String tableName : tableNames) {
if (deleteSql == null) {
Expand All @@ -261,15 +268,12 @@ public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerH
deleteSql = parameterService.getString(ParameterConstants.INITIAL_LOAD_DELETE_FIRST_SQL);
}
}
statements.append(String.format(deleteSql, tableName)).append(";");
sqlStatements.add(String.format(deleteSql, tableName));
}

statements.setLength(statements.length()-1); // Lose the last ;
sql = statements.toString();
} else {
sql = triggerRouter.getInitialLoadDeleteStmt();
sqlStatements.add(triggerRouter.getInitialLoadDeleteStmt());
}
return sql;
return sqlStatements;
}

public String createCsvDataSql(Trigger trigger, TriggerHistory triggerHistory, Channel channel,
Expand Down
Expand Up @@ -104,6 +104,8 @@ public void removeTrigger(StringBuilder sqlBuffer, String catalogName, String sc

public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerHistory triggerHistory, List<TransformTableNodeGroupLink> transforms, String deleteSql);

public List<String> createPurgeSqlForMultipleTables(Node node, TriggerRouter triggerRouter, TriggerHistory triggerHistory, List<TransformTableNodeGroupLink> transforms, String deleteSql);

public String createCsvDataSql(Trigger trigger, TriggerHistory triggerHistory, Channel channel, String whereClause);

public String createCsvPrimaryKeySql(Trigger trigger, TriggerHistory triggerHistory, Channel channel, String whereClause);
Expand Down
Expand Up @@ -1042,8 +1042,32 @@ protected void insertPurgeEvent(ISqlTransaction transaction, Node targetNode,
this.engine.getTransformService().findTransformsFor(
sourceNode.getNodeGroupId(), targetNode.getNodeGroupId(), triggerRouter.getTargetTable(triggerHistory));

String sql = StringUtils.isNotBlank(overrideDeleteStatement) ? overrideDeleteStatement
: symmetricDialect.createPurgeSqlFor(targetNode, triggerRouter, triggerHistory, transforms);
if (StringUtils.isNotBlank(overrideDeleteStatement)) {
createPurgeEvent(transaction, overrideDeleteStatement, targetNode, sourceNode,
triggerRouter, triggerHistory, isLoad, loadId, createBy);

} else if (transforms != null && transforms.size() > 0) {
List<String> sqlStatements = symmetricDialect.createPurgeSqlForMultipleTables(targetNode, triggerRouter,
triggerHistory, transforms, null);
for (String sql : sqlStatements) {
createPurgeEvent(transaction,
sql,
targetNode, sourceNode,
triggerRouter, triggerHistory, isLoad, loadId, createBy);
}
} else {
createPurgeEvent(transaction,
symmetricDialect.createPurgeSqlFor(targetNode, triggerRouter, triggerHistory, transforms),
targetNode, sourceNode,
triggerRouter, triggerHistory, isLoad, loadId, createBy);
}

}

protected void createPurgeEvent(ISqlTransaction transaction, String sql, Node targetNode, Node sourceNode,
TriggerRouter triggerRouter, TriggerHistory triggerHistory, boolean isLoad,
long loadId, String createBy) {

sql = FormatUtils.replace("groupId", targetNode.getNodeGroupId(), sql);
sql = FormatUtils.replace("externalId", targetNode.getExternalId(), sql);
sql = FormatUtils.replace("nodeId", targetNode.getNodeId(), sql);
Expand All @@ -1065,7 +1089,7 @@ protected void insertPurgeEvent(ISqlTransaction transaction, Node targetNode,
insertData(transaction, data);
}
}

public void insertSqlEvent(Node targetNode, String sql, boolean isLoad, long loadId,
String createBy) {
TriggerHistory history = engine.getTriggerRouterService()
Expand Down

0 comments on commit 07157d1

Please sign in to comment.