Skip to content

Commit

Permalink
0005676: Initial load sends too many tables when using
Browse files Browse the repository at this point in the history
$(targetExternalId) variable
  • Loading branch information
erilong committed Feb 1, 2023
1 parent 385f8e8 commit 2df0244
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 15 deletions.
Expand Up @@ -254,13 +254,14 @@ public String getTriggerName(DataEventType dml, int maxTriggerNameLength, Trigge
public Map<Trigger, Exception> getFailedTriggers();

public Map<Integer, List<TriggerRouter>> fillTriggerRoutersByHistIdAndSortHist(
String sourceNodeGroupId, String targetNodeGroupId, List<TriggerHistory> triggerHistories, List<TriggerRouter> triggerRouters);
String sourceNodeGroupId, String targetNodeGroupId, String targetExternalId, List<TriggerHistory> triggerHistories,
List<TriggerRouter> triggerRouters);

public Map<Integer, List<TriggerRouter>> fillTriggerRoutersByHistIdAndSortHist(
String sourceNodeGroupId, String targetNodeGroupId, List<TriggerHistory> triggerHistories);
String sourceNodeGroupId, String targetNodeGroupId, String targetExternalId, List<TriggerHistory> triggerHistories);

public Map<Integer, List<TriggerRouter>> fillTriggerRoutersByHistId(
String sourceNodeGroupId, String targetNodeGroupId, List<TriggerHistory> triggerHistories);
String sourceNodeGroupId, String targetNodeGroupId, String targetExternalId, List<TriggerHistory> triggerHistories);

public TriggerHistory findTriggerHistoryForGenericSync();

