diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractSymmetricDialect.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractSymmetricDialect.java index f9ce5d15d0..c2c2c90fd5 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractSymmetricDialect.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractSymmetricDialect.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -240,9 +241,16 @@ public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerH @Override public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerHistory triggerHistory, List transforms, String deleteSql) { - String sql = null; + List sqlStatements = createPurgeSqlForMultipleTables(node, triggerRouter, triggerHistory, transforms, deleteSql); + return sqlStatements.size() == 1 ? sqlStatements.get(0) : ""; + } + + @Override + public List createPurgeSqlForMultipleTables(Node node, TriggerRouter triggerRouter, TriggerHistory triggerHistory, + List transforms, String deleteSql) { + List sqlStatements = new ArrayList(); if (StringUtils.isEmpty(triggerRouter.getInitialLoadDeleteStmt())) { - List tableNames = new ArrayList(); + Set tableNames = new HashSet(); if (transforms != null) { for (TransformTableNodeGroupLink transform : transforms) { tableNames.add(transform.getFullyQualifiedTargetTableName()); @@ -251,7 +259,6 @@ public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerH tableNames.add(triggerRouter.qualifiedTargetTableName(triggerHistory)); } - StringBuilder statements = new StringBuilder(128); for (String tableName : tableNames) { if (deleteSql == null) { @@ -261,15 +268,12 @@ public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerH deleteSql = parameterService.getString(ParameterConstants.INITIAL_LOAD_DELETE_FIRST_SQL); } } - statements.append(String.format(deleteSql, tableName)).append(";"); + sqlStatements.add(String.format(deleteSql, tableName)); } - - statements.setLength(statements.length()-1); // Lose the last ; - sql = statements.toString(); } else { - sql = triggerRouter.getInitialLoadDeleteStmt(); + sqlStatements.add(triggerRouter.getInitialLoadDeleteStmt()); } - return sql; + return sqlStatements; } public String createCsvDataSql(Trigger trigger, TriggerHistory triggerHistory, Channel channel, diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/ISymmetricDialect.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/ISymmetricDialect.java index 7d440fbba6..ff3b641167 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/ISymmetricDialect.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/ISymmetricDialect.java @@ -104,6 +104,8 @@ public void removeTrigger(StringBuilder sqlBuffer, String catalogName, String sc public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerHistory triggerHistory, List transforms, String deleteSql); + public List createPurgeSqlForMultipleTables(Node node, TriggerRouter triggerRouter, TriggerHistory triggerHistory, List transforms, String deleteSql); + public String createCsvDataSql(Trigger trigger, TriggerHistory triggerHistory, Channel channel, String whereClause); public String createCsvPrimaryKeySql(Trigger trigger, TriggerHistory triggerHistory, Channel channel, String whereClause); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index c0a0ceb24b..58e4c412be 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -1042,8 +1042,32 @@ protected void insertPurgeEvent(ISqlTransaction transaction, Node targetNode, this.engine.getTransformService().findTransformsFor( sourceNode.getNodeGroupId(), targetNode.getNodeGroupId(), triggerRouter.getTargetTable(triggerHistory)); - String sql = StringUtils.isNotBlank(overrideDeleteStatement) ? overrideDeleteStatement - : symmetricDialect.createPurgeSqlFor(targetNode, triggerRouter, triggerHistory, transforms); + if (StringUtils.isNotBlank(overrideDeleteStatement)) { + createPurgeEvent(transaction, overrideDeleteStatement, targetNode, sourceNode, + triggerRouter, triggerHistory, isLoad, loadId, createBy); + + } else if (transforms != null && transforms.size() > 0) { + List sqlStatements = symmetricDialect.createPurgeSqlForMultipleTables(targetNode, triggerRouter, + triggerHistory, transforms, null); + for (String sql : sqlStatements) { + createPurgeEvent(transaction, + sql, + targetNode, sourceNode, + triggerRouter, triggerHistory, isLoad, loadId, createBy); + } + } else { + createPurgeEvent(transaction, + symmetricDialect.createPurgeSqlFor(targetNode, triggerRouter, triggerHistory, transforms), + targetNode, sourceNode, + triggerRouter, triggerHistory, isLoad, loadId, createBy); + } + + } + + protected void createPurgeEvent(ISqlTransaction transaction, String sql, Node targetNode, Node sourceNode, + TriggerRouter triggerRouter, TriggerHistory triggerHistory, boolean isLoad, + long loadId, String createBy) { + sql = FormatUtils.replace("groupId", targetNode.getNodeGroupId(), sql); sql = FormatUtils.replace("externalId", targetNode.getExternalId(), sql); sql = FormatUtils.replace("nodeId", targetNode.getNodeId(), sql); @@ -1065,7 +1089,7 @@ protected void insertPurgeEvent(ISqlTransaction transaction, Node targetNode, insertData(transaction, data); } } - + public void insertSqlEvent(Node targetNode, String sql, boolean isLoad, long loadId, String createBy) { TriggerHistory history = engine.getTriggerRouterService()