Skip to content

Commit

Permalink
0000841: 2 triggers for the same table that sync different subsets ca…
Browse files Browse the repository at this point in the history
…n end up causing errors
  • Loading branch information
chenson42 committed Oct 2, 2012
1 parent 29d476f commit 0d19f3c
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 65 deletions.
Expand Up @@ -638,68 +638,37 @@ protected TransformWriter createTransformDataWriter(Node identity, Node targetNo
return transformExtractWriter;
}

protected boolean doesCurrentTableMatch(String routerId, TriggerHistory triggerHistory,
Table currentTable, boolean setTargetTableName) {
if (currentTable != null) {
String catalogName = triggerHistory.getSourceCatalogName();
String schemaName = triggerHistory.getSourceSchemaName();
String tableName = triggerHistory.getSourceTableName();
Router router = triggerRouterService.getRouterById(routerId, false);
if (router != null && setTargetTableName) {
if (StringUtils.isNotBlank(router.getTargetCatalogName())) {
catalogName = router.getTargetCatalogName();
}

if (StringUtils.isNotBlank(router.getTargetSchemaName())) {
schemaName = router.getTargetSchemaName();
}

if (StringUtils.isNotBlank(router.getTargetTableName())) {
tableName = router.getTargetTableName();
}
}

return currentTable.getFullyQualifiedTableName().equals(
Table.getFullyQualifiedTableName(catalogName, schemaName, tableName));

} else {
return false;
}
}

