Skip to content
Permalink
Browse files

0003806: Initial load large character (varchar 4000) columns and stay

under SQL limit
  • Loading branch information...
erilong committed Nov 27, 2018
1 parent 97be356 commit 7cb365cd3b4d8de1141c5f304e8fe1d3bfa6939a
@@ -1,5 +1,6 @@
package org.jumpmind.symmetric.db.db2;

import org.jumpmind.db.model.Column;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;

@@ -126,5 +127,10 @@ public Db2As400TriggerTemplate(ISymmetricDialect symmetricDialect) {
public boolean useTriggerTemplateForColumnTemplatesDuringInitialLoad() {
return false;
}

@Override
protected boolean useTriggerTemplateForColumnTemplatesDuringInitialLoad(Column column) {
return false;
}

}
@@ -22,6 +22,7 @@

import java.util.HashMap;

import org.jumpmind.db.model.Column;
import org.jumpmind.db.platform.firebird.FirebirdDialect1DatabasePlatform;
import org.jumpmind.symmetric.db.AbstractTriggerTemplate;
import org.jumpmind.symmetric.db.ISymmetricDialect;
@@ -186,4 +187,8 @@ public boolean useTriggerTemplateForColumnTemplatesDuringInitialLoad() {
return false;
}

@Override
protected boolean useTriggerTemplateForColumnTemplatesDuringInitialLoad(Column column) {
return false;
}
}
@@ -22,6 +22,7 @@

import java.util.HashMap;

import org.jumpmind.db.model.Column;
import org.jumpmind.symmetric.db.AbstractTriggerTemplate;
import org.jumpmind.symmetric.db.ISymmetricDialect;

@@ -204,4 +205,8 @@ public boolean useTriggerTemplateForColumnTemplatesDuringInitialLoad() {
return false;
}

