Skip to content

Commit

Permalink
0002330: Make channels push asynchronously. Working on processinfo fo…
Browse files Browse the repository at this point in the history
…r what are now async processes
  • Loading branch information
chenson42 committed Jul 8, 2015
1 parent 12a2ea4 commit d4c04fc
Show file tree
Hide file tree
Showing 16 changed files with 154 additions and 110 deletions.
Expand Up @@ -26,8 +26,6 @@
import java.lang.management.ThreadMXBean;
import java.util.Date;

import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;

public class ProcessInfo implements Serializable, Comparable<ProcessInfo>, Cloneable {

private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -245,11 +243,11 @@ public String showInError(String identityNodeId) {
switch (key.getProcessType()) {
case MANUAL_LOAD:
return null;
case PUSH_JOB:
case EXTRACT_FOR_PUSH:
return key.getTargetNodeId();
case PULL_JOB:
return key.getSourceNodeId();
case PUSH_HANDLER:
case LOAD_FROM_PUSH:
return key.getSourceNodeId();
case PULL_HANDLER:
return key.getTargetNodeId();
Expand Down
Expand Up @@ -26,49 +26,6 @@ public class ProcessInfoKey implements Serializable {

private static final long serialVersionUID = 1L;

public enum ProcessType {
ANY, PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, REST_PULL_HANLDER, ROUTER_JOB, INSERT_LOAD_EVENTS, GAP_DETECT, ROUTER_READER, MANUAL_LOAD, FILE_SYNC_PULL_JOB, FILE_SYNC_PUSH_JOB, FILE_SYNC_PULL_HANDLER, FILE_SYNC_PUSH_HANDLER, INITIAL_LOAD_EXTRACT_JOB;

public String toString() {
switch (this) {
case ANY:
return "<Any>";
case MANUAL_LOAD:
return "Manual Load";
case PUSH_JOB:
return "Database Push";
case PULL_JOB:
return "Database Pull";
case PUSH_HANDLER:
return "Load From Push";
case PULL_HANDLER:
return "Extract For Pull";
case ROUTER_JOB:
return "Routing";
case ROUTER_READER:
return "Routing Reader";
case GAP_DETECT:
return "Gap Detection";
case FILE_SYNC_PULL_JOB:
return "File Sync Pull";
case FILE_SYNC_PUSH_JOB:
return "File Sync Push";
case FILE_SYNC_PULL_HANDLER:
return "Service File Sync Pull";
case FILE_SYNC_PUSH_HANDLER:
return "Service File Sync Push";
case REST_PULL_HANLDER:
return "REST Pull";
case INSERT_LOAD_EVENTS:
return "Inserting Load Events";
case INITIAL_LOAD_EXTRACT_JOB:
return "Initial Load Extractor";
default:
return name();
}
}
};

private String sourceNodeId;

private String targetNodeId;
Expand Down
@@ -0,0 +1,48 @@
package org.jumpmind.symmetric.model;

public enum ProcessType {
ANY, EXTRACT_FOR_PUSH, TRANSFER_TO, TRANSFER_FROM, PULL_JOB, LOAD_FROM_PUSH, PULL_HANDLER, REST_PULL_HANLDER, ROUTER_JOB, INSERT_LOAD_EVENTS, GAP_DETECT, ROUTER_READER, MANUAL_LOAD, FILE_SYNC_PULL_JOB, FILE_SYNC_PUSH_JOB, FILE_SYNC_PULL_HANDLER, FILE_SYNC_PUSH_HANDLER, INITIAL_LOAD_EXTRACT_JOB;

public String toString() {
switch (this) {
case ANY:
return "<Any>";
case MANUAL_LOAD:
return "Manual Load";
case EXTRACT_FOR_PUSH:
return "Extract For Push";
case TRANSFER_FROM:
return "Transfer From";
case TRANSFER_TO:
return "Transfer To";
case PULL_JOB:
return "Database Pull";
case LOAD_FROM_PUSH:
return "Load From Push";
case PULL_HANDLER:
return "Extract For Pull";
case ROUTER_JOB:
return "Routing";
case ROUTER_READER:
return "Routing Reader";
case GAP_DETECT:
return "Gap Detection";
case FILE_SYNC_PULL_JOB:
return "File Sync Pull";
case FILE_SYNC_PUSH_JOB:
return "File Sync Push";
case FILE_SYNC_PULL_HANDLER:
return "Service File Sync Pull";
case FILE_SYNC_PUSH_HANDLER:
return "Service File Sync Push";
case REST_PULL_HANLDER:
return "REST Pull";
case INSERT_LOAD_EVENTS:
return "Inserting Load Events";
case INITIAL_LOAD_EXTRACT_JOB:
return "Initial Load Extractor";
default:
return name();
}
}
}
Expand Up @@ -25,6 +25,7 @@

import org.jumpmind.db.sql.ISqlTemplate;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.UniqueKeyException;
import org.jumpmind.db.sql.mapper.NumberMapper;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
Expand All @@ -33,7 +34,7 @@
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfo.Status;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;
import org.jumpmind.symmetric.model.ProcessType;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
Expand Down
Expand Up @@ -48,7 +48,7 @@
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfo.Status;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;
import org.jumpmind.symmetric.model.ProcessType;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.util.AppUtils;
import org.jumpmind.util.FormatUtils;
Expand Down
Expand Up @@ -29,7 +29,7 @@
import org.jumpmind.symmetric.model.IncomingError;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.ProcessType;
import org.jumpmind.symmetric.model.RemoteNodeStatus;
import org.jumpmind.symmetric.service.impl.DataLoaderService.ConflictNodeGroupLink;
import org.jumpmind.symmetric.service.impl.DataLoaderService.DataLoaderWorker;
Expand Down Expand Up @@ -60,7 +60,7 @@ public interface IDataLoaderService {

public void save(ConflictNodeGroupLink settings);

public DataLoaderWorker createDataLoaderWorker(ProcessInfoKey.ProcessType processType, String channelId, Node sourceNode);
public DataLoaderWorker createDataLoaderWorker(ProcessType processType, String channelId, Node sourceNode);

public void clearCache();

Expand Down
Expand Up @@ -98,6 +98,7 @@
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfoDataWriter;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.ProcessType;
import org.jumpmind.symmetric.model.RemoteNodeStatus;
import org.jumpmind.symmetric.model.RemoteNodeStatuses;
import org.jumpmind.symmetric.model.Router;
Expand Down Expand Up @@ -1025,7 +1026,7 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
.getBatches();

ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(identity.getNodeId(), nodeCommunication.getNodeId(),
ProcessInfoKey.ProcessType.INITIAL_LOAD_EXTRACT_JOB));
ProcessType.INITIAL_LOAD_EXTRACT_JOB));
try {
boolean areBatchesOk = true;

Expand Down
Expand Up @@ -104,7 +104,7 @@
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfoDataWriter;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;
import org.jumpmind.symmetric.model.ProcessType;
import org.jumpmind.symmetric.model.RemoteNodeStatus;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.IDataLoaderService;
Expand Down Expand Up @@ -208,7 +208,7 @@ public void stop() {
}

@Override
public DataLoaderWorker createDataLoaderWorker(ProcessInfoKey.ProcessType processType, String channelId, Node sourceNode) {
public DataLoaderWorker createDataLoaderWorker(ProcessType processType, String channelId, Node sourceNode) {
DataLoaderWorker worker = new DataLoaderWorker(processType, channelId, sourceNode);
dataLoadWorkers.execute(worker);
return worker;
Expand Down Expand Up @@ -245,7 +245,7 @@ public List<IncomingBatch> loadDataBatch(String batchData) {
String nodeId = nodeService.findIdentityNodeId();
if (StringUtils.isNotBlank(nodeId)) {
ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(nodeId, nodeId,
ProcessInfoKey.ProcessType.MANUAL_LOAD));
ProcessType.MANUAL_LOAD));
try {
InternalIncomingTransport transport = new InternalIncomingTransport(new BufferedReader(new StringReader(batchData)));
List<IncomingBatch> list = loadDataFromTransport(processInfo, nodeService.findIdentity(), transport);
Expand Down Expand Up @@ -368,7 +368,7 @@ public void loadDataFromPush(Node sourceNode, InputStream in, OutputStream out)
Node local = nodeService.findIdentity();
if (local != null) {
ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(sourceNode.getNodeId(), local.getNodeId(),
ProcessInfoKey.ProcessType.PUSH_HANDLER));
ProcessType.LOAD_FROM_PUSH));
try {
List<IncomingBatch> batchList = loadDataFromTransport(processInfo, sourceNode, new InternalIncomingTransport(in));
logDataReceivedFromPush(sourceNode, batchList);
Expand Down Expand Up @@ -1037,9 +1037,9 @@ public class DataLoaderWorker implements Runnable {

DataContext ctx = new DataContext();

ProcessInfoKey.ProcessType processType;
ProcessType processType;

public DataLoaderWorker(ProcessInfoKey.ProcessType processType, String channelId, Node sourceNode) {
public DataLoaderWorker(ProcessType processType, String channelId, Node sourceNode) {
this.identityNode = nodeService.findIdentity();
this.sourceNode = sourceNode;
this.processType = processType;
Expand Down
Expand Up @@ -73,7 +73,7 @@
import org.jumpmind.symmetric.model.OutgoingBatches;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;
import org.jumpmind.symmetric.model.ProcessType;
import org.jumpmind.symmetric.model.RemoteNodeStatus;
import org.jumpmind.symmetric.model.RemoteNodeStatuses;
import org.jumpmind.symmetric.service.ClusterConstants;
Expand Down Expand Up @@ -563,7 +563,7 @@ public void loadFilesFromPush(String nodeId, InputStream in, OutputStream out) {
if (local != null && sourceNode != null) {
ProcessInfo processInfo = engine.getStatisticManager().newProcessInfo(
new ProcessInfoKey(nodeId, local.getNodeId(),
ProcessInfoKey.ProcessType.FILE_SYNC_PUSH_HANDLER));
ProcessType.FILE_SYNC_PUSH_HANDLER));
try {
List<IncomingBatch> list = processZip(in, nodeId, processInfo);
NodeSecurity security = nodeService.findNodeSecurity(local.getNodeId());
Expand Down
Expand Up @@ -27,6 +27,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -54,7 +55,7 @@
import org.jumpmind.symmetric.model.OutgoingBatchByNodeChannelCount;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;
import org.jumpmind.symmetric.model.ProcessType;
import org.jumpmind.symmetric.model.RemoteNodeStatus;
import org.jumpmind.symmetric.model.RemoteNodeStatuses;
import org.jumpmind.symmetric.service.ClusterConstants;
Expand Down Expand Up @@ -220,7 +221,7 @@ public void run() {
String channelId = nodeChannel.getChannelId();

ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(identitySecurity.getNodeId(), nodeChannel
.getNodeId(), ProcessType.PUSH_JOB, channelId));
.getNodeId(), ProcessType.EXTRACT_FOR_PUSH, channelId));

Exception error = null;
NodeChannelTransportForPushWorker pushWorker = null;
Expand All @@ -246,7 +247,7 @@ public void run() {
dataExtractorService.extractToStaging(processInfo, targetNode, batch);

if (pushWorker == null) {
pushWorker = new NodeChannelTransportForPushWorker(targetNode, identityNode, identitySecurity, status);
pushWorker = new NodeChannelTransportForPushWorker(channelId, targetNode, identityNode, identitySecurity, status);
nodeChannelTransportForPushWorker.execute(pushWorker);
}

Expand Down Expand Up @@ -339,12 +340,15 @@ class NodeChannelTransportForPushWorker implements Runnable {
NodeSecurity identitySecurity;

RemoteNodeStatus status;

public NodeChannelTransportForPushWorker(Node remoteNode, Node identityNode, NodeSecurity identitySecurity, RemoteNodeStatus status) {

String channelId;

public NodeChannelTransportForPushWorker(String channelId, Node remoteNode, Node identityNode, NodeSecurity identitySecurity, RemoteNodeStatus status) {
this.targetNode = remoteNode;
this.identityNode = identityNode;
this.identitySecurity = identitySecurity;
this.status = status;
this.channelId = channelId;
}

public void queueUpSend(OutgoingBatch batch) {
Expand All @@ -360,6 +364,11 @@ public void run() {
IDataExtractorService dataExtractorService = engine.getDataExtractorService();
ITransportManager transportManager = engine.getTransportManager();
IAcknowledgeService acknowledgeService = engine.getAcknowledgeService();
IStatisticManager statisticManager = engine.getStatisticManager();

ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(identitySecurity.getNodeId(), targetNode.getNodeId(),
ProcessType.TRANSFER_TO, channelId));

IOutgoingWithResponseTransport transport = null;
OutputStream os = null;
List<OutgoingBatch> batchesSent = new ArrayList<OutgoingBatch>();
Expand All @@ -369,6 +378,9 @@ public void run() {
batch.getChannelId(), parameterService.getRegistrationUrl());
while (!(batch instanceof EOM)) {
log.info("sending batch {}", batch);
processInfo.setCurrentBatchId(batch.getBatchId());
processInfo.setCurrentBatchStartTime(new Date());
processInfo.setStatus(ProcessInfo.Status.TRANSFERRING);
batchesSent.add(batch);
IStagedResource resource = dataExtractorService.getStagedResource(batch);
InputStream is = resource.getInputStream();
Expand All @@ -382,8 +394,11 @@ public void run() {
}
batch = sendQueue.take();
}

processInfo.setStatus(ProcessInfo.Status.OK);

BufferedReader reader = transport.readResponse();

String line = null;
do {
line = reader.readLine();
Expand All @@ -396,9 +411,10 @@ public void run() {
}
status.updateOutgoingStatus(batchesSent, batchAcks);
}
} while (line != null);
} while (line != null);

} catch (Exception ex) {
} catch (Exception ex) {
processInfo.setStatus(ProcessInfo.Status.ERROR);
fireOffline(ex, targetNode, status);
log.error("", ex);
} finally {
Expand Down
Expand Up @@ -56,7 +56,7 @@
import org.jumpmind.symmetric.model.OutgoingBatch.Status;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;
import org.jumpmind.symmetric.model.ProcessType;
import org.jumpmind.symmetric.model.Router;
import org.jumpmind.symmetric.model.TriggerRouter;
import org.jumpmind.symmetric.route.AuditTableDataRouter;
Expand Down
Expand Up @@ -43,8 +43,8 @@
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.ProcessType;
import org.jumpmind.symmetric.model.ProcessInfo.Status;
import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;
import org.jumpmind.symmetric.transport.AbstractTransportManager;
import org.jumpmind.symmetric.transport.IIncomingTransport;
import org.jumpmind.symmetric.transport.IOutgoingTransport;
Expand Down
Expand Up @@ -31,7 +31,7 @@
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfo.Status;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;
import org.jumpmind.symmetric.model.ProcessType;
import org.jumpmind.symmetric.transport.IOutgoingTransport;

public class FileSyncPullUriHandler extends AbstractUriHandler {
Expand Down
Expand Up @@ -36,7 +36,7 @@
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfo.Status;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;
import org.jumpmind.symmetric.model.ProcessType;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.IDataExtractorService;
import org.jumpmind.symmetric.service.INodeService;
Expand Down

0 comments on commit d4c04fc

Please sign in to comment.