Skip to content

Commit

Permalink
0002330: Make channels push asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Jul 8, 2015
1 parent 09181db commit ba66860
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 10 deletions.
Expand Up @@ -40,9 +40,9 @@ public String toString() {
case PULL_JOB:
return "Database Pull";
case PUSH_HANDLER:
return "Service Database Push";
return "Load From Push";
case PULL_HANDLER:
return "Service Database Pull";
return "Extract For Pull";
case ROUTER_JOB:
return "Routing";
case ROUTER_READER:
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.jumpmind.symmetric.model.IncomingError;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.RemoteNodeStatus;
import org.jumpmind.symmetric.service.impl.DataLoaderService.ConflictNodeGroupLink;
import org.jumpmind.symmetric.service.impl.DataLoaderService.DataLoaderWorker;
Expand Down Expand Up @@ -59,7 +60,7 @@ public interface IDataLoaderService {

public void save(ConflictNodeGroupLink settings);

public DataLoaderWorker createDataLoaderWorker(Node sourceNode);
public DataLoaderWorker createDataLoaderWorker(ProcessInfoKey.ProcessType processType, String channelId, Node sourceNode);

public void clearCache();

Expand Down
Expand Up @@ -208,8 +208,8 @@ public void stop() {
}

@Override
public DataLoaderWorker createDataLoaderWorker(Node sourceNode) {
DataLoaderWorker worker = new DataLoaderWorker(sourceNode);
public DataLoaderWorker createDataLoaderWorker(ProcessInfoKey.ProcessType processType, String channelId, Node sourceNode) {
DataLoaderWorker worker = new DataLoaderWorker(processType, channelId, sourceNode);
dataLoadWorkers.execute(worker);
return worker;
}
Expand Down Expand Up @@ -1028,16 +1028,22 @@ public class DataLoaderWorker implements Runnable {
List<IncomingBatch> batchList = new ArrayList<IncomingBatch>();

IncomingBatch currentlyLoading;

String channelId;

Node identityNode;

Node sourceNode;

DataContext ctx = new DataContext();

ProcessInfoKey.ProcessType processType;

public DataLoaderWorker(Node sourceNode) {
public DataLoaderWorker(ProcessInfoKey.ProcessType processType, String channelId, Node sourceNode) {
this.identityNode = nodeService.findIdentity();
this.sourceNode = sourceNode;
this.processType = processType;
this.channelId = channelId;

ctx.put(Constants.DATA_CONTEXT_ENGINE, engine);
if (identityNode != null) {
Expand All @@ -1062,7 +1068,7 @@ public void queueUpLoad(IncomingBatch batch) {
@Override
public void run() {
final ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(sourceNode.getNodeId(), identityNode.getNodeId(),
ProcessInfoKey.ProcessType.PUSH_HANDLER));
processType, channelId));
try {
currentlyLoading = toLoadQueue.take();
while (!(currentlyLoading instanceof EOM)) {
Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.model.IncomingBatch;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.service.IDataLoaderService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.impl.DataLoaderService;
Expand All @@ -61,6 +62,7 @@ public PushUriHandler(ISymmetricEngine engine, IInterceptor... interceptors) {

public void handle(HttpServletRequest req, HttpServletResponse res) throws IOException, ServletException {
String nodeId = ServletUtils.getParameter(req, WebConstants.NODE_ID);
String channelId = ServletUtils.getParameter(req, WebConstants.CHANNEL_ID);
Node sourceNode = engine.getNodeService().findNode(nodeId);
log.info("About to service push request for {}", nodeId);

Expand All @@ -87,8 +89,8 @@ public void handle(HttpServletRequest req, HttpServletResponse res) throws IOExc
writer.write(line);
writer.close();
writer = null;
if (worker == null) {
worker = dataLoaderService.createDataLoaderWorker(sourceNode);
if (worker == null) {
worker = dataLoaderService.createDataLoaderWorker(ProcessInfoKey.ProcessType.PUSH_HANDLER, channelId, sourceNode);
}
worker.queueUpLoad(new IncomingBatch(batchId, nodeId));
batchId = null;
Expand Down Expand Up @@ -116,7 +118,7 @@ public void handle(HttpServletRequest req, HttpServletResponse res) throws IOExc
status = "in progress";
batch = worker.getCurrentlyLoading();
}
if (batch != null) {
if (batch != null && !(batch instanceof DataLoaderService.EOM)) {
ArrayList<IncomingBatch> list = new ArrayList<IncomingBatch>(1);
list.add(batch);
log.info("sending {} ack ... for {}", status, batch);
Expand Down

0 comments on commit ba66860

Please sign in to comment.