Skip to content

Commit

Permalink
0005051: Improve performance with sync trigger call for list of triggers
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Jul 9, 2021
1 parent 37965d2 commit 5883dd1
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 67 deletions.
Expand Up @@ -20,6 +20,7 @@
*/
package org.jumpmind.symmetric.load;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -40,8 +41,11 @@
import org.jumpmind.symmetric.model.IncomingBatch;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.model.TriggerRouter;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.ITriggerRouterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -203,8 +207,9 @@ private void recordLoadFilterFlushNeeded(DataContext context, Table table) {
}

private void recordSyncNeeded(DataContext context, Table table, CsvData data) {
if (isSyncTriggersNeeded(context, table)) {
context.put(CTX_KEY_RESYNC_NEEDED, true);
if (engine.getParameterService().is(ParameterConstants.AUTO_SYNC_TRIGGERS_AFTER_CONFIG_LOADED) ||
context.getBatch().getBatchId() == Constants.VIRTUAL_BATCH_FOR_REGISTRATION) {
queueSyncTriggers(context, table, data);
}

if (data.getDataEventType() == DataEventType.CREATE) {
Expand Down Expand Up @@ -244,6 +249,72 @@ private void recordSyncNeeded(DataContext context, Table table, CsvData data) {

}

@SuppressWarnings("unchecked")
private void queueSyncTriggers(DataContext context, Table table, CsvData data) {
if ((matchesTable(table, TableConstants.SYM_TRIGGER) || matchesTable(table, TableConstants.SYM_TRIGGER_ROUTER))) {
Object needResync = context.get(CTX_KEY_RESYNC_NEEDED);
if (needResync == null || needResync instanceof Set) {
if (needResync == null) {
needResync = new HashSet<Object>();
context.put(CTX_KEY_RESYNC_NEEDED, needResync);
}

Map<String, String> columnValues = null;
if (data.getDataEventType() == DataEventType.DELETE) {
columnValues = data.toColumnNameValuePairs(table.getPrimaryKeyColumnNames(), CsvData.PK_DATA);
} else {
columnValues = data.toColumnNameValuePairs(table.getColumnNames(), CsvData.ROW_DATA);
}
String triggerId = columnValues.get("TRIGGER_ID");
if (matchesTable(table, TableConstants.SYM_TRIGGER_ROUTER)) {
String routerId = columnValues.get("ROUTER_ID");
TriggerRouter triggerRouter = new TriggerRouter();
triggerRouter.setTriggerId(triggerId);
triggerRouter.setRouterId(routerId);
((Set<Object>) needResync).add(triggerRouter);
} else {
Trigger trigger = new Trigger();
trigger.setTriggerId(triggerId);
((Set<Object>) needResync).add(trigger);
}
}
} else if (matchesTable(table, TableConstants.SYM_ROUTER) || matchesTable(table, TableConstants.SYM_NODE_GROUP_LINK)
|| matchesTable(table, TableConstants.SYM_TRIGGER_ROUTER_GROUPLET) || matchesTable(table, TableConstants.SYM_GROUPLET_LINK)) {
context.put(CTX_KEY_RESYNC_NEEDED, Boolean.TRUE);
}
}

@SuppressWarnings("unchecked")
private List<Trigger> convertNeedsSynced(Object needsSynced) {
if (needsSynced instanceof Set) {
Set<Trigger> triggers = new HashSet<Trigger>();
ITriggerRouterService triggerRouterService = engine.getTriggerRouterService();
triggerRouterService.clearCache();

for (Object object : (Set<Object>) needsSynced) {
Trigger trigger = null;
if (object instanceof Trigger) {
trigger = (Trigger) object;
trigger = triggerRouterService.getTriggerById(trigger.getTriggerId(), false);
} else if (object instanceof TriggerRouter) {
TriggerRouter tr = (TriggerRouter) object;
tr = triggerRouterService.findTriggerRouterById(tr.getTriggerId(), tr.getRouterId(), false);
if (tr != null) {
trigger = tr.getTrigger();
}
}

if (trigger != null) {
triggers.add(trigger);
} else {
return null;
}
}
return new ArrayList<Trigger>(triggers);
}
return null;
}

private void recordJobManagerRestartNeeded(DataContext context, Table table, CsvData data) {
if (isJobManagerRestartNeeded(table, data)) {
context.put(CTX_KEY_RESTART_JOBMANAGER_NEEDED, true);
Expand Down Expand Up @@ -303,17 +374,6 @@ private void recordFileSyncEnabled(DataContext context, Table table, CsvData dat
context.put(CTX_KEY_FILE_SYNC_ENABLED, true);
}
}

private boolean isSyncTriggersNeeded(DataContext context, Table table) {
boolean autoSync = engine.getParameterService().is(ParameterConstants.AUTO_SYNC_TRIGGERS_AFTER_CONFIG_LOADED) ||
context.getBatch().getBatchId() == Constants.VIRTUAL_BATCH_FOR_REGISTRATION;
return autoSync && (matchesTable(table, TableConstants.SYM_TRIGGER)
|| matchesTable(table, TableConstants.SYM_ROUTER)
|| matchesTable(table, TableConstants.SYM_TRIGGER_ROUTER)
|| matchesTable(table, TableConstants.SYM_TRIGGER_ROUTER_GROUPLET)
|| matchesTable(table, TableConstants.SYM_GROUPLET_LINK)
|| matchesTable(table, TableConstants.SYM_NODE_GROUP_LINK));
}

private boolean isGroupletFlushNeeded(Table table) {
return matchesTable(table, TableConstants.SYM_GROUPLET_LINK) ||
Expand Down Expand Up @@ -401,13 +461,18 @@ public void syncEnded(DataContext context, List<IncomingBatch> batchesProcessed,
* No need to sync triggers until the entire sync process has finished just in case there
* are multiple batches that contain configuration changes
*/
if (context.get(CTX_KEY_RESYNC_NEEDED) != null
&& parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) {
Object needsSynced = context.get(CTX_KEY_RESYNC_NEEDED);
if (needsSynced != null && parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) {
log.info("About to syncTriggers because new configuration came through the data loader");
engine.getClusterService().refreshLockEntries(); // Needed in case cluster.lock.enabled changed during config change.
engine.getTriggerRouterService().syncTriggers();
List<Trigger> triggers = convertNeedsSynced(needsSynced);
if (triggers != null && triggers.size() > 0) {
engine.getTriggerRouterService().syncTriggers(triggers, null, false, true);
} else {
engine.getClusterService().refreshLockEntries(); // Needed in case cluster.lock.enabled changed during config change.
engine.getTriggerRouterService().syncTriggers();
engine.getRegistrationService().setAllowClientRegistration(true);
}
context.remove(CTX_KEY_RESYNC_NEEDED);
engine.getRegistrationService().setAllowClientRegistration(true);
}

if (context.get(CTX_KEY_RESYNC_TABLE_NEEDED) != null
Expand Down
Expand Up @@ -20,6 +20,7 @@
*/
package org.jumpmind.symmetric.route;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
Expand Down Expand Up @@ -479,39 +480,21 @@ protected void queueSyncTriggers(SimpleRouterContext routingContext, DataMetaDat
}

Trigger trigger = null;
Date lastUpdateTime = null;
String triggerId = columnValues.get("TRIGGER_ID");
if (tableMatches(dataMetaData, TableConstants.SYM_TRIGGER_ROUTER)) {
String routerId = columnValues.get("ROUTER_ID");
TriggerRouter tr = triggerRouterService.findTriggerRouterById(triggerId,
routerId, refreshCache);
if (tr != null) {
trigger = tr.getTrigger();
lastUpdateTime = tr.getLastUpdateTime();
}
} else {
trigger = triggerRouterService.getTriggerById(triggerId, refreshCache);
if (trigger != null) {
lastUpdateTime = trigger.getLastUpdateTime();
}
}
if (trigger != null) {
List<TriggerHistory> histories = triggerRouterService
.getActiveTriggerHistories(trigger);
boolean sync = false;
if (histories != null && histories.size() > 0) {
for (TriggerHistory triggerHistory : histories) {
if (triggerHistory.getCreateTime().before(lastUpdateTime)) {
sync = true;
}
}
} else {
sync = true;
}

if (sync) {
((Set<Trigger>) needResync).add(trigger);
}
((Set<Trigger>) needResync).add(trigger);
} else {
routingContext.put(CTX_KEY_RESYNC_NEEDED, Boolean.TRUE);
}
}
} else if (tableMatches(dataMetaData, TableConstants.SYM_ROUTER)
Expand All @@ -523,7 +506,6 @@ protected void queueSyncTriggers(SimpleRouterContext routingContext, DataMetaDat
routingContext.put(CTX_KEY_FILE_SYNC_TRIGGERS_NEEDED, Boolean.TRUE);
}
}

}

protected Node findIdentity() {
Expand Down Expand Up @@ -641,18 +623,14 @@ public void contextCommitted(SimpleRouterContext routingContext) {
&& engine.getParameterService().is(ParameterConstants.AUTO_SYNC_TRIGGERS)
&& engine.getParameterService().is(
ParameterConstants.AUTO_SYNC_TRIGGERS_AFTER_CONFIG_CHANGED)) {
if (Boolean.TRUE.equals(needsSynced)) {
log.info("About to syncTriggers because new configuration came through the data router");

log.info("About to syncTriggers because new configuration came through the data router");
@SuppressWarnings("unchecked")
Set<Trigger> triggers = needsSynced instanceof Set ? (Set<Trigger>) needsSynced : null;
if (triggers != null && triggers.size() > 0) {
engine.getTriggerRouterService().syncTriggers(new ArrayList<Trigger>(triggers), null, false, true);
} else {
engine.getTriggerRouterService().syncTriggers();
} else if (needsSynced instanceof Set) {
@SuppressWarnings("unchecked")
Set<Trigger> triggers = (Set<Trigger>) needsSynced;
for (Trigger trigger : triggers) {
log.info("About to sync the "
+ trigger.getTriggerId()
+ " trigger because a change was detected by the config data router");
engine.getTriggerRouterService().syncTrigger(trigger, null, false);
}
}
}

Expand Down
Expand Up @@ -187,6 +187,8 @@ public String getTriggerName(DataEventType dml, int maxTriggerNameLength, Trigge

public void syncTrigger(Trigger trigger, ITriggerCreationListener listener, boolean force, boolean verifyTrigger);

public void syncTriggers(List<Trigger> triggers, ITriggerCreationListener listener, boolean force, boolean verifyInDatabase);

public void syncTriggers(Table table, boolean genAlways);

public void dropTriggers(TriggerHistory history);
Expand Down
Expand Up @@ -1824,44 +1824,66 @@ public void syncTrigger(Trigger trigger, ITriggerCreationListener listener, bool
}

public void syncTrigger(Trigger trigger, ITriggerCreationListener listener, boolean force, boolean verifyInDatabase) {
syncTriggers(Collections.singletonList(trigger), listener, force, verifyInDatabase);
}

public void syncTriggers(List<Trigger> triggers, ITriggerCreationListener listener, boolean force, boolean verifyInDatabase) {
StringBuilder sqlBuffer = new StringBuilder();
clearCache();
List<Trigger> triggersForCurrentNode = null;
if (verifyInDatabase) {
triggersForCurrentNode = getTriggersForCurrentNode();
} else {
triggersForCurrentNode = new ArrayList<Trigger>();
triggersForCurrentNode.add(trigger);
triggersForCurrentNode.addAll(triggers);
}
try {
if (listener != null) {
extensionService.addExtensionPoint(listener);
}

triggersToSync = triggersForCurrentNode.size();

log.info("Synchronizing {} triggers", triggers.size());
triggersToSync = triggers.size();
triggersSynced = 0;
for (ITriggerCreationListener l : extensionService.getExtensionPointList(ITriggerCreationListener.class)) {
l.syncTriggersStarted();
}

List<TriggerHistory> allHistories = getActiveTriggerHistories();
if (triggersForCurrentNode.contains(trigger)) {
if (!trigger.isSourceTableNameWildCarded() && !trigger.isSourceTableNameExpanded()) {
for (TriggerHistory triggerHistory : getActiveTriggerHistories(trigger)) {
if (!triggerHistory.getFullyQualifiedSourceTableName().equals(trigger.getFullyQualifiedSourceTableName())) {
Map<String, List<TriggerHistory>> activeHistoryByTriggerId = new HashMap<String, List<TriggerHistory>>();
for (TriggerHistory hist : allHistories) {
List<TriggerHistory> list = activeHistoryByTriggerId.get(hist.getTriggerId());
if (list == null) {
list = new ArrayList<TriggerHistory>();
activeHistoryByTriggerId.put(hist.getTriggerId(), list);
}
list.add(hist);
}

for (Trigger trigger : triggers) {
if (triggersForCurrentNode.contains(trigger)) {
if (!trigger.isSourceTableNameWildCarded() && !trigger.isSourceTableNameExpanded()) {
List<TriggerHistory> activeHistories = activeHistoryByTriggerId.get(trigger.getTriggerId());
if (activeHistories != null) {
for (TriggerHistory triggerHistory : activeHistories) {
if (!triggerHistory.getFullyQualifiedSourceTableName().equals(trigger.getFullyQualifiedSourceTableName())) {
dropTriggers(triggerHistory, sqlBuffer);
}
}
}
}
Map<String, List<TriggerTableSupportingInfo>> triggerToTableSupportingInfo = getTriggerToTableSupportingInfo(
Collections.singletonList(trigger), allHistories, true);
updateOrCreateDatabaseTrigger(trigger, triggersForCurrentNode, sqlBuffer,
force, verifyInDatabase, allHistories, false, triggerToTableSupportingInfo);
} else {
List<TriggerHistory> activeHistories = activeHistoryByTriggerId.get(trigger.getTriggerId());
if (activeHistories != null) {
for (TriggerHistory triggerHistory : activeHistories) {
dropTriggers(triggerHistory, sqlBuffer);
}
}
}
List<Trigger> triggers = new ArrayList<Trigger>();
triggers.add(trigger);
Map<String, List<TriggerTableSupportingInfo>> triggerToTableSupportingInfo = getTriggerToTableSupportingInfo(triggers, allHistories, true);
updateOrCreateDatabaseTrigger(trigger, triggersForCurrentNode, sqlBuffer,
force, verifyInDatabase, allHistories, false, triggerToTableSupportingInfo);
} else {
for (TriggerHistory triggerHistory : getActiveTriggerHistories(trigger)) {
dropTriggers(triggerHistory, sqlBuffer);
}
}
} finally {
for (ITriggerCreationListener l : extensionService.getExtensionPointList(ITriggerCreationListener.class)) {
Expand All @@ -1870,6 +1892,7 @@ public void syncTrigger(Trigger trigger, ITriggerCreationListener listener, bool
if (listener != null) {
extensionService.removeExtensionPoint(listener);
}
log.info("Done synchronizing {} triggers", triggers.size());
}
}

Expand Down

0 comments on commit 5883dd1

Please sign in to comment.