Skip to content

Commit

Permalink
SYMMETRICDS-342 AND SYMMETRICDS-388
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Mar 8, 2011
1 parent ee90ba7 commit d60d8a3
Show file tree
Hide file tree
Showing 48 changed files with 660 additions and 229 deletions.
Expand Up @@ -42,6 +42,7 @@
import org.jumpmind.symmetric.job.IJobManager;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeStatus;
import org.jumpmind.symmetric.model.RemoteNodeStatuses;
import org.jumpmind.symmetric.service.IAcknowledgeService;
import org.jumpmind.symmetric.service.IBandwidthService;
import org.jumpmind.symmetric.service.IClusterService;
Expand Down Expand Up @@ -312,7 +313,7 @@ public String sendSQL(String nodeId, String catalogName, String schemaName, Stri
return getDataService().sendSQL(nodeId, catalogName, schemaName, tableName, sql, false);
}

public boolean push() {
public RemoteNodeStatuses push() {
return getPushService().pushData();
}

Expand All @@ -324,7 +325,7 @@ public NodeStatus getNodeStatus() {
return getNodeService().getNodeStatus();
}

public boolean pull() {
public RemoteNodeStatuses pull() {
return getPullService().pullData();
}

Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.jumpmind.symmetric.job.PushJob;
import org.jumpmind.symmetric.job.RouterJob;
import org.jumpmind.symmetric.model.NodeStatus;
import org.jumpmind.symmetric.model.RemoteNodeStatuses;
import org.jumpmind.symmetric.service.IAcknowledgeService;
import org.jumpmind.symmetric.service.IBandwidthService;
import org.jumpmind.symmetric.service.IClusterService;
Expand Down Expand Up @@ -101,9 +102,9 @@ public interface ISymmetricEngine {
* Will perform a push the same way the {@link PushJob} would have.
*
* @see IPushService#pushData()
* @return true if data was pushed successfully
* @return {@link RemoteNodeStatuses}
*/
public boolean push();
public RemoteNodeStatuses push();

/**
* Call this to resync triggers
Expand All @@ -123,8 +124,9 @@ public interface ISymmetricEngine {
* Will perform a pull the same way the {@link PullJob} would have.
*
* @see IPullService#pullData()
* @return {@link RemoteNodeStatuses}
*/
public boolean pull();
public RemoteNodeStatuses pull();

/**
* Route captured data the same way the {@link RouterJob} would have.
Expand Down
Expand Up @@ -152,6 +152,8 @@ private ParameterConstants() {
public final static String WEB_BATCH_SERVLET_ENABLE = "web.batch.servlet.enable";

public final static String OFFLINE_NODE_DETECTION_PERIOD_MINUTES = "offline.node.detection.period.minutes";
public final static String HEARTBEAT_SYNC_ON_PUSH_PERIOD_SEC ="heartbeat.sync.on.push.period.sec";
public final static String HEARTBEAT_SYNC_ON_PUSH_PERIOD_SEC ="heartbeat.sync.on.push.period.sec";

public final static String STATISTIC_RECORD_ENABLE = "statistic.record.enable";

}
Expand Down
Expand Up @@ -45,6 +45,9 @@ public void busy(Node remoteNode) {

public void notAuthenticated(Node remoteNode) {
log.warn("AuthenticationFailed");
}

public void unknownError(Node remoteNode, Exception ex) {
}

public void offline(Node remoteNode) {
Expand Down
Expand Up @@ -56,5 +56,7 @@ public interface IOfflineClientListener extends IExtensionPoint {
* @param remoteNode
*/
public void registrationRequired(Node remoteNode);

public void unknownError(Node remoteNode, Exception ex);

}
Expand Down
Expand Up @@ -27,11 +27,13 @@
import org.apache.commons.lang.StringUtils;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.StandaloneSymmetricEngine;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.common.logging.ILog;
import org.jumpmind.symmetric.common.logging.LogFactory;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IRegistrationService;
import org.jumpmind.symmetric.statistic.IStatisticManager;
import org.jumpmind.symmetric.util.RandomTimeSlot;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.jmx.export.annotation.ManagedAttribute;
Expand Down Expand Up @@ -64,7 +66,9 @@ abstract public class AbstractJob implements Runnable, BeanNameAware, IJob {

private long lastExecutionTimeInMs;

private long totalExecutionTimeInMs;
private long totalExecutionTimeInMs;

private long lastExecutionProcessCount = 0;

private long numberOfRuns;

Expand All @@ -82,9 +86,11 @@ abstract public class AbstractJob implements Runnable, BeanNameAware, IJob {

private ScheduledFuture<?> scheduledJob;

private RandomTimeSlot randomTimeSlot;
private RandomTimeSlot randomTimeSlot;

private boolean autoStartConfigured;

private boolean autoStartConfigured;
private IStatisticManager statisticManager;

protected void init() {
this.autoStartConfigured = parameterService.is(autoStartParameterName);
Expand Down Expand Up @@ -155,23 +161,29 @@ public boolean invoke(boolean force) {
running = true;
synchronized (this) {
ran = true;
long ts = System.currentTimeMillis();
long startTime = System.currentTimeMillis();
long processCount = 0;
try {
if (!requiresRegistration
|| (requiresRegistration && registrationService
.isRegisteredWithServer())) {
hasNotRegisteredMessageBeenLogged = false;
doJob();
processCount = doJob();
} else {
if (!hasNotRegisteredMessageBeenLogged) {
log.warn("SymmetricEngineNotRegistered", getName());
hasNotRegisteredMessageBeenLogged = true;
}
}
} finally {
lastFinishTime = new Date();
lastExecutionTimeInMs = System.currentTimeMillis() - ts;
totalExecutionTimeInMs += lastExecutionTimeInMs;
lastFinishTime = new Date();
lastExecutionProcessCount = processCount;
long endTime = System.currentTimeMillis();
lastExecutionTimeInMs = endTime - startTime;
totalExecutionTimeInMs += lastExecutionTimeInMs;
if (lastExecutionProcessCount > 0 || lastExecutionTimeInMs > Constants.LONG_OPERATION_THRESHOLD) {
statisticManager.addJobStats(jobName, startTime, endTime, lastExecutionProcessCount);
}
numberOfRuns++;
running = false;
}
Expand All @@ -196,7 +208,7 @@ public void run() {
invoke(false);
}

abstract void doJob() throws Exception;
abstract long doJob() throws Exception;

public void setBeanName(final String beanName) {
this.jobName = beanName;
Expand Down Expand Up @@ -245,6 +257,11 @@ public boolean isStarted() {
@ManagedMetric(description = "The amount of time this job spent in execution during it's last run")
public long getLastExecutionTimeInMs() {
return lastExecutionTimeInMs;
}

@ManagedMetric(description = "The count of elements this job processed during it's last run")
public long getLastExecutionProcessCount() {
return lastExecutionProcessCount;
}

@ManagedAttribute(description = "The last time this job completed execution")
Expand Down Expand Up @@ -304,5 +321,9 @@ public void setAutoStartParameterName(String autoStartParameterName) {

public void setRandomTimeSlot(RandomTimeSlot randomTimeSlot) {
this.randomTimeSlot = randomTimeSlot;
}

public void setStatisticManager(IStatisticManager statisticManager) {
this.statisticManager = statisticManager;
}
}
Expand Down
Expand Up @@ -26,7 +26,7 @@
import org.jumpmind.symmetric.service.IDataService;

/**
*
* Background job that is responsible for updating this node's heart beat time.
*/
public class HeartbeatJob extends AbstractJob {

Expand All @@ -35,13 +35,16 @@ public class HeartbeatJob extends AbstractJob {
private IClusterService clusterService;

@Override
public void doJob() throws Exception {
public long doJob() throws Exception {
if (clusterService.lock(ClusterConstants.HEARTBEAT)) {
try {
dataService.heartbeat(false);
dataService.heartbeat(false);
return -1l;
} finally {
clusterService.unlock(ClusterConstants.HEARTBEAT);
}
} else {
return -1l;
}
}

Expand Down
Expand Up @@ -17,15 +17,14 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License. */


package org.jumpmind.symmetric.job;

import org.jumpmind.symmetric.model.RemoteNodeStatuses;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IPullService;

/**
*
* Background job that pulls data from remote nodes and then loads it.
*/
public class PullJob extends AbstractJob {

Expand All @@ -34,14 +33,16 @@ public class PullJob extends AbstractJob {
private INodeService nodeService;

@Override
public void doJob() throws Exception {
boolean dataPulled = pullService.pullData();
public long doJob() throws Exception {
RemoteNodeStatuses statuses = pullService.pullData();

// Re-pull immediately if we are in the middle of an initial load
// so that the initial load completes as quickly as possible.
while (nodeService.isDataLoadStarted() && dataPulled) {
dataPulled = pullService.pullData();
}
while (nodeService.isDataLoadStarted() && statuses.wasDataProcessed()) {
statuses = pullService.pullData();
}

return statuses.getDataProcessedCount();
}

public void setPullService(IPullService service) {
Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.jumpmind.symmetric.service.IPurgeService;

/**
*
* Background job that is responsible for purging already synchronized data
*/
public class PurgeJob extends AbstractJob {

Expand All @@ -34,8 +34,8 @@ public PurgeJob() {
}

@Override
public void doJob() throws Exception {
purgeService.purge();
public long doJob() throws Exception {
return purgeService.purge();
}

public void setPurgeService(IPurgeService service) {
Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.jumpmind.symmetric.service.IPushService;

/**
*
* Background job that is responsible for pushing data to linked nodes.
*/
public class PushJob extends AbstractJob {

Expand All @@ -38,8 +38,8 @@ public void setPushService(IPushService service) {
}

@Override
public void doJob() throws Exception {
pushService.pushData();
public long doJob() throws Exception {
return pushService.pushData().getDataProcessedCount();
}

}
Expand Down
Expand Up @@ -33,8 +33,8 @@ public class RouterJob extends AbstractJob {
IRouterService routingService;

@Override
void doJob() throws Exception {
routingService.routeData();
long doJob() throws Exception {
return routingService.routeData();
}

public void setRoutingService(IRouterService routingService) {
Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.jumpmind.symmetric.statistic.IStatisticManager;

/**
*
* Background job that is responsible for writing statistics to database tables.
*/
public class StatisticFlushJob extends AbstractJob {

Expand All @@ -34,8 +34,9 @@ public StatisticFlushJob() {
}

@Override
public void doJob() throws Exception {
statisticManager.flush();
public long doJob() throws Exception {
statisticManager.flush();
return -1l;
}

public void setStatisticManager(IStatisticManager statisticManager) {
Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.jumpmind.symmetric.service.ITriggerRouterService;

/**
*
* Background job that checks to see if triggers need to be regenerated.
*/
public class SyncTriggersJob extends AbstractJob {

Expand All @@ -34,8 +34,9 @@ public SyncTriggersJob() {
}

@Override
public void doJob() throws Exception {
triggerRouterService.syncTriggers();
public long doJob() throws Exception {
triggerRouterService.syncTriggers();
return -1l;
}

public void setTriggerRouterService(ITriggerRouterService triggerService) {
Expand Down
Expand Up @@ -24,15 +24,17 @@
import org.jumpmind.symmetric.service.INodeService;

/**
*
* Background job that is responsible for checking on node health. It will disable nodes that
* have been offline for a configurable period of time.
*/
public class WatchdogJob extends AbstractJob {

private INodeService nodeService;

@Override
public void doJob() throws Exception {
nodeService.checkForOfflineNodes();
public long doJob() throws Exception {
nodeService.checkForOfflineNodes();
return -1l;
}

public void setNodeService(INodeService nodeService) {
Expand Down
Expand Up @@ -382,5 +382,9 @@ public Date getCreateTime() {
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}

public long totalEventCount() {
return insertEventCount + updateEventCount + deleteEventCount + otherEventCount;
}

}

0 comments on commit d60d8a3

Please sign in to comment.