Skip to content

Commit

Permalink
0003294: Loads that use a custom sql (truncate table %s) to not take
Browse files Browse the repository at this point in the history
into account transforms
  • Loading branch information
jumpmind-josh committed Oct 28, 2017
1 parent 0d82752 commit a23fd54
Showing 1 changed file with 45 additions and 8 deletions.
Expand Up @@ -870,13 +870,15 @@ private void insertSQLBatchesForReload(Node targetNode, long loadId, String crea
&& engine.getGroupletService().isTargetEnabled(triggerRouter,
targetNode)) {

String tableName = triggerRouter.qualifiedTargetTableName(triggerHistory);
String formattedBeforeSql = String.format(currentRequest.getBeforeCustomSql(), tableName) + ";";
List<String> sqlStatements = resolveTargetTables(currentRequest.getBeforeCustomSql(),
triggerRouter, triggerHistory, targetNode);

insertSqlEvent(transaction, triggerHistory, triggerRouter.getTrigger().getChannelId(),
targetNode, formattedBeforeSql,
true, loadId, createBy);
sqlEventsSent++;
for (String sql : sqlStatements) {
insertSqlEvent(transaction, triggerHistory, triggerRouter.getTrigger().getChannelId(),
targetNode, sql,
true, loadId, createBy);
sqlEventsSent++;
}
if (!transactional) {
transaction.commit();
}
Expand Down Expand Up @@ -1076,8 +1078,11 @@ protected void insertPurgeEvent(ISqlTransaction transaction, Node targetNode,
sourceNode.getNodeGroupId(), targetNode.getNodeGroupId(), triggerRouter.getTargetTable(triggerHistory));

if (StringUtils.isNotBlank(overrideDeleteStatement)) {
createPurgeEvent(transaction, overrideDeleteStatement, targetNode, sourceNode,
triggerRouter, triggerHistory, isLoad, loadId, createBy);
List<String> sqlStatements = resolveTargetTables(overrideDeleteStatement, triggerRouter, triggerHistory, targetNode);
for (String sql : sqlStatements) {
createPurgeEvent(transaction, sql, targetNode, sourceNode,
triggerRouter, triggerHistory, isLoad, loadId, createBy);
}

} else if (transforms != null && transforms.size() > 0) {
List<String> sqlStatements = symmetricDialect.createPurgeSqlForMultipleTables(targetNode, triggerRouter,
Expand All @@ -1096,6 +1101,38 @@ protected void insertPurgeEvent(ISqlTransaction transaction, Node targetNode,
}

}

public List<String> resolveTargetTables(String sql, TriggerRouter triggerRouter, TriggerHistory triggerHistory, Node targetNode) {
if (sql == null) { return null; }

List<String> sqlStatements = new ArrayList<String>();
if (sql != null && sql.contains("%s")) {
Set<String> tableNames = new HashSet<String>();
Node sourceNode = engine.getNodeService().findIdentity();
String sourceTableName = triggerRouter.qualifiedTargetTableName(triggerHistory);

List<TransformTableNodeGroupLink> transforms =
this.engine.getTransformService().findTransformsFor(
sourceNode.getNodeGroupId(), targetNode.getNodeGroupId(), triggerRouter.getTargetTable(triggerHistory));

if (transforms != null) {
for (TransformTableNodeGroupLink transform : transforms) {
tableNames.add(transform.getFullyQualifiedTargetTableName());
}
} else {
tableNames.add(sourceTableName);
}

for (String tableName : tableNames) {
sqlStatements.add(String.format(sql, tableName));
}
}
else {
sqlStatements.add(sql);
}
return sqlStatements;

}

protected void createPurgeEvent(ISqlTransaction transaction, String sql, Node targetNode, Node sourceNode,
TriggerRouter triggerRouter, TriggerHistory triggerHistory, boolean isLoad,
Expand Down

0 comments on commit a23fd54

Please sign in to comment.