From a146f60e7caa6b7b265322e8aa463757364d4edf Mon Sep 17 00:00:00 2001 From: Chris Henson Date: Thu, 21 Sep 2017 08:53:47 -0400 Subject: [PATCH] 0003253: ProcessInfo which is used to gather information about processes can be corrupted on push and pull because of threading in 3.8 --- .../io/stage}/SimpleStagingDataWriter.java | 26 +++++- .../jumpmind/symmetric/model/ProcessInfo.java | 9 +- .../model/ProcessInfoDataWriter.java | 7 ++ .../symmetric/model/ProcessInfoKey.java | 24 ++--- .../symmetric/model/RemoteNodeStatus.java | 12 +-- .../symmetric/model/RemoteNodeStatuses.java | 4 +- .../symmetric/route/DataGapDetector.java | 2 +- .../symmetric/route/DataGapFastDetector.java | 2 +- .../symmetric/route/DataGapRouteReader.java | 2 +- .../service/impl/DataExtractorService.java | 64 ++++++++----- .../service/impl/DataLoaderService.java | 91 ++++++++++--------- .../impl/FileSyncExtractorService.java | 7 +- .../service/impl/FileSyncService.java | 8 +- .../service/impl/OfflinePushService.java | 6 +- .../symmetric/service/impl/PushService.java | 8 +- .../service/impl/RegistrationService.java | 3 +- .../symmetric/service/impl/RouterService.java | 2 +- .../internal/InternalTransportManager.java | 6 +- .../symmetric/web/FileSyncPullUriHandler.java | 2 +- .../symmetric/web/PullUriHandler.java | 4 +- .../symmetric/web/rest/RestService.java | 4 +- 21 files changed, 166 insertions(+), 127 deletions(-) rename {symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer => symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage}/SimpleStagingDataWriter.java (92%) diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/SimpleStagingDataWriter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/SimpleStagingDataWriter.java similarity index 92% rename from symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/SimpleStagingDataWriter.java rename to symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/SimpleStagingDataWriter.java index 1bcc8355ca..83c6de0f4a 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/SimpleStagingDataWriter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/SimpleStagingDataWriter.java @@ -18,7 +18,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.jumpmind.symmetric.io.data.writer; +package org.jumpmind.symmetric.io.stage; import java.io.BufferedReader; import java.io.BufferedWriter; @@ -31,12 +31,15 @@ import org.jumpmind.symmetric.csv.CsvReader; import org.jumpmind.symmetric.io.data.Batch; import org.jumpmind.symmetric.io.data.Batch.BatchType; +import org.jumpmind.symmetric.io.data.writer.IProtocolDataWriterListener; import org.jumpmind.symmetric.io.data.CsvConstants; import org.jumpmind.symmetric.io.data.DataContext; import org.jumpmind.symmetric.io.stage.IStagedResource; import org.jumpmind.symmetric.io.stage.IStagedResource.State; import org.jumpmind.util.Statistics; import org.jumpmind.symmetric.io.stage.IStagingManager; +import org.jumpmind.symmetric.model.ProcessInfo; +import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,11 +57,11 @@ public class SimpleStagingDataWriter { protected BatchType batchType; protected String targetNodeId; protected DataContext context; - + protected ProcessInfo processInfo; protected BufferedWriter writer; protected Batch batch; - public SimpleStagingDataWriter(BufferedReader reader, IStagingManager stagingManager, String category, long memoryThresholdInBytes, + public SimpleStagingDataWriter(ProcessInfo processInfo, BufferedReader reader, IStagingManager stagingManager, String category, long memoryThresholdInBytes, BatchType batchType, String targetNodeId, DataContext context, IProtocolDataWriterListener... listeners) { this.reader = new CsvReader(reader); this.reader.setEscapeMode(CsvReader.ESCAPE_MODE_BACKSLASH); @@ -70,6 +73,7 @@ public SimpleStagingDataWriter(BufferedReader reader, IStagingManager stagingMan this.targetNodeId = targetNodeId; this.listeners = listeners; this.context = context; + this.processInfo = processInfo; } public void process() throws IOException { @@ -96,7 +100,6 @@ public void process() throws IOException { } else if (line.startsWith(CsvConstants.TABLE)) { tableLine = new TableLine(catalogLine, schemaLine, line); TableLine batchTableLine = batchTableLines.get(tableLine); - if (batchTableLine != null) { tableLine = batchTableLine; writeLine(line); @@ -124,6 +127,7 @@ public void process() throws IOException { } else if (line.startsWith(CsvConstants.BATCH)) { batch = new Batch(batchType, Long.parseLong(getArgLine(line)), getArgLine(channelLine), getBinaryEncoding(binaryLine), getArgLine(nodeLine), targetNodeId, false); + processInfo.incrementBatchCount(); String location = batch.getStagedLocation(); resource = stagingManager.create(category, location, batch.getBatchId()); writer = resource.getWriter(memoryThresholdInBytes); @@ -136,7 +140,7 @@ public void process() throws IOException { for (IProtocolDataWriterListener listener : listeners) { listener.start(context, batch); } - } + } } else if (line.startsWith(CsvConstants.COMMIT)) { if (writer != null) { writeLine(line); @@ -157,6 +161,7 @@ public void process() throws IOException { } else if (line.startsWith(CsvConstants.RETRY)) { batch = new Batch(batchType, Long.parseLong(getArgLine(line)), getArgLine(channelLine), getBinaryEncoding(binaryLine), getArgLine(nodeLine), targetNodeId, false); + processInfo.incrementBatchCount(); String location = batch.getStagedLocation(); resource = stagingManager.find(category, location, batch.getBatchId()); if (resource == null || resource.getState() == State.CREATE) { @@ -204,6 +209,13 @@ public void process() throws IOException { writeLine(syncLine.columnsLine); } } + + if (line.startsWith(CsvConstants.INSERT) || line.startsWith(CsvConstants.DELETE) || line.startsWith(CsvConstants.UPDATE) + || line.startsWith(CsvConstants.CREATE) || line.startsWith(CsvConstants.SQL) + || line.startsWith(CsvConstants.BSH)) { + processInfo.incrementCurrentDataCount(); + } + int size = line.length(); if (size > MAX_WRITE_LENGTH) { log.debug("Exceeded max line length with {}", size); @@ -227,11 +239,15 @@ public void process() throws IOException { ts = System.currentTimeMillis(); } } + + processInfo.setStatus(ProcessStatus.OK); } catch (IOException ex) { if (resource != null) { resource.delete(); } + processInfo.setStatus(ProcessStatus.ERROR); + /* * Just log an error here. We want batches that come before us to continue to process and to be acknowledged */ diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfo.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfo.java index 05a2fcd9a0..c2d0475da6 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfo.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfo.java @@ -208,11 +208,12 @@ public long getCurrentLoadId() { return currentLoadId; } - public String getCurrentChannelThread() { - if (getKey().getChannelId() != null && getKey().getChannelId().length() > 0) { - return getKey().getChannelId(); + public String getQueue() { + String queue = key.getQueue(); + if (queue == null) { + queue = ""; } - return ""; + return queue; } public String getCurrentChannelId() { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfoDataWriter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfoDataWriter.java index 5b497f514a..4bcc3aeaa9 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfoDataWriter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfoDataWriter.java @@ -26,6 +26,7 @@ import org.jumpmind.symmetric.io.data.DataContext; import org.jumpmind.symmetric.io.data.IDataWriter; import org.jumpmind.symmetric.io.data.writer.NestedDataWriter; +import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus; public class ProcessInfoDataWriter extends NestedDataWriter { @@ -57,6 +58,12 @@ public boolean start(Table table) { } return super.start(table); } + + @Override + public void end(Batch batch, boolean inError) { + processInfo.setStatus(!inError ? ProcessStatus.OK : ProcessStatus.ERROR); + super.end(batch, inError); + } public void write(CsvData data) { if (data != null) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfoKey.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfoKey.java index 941e97f27e..7a94accf71 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfoKey.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfoKey.java @@ -32,20 +32,20 @@ public class ProcessInfoKey implements Serializable { private ProcessType processType; - private String channelId; + private String queue; public ProcessInfoKey(String sourceNodeId, String targetNodeId, ProcessType processType) { this.sourceNodeId = sourceNodeId; this.targetNodeId = targetNodeId; this.processType = processType; - this.channelId = null; + this.queue = null; } - public ProcessInfoKey(String sourceNodeId, String channelId, String targetNodeId, ProcessType processType) { + public ProcessInfoKey(String sourceNodeId, String queue, String targetNodeId, ProcessType processType) { this.sourceNodeId = sourceNodeId; this.targetNodeId = targetNodeId; this.processType = processType; - this.channelId = channelId; + this.queue = queue; } public String getSourceNodeId() { @@ -60,8 +60,8 @@ public ProcessType getProcessType() { return processType; } - public String getChannelId() { - return channelId; + public String getQueue() { + return queue; } @Override @@ -71,7 +71,7 @@ public int hashCode() { result = prime * result + ((processType == null) ? 0 : processType.hashCode()); result = prime * result + ((sourceNodeId == null) ? 0 : sourceNodeId.hashCode()); result = prime * result + ((targetNodeId == null) ? 0 : targetNodeId.hashCode()); - result = prime * result + ((channelId == null) ? 0 : channelId.hashCode()); + result = prime * result + ((queue == null) ? 0 : queue.hashCode()); return result; } @@ -96,18 +96,18 @@ public boolean equals(Object obj) { return false; } else if (!targetNodeId.equals(other.targetNodeId)) return false; - if (channelId == null) { - if (other.channelId != null) + if (queue == null) { + if (other.queue != null) return false; - } else if (!channelId.equals(other.channelId)) + } else if (!queue.equals(other.queue)) return false; return true; } @Override public String toString() { - return String.format("processType=%s,sourceNodeId=%s,targetNodeId=%s,channelId=%s", processType.toString(), sourceNodeId, - targetNodeId, channelId); + return String.format("processType=%s,sourceNodeId=%s,targetNodeId=%s,queue=%s", processType.toString(), sourceNodeId, + targetNodeId, queue); } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/RemoteNodeStatus.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/RemoteNodeStatus.java index 28d7ed60a7..838f00de53 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/RemoteNodeStatus.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/RemoteNodeStatus.java @@ -37,7 +37,7 @@ public static enum Status { }; private String nodeId; - private String channelId; + private String queue; private Status status; private long dataProcessed; private long batchesProcessed; @@ -49,7 +49,7 @@ public RemoteNodeStatus(String nodeId, String channelId, Map ch this.status = Status.NO_DATA; this.nodeId = nodeId; this.channels = channels; - this.channelId = channelId; + this.queue = channelId; } public boolean failed() { @@ -64,12 +64,12 @@ public void setNodeId(String nodeId) { this.nodeId = nodeId; } - public String getChannelId() { - return channelId; + public String getQueue() { + return queue; } - public void setChannelId(String channelId) { - this.channelId = channelId; + public void setQueue(String queue) { + this.queue = queue; } public Status getStatus() { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/RemoteNodeStatuses.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/RemoteNodeStatuses.java index b22b48b897..76832f7a7e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/RemoteNodeStatuses.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/RemoteNodeStatuses.java @@ -68,10 +68,10 @@ public boolean errorOccurred() { return errorOccurred; } - public RemoteNodeStatus add(String nodeId, String channelId) { + public RemoteNodeStatus add(String nodeId, String queue) { RemoteNodeStatus status = null; if (nodeId != null) { - status = new RemoteNodeStatus(nodeId, channelId, channels); + status = new RemoteNodeStatus(nodeId, queue, channels); add(status); } return status; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapDetector.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapDetector.java index f8ccbfcbf8..307b05de98 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapDetector.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapDetector.java @@ -35,7 +35,7 @@ import org.jumpmind.symmetric.model.ProcessInfo; import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus; import org.jumpmind.symmetric.model.ProcessInfoKey; -import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType; +import org.jumpmind.symmetric.model.ProcessType; import org.jumpmind.symmetric.service.IDataService; import org.jumpmind.symmetric.service.INodeService; import org.jumpmind.symmetric.service.IParameterService; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapFastDetector.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapFastDetector.java index c037b6d12b..3a711abc58 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapFastDetector.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapFastDetector.java @@ -42,7 +42,7 @@ import org.jumpmind.symmetric.model.ProcessInfo; import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus; import org.jumpmind.symmetric.model.ProcessInfoKey; -import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType; +import org.jumpmind.symmetric.model.ProcessType; import org.jumpmind.symmetric.service.IContextService; import org.jumpmind.symmetric.service.IDataService; import org.jumpmind.symmetric.service.INodeService; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java index d34f24abee..8529e72664 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java @@ -48,7 +48,7 @@ import org.jumpmind.symmetric.model.ProcessInfo; import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus; import org.jumpmind.symmetric.model.ProcessInfoKey; -import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType; +import org.jumpmind.symmetric.model.ProcessType; import org.jumpmind.symmetric.service.IParameterService; import org.jumpmind.util.AppUtils; import org.jumpmind.util.FormatUtils; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index e4573bdd0f..50ef876a50 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -104,6 +104,7 @@ import org.jumpmind.symmetric.io.stage.IStagedResource; import org.jumpmind.symmetric.io.stage.IStagedResource.State; import org.jumpmind.symmetric.io.stage.IStagingManager; +import org.jumpmind.symmetric.model.AbstractBatch.Status; import org.jumpmind.symmetric.model.Channel; import org.jumpmind.symmetric.model.ChannelMap; import org.jumpmind.symmetric.model.Data; @@ -114,16 +115,16 @@ import org.jumpmind.symmetric.model.NodeChannel; import org.jumpmind.symmetric.model.NodeCommunication; import org.jumpmind.symmetric.model.NodeCommunication.CommunicationType; +import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus; import org.jumpmind.symmetric.model.NodeGroupLink; import org.jumpmind.symmetric.model.NodeGroupLinkAction; import org.jumpmind.symmetric.model.OutgoingBatch; -import org.jumpmind.symmetric.model.AbstractBatch.Status; import org.jumpmind.symmetric.model.OutgoingBatchWithPayload; import org.jumpmind.symmetric.model.OutgoingBatches; import org.jumpmind.symmetric.model.ProcessInfo; import org.jumpmind.symmetric.model.ProcessInfoDataWriter; import org.jumpmind.symmetric.model.ProcessInfoKey; -import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType; +import org.jumpmind.symmetric.model.ProcessType; import org.jumpmind.symmetric.model.RemoteNodeStatus; import org.jumpmind.symmetric.model.RemoteNodeStatuses; import org.jumpmind.symmetric.model.Router; @@ -513,12 +514,12 @@ public List extract(ProcessInfo processInfo, Node targetNode, Str if (queue != null) { NodeGroupLinkAction defaultAction = configurationService.getNodeGroupLinkFor(nodeService.findIdentity().getNodeGroupId(), targetNode.getNodeGroupId(), false).getDataEventAction(); - ProcessInfoKey.ProcessType processType = processInfo.getKey().getProcessType(); + ProcessType processType = processInfo.getKey().getProcessType(); NodeGroupLinkAction action = null; - if (processType.equals(ProcessInfoKey.ProcessType.PUSH_JOB)) { + if (processType.equals(ProcessType.PUSH_JOB_EXTRACT)) { action = NodeGroupLinkAction.P; - } else if (processType.equals(ProcessInfoKey.ProcessType.PULL_HANDLER)) { + } else if (processType.equals(ProcessType.PULL_HANDLER_EXTRACT)) { action = NodeGroupLinkAction.W; } batches = outgoingBatchService.getOutgoingBatches(targetNode.getNodeId(), queue, action, defaultAction, false); @@ -574,7 +575,7 @@ public boolean extractOnlyOutgoingBatch(String nodeId, long batchId, Writer writ return extracted; } - protected List extract(final ProcessInfo processInfo, final Node targetNode, + protected List extract(final ProcessInfo extractInfo, final Node targetNode, final List activeBatches, final IDataWriter dataWriter, final BufferedWriter writer, final ExtractMode mode) { if (activeBatches.size() > 0) { final List processedBatches = new ArrayList(activeBatches.size()); @@ -595,22 +596,22 @@ protected List extract(final ProcessInfo processInfo, final Node List> futures = new ArrayList>(); - processInfo.setBatchCount(activeBatches.size()); + extractInfo.setBatchCount(activeBatches.size()); for (int i = 0; i < activeBatches.size(); i++) { currentBatch = activeBatches.get(i); - processInfo.setCurrentLoadId(currentBatch.getLoadId()); - processInfo.setDataCount(currentBatch.getDataRowCount()); - processInfo.setCurrentBatchId(currentBatch.getBatchId()); + extractInfo.setCurrentLoadId(currentBatch.getLoadId()); + extractInfo.setDataCount(currentBatch.getDataRowCount()); + extractInfo.setCurrentBatchId(currentBatch.getBatchId()); channelsProcessed.add(currentBatch.getChannelId()); currentBatch = requeryIfEnoughTimeHasPassed(batchesSelectedAtMs, currentBatch); - processInfo.setStatus(ProcessInfo.ProcessStatus.EXTRACTING); + extractInfo.setStatus(ProcessInfo.ProcessStatus.EXTRACTING); final OutgoingBatch extractBatch = currentBatch; Callable callable = new Callable() { public FutureOutgoingBatch call() throws Exception { - return extractBatch(extractBatch, status, processInfo, targetNode, dataWriter, mode, activeBatches); + return extractBatch(extractBatch, status, extractInfo, targetNode, dataWriter, mode, activeBatches); } }; @@ -635,9 +636,16 @@ public FutureOutgoingBatch call() throws Exception { } } + ProcessInfo transferInfo = statisticManager.newProcessInfo(new ProcessInfoKey(nodeService.findIdentityNodeId(), + extractInfo.getQueue(), targetNode.getNodeId(), extractInfo.getProcessType() == ProcessType.PUSH_JOB_EXTRACT ? ProcessType.PUSH_JOB_TRANSFER : ProcessType.PULL_HANDLER_TRANSFER)); + transferInfo.setTotalDataCount(extractInfo.getTotalDataCount()); Iterator activeBatchIter = activeBatches.iterator(); - for (Future future : futures) { + for (int i = 0; i < futures.size(); i++) { + Future future = futures.get(i); currentBatch = activeBatchIter.next(); + if (i == futures.size() - 1) { + extractInfo.setStatus(ProcessStatus.OK); + } boolean isProcessed = false; while (!isProcessed) { try { @@ -649,23 +657,25 @@ public FutureOutgoingBatch call() throws Exception { } if (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT) { - processInfo.setStatus(ProcessInfo.ProcessStatus.TRANSFERRING); - processInfo.setCurrentLoadId(currentBatch.getLoadId()); + transferInfo.setStatus(ProcessInfo.ProcessStatus.TRANSFERRING); + transferInfo.setCurrentLoadId(currentBatch.getLoadId()); boolean isRetry = extractBatch.isRetry() && extractBatch.getOutgoingBatch().getStatus() != OutgoingBatch.Status.IG; - currentBatch = sendOutgoingBatch(processInfo, targetNode, currentBatch, isRetry, - dataWriter, writer, mode); + currentBatch = sendOutgoingBatch(transferInfo, targetNode, currentBatch, isRetry, + dataWriter, writer, mode); } processedBatches.add(currentBatch); - isProcessed = true; if (currentBatch.getStatus() != Status.OK) { currentBatch.setLoadCount(currentBatch.getLoadCount() + 1); changeBatchStatus(Status.LD, currentBatch, mode); - processInfo.setStatus(ProcessInfo.ProcessStatus.LOADING); - processInfo.setCurrentTableName(currentBatch.getSummary()); } + + transferInfo.setCurrentTableName(currentBatch.getSummary()); + transferInfo.setStatus(ProcessStatus.OK); + + isProcessed = true; } catch (ExecutionException e) { if (isNotBlank(e.getMessage()) && e.getMessage().contains("string truncation")) { throw new RuntimeException("There is a good chance that the truncation error you are receiving is because contains_big_lobs on the '" @@ -677,6 +687,10 @@ public FutureOutgoingBatch call() throws Exception { throw new RuntimeException(e); } catch (TimeoutException e) { writeKeepAliveAck(writer, sourceNode, streamToFileEnabled); + } finally { + if (transferInfo.getStatus() != ProcessStatus.OK) { + transferInfo.setStatus(ProcessStatus.ERROR); + } } } } @@ -718,7 +732,7 @@ public FutureOutgoingBatch call() throws Exception { log.error("Failed to extract batch {}", currentBatch, e); } } - processInfo.setStatus(ProcessInfo.ProcessStatus.ERROR); + extractInfo.setStatus(ProcessInfo.ProcessStatus.ERROR); } } else { log.error("Could not log the outgoing batch status because the batch was null", @@ -1760,16 +1774,16 @@ protected boolean isApplicable(NodeCommunication nodeCommunication, RemoteNodeSt return nodeCommunication.getCommunicationType() != CommunicationType.FILE_XTRCT; } - protected ProcessType getProcessType() { - return ProcessInfoKey.ProcessType.INITIAL_LOAD_EXTRACT_JOB; - } - protected MultiBatchStagingWriter buildMultiBatchStagingWriter(ExtractRequest request, Node sourceNode, Node targetNode, List batches, ProcessInfo processInfo, Channel channel) { MultiBatchStagingWriter multiBatchStatingWriter = new MultiBatchStagingWriter(this, request, sourceNode.getNodeId(), stagingManager, batches, channel.getMaxBatchSize(), processInfo); return multiBatchStatingWriter; } + + protected ProcessType getProcessType() { + return ProcessType.INITIAL_LOAD_EXTRACT_JOB; + } class ExtractRequestMapper implements ISqlRowMapper { public ExtractRequest mapRow(Row row) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index 6132b9949b..89ebf2326a 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -83,11 +83,11 @@ import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter; import org.jumpmind.symmetric.io.data.writer.IProtocolDataWriterListener; import org.jumpmind.symmetric.io.data.writer.ResolvedData; -import org.jumpmind.symmetric.io.data.writer.SimpleStagingDataWriter; import org.jumpmind.symmetric.io.data.writer.TransformWriter; import org.jumpmind.symmetric.io.stage.IStagedResource; import org.jumpmind.symmetric.io.stage.IStagedResource.State; import org.jumpmind.symmetric.io.stage.IStagingManager; +import org.jumpmind.symmetric.io.stage.SimpleStagingDataWriter; import org.jumpmind.symmetric.load.ConfigurationChangedDatabaseWriterFilter; import org.jumpmind.symmetric.load.DefaultDataLoaderFactory; import org.jumpmind.symmetric.load.DynamicDatabaseWriterFilter; @@ -107,7 +107,7 @@ import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus; import org.jumpmind.symmetric.model.ProcessInfoDataWriter; import org.jumpmind.symmetric.model.ProcessInfoKey; -import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType; +import static org.jumpmind.symmetric.model.ProcessType.*; import org.jumpmind.symmetric.model.RemoteNodeStatus; import org.jumpmind.symmetric.service.IConfigurationService; import org.jumpmind.symmetric.service.IDataLoaderService; @@ -221,7 +221,7 @@ public List loadDataBatch(String batchData) { String nodeId = nodeService.findIdentityNodeId(); if (StringUtils.isNotBlank(nodeId)) { ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(nodeId, - nodeId, ProcessInfoKey.ProcessType.MANUAL_LOAD)); + nodeId, MANUAL_LOAD)); try { InternalIncomingTransport transport = new InternalIncomingTransport( new BufferedReader(new StringReader(batchData))); @@ -245,9 +245,9 @@ public List loadDataBatch(String batchData) { * Connect to the remote node and pull data. The acknowledgment of * commit/error status is sent separately after the data is processed. */ - public RemoteNodeStatus loadDataFromPull(Node remote, String channelId) throws IOException { + public RemoteNodeStatus loadDataFromPull(Node remote, String queue) throws IOException { RemoteNodeStatus status = new RemoteNodeStatus(remote != null ? remote.getNodeId() : null, - channelId, + queue, configurationService.getChannels(false)); loadDataFromPull(remote, status); return status; @@ -270,7 +270,7 @@ public void loadDataFromPull(Node remote, RemoteNodeStatus status) throws IOExce suspendIgnoreChannels.getSuspendChannelsAsString()); requestProperties.put(WebConstants.IGNORED_CHANNELS, suspendIgnoreChannels.getIgnoreChannelsAsString()); - requestProperties.put(WebConstants.CHANNEL_QUEUE, status.getChannelId()); + requestProperties.put(WebConstants.CHANNEL_QUEUE, status.getQueue()); transport = transportManager.getPullTransport(remote, local, localSecurity.getNodePassword(), requestProperties, parameterService.getRegistrationUrl()); @@ -289,12 +289,12 @@ public void loadDataFromPull(Node remote, RemoteNodeStatus status) throws IOExce isRegisterTransport = true; } - ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(remote - .getNodeId(), status.getChannelId(), local.getNodeId(), ProcessType.PULL_JOB)); + ProcessInfo transferInfo = statisticManager.newProcessInfo(new ProcessInfoKey(remote + .getNodeId(), status.getQueue(), local.getNodeId(), PULL_JOB_TRANSFER)); try { - List list = loadDataFromTransport(processInfo, remote, transport, null); + List list = loadDataFromTransport(transferInfo, remote, transport, null); if (list.size() > 0) { - processInfo.setStatus(ProcessInfo.ProcessStatus.ACKING); + transferInfo.setStatus(ProcessInfo.ProcessStatus.ACKING); status.updateIncomingStatus(list); local = nodeService.findIdentity(); if (local != null) { @@ -317,18 +317,18 @@ public void loadDataFromPull(Node remote, RemoteNodeStatus status) throws IOExce } if (containsError(list)) { - processInfo.setStatus(ProcessInfo.ProcessStatus.ERROR); + transferInfo.setStatus(ProcessInfo.ProcessStatus.ERROR); } else { - processInfo.setStatus(ProcessInfo.ProcessStatus.OK); + transferInfo.setStatus(ProcessInfo.ProcessStatus.OK); } updateBatchToSendCount(remote, transport); } catch (RuntimeException e) { - processInfo.setStatus(ProcessInfo.ProcessStatus.ERROR); + transferInfo.setStatus(ProcessInfo.ProcessStatus.ERROR); throw e; } catch (IOException e) { - processInfo.setStatus(ProcessInfo.ProcessStatus.ERROR); + transferInfo.setStatus(ProcessInfo.ProcessStatus.ERROR); throw e; } @@ -387,27 +387,23 @@ public void loadDataFromPush(Node sourceNode, InputStream in, OutputStream out) * stream. This is used for a "push" request with a response of an * acknowledgment. */ - public void loadDataFromPush(Node sourceNode, String channelId, InputStream in, OutputStream out) + public void loadDataFromPush(Node sourceNode, String queue, InputStream in, OutputStream out) throws IOException { Node local = nodeService.findIdentity(); if (local != null) { - ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(sourceNode - .getNodeId(), channelId, local.getNodeId(), ProcessInfoKey.ProcessType.PUSH_HANDLER)); + ProcessInfo transferInfo = statisticManager.newProcessInfo(new ProcessInfoKey(sourceNode + .getNodeId(), queue, local.getNodeId(), PUSH_HANDLER_TRANSFER)); try { - List batchList = loadDataFromTransport(processInfo, sourceNode, + List batchList = loadDataFromTransport(transferInfo, sourceNode, new InternalIncomingTransport(in), out); - logDataReceivedFromPush(sourceNode, batchList, processInfo); + logDataReceivedFromPush(sourceNode, batchList, transferInfo); NodeSecurity security = nodeService.findNodeSecurity(local.getNodeId()); - processInfo.setStatus(ProcessInfo.ProcessStatus.ACKING); + transferInfo.setStatus(ProcessInfo.ProcessStatus.ACKING); transportManager.writeAcknowledgement(out, sourceNode, batchList, local, - security != null ? security.getNodePassword() : null); - if (containsError(batchList)) { - processInfo.setStatus(ProcessInfo.ProcessStatus.ERROR); - } else { - processInfo.setStatus(ProcessInfo.ProcessStatus.OK); - } + security != null ? security.getNodePassword() : null); + transferInfo.setStatus(ProcessInfo.ProcessStatus.OK); } catch (Exception e) { - processInfo.setStatus(ProcessInfo.ProcessStatus.ERROR); + transferInfo.setStatus(ProcessInfo.ProcessStatus.ERROR); if (e instanceof RuntimeException) { throw (RuntimeException) e; } else if (e instanceof IOException) { @@ -451,7 +447,7 @@ private void logDataReceivedFromPush(Node sourceNode, List batchL public List loadDataFromOfflineTransport(Node remote, RemoteNodeStatus status, IIncomingTransport transport) throws IOException { Node local = nodeService.findIdentity(); ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(remote - .getNodeId(), local.getNodeId(), ProcessType.OFFLINE_PULL)); + .getNodeId(), local.getNodeId(), OFFLINE_PULL)); List list = null; try { list = loadDataFromTransport(processInfo, remote, transport, null); @@ -488,7 +484,7 @@ public void loadDataFromConfig(Node remote, RemoteNodeStatus status, boolean for localSecurity.getNodePassword(), Version.version(), configVersion, remote.getSyncUrl()); ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(remote - .getNodeId(), Constants.CHANNEL_CONFIG, local.getNodeId(), ProcessType.PULL_CONFIG_JOB)); + .getNodeId(), Constants.CHANNEL_CONFIG, local.getNodeId(), PULL_CONFIG_JOB)); try { log.info("Requesting current configuration {symmetricVersion={}, configVersion={}}", Version.version(), local.getConfigVersion()); @@ -525,9 +521,9 @@ public void loadDataFromConfig(Node remote, RemoteNodeStatus status, boolean for * is used for a pull request that responds with data, and the * acknowledgment is sent later. */ - protected List loadDataFromTransport(final ProcessInfo processInfo, + protected List loadDataFromTransport(final ProcessInfo transferInfo, final Node sourceNode, IIncomingTransport transport, OutputStream out) throws IOException { - final ManageIncomingBatchListener listener = new ManageIncomingBatchListener(processInfo); + final ManageIncomingBatchListener listener = new ManageIncomingBatchListener(transferInfo); final DataContext ctx = new DataContext(); Throwable error = null; try { @@ -555,17 +551,17 @@ protected List loadDataFromTransport(final ProcessInfo processInf long memoryThresholdInBytes = parameterService.getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD); String targetNodeId = nodeService.findIdentityNodeId(); if (parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED)) { - processInfo.setStatus(ProcessStatus.TRANSFERRING); + transferInfo.setStatus(ProcessStatus.TRANSFERRING); if (threadFactory == null) { threadFactory = new CustomizableThreadFactory(parameterService.getEngineName().toLowerCase() + "-dataloader"); } ExecutorService executor = Executors.newFixedThreadPool(1, threadFactory); - LoadIntoDatabaseOnArrivalListener loadListener = new LoadIntoDatabaseOnArrivalListener(processInfo, + LoadIntoDatabaseOnArrivalListener loadListener = new LoadIntoDatabaseOnArrivalListener(transferInfo, sourceNode.getNodeId(), listener, executor); - new SimpleStagingDataWriter(transport.openReader(), stagingManager, Constants.STAGING_CATEGORY_INCOMING, + new SimpleStagingDataWriter(transferInfo, transport.openReader(), stagingManager, Constants.STAGING_CATEGORY_INCOMING, memoryThresholdInBytes, BatchType.LOAD, targetNodeId, ctx, loadListener).process(); /* Previously submitted tasks will still be executed */ @@ -589,11 +585,14 @@ protected List loadDataFromTransport(final ProcessInfo processInf loadListener.isDone(); } else { + transferInfo.setStatus(ProcessStatus.OK); + ProcessInfo loadInfo = statisticManager.newProcessInfo(new ProcessInfoKey(sourceNode.getNodeId() + , transferInfo.getQueue(), nodeService.findIdentityNodeId(), PULL_JOB_LOAD)); DataProcessor processor = new DataProcessor(new ProtocolDataReader(BatchType.LOAD, targetNodeId, transport.openReader()), null, listener, "data load") { @Override protected IDataWriter chooseDataWriter(Batch batch) { - return buildDataWriter(processInfo, sourceNode.getNodeId(), + return buildDataWriter(loadInfo, sourceNode.getNodeId(), batch.getChannelId(), batch.getBatchId(), ((ManageIncomingBatchListener) listener).getCurrentBatch().isRetry()); } @@ -941,8 +940,10 @@ class LoadIntoDatabaseOnArrivalListener implements IProtocolDataWriterListener { private long batchStartsToArriveTimeInMs; private String sourceNodeId; + + private ProcessInfo transferInfo; - private ProcessInfo processInfo; + private ProcessInfo loadInfo; private ExecutorService executor; @@ -950,20 +951,16 @@ class LoadIntoDatabaseOnArrivalListener implements IProtocolDataWriterListener { private boolean isError; - public LoadIntoDatabaseOnArrivalListener(ProcessInfo processInfo, String sourceNodeId, + public LoadIntoDatabaseOnArrivalListener(ProcessInfo transferInfo, String sourceNodeId, ManageIncomingBatchListener listener, ExecutorService executor) { this.sourceNodeId = sourceNodeId; this.listener = listener; - this.processInfo = processInfo; this.executor = executor; + this.transferInfo = transferInfo; } public void start(DataContext ctx, Batch batch) { batchStartsToArriveTimeInMs = System.currentTimeMillis(); - processInfo.setStatus(ProcessInfo.ProcessStatus.TRANSFERRING); - if (batch.getStatistics() != null) { - processInfo.setDataCount(batch.getStatistics().get(DataReaderStatistics.DATA_ROW_COUNT)); - } } public void end(final DataContext ctx, final Batch batchInStaging, final IStagedResource resource) { @@ -974,7 +971,13 @@ public IncomingBatch call() throws Exception { IncomingBatch incomingBatch = null; if (!isError && resource != null && resource.exists()) { try { - processInfo.setStatus(ProcessInfo.ProcessStatus.LOADING); + loadInfo = statisticManager.newProcessInfo(new ProcessInfoKey(transferInfo.getSourceNodeId(), + transferInfo.getQueue(), transferInfo.getTargetNodeId(), transferInfo.getProcessType() == PULL_JOB_TRANSFER ? PULL_JOB_LOAD : PUSH_HANDLER_LOAD)); + if (batchInStaging.getStatistics() != null) { + loadInfo.setTotalDataCount(batchInStaging.getStatistics().get(DataReaderStatistics.DATA_ROW_COUNT)); + } + + loadInfo.setStatus(ProcessInfo.ProcessStatus.LOADING); ProtocolDataReader reader = new ProtocolDataReader(BatchType.LOAD, batchInStaging.getTargetNodeId(), resource) { @Override @@ -998,7 +1001,7 @@ public Batch nextBatch() { @Override protected IDataWriter chooseDataWriter(Batch batch) { boolean isRetry = ((ManageIncomingBatchListener) listener).getCurrentBatch().isRetry(); - return buildDataWriter(processInfo, sourceNodeId, batch.getChannelId(), batch.getBatchId(), isRetry); + return buildDataWriter(loadInfo, sourceNodeId, batch.getChannelId(), batch.getBatchId(), isRetry); } }; processor.process(ctx); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncExtractorService.java index f2267b57bf..e683631e29 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncExtractorService.java @@ -32,13 +32,12 @@ import org.jumpmind.symmetric.model.ExtractRequest; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.NodeCommunication; +import org.jumpmind.symmetric.model.NodeCommunication.CommunicationType; import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.model.ProcessInfo; -import org.jumpmind.symmetric.model.ProcessInfoKey; +import org.jumpmind.symmetric.model.ProcessType; import org.jumpmind.symmetric.model.RemoteNodeStatus; -import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType; import org.jumpmind.symmetric.model.RemoteNodeStatuses; -import org.jumpmind.symmetric.model.NodeCommunication.CommunicationType; import org.jumpmind.symmetric.service.IConfigurationService; import org.jumpmind.symmetric.service.IFileSyncService; import org.jumpmind.symmetric.service.INodeCommunicationService; @@ -136,6 +135,6 @@ protected void queue(String nodeId, String queue, RemoteNodeStatuses statuses) { @Override protected ProcessType getProcessType() { - return ProcessInfoKey.ProcessType.FILE_SYNC_INITIAL_LOAD_EXTRACT_JOB; + return ProcessType.FILE_SYNC_INITIAL_LOAD_EXTRACT_JOB; } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java index 072f366b5e..66a97d9e79 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java @@ -61,6 +61,7 @@ import org.jumpmind.symmetric.io.stage.IStagedResource; import org.jumpmind.symmetric.io.stage.IStagedResource.State; import org.jumpmind.symmetric.io.stage.IStagingManager; +import org.jumpmind.symmetric.model.AbstractBatch.Status; import org.jumpmind.symmetric.model.BatchAck; import org.jumpmind.symmetric.model.Channel; import org.jumpmind.symmetric.model.Data; @@ -75,11 +76,10 @@ import org.jumpmind.symmetric.model.NodeCommunication.CommunicationType; import org.jumpmind.symmetric.model.NodeSecurity; import org.jumpmind.symmetric.model.OutgoingBatch; -import org.jumpmind.symmetric.model.AbstractBatch.Status; import org.jumpmind.symmetric.model.OutgoingBatches; import org.jumpmind.symmetric.model.ProcessInfo; import org.jumpmind.symmetric.model.ProcessInfoKey; -import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType; +import org.jumpmind.symmetric.model.ProcessType; import org.jumpmind.symmetric.model.RemoteNodeStatus; import org.jumpmind.symmetric.model.RemoteNodeStatuses; import org.jumpmind.symmetric.service.ClusterConstants; @@ -129,7 +129,7 @@ public void trackChanges(boolean force) { log.debug("Tracking changes for file sync"); Node local = engine.getNodeService().findIdentity(); ProcessInfo processInfo = engine.getStatisticManager().newProcessInfo( - new ProcessInfoKey(local.getNodeId(), null, ProcessInfoKey.ProcessType.FILE_SYNC_TRACKER)); + new ProcessInfoKey(local.getNodeId(), null, ProcessType.FILE_SYNC_TRACKER)); boolean useCrc = engine.getParameterService().is(ParameterConstants.FILE_SYNC_USE_CRC); if (engine.getParameterService().is(ParameterConstants.FILE_SYNC_FAST_SCAN)) { @@ -756,7 +756,7 @@ public void loadFilesFromPush(String nodeId, InputStream in, OutputStream out) { if (local != null && sourceNode != null) { ProcessInfo processInfo = engine.getStatisticManager().newProcessInfo( new ProcessInfoKey(nodeId, local.getNodeId(), - ProcessInfoKey.ProcessType.FILE_SYNC_PUSH_HANDLER)); + ProcessType.FILE_SYNC_PUSH_HANDLER)); try { List list = processZip(in, nodeId, processInfo); NodeSecurity security = nodeService.findNodeSecurity(local.getNodeId(), true); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OfflinePushService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OfflinePushService.java index 7336f3f139..4b8166dd83 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OfflinePushService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OfflinePushService.java @@ -31,7 +31,7 @@ import org.jumpmind.symmetric.model.ProcessInfo; import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus; import org.jumpmind.symmetric.model.ProcessInfoKey; -import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType; +import org.jumpmind.symmetric.model.ProcessType; import org.jumpmind.symmetric.model.RemoteNodeStatus; import org.jumpmind.symmetric.model.RemoteNodeStatuses; import org.jumpmind.symmetric.service.ClusterConstants; @@ -127,13 +127,13 @@ private void pushToNode(Node remote, RemoteNodeStatus status) { Node identity = nodeService.findIdentity(); FileOutgoingTransport transport = null; ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey( - identity.getNodeId(), status.getChannelId(), remote.getNodeId(), ProcessType.OFFLINE_PUSH)); + identity.getNodeId(), status.getQueue(), remote.getNodeId(), ProcessType.OFFLINE_PUSH)); List extractedBatches = null; try { transport = (FileOutgoingTransport) transportManager.getPushTransport(remote, identity, null, null); - extractedBatches = dataExtractorService.extract(processInfo, remote, status.getChannelId(), transport); + extractedBatches = dataExtractorService.extract(processInfo, remote, status.getQueue(), transport); if (extractedBatches.size() > 0) { log.info("Offline push data written for {} at {}", remote, transport.getOutgoingDir()); List batchAcks = readAcks(extractedBatches, transport, transportManager, acknowledgeService, dataExtractorService); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java index ba417ba524..2846d9b9b2 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java @@ -37,7 +37,7 @@ import org.jumpmind.symmetric.model.ProcessInfo; import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus; import org.jumpmind.symmetric.model.ProcessInfoKey; -import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType; +import org.jumpmind.symmetric.model.ProcessType; import org.jumpmind.symmetric.model.RemoteNodeStatus; import org.jumpmind.symmetric.model.RemoteNodeStatuses; import org.jumpmind.symmetric.service.ClusterConstants; @@ -199,15 +199,15 @@ private void pushToNode(Node remote, RemoteNodeStatus status) { NodeSecurity identitySecurity = nodeService.findNodeSecurity(identity.getNodeId(), true); IOutgoingWithResponseTransport transport = null; ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(identity - .getNodeId(), status.getChannelId(), remote.getNodeId(), ProcessType.PUSH_JOB)); + .getNodeId(), status.getQueue(), remote.getNodeId(), ProcessType.PUSH_JOB_EXTRACT)); Map requestProperties = new HashMap(); - requestProperties.put(WebConstants.CHANNEL_QUEUE, status.getChannelId()); + requestProperties.put(WebConstants.CHANNEL_QUEUE, status.getQueue()); try { transport = transportManager.getPushTransport(remote, identity, identitySecurity.getNodePassword(), requestProperties, parameterService.getRegistrationUrl()); - List extractedBatches = dataExtractorService.extract(processInfo, remote, status.getChannelId(), transport); + List extractedBatches = dataExtractorService.extract(processInfo, remote, status.getQueue(), transport); if (extractedBatches.size() > 0) { log.info("Push data sent to {}", remote); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java index 4f63605a9f..ba7bdabd65 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java @@ -463,8 +463,7 @@ public synchronized boolean attemptToRegisterWithServer(int maxNumberOfAttempts) l.registrationStarting(); } log.info("This node is unregistered. It will attempt to register using the registration.url"); - String channelId = null; - registered = dataLoaderService.loadDataFromPull(null, channelId).getStatus() == Status.DATA_PROCESSED; + registered = dataLoaderService.loadDataFromPull(null, (String)null).getStatus() == Status.DATA_PROCESSED; } catch (ConnectException e) { log.warn( "The request to register failed because the client failed to connect to the server. The connection error message was: {}", diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java index bf1a108c0a..fb28e9598d 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java @@ -60,7 +60,7 @@ import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.model.ProcessInfo; import org.jumpmind.symmetric.model.ProcessInfoKey; -import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType; +import org.jumpmind.symmetric.model.ProcessType; import org.jumpmind.symmetric.model.Router; import org.jumpmind.symmetric.model.TableReloadRequest; import org.jumpmind.symmetric.model.Trigger; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/internal/InternalTransportManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/internal/InternalTransportManager.java index f262400c1a..925c0d58cb 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/internal/InternalTransportManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/internal/InternalTransportManager.java @@ -40,9 +40,9 @@ import org.jumpmind.symmetric.model.IncomingBatch; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.ProcessInfo; -import org.jumpmind.symmetric.model.ProcessInfoKey; import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus; -import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType; +import org.jumpmind.symmetric.model.ProcessInfoKey; +import org.jumpmind.symmetric.model.ProcessType; import org.jumpmind.symmetric.transport.AbstractTransportManager; import org.jumpmind.symmetric.transport.IIncomingTransport; import org.jumpmind.symmetric.transport.IOutgoingTransport; @@ -106,7 +106,7 @@ public void run(ISymmetricEngine engine, InputStream is, OutputStream os) suspendIgnoreChannels, IoConstants.ENCODING); ProcessInfo processInfo = engine.getStatisticManager().newProcessInfo( new ProcessInfoKey(engine.getNodeService().findIdentityNodeId(), local - .getNodeId(), ProcessType.PULL_HANDLER)); + .getNodeId(), ProcessType.PULL_HANDLER_EXTRACT)); try { engine.getDataExtractorService().extract(processInfo, local, transport); processInfo.setStatus(ProcessStatus.OK); diff --git a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/FileSyncPullUriHandler.java b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/FileSyncPullUriHandler.java index 2ba51b822b..35ee1cd40e 100644 --- a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/FileSyncPullUriHandler.java +++ b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/FileSyncPullUriHandler.java @@ -32,7 +32,7 @@ import org.jumpmind.symmetric.model.ProcessInfo; import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus; import org.jumpmind.symmetric.model.ProcessInfoKey; -import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType; +import org.jumpmind.symmetric.model.ProcessType; import org.jumpmind.symmetric.transport.IOutgoingTransport; public class FileSyncPullUriHandler extends AbstractUriHandler { diff --git a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/PullUriHandler.java b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/PullUriHandler.java index 78d0e7937b..8d7f7230c7 100644 --- a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/PullUriHandler.java +++ b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/PullUriHandler.java @@ -38,7 +38,7 @@ import org.jumpmind.symmetric.model.ProcessInfo; import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus; import org.jumpmind.symmetric.model.ProcessInfoKey; -import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType; +import org.jumpmind.symmetric.model.ProcessType; import org.jumpmind.symmetric.service.IConfigurationService; import org.jumpmind.symmetric.service.IDataExtractorService; import org.jumpmind.symmetric.service.INodeService; @@ -123,7 +123,7 @@ public void pull(String nodeId, String remoteHost, String remoteAddress, IOutgoingTransport outgoingTransport = createOutgoingTransport(outputStream, encoding, map); ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey( - nodeService.findIdentityNodeId(), map.getChannelQueue(), nodeId, ProcessType.PULL_HANDLER)); + nodeService.findIdentityNodeId(), map.getChannelQueue(), nodeId, ProcessType.PULL_HANDLER_EXTRACT)); try { Node targetNode = nodeService.findNode(nodeId, true); diff --git a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/rest/RestService.java b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/rest/RestService.java index 58010e2b24..f965426e41 100644 --- a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/rest/RestService.java +++ b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/rest/RestService.java @@ -55,10 +55,10 @@ import org.jumpmind.symmetric.io.data.writer.StructureDataWriter.PayloadType; import org.jumpmind.symmetric.job.IJob; import org.jumpmind.symmetric.job.IJobManager; +import org.jumpmind.symmetric.model.AbstractBatch.Status; import org.jumpmind.symmetric.model.BatchAck; import org.jumpmind.symmetric.model.BatchAckResult; import org.jumpmind.symmetric.model.IncomingBatch; -import org.jumpmind.symmetric.model.AbstractBatch.Status; import org.jumpmind.symmetric.model.NetworkedNode; import org.jumpmind.symmetric.model.NodeChannel; import org.jumpmind.symmetric.model.NodeGroupLink; @@ -69,7 +69,7 @@ import org.jumpmind.symmetric.model.OutgoingBatchWithPayload; import org.jumpmind.symmetric.model.ProcessInfo; import org.jumpmind.symmetric.model.ProcessInfoKey; -import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType; +import org.jumpmind.symmetric.model.ProcessType; import org.jumpmind.symmetric.model.Trigger; import org.jumpmind.symmetric.model.TriggerRouter; import org.jumpmind.symmetric.service.IAcknowledgeService;