Skip to content

Commit

Permalink
0002625: Hybrid Pull Feature
Browse files Browse the repository at this point in the history
  • Loading branch information
mmichalek committed Jul 11, 2016
1 parent a99713a commit acc6775
Show file tree
Hide file tree
Showing 21 changed files with 433 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public JobManager(ISymmetricEngine engine) {
this.jobs.add(new FileSyncPushJob(engine,taskScheduler));
this.jobs.add(new InitialLoadExtractorJob(engine,taskScheduler));
this.jobs.add(new MonitorJob(engine, taskScheduler));
this.jobs.add(new ReportStatusJob(engine, taskScheduler));
}

public IJob getJob(String name) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.job;

import java.util.HashMap;
import java.util.Map;

import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.model.NetworkedNode;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;
import org.jumpmind.symmetric.transport.http.HttpOutgoingTransport;
import org.jumpmind.symmetric.web.WebConstants;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

public class ReportStatusJob extends AbstractJob {

protected ReportStatusJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
super("job.report.status", true, engine.getParameterService().is(ParameterConstants.HYBRID_PUSH_PULL_ENABLED),
engine, taskScheduler);
}

@Override
void doJob(boolean force) throws Exception {
Node identity = engine.getNodeService().findIdentity();

int batchesToSend = engine.getOutgoingBatchService().countOutgoingBatchesPending(identity.getNodeId());

if (batchesToSend > 0) {
NodeSecurity identitySecurity = engine.getNodeService().findNodeSecurity(identity.getNodeId(), true);

NetworkedNode remote = engine.getNodeService().getRootNetworkedNode();

Map<String, String> requestParams = new HashMap<String, String>();
requestParams.put(WebConstants.BATCH_TO_SEND_COUNT, String.valueOf(batchesToSend));

IOutgoingWithResponseTransport transport = engine.getTransportManager().getPushTransport(remote.getNode(),
identity,
identitySecurity.getNodePassword(), requestParams, engine.getParameterService().getRegistrationUrl());

if (transport instanceof HttpOutgoingTransport) {
HttpOutgoingTransport httpTransport = (HttpOutgoingTransport) transport;
httpTransport.openStream().close(); // Effectively just sending over a header.
}
}


}

