Skip to content

Commit

Permalink
0006173: Could not find trigger history, causes error of had X columns
Browse files Browse the repository at this point in the history
but expected Y
  • Loading branch information
erilong committed Jan 3, 2024
1 parent 280b3ea commit f70233d
Showing 1 changed file with 36 additions and 9 deletions.
Expand Up @@ -3268,6 +3268,7 @@ protected void checkInterrupted() throws InterruptedException {
public class DataMapper implements ISqlRowMapper<Data> {
private List<TriggerRouter> triggerRouters;
private List<TriggerHistory> activeTriggerHistories;
private Collection<TriggerHistory> allTriggerHistories;
private HashMap<String, TriggerHistory> mismatchedTableName;
private HashSet<Integer> missingConfigTriggerHist;
private HashSet<Integer> mismatchedTriggerHist;
Expand All @@ -3294,7 +3295,7 @@ public Data mapRow(Row row) {
data.putAttribute(CsvData.ATTRIBUTE_TABLE_ID, triggerHistId);
TriggerHistory triggerHistory = engine.getTriggerRouterService().getTriggerHistory(triggerHistId);
if (triggerHistory == null) {
triggerHistory = findOrCreateTriggerHistory(tableName, triggerHistId, data.getDataId(), false);
triggerHistory = findOrCreateTriggerHistory(tableName, triggerHistId, data, false);
} else if (!triggerHistory.getSourceTableName().equals(tableName)) {
if (mismatchedTableName == null) {
mismatchedTableName = new HashMap<String, TriggerHistory>();
Expand All @@ -3305,7 +3306,7 @@ public Data mapRow(Row row) {
+ "table name {} for data_id {}. Attempting to look up a valid trigger_hist row by table name",
new Object[] { data.getTableName(),
triggerHistory.getSourceTableName(), data.getDataId() });
triggerHistory = findOrCreateTriggerHistory(tableName, triggerHistId, data.getDataId(), true);
triggerHistory = findOrCreateTriggerHistory(tableName, triggerHistId, data, true);
mismatchedTableName.put(data.getTableName(), triggerHistory);
} else {
triggerHistory = cachedTriggerHistory;
Expand All @@ -3316,10 +3317,11 @@ public Data mapRow(Row row) {
return data;
}

private TriggerHistory findOrCreateTriggerHistory(String tableName, int triggerHistId, long dataId, boolean isExistingTriggerHist) {
private TriggerHistory findOrCreateTriggerHistory(String tableName, int triggerHistId, Data data, boolean isExistingTriggerHist) {
TriggerHistory triggerHistory = null;
Trigger trigger = null;
Table table = null;
long dataId = data.getDataId();
if (triggerRouters == null) {
triggerRouters = engine.getTriggerRouterService().getAllTriggerRoutersForCurrentNode(engine.getNodeService().findIdentity().getNodeGroupId());
}
Expand All @@ -3333,13 +3335,9 @@ private TriggerHistory findOrCreateTriggerHistory(String tableName, int triggerH
if (table != null && trigger != null) {
if (activeTriggerHistories == null) {
activeTriggerHistories = engine.getTriggerRouterService().getActiveTriggerHistories();
allTriggerHistories = engine.getTriggerRouterService().getHistoryRecords().values();
}
for (TriggerHistory hist : activeTriggerHistories) {
if (hist.getTriggerId().equals(trigger.getTriggerId()) && hist.getSourceTableName().equalsIgnoreCase(tableName)) {
triggerHistory = hist;
break;
}
}
triggerHistory = findMatchingTriggerHistory(trigger, data, tableName);
if (triggerHistory == null) {
triggerHistory = new TriggerHistory(table, trigger, engine.getSymmetricDialect().getTriggerTemplate());
triggerHistory.setTriggerHistoryId(isExistingTriggerHist ? 0 : triggerHistId);
Expand All @@ -3355,6 +3353,7 @@ private TriggerHistory findOrCreateTriggerHistory(String tableName, int triggerH
triggerHistId, tableName, dataId);
engine.getTriggerRouterService().insert(triggerHistory);
activeTriggerHistories.add(triggerHistory);
allTriggerHistories.add(triggerHistory);
} else {
if (mismatchedTriggerHist == null) {
mismatchedTriggerHist = new HashSet<Integer>();
Expand All @@ -3377,6 +3376,34 @@ private TriggerHistory findOrCreateTriggerHistory(String tableName, int triggerH
}
return triggerHistory;
}

private TriggerHistory findMatchingTriggerHistory(Trigger trigger, Data data, String tableName) {
TriggerHistory triggerHistory = null;
int columnCount = 0, pkColumnCount = 0;
if (data.getDataEventType() == DataEventType.INSERT || data.getDataEventType() == DataEventType.UPDATE) {
columnCount = data.getParsedData(CsvData.ROW_DATA).length;
}
if (data.getDataEventType() == DataEventType.DELETE || data.getDataEventType() == DataEventType.UPDATE) {
pkColumnCount = data.getParsedData(CsvData.PK_DATA).length;
}
for (TriggerHistory hist : allTriggerHistories) {
if (hist.getTriggerId().equals(trigger.getTriggerId()) && hist.getSourceTableName().equalsIgnoreCase(tableName)
&& (columnCount == 0 || columnCount == hist.getParsedColumnNames().length)
&& (pkColumnCount == 0 || pkColumnCount == hist.getParsedPkColumnNames().length)) {
triggerHistory = hist;
break;
}
}
if (triggerHistory == null) {
for (TriggerHistory hist : activeTriggerHistories) {
if (hist.getTriggerId().equals(trigger.getTriggerId()) && hist.getSourceTableName().equalsIgnoreCase(tableName)) {
triggerHistory = hist;
break;
}
}
}
return triggerHistory;
}
}

public int resendBatchAsReload(long batchId, String nodeId) {
Expand Down

0 comments on commit f70233d

Please sign in to comment.