Skip to content

Commit

Permalink
0002625: Hybrid Pull Feature
Browse files Browse the repository at this point in the history
  • Loading branch information
mmichalek committed Jul 21, 2016
1 parent 5800903 commit 2fd3d10
Show file tree
Hide file tree
Showing 13 changed files with 104 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@
import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;
import org.jumpmind.symmetric.transport.http.HttpOutgoingTransport;
import org.jumpmind.symmetric.web.WebConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

public class ReportStatusJob extends AbstractJob {

protected final Logger log = LoggerFactory.getLogger(getClass());

protected ReportStatusJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
super("job.report.status", true, engine.getParameterService().is(ParameterConstants.HYBRID_PUSH_PULL_ENABLED),
Expand All @@ -43,26 +47,22 @@ protected ReportStatusJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskS

@Override
void doJob(boolean force) throws Exception {
Node identity = engine.getNodeService().findIdentity();
Node local = engine.getNodeService().findIdentity();
NetworkedNode remote = engine.getNodeService().getRootNetworkedNode();
if (local.getNodeId().equals(remote.getNode().getNodeId())) {
log.debug("Node %s will not send status to itself.", local.getNodeId());
return;
}

// int batchesToSend = engine.getOutgoingBatchService().countOutgoingBatchesPending(local.getNodeId());

int batchesToSend = engine.getOutgoingBatchService().countOutgoingBatchesPending(identity.getNodeId());
int batchesToSend=9999; // TODO Testing.

if (batchesToSend > 0) {
NodeSecurity identitySecurity = engine.getNodeService().findNodeSecurity(identity.getNodeId(), true);

NetworkedNode remote = engine.getNodeService().getRootNetworkedNode();

Map<String, String> requestParams = new HashMap<String, String>();
requestParams.put(WebConstants.BATCH_TO_SEND_COUNT, String.valueOf(batchesToSend));

IOutgoingWithResponseTransport transport = engine.getTransportManager().getPushTransport(remote.getNode(),
identity,
identitySecurity.getNodePassword(), requestParams, engine.getParameterService().getRegistrationUrl());

if (transport instanceof HttpOutgoingTransport) {
HttpOutgoingTransport httpTransport = (HttpOutgoingTransport) transport;
httpTransport.openStream().close(); // Effectively just sending over a header.
}
engine.getTransportManager().sendStatus(local, remote.getNode(), requestParams);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public static enum Status {
private long dataProcessed;
private long batchesProcessed;
private long reloadBatchesProcessed;
private long batchToSendCount;
private boolean complete = false;
private Map<String, Channel> channels;

Expand Down Expand Up @@ -145,4 +146,12 @@ public boolean isComplete() {
return complete;
}

public long getBatchToSendCount() {
return batchToSendCount;
}

public void setBatchToSendCount(long batchToSendCount) {
this.batchToSendCount = batchToSendCount;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
package org.jumpmind.symmetric.service;

import java.util.Date;
import java.util.List;
import java.util.List;
import java.util.Map;

import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatchSummary;
import org.jumpmind.symmetric.model.OutgoingBatches;
import org.jumpmind.symmetric.model.OutgoingLoadSummary;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatchSummary;
import org.jumpmind.symmetric.model.OutgoingBatches;

/**
* This service provides an API to access to the outgoing batch table.
Expand Down Expand Up @@ -87,7 +88,7 @@ public interface IOutgoingBatchService {

public int countOutgoingBatchesUnsent(String channelId);

public int countOutgoingBatchesPending(String nodeId);
public Map<String, Integer> countOutgoingBatchesPending(String nodeId);

public List<OutgoingBatchSummary> findOutgoingBatchSummary(OutgoingBatch.Status ... statuses);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ public void loadDataFromPull(Node remote, RemoteNodeStatus status) throws IOExce
processInfo.setStatus(ProcessInfo.Status.OK);
}

updateBatchToSendCount(remote, transport);
updateBatchToSendCount(status, status.getChannelId(), transport);

} catch (RuntimeException e) {
processInfo.setStatus(ProcessInfo.Status.ERROR);
Expand Down Expand Up @@ -344,11 +344,12 @@ public void loadDataFromPull(Node remote, RemoteNodeStatus status) throws IOExce
}
}

protected void updateBatchToSendCount(Node remote, IIncomingTransport transport) {
protected void updateBatchToSendCount(RemoteNodeStatus status, String queue, IIncomingTransport transport) {
Map<String, String> headers = transport.getHeaders();
if (headers != null && headers.containsKey(WebConstants.BATCH_TO_SEND_COUNT)) {
// TODO save this batch to send to node communication... Figure out how queues fit in.
int batchToSendCount = Integer.parseInt(headers.get(WebConstants.BATCH_TO_SEND_COUNT));
int batchToSendCount = Integer.parseInt(headers.get(WebConstants.BATCH_TO_SEND_COUNT));
status.setBatchToSendCount(batchToSendCount);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ public void run() {
boolean failed = false;
try {
executor.execute(nodeCommunication, status);
nodeCommunication.setBatchToSendCount(status.getBatchToSendCount());
failed = status.failed();
} catch (Throwable ex) {
failed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,24 +296,39 @@ public int countOutgoingBatchesUnsent(String channelId) {
return sqlTemplate.queryForInt(getSql("countOutgoingBatchesUnsentOnChannelSql"), channelId);
}

public int countOutgoingBatchesPending(String nodeId) {
public Map<String, Integer> countOutgoingBatchesPending(String nodeId) {
List<String> nodeIds;
if (nodeId != null) {
nodeIds = Arrays.asList(nodeId);
} else {
nodeIds = Collections.emptyList();
}
// Select only PULL channels?
// TODO: Select only PULL channels?


List<OutgoingBatch.Status> pendingStatuses = Arrays.asList(OutgoingBatch.Status.ER,
OutgoingBatch.Status.RQ,
OutgoingBatch.Status.NE,
OutgoingBatch.Status.QY,
OutgoingBatch.Status.RT);

List<String> emptyList = Collections.emptyList();
Map<String, Object> params = new HashMap<String, Object>();
// params.put("NODES", nodeIds);
// params.put("CHANNELS", channels);
params.put("STATUSES", toStringList(pendingStatuses));

// List<String> emptyList = Collections.emptyList();

// return sqlTemplate
// .queryForInt(
// getSql("selectCountBatchesPrefixSql",
// buildBatchWhere(nodeIds, channels, statuses, loads)), params);

// TODO: switch to countOutgoingBatchesPendingByChannelSql

// return countOutgoingBatches(nodeIds, emptyList, pendingStatuses, emptyList);

return countOutgoingBatches(nodeIds, emptyList, pendingStatuses, emptyList);
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform,
+ " last_update_hostname, last_update_time, create_time, create_by) "
+ " (select batch_id, ?, channel_id, 'NE', load_id, extract_job_flag, load_flag, common_flag, reload_event_count, other_event_count, "
+ " last_update_hostname, current_timestamp, create_time, 'copy' from $(outgoing_batch) where node_id=? and channel_id=? and batch_id > ?) ");

putSql("countOutgoingBatchesPendingByChannelSql",
"SELECT channel_id, count(*) AS batch_count FROM $(outgoing_batch) where ere status in (:STATUS_LIST) GROUP BY channel_id");


}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,24 +129,24 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
}

try {
dataLoaderService.loadDataFromPull(node, status);
} catch (ConnectException ex) {
log.warn(
"Failed to connect to the transport: {}",
(node.getSyncUrl() == null ? parameterService.getRegistrationUrl() : node
.getSyncUrl()));
fireOffline(ex, node, status);
} catch (OfflineException ex) {
fireOffline(ex, node, status);
} catch (UnknownHostException ex) {
fireOffline(ex, node, status);
} catch (SocketException ex) {
log.warn("{}", ex.getMessage());
fireOffline(ex, node, status);
} catch (IOException ex) {
log.error("An IO exception happened while attempting to pull data", ex);
fireOffline(ex, node, status);
}
dataLoaderService.loadDataFromPull(node, status);
} catch (ConnectException ex) {
log.warn(
"Failed to connect to the transport: {}",
(node.getSyncUrl() == null ? parameterService.getRegistrationUrl() : node
.getSyncUrl()));
fireOffline(ex, node, status);
} catch (OfflineException ex) {
fireOffline(ex, node, status);
} catch (UnknownHostException ex) {
fireOffline(ex, node, status);
} catch (SocketException ex) {
log.warn("{}", ex.getMessage());
fireOffline(ex, node, status);
} catch (IOException ex) {
log.error("An IO exception happened while attempting to pull data", ex);
fireOffline(ex, node, status);
}

if (!status.failed() &&
(status.getDataProcessed() > 0 || status.getBatchesProcessed() > 0)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,7 @@ public IOutgoingWithResponseTransport getFilePushTransport(Node remote, Node loc
public String resolveURL(String url, String registrationUrl);

public int sendCopyRequest(Node local) throws IOException;

public int sendStatus(Node local, Node remote, Map<String, String> statuses) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ public HttpTransportManager(ISymmetricEngine engine) {
super(engine.getExtensionService());
this.engine = engine;
}

public int sendStatus(Node local, Node remote, Map<String, String> statuses) throws IOException {
String securityToken = engine.getNodeService().findNodeSecurity(local.getNodeId())
.getNodePassword();
String url = addSecurityToken(remote.getSyncUrl() + "/pushstatus",
"&", local.getNodeId(), securityToken);
url = add(url, WebConstants.EXTERNAL_ID, engine.getParameterService().getExternalId(), "&");
url = add(url, WebConstants.NODE_GROUP_ID, engine.getParameterService().getNodeGroupId(), "&");

log.debug("Send Status: " + url);
return sendMessage(new URL(url), "");
}

public int sendCopyRequest(Node local) throws IOException {
StringBuilder data = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ public void run(ISymmetricEngine engine, InputStream is, OutputStream os)
public int sendCopyRequest(Node local) throws IOException {
return -1;
}

@Override
public int sendStatus(Node local, Node remote, Map<String, String> statuses) throws IOException {
return -1;
}

public int sendAcknowledgement(Node remote, List<IncomingBatch> list, Node local,
String securityToken, String registrationUrl) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public IOutgoingWithResponseTransport getPushTransport(Node remote, Node local,
return outgoingTransport;
}

@Override
public int sendStatus(Node local, Node remote, Map<String, String> statuses) throws IOException {
return -1;
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,11 @@ public void pull(String nodeId, String remoteHost, String remoteAddress,
}

private void addPendingBatchesCount(String targetNodeId, HttpServletResponse res) {
if (this.parameterService.is(ParameterConstants.HYBRID_PUSH_PULL_ENABLED)) {
int pendingBatchCount =
this.outgoingBatchService.countOutgoingBatchesPending(targetNodeId);
if (this.parameterService.is(ParameterConstants.HYBRID_PUSH_PULL_ENABLED)) {
// TODO
// int pendingBatchCount =
// this.outgoingBatchService.countOutgoingBatchesPending(targetNodeId);
int pendingBatchCount = 99999;
res.addHeader(WebConstants.BATCH_TO_SEND_COUNT, String.valueOf(pendingBatchCount));
}
}
Expand Down

0 comments on commit 2fd3d10

Please sign in to comment.