@Override
public String getClusterLockName() {
return ClusterConstants.REPORT_STATUS;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,10 @@ private ParameterConstants() {
public final static String CHANNEL_THREADING = "channel.threading.enabled";

public final static String MONITOR_EVENTS_CAPTURE_ENABLED = "monitor.events.capture.enabled";

public final static String HYBRID_PUSH_PULL_ENABLED = "hybrid.push.pull.enabled";

public final static String HYBRID_PUSH_PULL_TIMEOUT = "hybrid.push.pull.timeout";

public static Map<String, ParameterMetaData> getParameterMetaData() {
return parameterMetaData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ public class NodeCommunication implements Serializable {
private static final long serialVersionUID = 1L;

public enum CommunicationType {
PULL, PUSH, FILE_PUSH, FILE_PULL, OFFLN_PULL, OFFLN_PUSH, EXTRACT
PULL, PUSH, FILE_PUSH, FILE_PULL, OFFLN_PULL, OFFLN_PUSH, EXTRACT;

public static boolean isPullType(CommunicationType communicationType) {
return communicationType == PULL || communicationType == CommunicationType.FILE_PULL || communicationType == OFFLN_PULL;
}
};

private transient Node node;
Expand Down Expand Up @@ -58,6 +62,10 @@ public enum CommunicationType {
private long totalSuccessMillis;

private long totalFailMillis;

private long batchToSendCount;

private int nodePriority;

public String getNodeId() {
return nodeId;
Expand Down Expand Up @@ -163,6 +171,22 @@ public Date getLastLockTime() {
return lastLockTime;
}

public long getBatchToSendCount() {
return batchToSendCount;
}

public void setBatchToSendCount(long batchToSendCount) {
this.batchToSendCount = batchToSendCount;
}

public int getNodePriority() {
return nodePriority;
}

public void setNodePriority(int nodePriority) {
this.nodePriority = nodePriority;
}

public void setNode(Node node) {
this.node = node;
}
Expand Down Expand Up @@ -195,4 +219,9 @@ public String getIdentifier() {
return getNodeId() + "-" + getQueue();
}

@Override
public String toString() {
return "NodeCommunication [nodeId=" + nodeId + ", queue=" + queue + ", communicationType=" + communicationType + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ClusterConstants {
public static final String PURGE_OUTGOING = "PURGE_OUTGOING";
public static final String PURGE_INCOMING = "PURGE_INCOMING";
public static final String PURGE_STATISTICS = "PURGE_STATISTICS";
public static final String REPORT_STATUS = "REPORT_STATUS";
public static final String PURGE_DATA_GAPS = "PURGE_DATA_GAPS";
public static final String HEARTBEAT = "HEARTBEAT";
public static final String INITIAL_LOAD_EXTRACT = "INITIAL_LOAD_EXTRACT";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ public interface IOutgoingBatchService {

public int countOutgoingBatchesInError(String channelId);

public int countOutgoingBatchesUnsent(String channelId);
public int countOutgoingBatchesUnsent(String channelId);

public int countOutgoingBatchesPending(String nodeId);

public List<OutgoingBatchSummary> findOutgoingBatchSummary(OutgoingBatch.Status ... statuses);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import org.jumpmind.symmetric.service.IExtensionService;
import org.jumpmind.symmetric.service.IIncomingBatchService;
import org.jumpmind.symmetric.service.ILoadFilterService;
import org.jumpmind.symmetric.service.INodeCommunicationService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
import org.jumpmind.symmetric.service.ITransformService;
Expand Down Expand Up @@ -156,6 +157,8 @@ public class DataLoaderService extends AbstractService implements IDataLoaderSer
private IStagingManager stagingManager;

private IExtensionService extensionService;

private INodeCommunicationService nodeCommunicationService;

private Map<NodeGroupLink, List<ConflictNodeGroupLink>> conflictSettingsCache = new HashMap<NodeGroupLink, List<ConflictNodeGroupLink>>();

Expand All @@ -181,6 +184,7 @@ public DataLoaderService(ISymmetricEngine engine) {
extensionService = engine.getExtensionService();
extensionService.addExtensionPoint(new DefaultDataLoaderFactory(parameterService));
extensionService.addExtensionPoint(new ConfigurationChangedDatabaseWriterFilter(engine));
this.nodeCommunicationService = engine.getNodeCommunicationService();
this.engine = engine;
}

Expand Down Expand Up @@ -307,6 +311,9 @@ public void loadDataFromPull(Node remote, RemoteNodeStatus status) throws IOExce
} else {
processInfo.setStatus(ProcessInfo.Status.OK);
}

updateBatchToSendCount(remote, transport);

} catch (RuntimeException e) {
processInfo.setStatus(ProcessInfo.Status.ERROR);
throw e;
Expand Down Expand Up @@ -337,6 +344,14 @@ public void loadDataFromPull(Node remote, RemoteNodeStatus status) throws IOExce
}
}

protected void updateBatchToSendCount(Node remote, IIncomingTransport transport) {
Map<String, String> headers = transport.getHeaders();
if (headers != null && headers.containsKey(WebConstants.BATCH_TO_SEND_COUNT)) {
// TODO save this batch to send to node communication... Figure out how queues fit in.
int batchToSendCount = Integer.parseInt(headers.get(WebConstants.BATCH_TO_SEND_COUNT));
}
}

private boolean containsError(List<IncomingBatch> list) {
for (IncomingBatch incomingBatch : list) {
if (incomingBatch.getStatus() == Status.ER) {
Expand Down
Loading

0 comments on commit acc6775

Please sign in to comment.