Skip to content

Commit

Permalink
0005667: Initial load sends too many tables when using
Browse files Browse the repository at this point in the history
$(targetExternalId) variable
  • Loading branch information
erilong committed Feb 1, 2023
1 parent 0c02ce0 commit 9cdf060
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 16 deletions.
Expand Up @@ -206,13 +206,13 @@ public String getTriggerName(DataEventType dml, int maxTriggerNameLength, Trigge
public Map<Trigger, Exception> getFailedTriggers();

public Map<Integer, List<TriggerRouter>> fillTriggerRoutersByHistIdAndSortHist(
String sourceNodeGroupId, String targetNodeGroupId, List<TriggerHistory> triggerHistories, List<TriggerRouter> triggerRouters);
String sourceNodeGroupId, String targetNodeGroupId, String targetExternalId, List<TriggerHistory> triggerHistories, List<TriggerRouter> triggerRouters);

public Map<Integer, List<TriggerRouter>> fillTriggerRoutersByHistIdAndSortHist(
String sourceNodeGroupId, String targetNodeGroupId, List<TriggerHistory> triggerHistories);
String sourceNodeGroupId, String targetNodeGroupId, String targetExternalId, List<TriggerHistory> triggerHistories);

public Map<Integer, List<TriggerRouter>> fillTriggerRoutersByHistId(
String sourceNodeGroupId, String targetNodeGroupId, List<TriggerHistory> triggerHistories);
String sourceNodeGroupId, String targetNodeGroupId, String targetExternalId, List<TriggerHistory> triggerHistories);

public TriggerHistory findTriggerHistoryForGenericSync();

