Skip to content
Permalink
Browse files

0003802: Faster two-pass initial load for tables with LOB on Oracle

  • Loading branch information...
erilong committed Nov 21, 2018
1 parent c5ffcfc commit 98deedd275f87232ef3827042785324d920ee11d
@@ -27,6 +27,7 @@

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.platform.PermissionType;
@@ -367,6 +368,21 @@ public String massageForLob(String sql, Channel channel) {
}
}

@Override
public boolean isInitialLoadTwoPassLob(Table table) {
return parameterService.is(ParameterConstants.INITIAL_LOAD_EXTRACT_USE_TWO_PASS_LOB)
&& table.containsLobColumns(this.platform);
}

@Override
public String getInitialLoadTwoPassLobLengthSql(Column column, boolean isFirstPass) {
String quote = this.platform.getDdlBuilder().getDatabaseInfo().getDelimiterToken();
if (isFirstPass) {
return "dbms_lob.getlength(t." + quote + column.getName() + quote + ") <= 4000";
}
return "dbms_lob.getlength(t." + quote + column.getName() + quote + ") > 4000";
}

@Override
protected String getDbSpecificDataHasChangedCondition(Trigger trigger) {
if (!trigger.isUseCaptureLobs()) {
@@ -7,6 +7,7 @@

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.platform.PermissionType;
@@ -330,6 +331,21 @@ public String massageForLob(String sql, Channel channel) {
}
}

@Override
public boolean isInitialLoadTwoPassLob(Table table) {
return parameterService.is(ParameterConstants.INITIAL_LOAD_EXTRACT_USE_TWO_PASS_LOB)
&& table.containsLobColumns(this.platform);
}

@Override
public String getInitialLoadTwoPassLobLengthSql(Column column, boolean isFirstPass) {
String quote = this.platform.getDdlBuilder().getDatabaseInfo().getDelimiterToken();
if (isFirstPass) {
return "dbms_lob.getlength(t." + quote + column.getName() + quote + ") <= 4000";
}
return "dbms_lob.getlength(t." + quote + column.getName() + quote + ") > 4000";
}

@Override
protected String getDbSpecificDataHasChangedCondition(Trigger trigger) {
if (!trigger.isUseCaptureLobs()) {
@@ -155,6 +155,7 @@ private ParameterConstants() {
public final static String INITIAL_LOAD_CONCAT_CSV_IN_SQL_ENABLED = "initial.load.concat.csv.in.sql.enabled";
public final static String INITIAL_LOAD_EXTRACT_THREAD_COUNT_PER_SERVER = "initial.load.extract.thread.per.server.count";
public final static String INITIAL_LOAD_EXTRACT_TIMEOUT_MS = "initial.load.extract.timeout.ms";
public final static String INITIAL_LOAD_EXTRACT_USE_TWO_PASS_LOB = "initial.load.extract.use.two.pass.lob";
public final static String INITIAL_LOAD_EXTRACT_JOB_START = "start.initial.load.extract.job";
public final static String INITIAL_LOAD_SCHEMA_DUMP_COMMAND = "initial.load.schema.dump.command";
public final static String INITIAL_LOAD_SCHEMA_LOAD_COMMAND = "initial.load.schema.load.command";
@@ -855,6 +855,45 @@ public String massageForLob(String sql, Channel channel) {
return sql;
}

public boolean isInitialLoadTwoPassLob(Table table) {
return false;
}

public String getInitialLoadTwoPassLobSql(String sql, Table table, boolean isFirstPass) {
List<Column> columns = table.getLobColumns(this.platform);
boolean isFirstColumn = true;

if (columns.size() > 0) {
sql = sql == null ? "" : sql;
if (!sql.equals("")) {
sql += " and ";
}
sql += "(";
}

for (Column column : table.getLobColumns(this.platform)) {
if (isFirstColumn) {
isFirstColumn = false;
} else {
if (isFirstPass) {
sql += " and ";
} else {
sql += " or ";
}
}
sql += getInitialLoadTwoPassLobLengthSql(column, isFirstPass);
}

if (columns.size() > 0) {
sql += ")";
}
return sql;
}

public String getInitialLoadTwoPassLobLengthSql(Column column, boolean isFirstPass) {
return null;
}

public boolean escapesTemplatesForDatabaseInserts() {
return false;
}
@@ -178,40 +178,32 @@ public String createInitalLoadSql(Node node, TriggerRouter triggerRouter, Table
if (i > 0) {
columnList.append(",");
}
boolean isLob = symmetricDialect.getPlatform()
.isLob(column.getMappedTypeCode());
if (!(isLob && triggerRouter.getTrigger().isUseStreamLobs())) {

String columnExpression = null;
if (useTriggerTemplateForColumnTemplatesDuringInitialLoad()) {
ColumnString columnString = fillOutColumnTemplate(tableAlias,
tableAlias, "", table, column, DataEventType.INSERT, false, channel,
triggerRouter.getTrigger());
columnExpression = columnString.columnString;
if (isNotBlank(textColumnExpression)
&& TypeMap.isTextType(column.getMappedTypeCode())) {
columnExpression = textColumnExpression.replace("$(columnName)",
columnExpression);
}
} else {
columnExpression = SymmetricUtils.quote(symmetricDialect,
column.getName());

if (dateTimeAsString
&& TypeMap.isDateTimeType(column.getMappedTypeCode())) {
columnExpression = castDatetimeColumnToString(column.getName());
} else if (isNotBlank(textColumnExpression)
&& TypeMap.isTextType(column.getMappedTypeCode())) {
columnExpression = textColumnExpression.replace("$(columnName)",
columnExpression);
}
String columnExpression = null;
if (useTriggerTemplateForColumnTemplatesDuringInitialLoad()) {
ColumnString columnString = fillOutColumnTemplate(tableAlias,
tableAlias, "", table, column, DataEventType.INSERT, false, channel,
triggerRouter.getTrigger());
columnExpression = columnString.columnString;
if (isNotBlank(textColumnExpression)
&& TypeMap.isTextType(column.getMappedTypeCode())) {
columnExpression = textColumnExpression.replace("$(columnName)",
columnExpression);
}

columnList.append(columnExpression).append(" as ").append("x__").append(i);

} else {
columnList.append(" ").append(emptyColumnTemplate).append(" as ").append("x__").append(i);
columnExpression = SymmetricUtils.quote(symmetricDialect,
column.getName());

if (dateTimeAsString
&& TypeMap.isDateTimeType(column.getMappedTypeCode())) {
columnExpression = castDatetimeColumnToString(column.getName());
} else if (isNotBlank(textColumnExpression)
&& TypeMap.isTextType(column.getMappedTypeCode())) {
columnExpression = textColumnExpression.replace("$(columnName)",
columnExpression);
}
}

columnList.append(columnExpression).append(" as ").append("x__").append(i);
}
}
sql = FormatUtils.replace("columns", columnList.toString(), sql);
@@ -224,6 +224,10 @@ public void removeTrigger(StringBuilder sqlBuffer, String catalogName, String sc
public String massageDataExtractionSql(String sql, Channel channel);

public String massageForLob(String sql, Channel channel);

public boolean isInitialLoadTwoPassLob(Table table);

public String getInitialLoadTwoPassLobSql(String sql, Table table, boolean isFirstPass);

/*
* Indicates that the dialect relies on SQL that is to be inserted into the database for use
@@ -2421,6 +2421,8 @@ public void close() {
private String selfRefChildColumnName;

boolean isFirstRow;

boolean isLobFirstPass;

public SelectFromTableSource(OutgoingBatch outgoingBatch, Batch batch,
SelectFromTableEvent event) {
@@ -2527,6 +2529,10 @@ protected CsvData selectNext() {
}
}

if (symmetricDialect.isInitialLoadTwoPassLob(this.sourceTable)) {
this.isLobFirstPass = true;
}

this.startNewCursor(history, triggerRouter);
}
}
@@ -2539,6 +2545,9 @@ protected CsvData selectNext() {
this.selfRefLevel++;
this.startNewCursor(this.currentInitialLoadEvent.getTriggerHistory(), triggerRouter);
this.isFirstRow = true;
} else if (symmetricDialect.isInitialLoadTwoPassLob(this.sourceTable) && this.isLobFirstPass) {
this.isLobFirstPass = false;
this.startNewCursor(this.currentInitialLoadEvent.getTriggerHistory(), triggerRouter);
} else {
this.currentInitialLoadEvent = null;
}
@@ -2587,12 +2596,19 @@ protected void startNewCursor(final TriggerHistory triggerHistory,
}
log.info("Querying level {} for table {}: {}", selfRefLevel, sourceTable.getName(), selectSql);
}

Channel channel = configurationService.getChannel(triggerRouter.getTrigger().getChannelId());

if (symmetricDialect.isInitialLoadTwoPassLob(this.sourceTable)) {
channel = new Channel();
channel.setContainsBigLob(!this.isLobFirstPass);
selectSql = symmetricDialect.getInitialLoadTwoPassLobSql(selectSql, this.sourceTable, this.isLobFirstPass);
log.info("Querying {} pass LOB for table {}: {}", (this.isLobFirstPass ? "first" : "second"), sourceTable.getName(), selectSql);
}

String sql = symmetricDialect.createInitialLoadSqlFor(
this.currentInitialLoadEvent.getNode(), triggerRouter, sourceTable,
triggerHistory,
configurationService.getChannel(triggerRouter.getTrigger().getChannelId()),
selectSql);
this.currentInitialLoadEvent.getNode(), triggerRouter, sourceTable, triggerHistory, channel, selectSql);

for (IReloadVariableFilter filter : extensionService.getExtensionPointList(IReloadVariableFilter.class)) {
sql = filter.filterInitalLoadSql(sql, node, targetTable);
}
@@ -628,6 +628,17 @@ initial.load.extract.thread.per.server.count=20
# Tags: load
initial.load.extract.timeout.ms=7200000

# Some dialects can extract small LOBs faster using a substring function.
# When the dialect supports it and the parameter is enabled, the initial load
# will extract tables with LOBs using two passes. The first pass gets rows
# with LOBs under the character limit, and the second pass gets all other rows.
# Currently implemented for Oracle/Tibero.
#
# DatabaseOverridable: true
# Tags: load
# Type: boolean
initial.load.extract.use.two.pass.lob=true

# Indicates that the SQL used to extract data from a
# table for an initial load should concatenate the data using
# the same SQL expression that a trigger uses versus concatenating

0 comments on commit 98deedd

Please sign in to comment.
You can’t perform that action at this time.