Skip to content

Commit

Permalink
0001959: Add feature where character data can be modified by an prope…
Browse files Browse the repository at this point in the history
…rty expression in order to support "weird" character issues
  • Loading branch information
chenson42 committed Sep 8, 2014
1 parent 79dcd04 commit 969795b
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 14 deletions.
Expand Up @@ -314,10 +314,11 @@ public boolean supportsTransactionViews() {
@Override
public String massageDataExtractionSql(String sql, Channel channel) {
if (channel != null && !channel.isContainsBigLob()) {
sql = StringUtils.replace(sql, "d.row_data", "dbms_lob.substr(d.row_data, 4000, 1 ) as row_data");
sql = StringUtils.replace(sql, "d.old_data", "dbms_lob.substr(d.old_data, 4000, 1 ) as old_data");
sql = StringUtils.replace(sql, "d.pk_data", "dbms_lob.substr(d.pk_data, 4000, 1 ) as pk_data");
sql = StringUtils.replace(sql, "d.row_data", "dbms_lob.substr(d.row_data, 4000, 1 )");
sql = StringUtils.replace(sql, "d.old_data", "dbms_lob.substr(d.old_data, 4000, 1 )");
sql = StringUtils.replace(sql, "d.pk_data", "dbms_lob.substr(d.pk_data, 4000, 1 )");
}
sql = super.massageDataExtractionSql(sql, channel);
return sql;
}

Expand Down
Expand Up @@ -158,6 +158,7 @@ private ParameterConstants() {
public final static String DATA_RELOAD_IS_BATCH_INSERT_TRANSACTIONAL = "datareload.batch.insert.transactional";

public final static String DATA_EXTRACTOR_ENABLED = "dataextractor.enable";
public final static String DATA_EXTRACTOR_TEXT_COLUMN_EXPRESSION = "dataextractor.text.column.expression";
public final static String OUTGOING_BATCH_MAX_BATCHES_TO_SELECT = "outgoing.batches.max.to.select";

public final static String DBDIALECT_ORACLE_USE_TRANSACTION_VIEW = "oracle.use.transaction.view";
Expand Down
Expand Up @@ -20,6 +20,8 @@
*/
package org.jumpmind.symmetric.db;

import static org.apache.commons.lang.StringUtils.isNotBlank;

import java.io.IOException;
import java.sql.Types;
import java.util.ArrayList;
Expand Down Expand Up @@ -677,6 +679,12 @@ public boolean canGapsOccurInCapturedDataIds() {
}

public String massageDataExtractionSql(String sql, Channel channel) {
String textColumnExpression = parameterService.getString(ParameterConstants.DATA_EXTRACTOR_TEXT_COLUMN_EXPRESSION);
if (isNotBlank(textColumnExpression)) {
sql = sql.replace("d.old_data", textColumnExpression.replace("$(columnName)", "d.old_data"));
sql = sql.replace("d.row_data", textColumnExpression.replace("$(columnName)", "d.row_data"));
sql = sql.replace("d.pk_data", textColumnExpression.replace("$(columnName)", "d.pk_data"));
}
return sql;
}

Expand Down
Expand Up @@ -20,6 +20,8 @@
*/
package org.jumpmind.symmetric.db;

import static org.apache.commons.lang.StringUtils.isNotBlank;

import java.sql.Types;
import java.util.Map;

Expand All @@ -38,6 +40,7 @@
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.model.TriggerRouter;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.util.SymmetricUtils;
import org.jumpmind.util.FormatUtils;

Expand Down Expand Up @@ -110,22 +113,29 @@ protected AbstractTriggerTemplate(ISymmetricDialect symmetricDialect) {
public String createInitalLoadSql(Node node, TriggerRouter triggerRouter, Table originalTable,
TriggerHistory triggerHistory, Channel channel, String overrideSelectSql) {

IParameterService parameterService = symmetricDialect.getParameterService();

Table table = originalTable.copyAndFilterColumns(triggerHistory.getParsedColumnNames(),
triggerHistory.getParsedPkColumnNames(), true);

Column[] columns = table.getColumns();

String textColumnExpression = parameterService.getString(ParameterConstants.DATA_EXTRACTOR_TEXT_COLUMN_EXPRESSION);

String sql = null;

if (symmetricDialect.getParameterService().is(
if (parameterService.is(
ParameterConstants.INITIAL_LOAD_CONCAT_CSV_IN_SQL_ENABLED)) {
sql = sqlTemplates.get(INITIAL_LOAD_SQL_TEMPLATE);
String columnsText = buildColumnString(symmetricDialect.getInitialLoadTableAlias(),
symmetricDialect.getInitialLoadTableAlias(), "", columns, DataEventType.INSERT,
false, channel, triggerRouter.getTrigger()).columnString;
if (isNotBlank(textColumnExpression)) {
columnsText = textColumnExpression.replace("$(columnName)", columnsText);
}
sql = FormatUtils.replace("columns", columnsText, sql);
} else {
boolean dateTimeAsString = symmetricDialect.getParameterService().is(
} else {
boolean dateTimeAsString = parameterService.is(
ParameterConstants.DATA_LOADER_TREAT_DATETIME_AS_VARCHAR);
sql = "select $(columns) from $(schemaName)$(tableName) t where $(whereClause)";
StringBuilder columnList = new StringBuilder();
Expand All @@ -139,12 +149,17 @@ public String createInitalLoadSql(Node node, TriggerRouter triggerRouter, Table
columnList.append(",");
}

String columnExpression = SymmetricUtils.quote(symmetricDialect,
column.getName());

if (dateTimeAsString && TypeMap.isDateTimeType(column.getMappedTypeCode())) {
columnList.append(castDatetimeColumnToString(column.getName()));
} else {
columnList.append(SymmetricUtils.quote(symmetricDialect,
column.getName()));
columnExpression = castDatetimeColumnToString(column.getName());
} else if (isNotBlank(textColumnExpression) && TypeMap.isTextType(column.getMappedTypeCode())) {
columnExpression = textColumnExpression.replace("$(columnName)", columnExpression) + " as " + column.getName();
}

columnList.append(columnExpression);

}
}
}
Expand Down Expand Up @@ -613,7 +628,7 @@ protected ColumnString buildColumnString(String origTableAlias, String tableAlia
String lastCommandToken = symmetricDialect.escapesTemplatesForDatabaseInserts() ? (triggerConcatCharacter
+ "'',''" + triggerConcatCharacter)
: (triggerConcatCharacter + "','" + triggerConcatCharacter);

for (int i = 0; i < columns.length; i++) {
Column column = columns[i];
if (column != null) {
Expand Down Expand Up @@ -654,7 +669,7 @@ protected ColumnString buildColumnString(String origTableAlias, String tableAlia
templateToUse = emptyColumnTemplate;
} else {
templateToUse = clobColumnTemplate;
}
}
break;
case Types.BLOB:
if (requiresWrappedBlobTemplateForBlobType()) {
Expand Down
Expand Up @@ -40,14 +40,14 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
// Note that the order by data_id is done appended in code
putSql("selectEventDataToExtractSql",
""
+ "select d.data_id, d.table_name, d.event_type, d.row_data, d.pk_data, d.old_data, "
+ "select d.data_id, d.table_name, d.event_type, d.row_data as row_data, d.pk_data as pk_data, d.old_data as old_data, "
+ " d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list, e.router_id from $(data) d inner join "
+ " $(data_event) e on d.data_id = e.data_id inner join $(outgoing_batch) o on o.batch_id=e.batch_id "
+ " where o.batch_id = ? and o.node_id = ? ");

putSql("selectEventDataByBatchIdSql",
""
+ "select d.data_id, d.table_name, d.event_type, d.row_data, d.pk_data, d.old_data, "
+ "select d.data_id, d.table_name, d.event_type, d.row_data as row_data, d.pk_data as pk_data, d.old_data as old_data, "
+ " d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list, e.router_id from $(data) d inner join "
+ " $(data_event) e on d.data_id = e.data_id inner join $(outgoing_batch) o on o.batch_id=e.batch_id "
+ " where o.batch_id = ? ");
Expand Down
12 changes: 12 additions & 0 deletions symmetric-core/src/main/resources/symmetric-default.properties
Expand Up @@ -894,6 +894,18 @@ outgoing.batches.peek.ahead.batch.commit.size=10
# Type: boolean
dataextractor.enable=true

# Provide an expression that will be used in the trigger templates, and in the initial load
# and the sym_data extraction SQL for all text based column values (like varchar, char, nvarchar,
# clob and nchar columns). The expression can be used to make scenario based casts. For example,
# if the data in the database was inserted under a different character set that the default character set
# on Oracle, then a helpful expression might be something like this:
# convert($(columnName), 'AR8ISO8859P6', 'AR8MSWIN1256')
#
# DatabaseOverridable: true
# Tags: extract
# Type: textbox
dataextractor.text.column.expression=

# This instructs symmetric to attempt to skip duplicate batches that are received. Symmetric might
# be more efficient when recovering from error conditions if this is set to true, but you run the
# risk of missing data if the batch ids get reset (on one node, but not another) somehow (which is unlikely in production, but
Expand Down

0 comments on commit 969795b

Please sign in to comment.