@Override
protected boolean useTriggerTemplateForColumnTemplatesDuringInitialLoad(Column column) {
return false;
}
}
@@ -153,6 +153,7 @@ private ParameterConstants() {
public final static String INITIAL_LOAD_REVERSE_FIRST = "initial.load.reverse.first";
public final static String INITIAL_LOAD_USE_EXTRACT_JOB = "initial.load.use.extract.job.enabled";
public final static String INITIAL_LOAD_CONCAT_CSV_IN_SQL_ENABLED = "initial.load.concat.csv.in.sql.enabled";
public final static String INITIAL_LOAD_USE_COLUMN_TEMPLATES_ENABLED = "initial.load.use.column.templates.enabled";
public final static String INITIAL_LOAD_EXTRACT_THREAD_COUNT_PER_SERVER = "initial.load.extract.thread.per.server.count";
public final static String INITIAL_LOAD_EXTRACT_TIMEOUT_MS = "initial.load.extract.timeout.ms";
public final static String INITIAL_LOAD_EXTRACT_USE_TWO_PASS_LOB = "initial.load.extract.use.two.pass.lob";
@@ -230,6 +230,9 @@ public String createInitialLoadSqlFor(Node node, TriggerRouter trigger, Table ta
.trim();
}

public boolean[] getColumnPositionUsingTemplate(Table originalTable, TriggerHistory triggerHistory) {
return triggerTemplate.getColumnPositionUsingTemplate(originalTable, triggerHistory);
}

public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerHistory triggerHistory) {
return createPurgeSqlFor(node, triggerRouter, triggerHistory, null);
@@ -135,6 +135,26 @@ protected AbstractTriggerTemplate(ISymmetricDialect symmetricDialect) {
* the dialect that we want to select the columns straight up.
*/
public boolean useTriggerTemplateForColumnTemplatesDuringInitialLoad() {
return this.symmetricDialect.getParameterService().is(ParameterConstants.INITIAL_LOAD_USE_COLUMN_TEMPLATES_ENABLED);
}

/**
* When INITIAL_LOAD_USE_COLUMN_TEMPLATES_ENABLED is true, column templates are used for all columns.
* When false, only specific column types have been implemented to format data in code.
*/
protected boolean useTriggerTemplateForColumnTemplatesDuringInitialLoad(Column column) {
if (!useTriggerTemplateForColumnTemplatesDuringInitialLoad() && column != null) {
int type = column.getJdbcTypeCode();
// These column types can be selected directly without a template
if (type == Types.CHAR || type == Types.NCHAR || type == Types.VARCHAR || type == ColumnTypes.NVARCHAR
|| type == Types.LONGVARCHAR || type == ColumnTypes.LONGNVARCHAR || type == Types.CLOB
|| type == Types.TINYINT || type == Types.SMALLINT || type == Types.INTEGER || type == Types.BIGINT
|| type == Types.NUMERIC || type == Types.BINARY || type == Types.VARBINARY
|| (type == Types.BLOB && !requiresWrappedBlobTemplateForBlobType()) || type == Types.LONGVARBINARY
|| type == ColumnTypes.MSSQL_NTEXT) {
return false;
}
}
return true;
}

@@ -179,7 +199,7 @@ public String createInitalLoadSql(Node node, TriggerRouter triggerRouter, Table
columnList.append(",");
}
String columnExpression = null;
if (useTriggerTemplateForColumnTemplatesDuringInitialLoad()) {
if (useTriggerTemplateForColumnTemplatesDuringInitialLoad(column)) {
ColumnString columnString = fillOutColumnTemplate(tableAlias,
tableAlias, "", table, column, DataEventType.INSERT, false, channel,
triggerRouter.getTrigger());
@@ -240,7 +260,27 @@ public String createInitalLoadSql(Node node, TriggerRouter triggerRouter, Table

return sql;
}


public boolean[] getColumnPositionUsingTemplate(Table originalTable, TriggerHistory triggerHistory) {

IParameterService parameterService = symmetricDialect.getParameterService();
boolean concatInCsv = parameterService.is(ParameterConstants.INITIAL_LOAD_CONCAT_CSV_IN_SQL_ENABLED);

Table table = originalTable.copyAndFilterColumns(triggerHistory.getParsedColumnNames(),
triggerHistory.getParsedPkColumnNames(), true);

Column[] columns = table.getColumns();
boolean[] isColumnPositionUsingTemplate = new boolean[columns.length];

if (!concatInCsv) {
for (int i = 0; i < columns.length; i++) {
isColumnPositionUsingTemplate[i] = useTriggerTemplateForColumnTemplatesDuringInitialLoad(columns[i]);
}
}

return isColumnPositionUsingTemplate;
}

protected String castDatetimeColumnToString(String columnName) {
return SymmetricUtils.quote(symmetricDialect, columnName);
}
@@ -831,7 +871,7 @@ else if (column.getJdbcTypeName() != null
break;
}
case Types.LONGVARBINARY:
case -10: // SQL-Server ntext binary type
case ColumnTypes.MSSQL_NTEXT:
if (column.getJdbcTypeName() != null
&& (column.getJdbcTypeName().toUpperCase().contains(TypeMap.IMAGE))
&& StringUtils.isNotBlank(imageColumnTemplate)) {
@@ -108,6 +108,8 @@ public void removeTrigger(StringBuilder sqlBuffer, String catalogName, String sc
public String getTransactionTriggerExpression(String defaultCatalog, String defaultSchema, Trigger trigger);

public String createInitialLoadSqlFor(Node node, TriggerRouter trigger, Table table, TriggerHistory triggerHistory, Channel channel, String overrideSelectSql);

public boolean[] getColumnPositionUsingTemplate(Table originalTable, TriggerHistory triggerHistory);

public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerHistory triggerHistory);

@@ -2619,6 +2619,7 @@ protected void startNewCursor(final TriggerHistory triggerHistory,
ParameterConstants.INITIAL_LOAD_CONCAT_CSV_IN_SQL_ENABLED);
final boolean objectValuesWillNeedEscaped = !symmetricDialect.getTriggerTemplate()
.useTriggerTemplateForColumnTemplatesDuringInitialLoad();
final boolean[] isColumnPositionUsingTemplate = symmetricDialect.getColumnPositionUsingTemplate(sourceTable, triggerHistory);
log.debug(sql);

this.cursor = sqlTemplate.queryForCursor(initialLoadSql, new ISqlRowMapper<Data>() {
@@ -2627,13 +2628,11 @@ public Data mapRow(Row row) {
if (selectedAsCsv) {
csvRow = row.stringValue();
} else if (objectValuesWillNeedEscaped) {
String[] rowData = platform.getStringValues(
csvRow = platform.getCsvStringValue(
symmetricDialect.getBinaryEncoding(), sourceTable.getColumns(),
row, false, true);
csvRow = CsvUtils.escapeCsvData(rowData, '\0', '"');
row, isColumnPositionUsingTemplate);
} else {
csvRow = row.csvValue();

}
int commaCount = StringUtils.countMatches(csvRow, ",");
if (expectedCommaCount <= commaCount) {
@@ -649,6 +649,17 @@ initial.load.extract.use.two.pass.lob=true
# Type: boolean
initial.load.concat.csv.in.sql.enabled=false

# The initial load SQL to extract a table will wrap each column with
# a template of SQL functions to format the data using the database.
# When disabled, each column is selected directly, and data is
# formatted by the server instead. Currently, the server can format
# string, integer, and LOB types, but other types still use templates.
#
# DatabaseOverridable: true
# Tags: load
# Type: boolean
initial.load.use.column.templates.enabled=false

# This is SQL that will run on the client before an initial load starts.
# The default delimiter for these lines is a semicolon. To override, include
# a single line that starts with delimiter and is followed by the new delimiter, then the old
@@ -521,6 +521,44 @@ protected String cleanNumber(String value) {
return values;
}

public String getCsvStringValue(BinaryEncoding encoding, Column[] metaData, Row row, boolean[] isColumnPositionUsingTemplate) {
StringBuilder concatenatedRow = new StringBuilder();
Set<String> names = row.keySet();
int i = 0;
for (String name : names) {
Column column = metaData[i];
int type = column.getJdbcTypeCode();
if (i > 0) {
concatenatedRow.append(",");
}
if (row.get(name) != null) {
if (isColumnPositionUsingTemplate[i]) {
concatenatedRow.append(row.getString(name));
} else if (column.isOfNumericType()) {
concatenatedRow.append(row.getString(name));
} else if (!column.isTimestampWithTimezone() && (type == Types.DATE || type == Types.TIMESTAMP || type == Types.TIME)) {
concatenatedRow.append("\"").append(getDateTimeStringValue(name, type, row, false)).append("\"");
} else if (type == Types.BOOLEAN || type == Types.BIT) {
concatenatedRow.append(row.getBoolean(name) ? "1" : "0");
} else if (column.isOfBinaryType()) {
byte[] bytes = row.getBytes(name);
if (encoding == BinaryEncoding.NONE) {
concatenatedRow.append(row.getString(name));
} else if (encoding == BinaryEncoding.BASE64) {
concatenatedRow.append(new String(Base64.encodeBase64(bytes)));
} else if (encoding == BinaryEncoding.HEX) {
concatenatedRow.append(new String(Hex.encodeHex(bytes)));
}
} else {
concatenatedRow.append("\"").append(row.getString(name).replace("\\", "\\\\").replace("\"", "\\\"")).append("\"");
}
}

i++;
}
return concatenatedRow.toString();
}

protected String getDateTimeStringValue(String name, int type, Row row, boolean useVariableDates) {
Object dateObj = row.get(name);
if (dateObj instanceof String) {
@@ -134,6 +134,8 @@ public DmlStatement createDmlStatement(DmlType dmlType, String catalogName, Stri
Column[] orderedMetaData, boolean useVariableDates, boolean fitToColumn);

public String[] getStringValues(BinaryEncoding encoding, Column[] metaData, Row row, boolean useVariableDates, boolean indexByPosition);

public String getCsvStringValue(BinaryEncoding encoding, Column[] metaData, Row row, boolean[] needEscaped);

public Database readDatabaseFromXml(String filePath, boolean alterCaseToMatchDatabaseDefaultCase);

0 comments on commit 7cb365c

Please sign in to comment.
You can’t perform that action at this time.