Skip to content

Commit

Permalink
0003335: Extension point during reloads to adjust the active trigger
Browse files Browse the repository at this point in the history
histories
  • Loading branch information
jumpmind-josh committed Dec 12, 2017
1 parent a2f0eab commit 1319a98
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 6 deletions.
@@ -0,0 +1,24 @@
package org.jumpmind.symmetric.load;

import java.util.List;

import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.TriggerHistory;

public class DefaultReloadGenerator implements IReloadGenerator, IBuiltInExtensionPoint {

ISymmetricEngine engine;

@Override
public void setSymmetricEngine(ISymmetricEngine engine) {
this.engine = engine;
}

@Override
public List<TriggerHistory> getActiveTriggerHistories(Node targetNode) {
return engine.getTriggerRouterService().getActiveTriggerHistories();
}

}
@@ -0,0 +1,13 @@
package org.jumpmind.symmetric.load;

import java.util.List;

import org.jumpmind.extension.IExtensionPoint;
import org.jumpmind.symmetric.ext.ISymmetricEngineAware;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.TriggerHistory;

public interface IReloadGenerator extends IExtensionPoint, ISymmetricEngineAware {

List<TriggerHistory> getActiveTriggerHistories(Node targetNode);
}
Expand Up @@ -47,6 +47,7 @@
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.load.IReloadGenerator;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.DataGap;
Expand Down Expand Up @@ -246,7 +247,6 @@ protected void insertInitialLoadEvents() {
.getRegistrationTime() != null)) {

List<NodeSecurity> nodeSecurities = findNodesThatAreReadyForInitialLoad();
List<TriggerHistory> activeHistories = triggerRouterService.getActiveTriggerHistories();
Map<String, List<TriggerRouter>> triggerRoutersByTargetNodeGroupId = new HashMap<String, List<TriggerRouter>>();

if (nodeSecurities != null && nodeSecurities.size() > 0) {
Expand All @@ -257,6 +257,8 @@ protected void insertInitialLoadEvents() {


for (NodeSecurity security : nodeSecurities) {
List<TriggerHistory> activeHistories = triggerRouterService.getActiveTriggerHistories();

if (activeHistories.size() > 0) {
Node targetNode = engine.getNodeService().findNode(security.getNodeId());
boolean thisMySecurityRecord = security.getNodeId().equals(
Expand Down Expand Up @@ -312,7 +314,7 @@ protected void insertInitialLoadEvents() {
}
}

processTableRequestLoads(identity, processInfo, activeHistories, triggerRoutersByTargetNodeGroupId);
processTableRequestLoads(identity, processInfo, triggerRoutersByTargetNodeGroupId);
}
}

Expand All @@ -324,22 +326,28 @@ protected void insertInitialLoadEvents() {

}

public void processTableRequestLoads(Node source, ProcessInfo processInfo, List<TriggerHistory> activeHistories, Map<String, List<TriggerRouter>> triggerRoutersByTargetNodeGroupId) {
public void processTableRequestLoads(Node source, ProcessInfo processInfo, Map<String, List<TriggerRouter>> triggerRoutersByTargetNodeGroupId) {
List<TableReloadRequest> loadsToProcess = engine.getDataService().getTableReloadRequestToProcess(source.getNodeId());
if (loadsToProcess.size() > 0) {
processInfo.setStatus(ProcessInfo.Status.CREATING);
processInfo.setStatus(ProcessInfo.Status.CREATING);
log.info("Found " + loadsToProcess.size() + " table reload requests to process.");
gapDetector.setFullGapAnalysis(true);

Map<String, List<TableReloadRequest>> requestsSplitByLoad = new HashMap<String, List<TableReloadRequest>>();
for (TableReloadRequest load : loadsToProcess) {
Node targetNode = engine.getNodeService().findNode(load.getTargetNodeId());

if (load.isFullLoadRequest() && isValidLoadTarget(load.getTargetNodeId())) {
List<TableReloadRequest> fullLoad = new ArrayList<TableReloadRequest>();
fullLoad.add(load);

List<TriggerRouter> triggerRouters = engine.getTriggerRouterService()
.getAllTriggerRoutersForReloadForCurrentNode(parameterService.getNodeGroupId(), targetNode.getNodeGroupId());

List<TriggerHistory> activeHistories = extensionService.getExtensionPoint(IReloadGenerator.class).getActiveTriggerHistories(targetNode);

engine.getDataService().insertReloadEvents(
engine.getNodeService().findNode(load.getTargetNodeId()),
false, fullLoad, processInfo);
targetNode,false, fullLoad, processInfo, activeHistories, triggerRouters);
}
else {
NodeSecurity targetNodeSecurity = engine.getNodeService().findNodeSecurity(load.getTargetNodeId());
Expand Down Expand Up @@ -367,6 +375,7 @@ public void processTableRequestLoads(Node source, ProcessInfo processInfo, List
triggerRouters = triggerRouterService.getAllTriggerRoutersForReloadForCurrentNode(parameterService.getNodeGroupId(), targetNode.getNodeGroupId());
triggerRoutersByTargetNodeGroupId.put(targetNode.getNodeGroupId(), triggerRouters);
}
List<TriggerHistory> activeHistories = extensionService.getExtensionPoint(IReloadGenerator.class).getActiveTriggerHistories(targetNode);

engine.getDataService().insertReloadEvents(
targetNode,
Expand Down

0 comments on commit 1319a98

Please sign in to comment.