Skip to content

Commit

Permalink
0004034: Create tables based on catalog, schema, table and column nam…
Browse files Browse the repository at this point in the history
…es set by routers and transforms
  • Loading branch information
evan-miller-jumpmind committed Jun 27, 2023
1 parent 8b63682 commit 6516c42
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 14 deletions.
Expand Up @@ -20,50 +20,67 @@
*/
package org.jumpmind.symmetric.extract;

import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.data.transform.ColumnPolicy;
import org.jumpmind.symmetric.io.data.transform.RemoveColumnTransform;
import org.jumpmind.symmetric.io.data.transform.TransformColumn;
import org.jumpmind.symmetric.io.data.transform.TransformPoint;
import org.jumpmind.symmetric.io.data.transform.TransformTable;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.Router;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.service.ITransformService;
import org.jumpmind.symmetric.service.ITriggerRouterService;
import org.jumpmind.symmetric.service.impl.TransformService.TransformTableNodeGroupLink;
import org.jumpmind.symmetric.util.SymmetricUtils;

public class ColumnsAccordingToTriggerHistory {
private Map<CacheKey, Table> cache = new HashMap<CacheKey, Table>();
private Node sourceNode;
private Node targetNode;
private ITriggerRouterService triggerRouterService;
private ITransformService transformService;
private ISymmetricDialect symmetricDialect;
private String tablePrefix;

public ColumnsAccordingToTriggerHistory(ISymmetricEngine engine, Node sourceNode, Node targetNode) {
triggerRouterService = engine.getTriggerRouterService();
transformService = engine.getTransformService();
symmetricDialect = engine.getSymmetricDialect();
tablePrefix = engine.getTablePrefix().toLowerCase();
this.sourceNode = sourceNode;
this.targetNode = targetNode;
}

public Table lookup(String routerId, TriggerHistory triggerHistory, boolean setTargetTableName, boolean useDatabaseDefinition) {
CacheKey key = new CacheKey(routerId, triggerHistory.getTriggerHistoryId(), setTargetTableName, useDatabaseDefinition);
public Table lookup(String routerId, TriggerHistory triggerHistory, boolean setTargetTableName,
boolean useDatabaseDefinition, boolean useTransforms) {
CacheKey key = new CacheKey(routerId, triggerHistory.getTriggerHistoryId(), setTargetTableName,
useDatabaseDefinition, useTransforms);
Table table = cache.get(key);
if (table == null) {
table = lookupAndOrderColumnsAccordingToTriggerHistory(routerId, triggerHistory, setTargetTableName, useDatabaseDefinition);
table = lookupAndOrderColumnsAccordingToTriggerHistory(routerId, triggerHistory, setTargetTableName,
useDatabaseDefinition, useTransforms);
cache.put(key, table);
}
return table;
}

protected Table lookupAndOrderColumnsAccordingToTriggerHistory(String routerId,
TriggerHistory triggerHistory, boolean setTargetTableName, boolean useDatabaseDefinition) {
protected Table lookupAndOrderColumnsAccordingToTriggerHistory(String routerId, TriggerHistory triggerHistory,
boolean setTargetTableName, boolean useDatabaseDefinition, boolean useTransforms) {
String catalogName = triggerHistory.getSourceCatalogName();
String schemaName = triggerHistory.getSourceSchemaName();
String tableName = triggerHistory.getSourceTableName();
Expand Down Expand Up @@ -101,34 +118,108 @@ protected Table lookupAndOrderColumnsAccordingToTriggerHistory(String routerId,
table.setCatalog(null);
} else if (StringUtils.isNotBlank(router.getTargetCatalogName())) {
table.setCatalog(SymmetricUtils.replaceNodeVariables(sourceNode, targetNode, router.getTargetCatalogName()));
table.setCatalog(SymmetricUtils.replaceCatalogSchemaVariables(catalogName, schemaName, router.getTargetCatalogName()));
}
if (StringUtils.equals(Constants.NONE_TOKEN, router.getTargetSchemaName())) {
table.setSchema(null);
} else if (StringUtils.isNotBlank(router.getTargetSchemaName())) {
table.setSchema(SymmetricUtils.replaceNodeVariables(sourceNode, targetNode, router.getTargetSchemaName()));
table.setSchema(SymmetricUtils.replaceCatalogSchemaVariables(catalogName, schemaName, router.getTargetSchemaName()));
}
if (StringUtils.isNotBlank(router.getTargetTableName())) {
table.setName(router.getTargetTableName());
}
}
if (useTransforms) {
TransformTable transform = getTransform(sourceNode.getNodeGroupId(), targetNode.getNodeGroupId(), table,
TransformPoint.EXTRACT, Integer.MIN_VALUE);
while (transform != null) {
applyTransform(table, transform);
transform = getTransform(sourceNode.getNodeGroupId(), targetNode.getNodeGroupId(), table,
TransformPoint.EXTRACT, transform.getTransformOrder() + 1);
}
transform = getTransform(sourceNode.getNodeGroupId(), targetNode.getNodeGroupId(), table,
TransformPoint.LOAD, Integer.MIN_VALUE);
while (transform != null) {
applyTransform(table, transform);
transform = getTransform(sourceNode.getNodeGroupId(), targetNode.getNodeGroupId(), table,
TransformPoint.LOAD, transform.getTransformOrder() + 1);
}
}
return table;
}

protected IDatabasePlatform getTargetPlatform(String tableName) {
return tableName.startsWith(tablePrefix) ? symmetricDialect.getPlatform() : symmetricDialect.getTargetDialect().getPlatform();
}

protected TransformTable getTransform(String sourceNodeGroupId, String targetNodeGroupId, Table table,
TransformPoint transformPoint, int order) {
List<TransformTableNodeGroupLink> transforms = transformService.findTransformsFor(sourceNode.getNodeGroupId(),
targetNode.getNodeGroupId(), table.getName());
if (transforms != null) {
for (TransformTableNodeGroupLink transform : transforms) {
if (StringUtils.equals(transform.getSourceCatalogName(), table.getCatalog())
&& StringUtils.equals(transform.getSourceSchemaName(), table.getSchema())
&& transform.getTransformPoint().equals(transformPoint) && transform.getTransformOrder() >= order) {
return transform;
}
}
}
return null;
}

protected void applyTransform(Table table, TransformTable transform) {
List<String> columnNamesToRemoveList = new ArrayList<String>();
if (transform.getColumnPolicy().equals(ColumnPolicy.SPECIFIED)) {
columnNamesToRemoveList.addAll(Arrays.asList(table.getColumnNames()));
}
for (TransformColumn transformColumn : transform.getTransformColumns()) {
if (StringUtils.isNotBlank(transformColumn.getSourceColumnName())) {
Column column = table.getColumnWithName(transformColumn.getSourceColumnName());
if (column != null) {
columnNamesToRemoveList.remove(column.getName());
column.setName(transformColumn.getTargetColumnName());
if (RemoveColumnTransform.NAME.equals(transformColumn.getTransformType())) {
columnNamesToRemoveList.add(column.getName());
} else {
columnNamesToRemoveList.remove(column.getName());
}
column.setPrimaryKey(transformColumn.isPk());
}
} else {
Column column = new Column(transformColumn.getTargetColumnName());
column.setPrimaryKey(transformColumn.isPk());
column.setTypeCode(Types.VARCHAR);
column.setJdbcTypeCode(Types.VARCHAR);
column.setJdbcTypeName("VARCHAR");
column.setSize("100");
table.addColumn(column);
columnNamesToRemoveList.remove(column.getName());
}
}
for (String columnName : columnNamesToRemoveList) {
table.removeColumn(table.getColumnIndex(columnName));
}
table.setCatalog(transform.getTargetCatalogName());
table.setSchema(transform.getTargetSchemaName());
table.setName(transform.getTargetTableName());
}

static class CacheKey {
private String routerId;
private int triggerHistoryId;
private boolean setTargetTableName;
private boolean useDatabaseDefinition;
private boolean useTransforms;

public CacheKey(String routerId, int triggerHistoryId, boolean setTargetTableName, boolean useDatabaseDefinition) {
public CacheKey(String routerId, int triggerHistoryId, boolean setTargetTableName,
boolean useDatabaseDefinition, boolean useTransforms) {
this.routerId = routerId;
this.triggerHistoryId = triggerHistoryId;
this.setTargetTableName = setTargetTableName;
this.useDatabaseDefinition = useDatabaseDefinition;
this.useTransforms = useTransforms;
}

@Override
Expand All @@ -139,6 +230,7 @@ public int hashCode() {
result = prime * result + (setTargetTableName ? 1231 : 1237);
result = prime * result + triggerHistoryId;
result = prime * result + (useDatabaseDefinition ? 1231 : 1237);
result = prime * result + (useTransforms ? 1231 : 1237);
return result;
}

Expand Down Expand Up @@ -170,6 +262,9 @@ public boolean equals(Object obj) {
if (useDatabaseDefinition != other.useDatabaseDefinition) {
return false;
}
if (useTransforms != other.useTransforms) {
return false;
}
return true;
}
}
Expand Down
Expand Up @@ -144,8 +144,8 @@ public CsvData next() {
boolean isFileParserRouter = triggerHistory.getTriggerId().equals(AbstractFileParsingRouter.TRIGGER_ID_FILE_PARSER);
if (lastTriggerHistory == null || lastTriggerHistory.getTriggerHistoryId() != triggerHistory.getTriggerHistoryId() ||
lastRouterId == null || !lastRouterId.equals(routerId)) {
sourceTable = columnsAccordingToTriggerHistory.lookup(routerId, triggerHistory, false, !isFileParserRouter);
targetTable = columnsAccordingToTriggerHistory.lookup(routerId, triggerHistory, true, false);
sourceTable = columnsAccordingToTriggerHistory.lookup(routerId, triggerHistory, false, !isFileParserRouter, false);
targetTable = columnsAccordingToTriggerHistory.lookup(routerId, triggerHistory, true, false, false);
if (trigger != null && trigger.isUseStreamLobs() || (data.getRowData() != null && hasLobsThatNeedExtract(sourceTable, data))) {
requiresLobSelectedFromSource = true;
} else {
Expand Down Expand Up @@ -189,7 +189,7 @@ protected Data processReloadEvent(TriggerHistory triggerHistory, TriggerRouter t
processInfo.setCurrentTableName(triggerHistory.getSourceTableName());
String initialLoadSelect = data.getRowData();
if (initialLoadSelect == null && triggerRouter.getTrigger().isStreamRow()) {
sourceTable = columnsAccordingToTriggerHistory.lookup(triggerRouter.getRouter().getRouterId(), triggerHistory, false, true);
sourceTable = columnsAccordingToTriggerHistory.lookup(triggerRouter.getRouter().getRouterId(), triggerHistory, false, true, false);
Column[] columns = sourceTable.getPrimaryKeyColumns();
String[] pkData = data.getParsedData(CsvData.PK_DATA);
boolean[] nullKeyValues = new boolean[columns.length];
Expand Down Expand Up @@ -261,7 +261,7 @@ protected boolean processCreateEvent(TriggerHistory triggerHistory, String route
*/
sourceTable = symmetricDialect.getTargetDialect().getPlatform().getTableFromCache(sourceTable.getCatalog(),
sourceTable.getSchema(), sourceTable.getName(), true);
targetTable = columnsAccordingToTriggerHistory.lookup(routerId, triggerHistory, true, true);
targetTable = columnsAccordingToTriggerHistory.lookup(routerId, triggerHistory, true, true, true);
Table copyTargetTable = targetTable.copy();
Database db = new Database();
db.setName("dataextractor");
Expand Down
Expand Up @@ -142,8 +142,8 @@ protected CsvData selectNext() {
isFirstRow = true;
if (currentInitialLoadEvent.containsData()) {
data = currentInitialLoadEvent.getData();
sourceTable = columnsAccordingToTriggerHistory.lookup(currentInitialLoadEvent.getTriggerRouter().getRouterId(), history, false, true);
targetTable = columnsAccordingToTriggerHistory.lookup(currentInitialLoadEvent.getTriggerRouter().getRouterId(), history, true, false);
sourceTable = columnsAccordingToTriggerHistory.lookup(currentInitialLoadEvent.getTriggerRouter().getRouterId(), history, false, true, false);
targetTable = columnsAccordingToTriggerHistory.lookup(currentInitialLoadEvent.getTriggerRouter().getRouterId(), history, true, false, false);
currentInitialLoadEvent = null;
} else {
triggerRouter = currentInitialLoadEvent.getTriggerRouter();
Expand All @@ -162,8 +162,8 @@ protected CsvData selectNext() {
: new NodeChannel(triggerRouter.getTrigger().getChannelId());
routingContext = new SimpleRouterContext(batch == null ? null : batch.getTargetNodeId(), channel);
}
sourceTable = columnsAccordingToTriggerHistory.lookup(triggerRouter.getRouter().getRouterId(), history, false, true);
targetTable = columnsAccordingToTriggerHistory.lookup(triggerRouter.getRouter().getRouterId(), history, true, false);
sourceTable = columnsAccordingToTriggerHistory.lookup(triggerRouter.getRouter().getRouterId(), history, false, true, false);
targetTable = columnsAccordingToTriggerHistory.lookup(triggerRouter.getRouter().getRouterId(), history, true, false, false);
overrideSelectSql = currentInitialLoadEvent.getInitialLoadSelect();
if (overrideSelectSql != null && overrideSelectSql.trim().toUpperCase().startsWith("WHERE")) {
overrideSelectSql = overrideSelectSql.trim().substring(5);
Expand Down
Expand Up @@ -116,6 +116,16 @@ public static String replaceNodeVariables(Node sourceNode, Node targetNode, Stri
return str;
}

public static String replaceCatalogSchemaVariables(String catalogName, String schemaName, String str) {
if (catalogName != null) {
str = FormatUtils.replace("sourceCatalogName", catalogName, str);
}
if (schemaName != null) {
str = FormatUtils.replace("sourceSchemaName", schemaName, str);
}
return str;
}

public static void logNotices() {
synchronized (SymmetricUtils.class) {
if (isNoticeLogged) {
Expand Down

0 comments on commit 6516c42

Please sign in to comment.