Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
0001575: Offline data synchronization
  • Loading branch information
erilong committed Aug 13, 2015
1 parent 39f8fec commit fcabdf8
Show file tree
Hide file tree
Showing 22 changed files with 1,124 additions and 10 deletions.
Expand Up @@ -50,6 +50,8 @@ public JobManager(ISymmetricEngine engine) {
this.jobs.add(new RouterJob(engine, taskScheduler));
this.jobs.add(new PushJob(engine, taskScheduler));
this.jobs.add(new PullJob(engine, taskScheduler));
this.jobs.add(new OfflinePushJob(engine, taskScheduler));
this.jobs.add(new OfflinePullJob(engine, taskScheduler));
this.jobs.add(new OutgoingPurgeJob(engine, taskScheduler));
this.jobs.add(new IncomingPurgeJob(engine, taskScheduler));
this.jobs.add(new StatisticFlushJob(engine, taskScheduler));
Expand Down
@@ -0,0 +1,46 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.job;

import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

/*
* Background job that pulls data from remote nodes and then loads it.
*/
public class OfflinePullJob extends AbstractJob {

public OfflinePullJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
super("job.offline.pull", false, engine.getParameterService().is("start.offline.pull.job"),
engine, taskScheduler);
}

@Override
public void doJob(boolean force) throws Exception {
engine.getOfflinePullService().pullData(force);
}

public String getClusterLockName() {
return ClusterConstants.OFFLINE_PULL;
}

}
@@ -0,0 +1,48 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.job;

import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

/*
* Background job that is responsible for pushing data to linked nodes.
*/
public class OfflinePushJob extends AbstractJob {

public OfflinePushJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
super("job.offline.push", true, engine.getParameterService().is("start.offline.push.job"), engine,
taskScheduler);
}

@Override
public void doJob(boolean force) throws Exception {
if (engine != null) {
engine.getOfflinePushService().pushData(force).getDataProcessedCount();
}
}

