Skip to content

Commit

Permalink
0001215: The initial load sql is not overridden in the reloadTable me…
Browse files Browse the repository at this point in the history
…thod
  • Loading branch information
chenson42 committed May 14, 2013
1 parent 6216bdc commit 058cfb5
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 17 deletions.
Expand Up @@ -41,8 +41,8 @@ public AbstractEmbeddedSymmetricDialect(IParameterService parameterService,
* When returning the raw SQL for use as SQL it needs to be un-escaped.
*/
@Override
public String createInitialLoadSqlFor(Node node, TriggerRouter trigger, Table table, TriggerHistory triggerHistory, Channel channel) {
String sql = super.createInitialLoadSqlFor(node, trigger, table, triggerHistory, channel);
public String createInitialLoadSqlFor(Node node, TriggerRouter trigger, Table table, TriggerHistory triggerHistory, Channel channel, String overrideSelectSql) {
String sql = super.createInitialLoadSqlFor(node, trigger, table, triggerHistory, channel, overrideSelectSql);
sql = sql.replace("''", "'");
return sql;
}
Expand Down
Expand Up @@ -210,8 +210,8 @@ public String getTransactionTriggerExpression(String defaultCatalog, String defa
}

public String createInitialLoadSqlFor(Node node, TriggerRouter trigger, Table table,
TriggerHistory triggerHistory, Channel channel) {
return triggerTemplate.createInitalLoadSql(node, trigger, table, triggerHistory, channel)
TriggerHistory triggerHistory, Channel channel, String overrideSelectSql) {
return triggerTemplate.createInitalLoadSql(node, trigger, table, triggerHistory, channel, overrideSelectSql)
.trim();
}

Expand Down
Expand Up @@ -110,7 +110,7 @@ protected AbstractTriggerTemplate(ISymmetricDialect symmetricDialect) {
}

public String createInitalLoadSql(Node node, TriggerRouter triggerRouter, Table originalTable,
TriggerHistory triggerHistory, Channel channel) {
TriggerHistory triggerHistory, Channel channel, String overrideSelectSql) {

Table table = originalTable.copyAndFilterColumns(triggerHistory.getParsedColumnNames(),
triggerHistory.getParsedPkColumnNames(), true);
Expand All @@ -123,11 +123,15 @@ public String createInitalLoadSql(Node node, TriggerRouter triggerRouter, Table
false, channel, triggerRouter.getTrigger()).columnString;

sql = FormatUtils.replace("columns", columnsText, sql);
String initialLoadSelect = StringUtils.isBlank(triggerRouter.getInitialLoadSelect()) ? Constants.ALWAYS_TRUE_CONDITION
: triggerRouter.getInitialLoadSelect();
if (StringUtils.isNotBlank(overrideSelectSql)) {
initialLoadSelect = overrideSelectSql;
}
sql = FormatUtils
.replace(
"whereClause",
StringUtils.isBlank(triggerRouter.getInitialLoadSelect()) ? Constants.ALWAYS_TRUE_CONDITION
: triggerRouter.getInitialLoadSelect(), sql);
"whereClause", initialLoadSelect
, sql);
sql = FormatUtils.replace("tableName", SymmetricUtils.quote(symmetricDialect, table.getName()), sql);
sql = FormatUtils.replace("schemaName",
triggerHistory == null ? getSourceTablePrefix(triggerRouter.getTrigger())
Expand Down
Expand Up @@ -80,7 +80,7 @@ public void removeTrigger(StringBuilder sqlBuffer, String catalogName, String sc

public String getTransactionTriggerExpression(String defaultCatalog, String defaultSchema, Trigger trigger);

public String createInitialLoadSqlFor(Node node, TriggerRouter trigger, Table table, TriggerHistory triggerHistory, Channel channel);
public String createInitialLoadSqlFor(Node node, TriggerRouter trigger, Table table, TriggerHistory triggerHistory, Channel channel, String overrideSelectSql);

public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerHistory triggerHistory);

Expand Down
Expand Up @@ -220,7 +220,7 @@ public void extractConfigurationStandalone(Node targetNode, Writer writer,
if (!triggerRouter.getTrigger().getSourceTableName()
.endsWith(TableConstants.SYM_NODE_IDENTITY)) {
initialLoadEvents.add(new SelectFromTableEvent(targetNode, triggerRouter,
triggerHistory));
triggerHistory, null));
} else {
Data data = new Data(1, null, targetNode.getNodeId(), DataEventType.INSERT,
triggerHistory.getSourceTableName(), null, triggerHistory,
Expand Down Expand Up @@ -789,7 +789,7 @@ public CsvData next() {
TriggerRouter triggerRouter = triggerRouterService.getTriggerRouterForCurrentNode(triggerId, routerId, false);
if (triggerRouter != null) {
SelectFromTableEvent event = new SelectFromTableEvent(targetNode,
triggerRouter, triggerHistory);
triggerRouter, triggerHistory, data.getRowData());
this.reloadSource = new SelectFromTableSource(outgoingBatch, batch,
event);
data = (Data) this.reloadSource.next();
Expand Down Expand Up @@ -953,8 +953,8 @@ protected CsvData selectNext() {
this.sourceTable = lookupAndOrderColumnsAccordingToTriggerHistory(
triggerRouter.getRouter().getRouterId(), history, false);
this.targetTable = lookupAndOrderColumnsAccordingToTriggerHistory(
triggerRouter.getRouter().getRouterId(), history, true);
this.startNewCursor(history, triggerRouter);
triggerRouter.getRouter().getRouterId(), history, true);
this.startNewCursor(history, triggerRouter, this.currentInitialLoadEvent.getInitialLoadSelect());

}

Expand All @@ -980,12 +980,12 @@ protected void closeCursor() {
}

protected void startNewCursor(final TriggerHistory triggerHistory,
final TriggerRouter triggerRouter) {
final TriggerRouter triggerRouter, String overrideSelectSql) {
final int expectedCommaCount = triggerHistory.getParsedColumnNames().length - 1;
String initialLoadSql = symmetricDialect.createInitialLoadSqlFor(
this.currentInitialLoadEvent.getNode(), triggerRouter, this.sourceTable,
triggerHistory,
configurationService.getChannel(triggerRouter.getTrigger().getChannelId()));
configurationService.getChannel(triggerRouter.getTrigger().getChannelId()), overrideSelectSql);
this.cursor = sqlTemplate.queryForCursor(initialLoadSql, new ISqlRowMapper<Data>() {
public Data mapRow(Row rs) {
String csvRow = rs.stringValue();
Expand Down Expand Up @@ -1028,11 +1028,13 @@ class SelectFromTableEvent {
private TriggerHistory triggerHistory;
private Node node;
private Data data;
private String initialLoadSelect;

public SelectFromTableEvent(Node node, TriggerRouter triggerRouter,
TriggerHistory triggerHistory) {
TriggerHistory triggerHistory, String initialLoadSelect) {
this.node = node;
this.triggerRouter = triggerRouter;
this.initialLoadSelect = initialLoadSelect;
Trigger trigger = triggerRouter.getTrigger();
this.triggerHistory = triggerHistory != null ? triggerHistory : triggerRouterService
.getNewestTriggerHistoryForTrigger(trigger.getTriggerId(),
Expand Down Expand Up @@ -1064,6 +1066,10 @@ public Node getNode() {
public boolean containsData() {
return data != null;
}

public String getInitialLoadSelect() {
return initialLoadSelect;
}

}

Expand Down
Expand Up @@ -164,7 +164,7 @@ public void testInitialLoadSql() throws Exception {
triggerRouterService.getNewestTriggerHistoryForTrigger(trigger.getTriggerId(),
trigger.getSourceCatalogName(), trigger.getSourceSchemaName(),
trigger.getSourceTableName()),
getConfigurationService().getChannel(triggerRouter.getTrigger().getChannelId()));
getConfigurationService().getChannel(triggerRouter.getTrigger().getChannelId()), null);
List<String> csvStrings = getSqlTemplate().query(sql, new StringMapper());
assertTrue(csvStrings.size() > 0);
String csvString = csvStrings.get(0);
Expand Down

0 comments on commit 058cfb5

Please sign in to comment.