Expand Down
Expand Up @@ -2301,7 +2301,7 @@ protected void checkSendDeferredConstraints(ExtractRequest request, List<Extract
if (histories != null && histories.size() > 0) {
for (TriggerHistory history : histories) {
Channel channel = configurationService.getChannel(trigger.getReloadChannelId());
if (!channel.isFileSyncFlag()) {
if (!channel.isFileSyncFlag() && history.getSourceTableName().equalsIgnoreCase(request.getTableName())) {
Data data = new Data(history.getSourceTableName(), DataEventType.CREATE, null, String.valueOf(request.getLoadId()),
history, trigger.getChannelId(), null, null);
data.setNodeList(targetNode.getNodeId());
Expand Down
Expand Up @@ -962,7 +962,7 @@ public Map<Integer, ExtractRequest> insertReloadEvents(Node targetNode, boolean

Map<Integer, List<TriggerRouter>> triggerRoutersByHistoryId = triggerRouterService
.fillTriggerRoutersByHistIdAndSortHist(sourceNode.getNodeGroupId(),
targetNode.getNodeGroupId(), triggerHistories, triggerRouters);
targetNode.getNodeGroupId(), targetNode.getExternalId(), triggerHistories, triggerRouters);

if (isFullLoad) {
if (!reverse) {
Expand Down Expand Up @@ -2412,7 +2412,7 @@ protected String reloadTable(String nodeId, String catalogName, String schemaNam
catalogName, schemaName, tableName);
Map<Integer, List<TriggerRouter>> triggerRoutersByHistoryId = triggerRouterService
.fillTriggerRoutersByHistIdAndSortHist(sourceNode.getNodeGroupId(),
targetNode.getNodeGroupId(), triggerHistories);
targetNode.getNodeGroupId(), targetNode.getExternalId(), triggerHistories);
int eventCount = 0;
ISqlTransaction transaction = null;
try {
Expand Down Expand Up @@ -2598,6 +2598,10 @@ public void reloadMissingForeignKeyRows(long batchId, String nodeId, long dataId
protected void reloadMissingForeignKeyRows(Data data, long batchId, String nodeId, long dataId, long rowNumber) {
String batchName = nodeId + "-" + batchId + " " + (dataId == -1 ? "row " + rowNumber : "data " + dataId);
try {
if (data == null) {
log.warn("Unable to reload missing foreign data for data ID {} because data is not found", dataId);
return;
}
log.debug("reloadMissingForeignKeyRows for batch {} table {}", batchName, data.getTableName());
TriggerHistory hist = data.getTriggerHistory();
IDatabasePlatform targetPlatform = getTargetPlatform(data.getTableName());
Expand Down
Expand Up @@ -2476,18 +2476,18 @@ public TriggerHistory findTriggerHistoryForGenericSync() {

@Override
public Map<Integer, List<TriggerRouter>> fillTriggerRoutersByHistIdAndSortHist(
String sourceNodeGroupId, String targetNodeGroupId, List<TriggerHistory> triggerHistories) {
return fillTriggerRoutersByHistIdAndSortHist(sourceNodeGroupId, targetNodeGroupId, triggerHistories, getAllTriggerRoutersForReloadForCurrentNode(
String sourceNodeGroupId, String targetNodeGroupId, String targetExternalId, List<TriggerHistory> triggerHistories) {
return fillTriggerRoutersByHistIdAndSortHist(sourceNodeGroupId, targetNodeGroupId, targetExternalId, triggerHistories, getAllTriggerRoutersForReloadForCurrentNode(
sourceNodeGroupId, targetNodeGroupId));
}

@Override
public Map<Integer, List<TriggerRouter>> fillTriggerRoutersByHistIdAndSortHist(
String sourceNodeGroupId, String targetNodeGroupId, List<TriggerHistory> triggerHistories, List<TriggerRouter> triggerRouters) {
String sourceNodeGroupId, String targetNodeGroupId, String targetExternalId, List<TriggerHistory> triggerHistories, List<TriggerRouter> triggerRouters) {


final Map<Integer, List<TriggerRouter>> triggerRoutersByHistoryId = fillTriggerRoutersByHistId(
sourceNodeGroupId, targetNodeGroupId, triggerHistories, triggerRouters);
sourceNodeGroupId, targetNodeGroupId, targetExternalId, triggerHistories, triggerRouters);
final List<Table> sortedTables = getSortedTablesFor(triggerHistories);

Comparator<TriggerHistory> comparator = new Comparator<TriggerHistory>() {
Expand Down Expand Up @@ -2535,13 +2535,13 @@ public int compare(TriggerHistory o1, TriggerHistory o2) {

@Override
public Map<Integer, List<TriggerRouter>> fillTriggerRoutersByHistId(
String sourceNodeGroupId, String targetNodeGroupId, List<TriggerHistory> triggerHistories) {
return fillTriggerRoutersByHistId(sourceNodeGroupId, targetNodeGroupId, triggerHistories, getAllTriggerRoutersForReloadForCurrentNode(
String sourceNodeGroupId, String targetNodeGroupId, String targetExternalId, List<TriggerHistory> triggerHistories) {
return fillTriggerRoutersByHistId(sourceNodeGroupId, targetNodeGroupId, targetExternalId, triggerHistories, getAllTriggerRoutersForReloadForCurrentNode(
sourceNodeGroupId, targetNodeGroupId));
}

protected Map<Integer, List<TriggerRouter>> fillTriggerRoutersByHistId(
String sourceNodeGroupId, String targetNodeGroupId, List<TriggerHistory> triggerHistories, List<TriggerRouter> triggerRouters) {
String sourceNodeGroupId, String targetNodeGroupId, String targetExternalId, List<TriggerHistory> triggerHistories, List<TriggerRouter> triggerRouters) {

triggerRouters = new ArrayList<TriggerRouter>(triggerRouters);

Expand All @@ -2556,7 +2556,11 @@ protected Map<Integer, List<TriggerRouter>> fillTriggerRoutersByHistId(
String triggerId = triggerHistory.getTriggerId();
for (TriggerRouter triggerRouter : triggerRouters) {
if (triggerRouter.getTrigger().getTriggerId().equals(triggerId)) {
triggerRoutersForTriggerHistory.add(triggerRouter);
if (!triggerRouter.getTrigger().getSourceTableName().contains("$(targetExternalId)")
|| triggerRouter.getTrigger().getSourceTableName().replace("$(targetExternalId)", targetExternalId)
.equalsIgnoreCase(triggerHistory.getSourceTableName())) {
triggerRoutersForTriggerHistory.add(triggerRouter);
}
}
}
}
Expand Down

0 comments on commit 9cdf060

Please sign in to comment.