public String getClusterLockName() {
return ClusterConstants.OFFLINE_PUSH;
}

}
Expand Up @@ -81,6 +81,8 @@
import org.jumpmind.symmetric.service.ILoadFilterService;
import org.jumpmind.symmetric.service.INodeCommunicationService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IOfflinePullService;
import org.jumpmind.symmetric.service.IOfflinePushService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IPullService;
Expand All @@ -106,6 +108,8 @@
import org.jumpmind.symmetric.service.impl.LoadFilterService;
import org.jumpmind.symmetric.service.impl.NodeCommunicationService;
import org.jumpmind.symmetric.service.impl.NodeService;
import org.jumpmind.symmetric.service.impl.OfflinePullService;
import org.jumpmind.symmetric.service.impl.OfflinePushService;
import org.jumpmind.symmetric.service.impl.OutgoingBatchService;
import org.jumpmind.symmetric.service.impl.ParameterService;
import org.jumpmind.symmetric.service.impl.PullService;
Expand Down Expand Up @@ -166,6 +170,8 @@ abstract public class AbstractSymmetricEngine implements ISymmetricEngine {
protected IConcurrentConnectionManager concurrentConnectionManager;

protected ITransportManager transportManager;

protected ITransportManager offlineTransportManager;

protected IClusterService clusterService;

Expand Down Expand Up @@ -197,6 +203,10 @@ abstract public class AbstractSymmetricEngine implements ISymmetricEngine {

protected IPullService pullService;

protected IOfflinePushService offlinePushService;

protected IOfflinePullService offlinePullService;

protected IJobManager jobManager;

protected ISequenceService sequenceService;
Expand Down Expand Up @@ -317,6 +327,7 @@ protected void init() {
this.incomingBatchService = new IncomingBatchService(parameterService, symmetricDialect, clusterService);
this.dataExtractorService = new DataExtractorService(this);
this.transportManager = new TransportManagerFactory(this).create();
this.offlineTransportManager = new TransportManagerFactory(this).create(Constants.PROTOCOL_FILE);
this.dataLoaderService = new DataLoaderService(this);
this.registrationService = new RegistrationService(this);
this.acknowledgeService = new AcknowledgeService(this);
Expand All @@ -326,6 +337,12 @@ protected void init() {
this.pullService = new PullService(parameterService, symmetricDialect,
nodeService, dataLoaderService, registrationService, clusterService, nodeCommunicationService,
configurationService, extensionService);
this.offlinePushService = new OfflinePushService(parameterService, symmetricDialect,
dataExtractorService, acknowledgeService, offlineTransportManager, nodeService,
clusterService, nodeCommunicationService, statisticManager, configurationService, extensionService);
this.offlinePullService = new OfflinePullService(parameterService, symmetricDialect,
nodeService, dataLoaderService, clusterService, nodeCommunicationService,
configurationService, extensionService, offlineTransportManager);
this.fileSyncService = new FileSyncService(this);
this.jobManager = createJobManager();

Expand Down Expand Up @@ -967,6 +984,14 @@ public IPushService getPushService() {
return this.pushService;
}

public IOfflinePullService getOfflinePullService() {
return this.offlinePullService;
}

public IOfflinePushService getOfflinePushService() {
return this.offlinePushService;
}

public IRouterService getRouterService() {
return this.routerService;
}
Expand Down Expand Up @@ -1011,6 +1036,10 @@ public ITransportManager getTransportManager() {
return transportManager;
}

public ITransportManager getOfflineTransportManager() {
return offlineTransportManager;
}

public IExtensionService getExtensionService() {
return extensionService;
}
Expand Down
Expand Up @@ -47,6 +47,8 @@
import org.jumpmind.symmetric.service.ILoadFilterService;
import org.jumpmind.symmetric.service.INodeCommunicationService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IOfflinePullService;
import org.jumpmind.symmetric.service.IOfflinePushService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IPullService;
Expand Down Expand Up @@ -251,6 +253,10 @@ public interface ISymmetricEngine {

public IPushService getPushService();

public IOfflinePullService getOfflinePullService();

public IOfflinePushService getOfflinePushService();

public IRouterService getRouterService();

public ISecurityService getSecurityService();
Expand Down
Expand Up @@ -140,6 +140,8 @@ private Constants() {
public static final String PROTOCOL_HTTP = "http";

public static final String PROTOCOL_INTERNAL = "internal";

public static final String PROTOCOL_FILE = "file";

public static final String PROTOCOL_EXT = "ext";

Expand Down
Expand Up @@ -65,6 +65,12 @@ private ParameterConstants() {
public final static String PUSH_MINIMUM_PERIOD_MS = "push.period.minimum.ms";
public final static String PUSH_LOCK_TIMEOUT_MS = "push.lock.timeout.ms";

public final static String OFFLINE_PULL_THREAD_COUNT_PER_SERVER = "offline.pull.thread.per.server.count";
public final static String OFFLINE_PULL_LOCK_TIMEOUT_MS = "offline.pull.lock.timeout.ms";

public final static String OFFLINE_PUSH_THREAD_COUNT_PER_SERVER = "offline.push.thread.per.server.count";
public final static String OFFLINE_PUSH_LOCK_TIMEOUT_MS = "offline.push.lock.timeout.ms";

public final static String FILE_PULL_THREAD_COUNT_PER_SERVER = "file.pull.thread.per.server.count";
public final static String FILE_PULL_MINIMUM_PERIOD_MS = "file.pull.period.minimum.ms";
public final static String FILE_PULL_LOCK_TIMEOUT_MS = "file.pull.lock.timeout.ms";
Expand Down Expand Up @@ -241,6 +247,14 @@ private ParameterConstants() {
public final static String WEB_BATCH_URI_HANDLER_ENABLE = "web.batch.servlet.enable";

public final static String NODE_COPY_MODE_ENABLED = "node.copy.mode.enabled";

public final static String NODE_OFFLINE = "node.offline";
public final static String NODE_OFFLINE_INCOMING_DIR = "node.offline.incoming.dir";
public final static String NODE_OFFLINE_INCOMING_ACCEPT_ALL = "node.offline.incoming.accept.all";
public final static String NODE_OFFLINE_OUTGOING_DIR = "node.offline.outgoing.dir";
public final static String NODE_OFFLINE_ERROR_DIR = "node.offline.error.dir";
public final static String NODE_OFFLINE_ARCHIVE_DIR = "node.offline.archive.dir";

public final static String OFFLINE_NODE_DETECTION_PERIOD_MINUTES = "offline.node.detection.period.minutes";
public final static String HEARTBEAT_SYNC_ON_PUSH_PERIOD_SEC = "heartbeat.sync.on.push.period.sec";
public final static String HEARTBEAT_JOB_PERIOD_MS = "job.heartbeat.period.time.ms";
Expand Down
Expand Up @@ -28,7 +28,7 @@ public class NodeCommunication implements Serializable {
private static final long serialVersionUID = 1L;

public enum CommunicationType {
PULL, PUSH, FILE_PUSH, FILE_PULL, EXTRACT
PULL, PUSH, FILE_PUSH, FILE_PULL, OFFLN_PULL, OFFLN_PUSH, EXTRACT
};

private transient Node node;
Expand Down
Expand Up @@ -27,7 +27,7 @@ public class ProcessInfoKey implements Serializable {
private static final long serialVersionUID = 1L;

public enum ProcessType {
ANY, PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, REST_PULL_HANLDER, ROUTER_JOB, INSERT_LOAD_EVENTS, GAP_DETECT, ROUTER_READER, MANUAL_LOAD, FILE_SYNC_PULL_JOB, FILE_SYNC_PUSH_JOB, FILE_SYNC_PULL_HANDLER, FILE_SYNC_PUSH_HANDLER, INITIAL_LOAD_EXTRACT_JOB;
ANY, PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, REST_PULL_HANLDER, OFFLINE_PUSH, OFFLINE_PULL, ROUTER_JOB, INSERT_LOAD_EVENTS, GAP_DETECT, ROUTER_READER, MANUAL_LOAD, FILE_SYNC_PULL_JOB, FILE_SYNC_PUSH_JOB, FILE_SYNC_PULL_HANDLER, FILE_SYNC_PUSH_HANDLER, INITIAL_LOAD_EXTRACT_JOB;

public String toString() {
switch (this) {
Expand All @@ -43,6 +43,10 @@ public String toString() {
return "Service Database Push";
case PULL_HANDLER:
return "Service Database Pull";
case OFFLINE_PUSH:
return "Offline Push";
case OFFLINE_PULL:
return "Offline Pull";
case ROUTER_JOB:
return "Routing";
case ROUTER_READER:
Expand Down
Expand Up @@ -29,6 +29,8 @@ public class ClusterConstants {
public static final String ROUTE = "ROUTE";
public static final String PUSH = "PUSH";
public static final String PULL = "PULL";
public static final String OFFLINE_PUSH = "OFFLINE_PUSH";
public static final String OFFLINE_PULL = "OFFLINE_PULL";
public static final String REFRESH_CACHE = "REFRESH_CACHE";
public static final String PURGE_OUTGOING = "PURGE_OUTGOING";
public static final String PURGE_INCOMING = "PURGE_INCOMING";
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.model.RemoteNodeStatus;
import org.jumpmind.symmetric.service.impl.DataLoaderService.ConflictNodeGroupLink;
import org.jumpmind.symmetric.transport.IIncomingTransport;

/**
* This service provides an API to load data into a SymmetricDS node's database
Expand All @@ -45,7 +46,9 @@ public interface IDataLoaderService {
public void loadDataFromPull(Node sourceNode, RemoteNodeStatus status) throws IOException;

public void loadDataFromPush(Node sourceNode, InputStream in, OutputStream out) throws IOException;


public List<IncomingBatch> loadDataFromOfflineTransport(Node remote, RemoteNodeStatus status, IIncomingTransport transport) throws IOException;

public List<String> getAvailableDataLoaderFactories();

public List<IncomingBatch> loadDataBatch(String batchData);
Expand Down
@@ -0,0 +1,35 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.service;

import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeGroupLinkAction;
import org.jumpmind.symmetric.model.RemoteNodeStatuses;

/**
* Service API that is responsible for pulling data from the list of configured
* {@link Node}s that are configured to {@link NodeGroupLinkAction#W}
*/
public interface IOfflinePullService {

public RemoteNodeStatuses pullData(boolean force);

}
@@ -0,0 +1,35 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.service;

import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeGroupLinkAction;
import org.jumpmind.symmetric.model.RemoteNodeStatuses;

/**
* Service API that is responsible for pushing data to the list of configured
* {@link Node}s that are configured to {@link NodeGroupLinkAction#P}
*/
public interface IOfflinePushService extends IOfflineDetectorService {

public RemoteNodeStatuses pushData(boolean force);

}

0 comments on commit fcabdf8

Please sign in to comment.