Expand Down
Expand Up @@ -1963,7 +1963,7 @@ protected void checkSendDeferredConstraints(ExtractRequest request, List<Extract
if (histories != null && histories.size() > 0) {
for (TriggerHistory history : histories) {
Channel channel = configurationService.getChannel(trigger.getReloadChannelId());
if (!channel.isFileSyncFlag()) {
if (!channel.isFileSyncFlag() && history.getSourceTableName().equalsIgnoreCase(request.getTableName())) {
Data data = new Data(history.getSourceTableName(), DataEventType.CREATE, null, String.valueOf(request.getLoadId()),
history, trigger.getChannelId(), null, null);
data.setNodeList(targetNode.getNodeId());
Expand Down
Expand Up @@ -946,7 +946,7 @@ public Map<Integer, ExtractRequest> insertReloadEvents(Node targetNode, boolean
}
Map<Integer, List<TriggerRouter>> triggerRoutersByHistoryId = triggerRouterService
.fillTriggerRoutersByHistIdAndSortHist(sourceNode.getNodeGroupId(),
targetNode.getNodeGroupId(), triggerHistories, triggerRouters);
targetNode.getNodeGroupId(), targetNode.getExternalId(), triggerHistories, triggerRouters);
if (isFullLoad) {
if (!reverse) {
nodeService.setInitialLoadEnabled(transaction, nodeIdRecord, false, true, loadId, createBy);
Expand Down Expand Up @@ -2296,7 +2296,7 @@ protected String reloadTable(String nodeId, String catalogName, String schemaNam
catalogName, schemaName, tableName);
Map<Integer, List<TriggerRouter>> triggerRoutersByHistoryId = triggerRouterService
.fillTriggerRoutersByHistIdAndSortHist(sourceNode.getNodeGroupId(),
targetNode.getNodeGroupId(), triggerHistories);
targetNode.getNodeGroupId(), targetNode.getExternalId(), triggerHistories);
int eventCount = 0;
ISqlTransaction transaction = null;
try {
Expand Down Expand Up @@ -2468,6 +2468,10 @@ public void reloadMissingForeignKeyRows(long batchId, String nodeId, long dataId
protected void reloadMissingForeignKeyRows(Data data, long batchId, String nodeId, long dataId, long rowNumber) {
String batchName = nodeId + "-" + batchId + " " + (dataId == -1 ? "row " + rowNumber : "data " + dataId);
try {
if (data == null) {
log.warn("Unable to reload missing foreign data for data ID {} because data is not found", dataId);
return;
}
log.debug("reloadMissingForeignKeyRows for batch {} table {}", batchName, data.getTableName());
TriggerHistory hist = data.getTriggerHistory();
IDatabasePlatform targetPlatform = getTargetPlatform(data.getTableName());
Expand Down
Expand Up @@ -2580,16 +2580,17 @@ public TriggerHistory findTriggerHistoryForGenericSync() {

@Override
public Map<Integer, List<TriggerRouter>> fillTriggerRoutersByHistIdAndSortHist(
String sourceNodeGroupId, String targetNodeGroupId, List<TriggerHistory> triggerHistories) {
return fillTriggerRoutersByHistIdAndSortHist(sourceNodeGroupId, targetNodeGroupId, triggerHistories, getAllTriggerRoutersForReloadForCurrentNode(
sourceNodeGroupId, targetNodeGroupId));
String sourceNodeGroupId, String targetNodeGroupId, String targetExternalId, List<TriggerHistory> triggerHistories) {
return fillTriggerRoutersByHistIdAndSortHist(sourceNodeGroupId, targetNodeGroupId, targetExternalId, triggerHistories,
getAllTriggerRoutersForReloadForCurrentNode(sourceNodeGroupId, targetNodeGroupId));
}

@Override
public Map<Integer, List<TriggerRouter>> fillTriggerRoutersByHistIdAndSortHist(
String sourceNodeGroupId, String targetNodeGroupId, List<TriggerHistory> triggerHistories, List<TriggerRouter> triggerRouters) {
String sourceNodeGroupId, String targetNodeGroupId, String targetExternalId, List<TriggerHistory> triggerHistories,
List<TriggerRouter> triggerRouters) {
final Map<Integer, List<TriggerRouter>> triggerRoutersByHistoryId = fillTriggerRoutersByHistId(
sourceNodeGroupId, targetNodeGroupId, triggerHistories, triggerRouters);
sourceNodeGroupId, targetNodeGroupId, targetExternalId, triggerHistories, triggerRouters);
final List<Table> sortedTables = getSortedTablesFor(triggerHistories);
Comparator<TriggerHistory> comparator = new Comparator<TriggerHistory>() {
public int compare(TriggerHistory o1, TriggerHistory o2) {
Expand Down Expand Up @@ -2636,13 +2637,14 @@ public int compare(TriggerHistory o1, TriggerHistory o2) {

@Override
public Map<Integer, List<TriggerRouter>> fillTriggerRoutersByHistId(
String sourceNodeGroupId, String targetNodeGroupId, List<TriggerHistory> triggerHistories) {
return fillTriggerRoutersByHistId(sourceNodeGroupId, targetNodeGroupId, triggerHistories, getAllTriggerRoutersForReloadForCurrentNode(
String sourceNodeGroupId, String targetNodeGroupId, String targetExternalId, List<TriggerHistory> triggerHistories) {
return fillTriggerRoutersByHistId(sourceNodeGroupId, targetNodeGroupId, targetExternalId, triggerHistories, getAllTriggerRoutersForReloadForCurrentNode(
sourceNodeGroupId, targetNodeGroupId));
}

protected Map<Integer, List<TriggerRouter>> fillTriggerRoutersByHistId(
String sourceNodeGroupId, String targetNodeGroupId, List<TriggerHistory> triggerHistories, List<TriggerRouter> triggerRouters) {
String sourceNodeGroupId, String targetNodeGroupId, String targetExternalId, List<TriggerHistory> triggerHistories,
List<TriggerRouter> triggerRouters) {
triggerRouters = new ArrayList<TriggerRouter>(triggerRouters);
Map<Integer, List<TriggerRouter>> triggerRoutersByHistoryId = new HashMap<Integer, List<TriggerRouter>>(
triggerHistories.size());
Expand All @@ -2653,7 +2655,11 @@ protected Map<Integer, List<TriggerRouter>> fillTriggerRoutersByHistId(
String triggerId = triggerHistory.getTriggerId();
for (TriggerRouter triggerRouter : triggerRouters) {
if (triggerRouter.getTrigger().getTriggerId().equals(triggerId)) {
triggerRoutersForTriggerHistory.add(triggerRouter);
if (!triggerRouter.getTrigger().getSourceTableName().contains("$(targetExternalId)")
|| triggerRouter.getTrigger().getSourceTableName().replace("$(targetExternalId)", targetExternalId)
.equalsIgnoreCase(triggerHistory.getSourceTableName())) {
triggerRoutersForTriggerHistory.add(triggerRouter);
}
}
}
}
Expand Down

0 comments on commit 2df0244

Please sign in to comment.