From ba668607daa908b830cdc2e59e40a41fc7c764df Mon Sep 17 00:00:00 2001 From: chenson42 Date: Wed, 8 Jul 2015 19:04:06 +0000 Subject: [PATCH] 0002330: Make channels push asynchronously --- .../jumpmind/symmetric/model/ProcessInfoKey.java | 4 ++-- .../symmetric/service/IDataLoaderService.java | 3 ++- .../symmetric/service/impl/DataLoaderService.java | 14 ++++++++++---- .../org/jumpmind/symmetric/web/PushUriHandler.java | 8 +++++--- 4 files changed, 19 insertions(+), 10 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfoKey.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfoKey.java index c1b3a6d12d..8b8c5ed5ab 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfoKey.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfoKey.java @@ -40,9 +40,9 @@ public String toString() { case PULL_JOB: return "Database Pull"; case PUSH_HANDLER: - return "Service Database Push"; + return "Load From Push"; case PULL_HANDLER: - return "Service Database Pull"; + return "Extract For Pull"; case ROUTER_JOB: return "Routing"; case ROUTER_READER: diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataLoaderService.java index dee7b5ee8c..f6a4c45b3e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataLoaderService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataLoaderService.java @@ -29,6 +29,7 @@ import org.jumpmind.symmetric.model.IncomingError; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.NodeGroupLink; +import org.jumpmind.symmetric.model.ProcessInfoKey; import org.jumpmind.symmetric.model.RemoteNodeStatus; import org.jumpmind.symmetric.service.impl.DataLoaderService.ConflictNodeGroupLink; import org.jumpmind.symmetric.service.impl.DataLoaderService.DataLoaderWorker; @@ -59,7 +60,7 @@ public interface IDataLoaderService { public void save(ConflictNodeGroupLink settings); - public DataLoaderWorker createDataLoaderWorker(Node sourceNode); + public DataLoaderWorker createDataLoaderWorker(ProcessInfoKey.ProcessType processType, String channelId, Node sourceNode); public void clearCache(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index 638e55ed93..337b162999 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -208,8 +208,8 @@ public void stop() { } @Override - public DataLoaderWorker createDataLoaderWorker(Node sourceNode) { - DataLoaderWorker worker = new DataLoaderWorker(sourceNode); + public DataLoaderWorker createDataLoaderWorker(ProcessInfoKey.ProcessType processType, String channelId, Node sourceNode) { + DataLoaderWorker worker = new DataLoaderWorker(processType, channelId, sourceNode); dataLoadWorkers.execute(worker); return worker; } @@ -1028,16 +1028,22 @@ public class DataLoaderWorker implements Runnable { List batchList = new ArrayList(); IncomingBatch currentlyLoading; + + String channelId; Node identityNode; Node sourceNode; DataContext ctx = new DataContext(); + + ProcessInfoKey.ProcessType processType; - public DataLoaderWorker(Node sourceNode) { + public DataLoaderWorker(ProcessInfoKey.ProcessType processType, String channelId, Node sourceNode) { this.identityNode = nodeService.findIdentity(); this.sourceNode = sourceNode; + this.processType = processType; + this.channelId = channelId; ctx.put(Constants.DATA_CONTEXT_ENGINE, engine); if (identityNode != null) { @@ -1062,7 +1068,7 @@ public void queueUpLoad(IncomingBatch batch) { @Override public void run() { final ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(sourceNode.getNodeId(), identityNode.getNodeId(), - ProcessInfoKey.ProcessType.PUSH_HANDLER)); + processType, channelId)); try { currentlyLoading = toLoadQueue.take(); while (!(currentlyLoading instanceof EOM)) { diff --git a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/PushUriHandler.java b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/PushUriHandler.java index 5337203060..22e8a4eb96 100644 --- a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/PushUriHandler.java +++ b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/PushUriHandler.java @@ -42,6 +42,7 @@ import org.jumpmind.symmetric.io.stage.IStagingManager; import org.jumpmind.symmetric.model.IncomingBatch; import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.ProcessInfoKey; import org.jumpmind.symmetric.service.IDataLoaderService; import org.jumpmind.symmetric.service.INodeService; import org.jumpmind.symmetric.service.impl.DataLoaderService; @@ -61,6 +62,7 @@ public PushUriHandler(ISymmetricEngine engine, IInterceptor... interceptors) { public void handle(HttpServletRequest req, HttpServletResponse res) throws IOException, ServletException { String nodeId = ServletUtils.getParameter(req, WebConstants.NODE_ID); + String channelId = ServletUtils.getParameter(req, WebConstants.CHANNEL_ID); Node sourceNode = engine.getNodeService().findNode(nodeId); log.info("About to service push request for {}", nodeId); @@ -87,8 +89,8 @@ public void handle(HttpServletRequest req, HttpServletResponse res) throws IOExc writer.write(line); writer.close(); writer = null; - if (worker == null) { - worker = dataLoaderService.createDataLoaderWorker(sourceNode); + if (worker == null) { + worker = dataLoaderService.createDataLoaderWorker(ProcessInfoKey.ProcessType.PUSH_HANDLER, channelId, sourceNode); } worker.queueUpLoad(new IncomingBatch(batchId, nodeId)); batchId = null; @@ -116,7 +118,7 @@ public void handle(HttpServletRequest req, HttpServletResponse res) throws IOExc status = "in progress"; batch = worker.getCurrentlyLoading(); } - if (batch != null) { + if (batch != null && !(batch instanceof DataLoaderService.EOM)) { ArrayList list = new ArrayList(1); list.add(batch); log.info("sending {} ack ... for {}", status, batch);