Skip to content

Commit

Permalink
0004974: Routing and extracting had X columns but expected Y
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed May 4, 2021
1 parent 411d7fc commit c6c5758
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public enum TriggerReBuildReason {
FORCED("F"),
TRIGGERS_MISSING("T"),
TRIGGER_TEMPLATE_CHANGED("E"),
TRIGGER_HIST_MISSIG("H");
TRIGGER_HIST_MISSING("H");

private String code;

Expand All @@ -54,8 +54,12 @@ public static TriggerReBuildReason fromCode(String code) {
return TABLE_SYNC_CONFIGURATION_CHANGED;
} else if (code.equals(FORCED.code)) {
return FORCED;
} else if (code.equals(TRIGGERS_MISSING.code)) {
return TRIGGERS_MISSING;
} else if (code.equals(TRIGGER_TEMPLATE_CHANGED.code)) {
return TRIGGER_TEMPLATE_CHANGED;
} else if (code.equals(TRIGGER_HIST_MISSING.code)) {
return TRIGGER_HIST_MISSING;
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3245,14 +3245,14 @@ public Data mapRow(Row row) {
List<TriggerHistory> activeTriggerHistories = engine.getTriggerRouterService().getActiveTriggerHistories();
triggerHistory = new TriggerHistory(table, trigger, engine.getSymmetricDialect().getTriggerTemplate());
triggerHistory.setTriggerHistoryId(triggerHistId);
triggerHistory.setLastTriggerBuildReason(TriggerReBuildReason.TRIGGER_HIST_MISSIG);
triggerHistory.setLastTriggerBuildReason(TriggerReBuildReason.TRIGGER_HIST_MISSING);
triggerHistory.setNameForInsertTrigger(engine.getTriggerRouterService().getTriggerName(DataEventType.INSERT,
symmetricDialect.getMaxTriggerNameLength(), trigger, table, activeTriggerHistories, null));
triggerHistory.setNameForUpdateTrigger(engine.getTriggerRouterService().getTriggerName(DataEventType.UPDATE,
symmetricDialect.getMaxTriggerNameLength(), trigger, table, activeTriggerHistories, null));
triggerHistory.setNameForDeleteTrigger(engine.getTriggerRouterService().getTriggerName(DataEventType.DELETE,
symmetricDialect.getMaxTriggerNameLength(), trigger, table, activeTriggerHistories, null));
log.warn("Could not find a trigger history row for the table {} for data_id {}. \"Attempting\" to generate a new trigger history row", tableName, data.getDataId());
log.warn("Could not find a trigger history row for the table {} for data_id {}. Generating a new trigger history row.", tableName, data.getDataId());
engine.getTriggerRouterService().insert(triggerHistory);
} else {
if (missingConfigTriggerHist == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.jumpmind.symmetric.model.Router;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.model.TriggerReBuildReason;
import org.jumpmind.symmetric.model.TriggerRouter;
import org.jumpmind.symmetric.route.AbstractFileParsingRouter;
import org.jumpmind.symmetric.route.AuditTableDataRouter;
Expand Down Expand Up @@ -111,6 +112,8 @@ public class RouterService extends AbstractService implements IRouterService {

protected Map<String, CounterStat> invalidRouterType = new HashMap<String, CounterStat>();

protected Map<Integer, CounterStat> missingColumns = new HashMap<Integer, CounterStat>();

protected long triggerRouterCacheTime = 0;

protected Map<String, Boolean> commonBatchesLastKnownState = new HashMap<String, Boolean>();
Expand Down Expand Up @@ -247,6 +250,15 @@ synchronized public long routeData(boolean force) {
data.getTableName(), data.getDataId(), data.getTriggerHistory().getTriggerHistoryId(), counterStat.getCount());
}
missingTriggerRouter.clear();

for (CounterStat counterStat : missingColumns.values()) {
Data data = (Data) counterStat.getObject();
log.warn("Ignoring data captured for table '{}' with trigger hist id {} because the number of columns and values don't match. "
+ "This can happen when you manually remove rows from sym_trigger_hist. "
+ "Starting with data id {}, there were {} occurrences.",
data.getTableName(), data.getTriggerHistory().getTriggerHistoryId(), data.getDataId(), counterStat.getCount());
}
missingColumns.clear();
}
}
}
Expand Down Expand Up @@ -866,6 +878,14 @@ protected int routeData(ProcessInfo processInfo, Data data, ChannelRouterContext
"None of the target nodes specified in the data.node_list field ({}) were qualified nodes. Data id {} for table '{}' will not be routed using the {} router",
new Object[] {targetNodeIds, data.getDataId(), data.getTableName(), triggerRouter.getRouter().getRouterId() });
}
} else if (data.getTriggerHistory().getLastTriggerBuildReason() == TriggerReBuildReason.TRIGGER_HIST_MISSING && !doesColumnCountMatchValues(dataMetaData, data)) {
Integer triggerHistId = data.getTriggerHistory().getTriggerHistoryId();
CounterStat counterStat = missingColumns.get(triggerHistId);
if (counterStat == null) {
counterStat = new CounterStat(data);
missingColumns.put(triggerHistId, counterStat);
}
counterStat.incrementCount();
} else {
try {
IDataRouter dataRouter = getDataRouter(triggerRouter.getRouter());
Expand All @@ -886,12 +906,8 @@ protected int routeData(ProcessInfo processInfo, Data data, ChannelRouterContext
throw ex;
}

StringBuilder failureMessage = new StringBuilder(
"Failed to route data: ");
failureMessage.append(data.getDataId());
failureMessage.append(" for table: ");
failureMessage.append(data.getTableName());
failureMessage.append(".\n");
StringBuilder failureMessage = new StringBuilder("Failed to route data: ");
failureMessage.append(data.getDataId()).append(" for table: ").append(data.getTableName()).append(".\n");
data.writeCsvDataDetails(failureMessage);
throw new SymmetricException(failureMessage.toString(), ex);
}
Expand Down Expand Up @@ -1166,4 +1182,17 @@ protected Table buildTableFromTriggerHistory(TriggerHistory triggerHistory) {
return table;
}

protected boolean doesColumnCountMatchValues(DataMetaData dataMetaData, Data data) {
if (data.getCreateTime() == null || data.getTriggerHistory().getCreateTime() == null ||
data.getTriggerHistory().getCreateTime().compareTo(data.getCreateTime()) > 0) {
String[] rowData = null;
if (dataMetaData.getData().getDataEventType() == DataEventType.DELETE) {
rowData = dataMetaData.getData().toParsedOldData();
} else {
rowData = dataMetaData.getData().toParsedRowData();
}
return dataMetaData.getTable().getColumnCount() == rowData.length;
}
return true;
}
}

0 comments on commit c6c5758

Please sign in to comment.