Skip to content

Commit

Permalink
0004831: Initial Load events for config tables need to use correct
Browse files Browse the repository at this point in the history
dialect when running load only, extract only, or log miner
  • Loading branch information
Philip Marzullo committed Feb 16, 2021
1 parent fbe4c0c commit a85760c
Showing 1 changed file with 20 additions and 11 deletions.
Expand Up @@ -2903,6 +2903,7 @@ public boolean shouldDataBeRouted(CsvData data) {

protected CsvData selectNext() {
CsvData data = null;
ISymmetricDialect symmetricDialectToUse = getSymmetricDialect();
if (this.currentInitialLoadEvent == null && selectFromTableEventsToSend.size() > 0) {
this.currentInitialLoadEvent = selectFromTableEventsToSend.remove(0);
TriggerHistory history = this.currentInitialLoadEvent.getTriggerHistory();
Expand Down Expand Up @@ -2962,7 +2963,7 @@ protected CsvData selectNext() {
}
}

if (this.routingContext.getChannel().isReloadFlag() && getSymmetricDialect().isInitialLoadTwoPassLob(this.sourceTable)) {
if (this.routingContext.getChannel().isReloadFlag() && symmetricDialectToUse.isInitialLoadTwoPassLob(this.sourceTable)) {
this.isLobFirstPass = true;
}

Expand All @@ -2978,7 +2979,7 @@ protected CsvData selectNext() {
this.selfRefLevel++;
this.startNewCursor(this.currentInitialLoadEvent.getTriggerHistory(), triggerRouter);
this.isFirstRow = true;
} else if (getSymmetricDialect().isInitialLoadTwoPassLob(this.sourceTable) && this.isLobFirstPass) {
} else if (symmetricDialectToUse.isInitialLoadTwoPassLob(this.sourceTable) && this.isLobFirstPass) {
this.isLobFirstPass = false;
this.startNewCursor(this.currentInitialLoadEvent.getTriggerHistory(), triggerRouter);
} else {
Expand All @@ -3001,11 +3002,19 @@ protected void closeCursor() {
}

public ISymmetricDialect getSymmetricDialect() {
return this.isConfiguration ? symmetricDialect : symmetricDialect.getTargetDialect();
ISymmetricDialect dialect = null;
if (this.isConfiguration || sourceTable.getName().toLowerCase().startsWith(parameterService.getTablePrefix().toLowerCase() + "_")) {
dialect = symmetricDialect;
} else {
dialect = symmetricDialect.getTargetDialect();
}
return dialect;
}

protected void startNewCursor(final TriggerHistory triggerHistory,
final TriggerRouter triggerRouter) {

ISymmetricDialect symmetricDialectToUse = getSymmetricDialect();

String selectSql = overrideSelectSql;
if (isSelfReferencingFk) {
Expand All @@ -3018,7 +3027,7 @@ protected void startNewCursor(final TriggerHistory triggerHistory,
if (selfRefLevel == 0) {
selectSql += "(" + selfRefParentColumnName + " is null or " + selfRefParentColumnName + " = " + selfRefChildColumnName + ") ";
} else {
DatabaseInfo info = getSymmetricDialect().getPlatform().getDatabaseInfo();
DatabaseInfo info = symmetricDialectToUse.getPlatform().getDatabaseInfo();
String tableName = Table.getFullyQualifiedTableName(sourceTable.getCatalog(), sourceTable.getSchema(),
sourceTable.getName(), info.getDelimiterToken(), info.getCatalogSeparator(), info.getSchemaSeparator());
String refSql= "select " + selfRefChildColumnName + " from " + tableName +
Expand All @@ -3036,14 +3045,14 @@ protected void startNewCursor(final TriggerHistory triggerHistory,

Channel channel = configurationService.getChannel(triggerRouter.getTrigger().getReloadChannelId());

if (channel.isReloadFlag() && getSymmetricDialect().isInitialLoadTwoPassLob(this.sourceTable)) {
if (channel.isReloadFlag() && symmetricDialectToUse.isInitialLoadTwoPassLob(this.sourceTable)) {
channel = new Channel();
channel.setContainsBigLob(!this.isLobFirstPass);
selectSql = getSymmetricDialect().getInitialLoadTwoPassLobSql(selectSql, this.sourceTable, this.isLobFirstPass);
selectSql = symmetricDialectToUse.getInitialLoadTwoPassLobSql(selectSql, this.sourceTable, this.isLobFirstPass);
log.info("Querying {} pass LOB for table {}: {}", (this.isLobFirstPass ? "first" : "second"), sourceTable.getName(), selectSql);
}

String sql = getSymmetricDialect().createInitialLoadSqlFor(
String sql = symmetricDialectToUse.createInitialLoadSqlFor(
this.currentInitialLoadEvent.getNode(), triggerRouter, sourceTable, triggerHistory, channel, selectSql);

for (IReloadVariableFilter filter : extensionService.getExtensionPointList(IReloadVariableFilter.class)) {
Expand All @@ -3052,16 +3061,16 @@ protected void startNewCursor(final TriggerHistory triggerHistory,

final String initialLoadSql = sql;
final int expectedCommaCount = triggerHistory.getParsedColumnNames().length - 1;
final boolean selectedAsCsv = getSymmetricDialect().getParameterService().is(
final boolean selectedAsCsv = symmetricDialectToUse.getParameterService().is(
ParameterConstants.INITIAL_LOAD_CONCAT_CSV_IN_SQL_ENABLED);
final boolean objectValuesWillNeedEscaped = !getSymmetricDialect().getTriggerTemplate()
final boolean objectValuesWillNeedEscaped = !symmetricDialectToUse.getTriggerTemplate()
.useTriggerTemplateForColumnTemplatesDuringInitialLoad();
final boolean[] isColumnPositionUsingTemplate = getSymmetricDialect().getColumnPositionUsingTemplate(sourceTable, triggerHistory);
final boolean[] isColumnPositionUsingTemplate = symmetricDialectToUse.getColumnPositionUsingTemplate(sourceTable, triggerHistory);
final boolean checkRowLength = parameterService.is(ParameterConstants.EXTRACT_CHECK_ROW_SIZE, false);
final long rowMaxLength = parameterService.getLong(ParameterConstants.EXTRACT_ROW_MAX_LENGTH, 1000000000);
log.debug(sql);

this.cursor = getSymmetricDialect().getPlatform().getSqlTemplate().queryForCursor(initialLoadSql, new ISqlRowMapper<Data>() {
this.cursor = symmetricDialectToUse.getPlatform().getSqlTemplate().queryForCursor(initialLoadSql, new ISqlRowMapper<Data>() {
public Data mapRow(Row row) {
if (checkRowLength) {
// Account for double byte characters and encoding
Expand Down

0 comments on commit a85760c

Please sign in to comment.