protected Table lookupAndOrderColumnsAccordingToTriggerHistory(String routerId,
TriggerHistory triggerHistory, Table currentTable, boolean setTargetTableName) {
TriggerHistory triggerHistory, boolean setTargetTableName) {
String catalogName = triggerHistory.getSourceCatalogName();
String schemaName = triggerHistory.getSourceSchemaName();
String tableName = triggerHistory.getSourceTableName();
if (!doesCurrentTableMatch(routerId, triggerHistory, currentTable, setTargetTableName)) {
currentTable = symmetricDialect.getPlatform().getTableFromCache(catalogName,
schemaName, tableName, false);
if (currentTable == null) {
throw new RuntimeException(String.format(
"Could not find the table, %s, to extract",
Table.getFullyQualifiedTableName(catalogName, schemaName, tableName)));
Table table = symmetricDialect.getPlatform().getTableFromCache(catalogName, schemaName,
tableName, false);
if (table == null) {
throw new RuntimeException(String.format("Could not find the table, %s, to extract",
Table.getFullyQualifiedTableName(catalogName, schemaName, tableName)));
}
table = table.copyAndFilterColumns(triggerHistory.getParsedColumnNames(),
triggerHistory.getParsedPkColumnNames(), true);
table.setCatalog(catalogName);
table.setSchema(schemaName);

Router router = triggerRouterService.getRouterById(routerId, false);
if (router != null && setTargetTableName) {
if (StringUtils.isNotBlank(router.getTargetCatalogName())) {
table.setCatalog(router.getTargetCatalogName());
}
currentTable = currentTable.copyAndFilterColumns(triggerHistory.getParsedColumnNames(), triggerHistory.getParsedPkColumnNames(), true);
currentTable.setCatalog(catalogName);
currentTable.setSchema(schemaName);

Router router = triggerRouterService.getRouterById(routerId, false);
if (router != null && setTargetTableName) {
if (StringUtils.isNotBlank(router.getTargetCatalogName())) {
currentTable.setCatalog(router.getTargetCatalogName());
}

if (StringUtils.isNotBlank(router.getTargetSchemaName())) {
currentTable.setSchema(router.getTargetSchemaName());
}
if (StringUtils.isNotBlank(router.getTargetSchemaName())) {
table.setSchema(router.getTargetSchemaName());
}

if (StringUtils.isNotBlank(router.getTargetTableName())) {
currentTable.setName(router.getTargetTableName());
}
if (StringUtils.isNotBlank(router.getTargetTableName())) {
table.setName(router.getTargetTableName());
}
}
return currentTable;
return table;
}

class SelectFromSymDataSource implements IExtractDataReaderSource {
Expand All @@ -709,6 +678,8 @@ class SelectFromSymDataSource implements IExtractDataReaderSource {
private OutgoingBatch outgoingBatch;

private Table currentTable;

private TriggerHistory lastTriggerHistory;

private boolean requiresLobSelectedFromSource;

Expand Down Expand Up @@ -752,7 +723,6 @@ public CsvData next() {
if (data == null) {
data = this.cursor.next();
if (data != null) {
// TODO add null checks
TriggerHistory triggerHistory = data.getTriggerHistory();
String routerId = data.getAttribute(CsvData.ATTRIBUTE_ROUTER_ID);

Expand All @@ -774,9 +744,12 @@ public CsvData next() {
Trigger trigger = triggerRouterService.getTriggerById(
triggerHistory.getTriggerId(), false);
if (trigger != null) {
this.currentTable = lookupAndOrderColumnsAccordingToTriggerHistory(
routerId, triggerHistory, currentTable, true);
this.requiresLobSelectedFromSource = trigger.isUseStreamLobs();
if ((lastTriggerHistory == null || lastTriggerHistory
.getTriggerHistoryId() != triggerHistory.getTriggerHistoryId())) {
this.currentTable = lookupAndOrderColumnsAccordingToTriggerHistory(
routerId, triggerHistory, true);
this.requiresLobSelectedFromSource = trigger.isUseStreamLobs();
}
} else {
log.error(
"Could not locate a trigger with the id of {} for {}. It was recorded in the hist table with a hist id of {}",
Expand All @@ -785,6 +758,8 @@ public CsvData next() {
triggerHistory.getTriggerHistoryId() });
}
}

lastTriggerHistory = triggerHistory;
} else {
closeCursor();
}
Expand Down Expand Up @@ -891,18 +866,18 @@ protected CsvData selectNext() {
this.currentInitialLoadEvent = null;
this.currentTable = lookupAndOrderColumnsAccordingToTriggerHistory(
(String) data.getAttribute(CsvData.ATTRIBUTE_ROUTER_ID), history,
currentTable, true);
true);
} else {
this.triggerRouter = this.currentInitialLoadEvent.getTriggerRouter();
NodeChannel channel = batch != null ? configurationService.getNodeChannel(
batch.getChannelId(), false) : new NodeChannel(this.triggerRouter
.getTrigger().getChannelId());
this.routingContext = new SimpleRouterContext(batch.getTargetNodeId(), channel);
this.currentTable = lookupAndOrderColumnsAccordingToTriggerHistory(
triggerRouter.getRouter().getRouterId(), history, currentTable, false);
triggerRouter.getRouter().getRouterId(), history, false);
this.startNewCursor(history, triggerRouter);
this.currentTable = lookupAndOrderColumnsAccordingToTriggerHistory(
triggerRouter.getRouter().getRouterId(), history, currentTable, true);
triggerRouter.getRouter().getRouterId(), history, true);

}

Expand Down
Expand Up @@ -198,6 +198,7 @@ protected Object readNext() {
} else if (tokens[0].equals(CsvConstants.UPDATE)) {
CsvData data = new CsvData();
data.setDataEventType(DataEventType.UPDATE);
// TODO check for invalid range and print results
data.putParsedData(CsvData.ROW_DATA,
CollectionUtils.copyOfRange(tokens, 1, table.getColumnCount() + 1));
data.putParsedData(CsvData.PK_DATA, CollectionUtils.copyOfRange(tokens,
Expand Down
Expand Up @@ -21,10 +21,8 @@
package org.jumpmind.symmetric.io.data.writer;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Column;
Expand All @@ -49,7 +47,7 @@ abstract public class AbstractProtocolDataWriter implements IDataWriter {

protected Table table;

protected Set<Table> processedTables = new HashSet<Table>();
protected Map<String, Table> processedTables = new HashMap<String, Table>();

protected String delimiter = ",";

Expand Down Expand Up @@ -112,11 +110,12 @@ public boolean start(Table table) {
String schemaName = table.getSchema();
println(CsvConstants.SCHEMA, StringUtils.isNotBlank(schemaName) ? schemaName : "");
println(CsvConstants.TABLE, table.getName());
if (!processedTables.contains(table)) {
if (!processedTables.containsKey(table.getName()) ||
!processedTables.get(table.getName()).equals(table)) {
println(CsvConstants.KEYS, table.getPrimaryKeyColumns());
println(CsvConstants.COLUMNS, table.getColumns());
this.processedTables.put(table.getName(), table);
}
this.processedTables.add(table);
return true;
} else {
return false;
Expand Down Expand Up @@ -160,6 +159,10 @@ public void write(CsvData data) {
case SQL:
println(CsvConstants.SQL, data.getCsvData(CsvData.ROW_DATA));
break;

case RELOAD:
default:
break;
}
}
}
Expand Down
Expand Up @@ -46,6 +46,7 @@ public static <T> Map<String, T> toMap(String[] keyNames, T[] values) {
* @throws NullPointerException if <tt>original</tt> is null
* @since 1.6
*/
@SuppressWarnings("unchecked")
public static <T> T[] copyOfRange(T[] original, int from, int to) {
return copyOfRange(original, from, to, (Class<T[]>) original.getClass());
}
Expand Down Expand Up @@ -81,6 +82,7 @@ public static <T> T[] copyOfRange(T[] original, int from, int to) {
* an array of class <tt>newType</tt>.
* @since 1.6
*/
@SuppressWarnings("unchecked")
public static <T,U> T[] copyOfRange(U[] original, int from, int to, Class<? extends T[]> newType) {
int newLength = to - from;
if (newLength < 0)
Expand Down

0 comments on commit 0d19f3c

Please sign in to comment.