diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/StatisticFlushJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/StatisticFlushJob.java index 2bc66e6806..0f48679412 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/StatisticFlushJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/StatisticFlushJob.java @@ -39,7 +39,7 @@ public StatisticFlushJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSc @Override public void doJob(boolean force) throws Exception { engine.getStatisticManager().flush(); - engine.getPurgeService().purgeStats(); + engine.getPurgeService().purgeStats(force); } public String getClusterLockName() { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java index f77443baa2..63beec7dfa 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java @@ -229,12 +229,12 @@ protected void init() { this.nodeService = new NodeService(parameterService, symmetricDialect); this.configurationService = new ConfigurationService(parameterService, symmetricDialect, nodeService); + this.clusterService = new ClusterService(parameterService, symmetricDialect); this.statisticService = new StatisticService(parameterService, symmetricDialect); this.statisticManager = new StatisticManager(parameterService, nodeService, - configurationService, statisticService); + configurationService, statisticService, clusterService); this.concurrentConnectionManager = new ConcurrentConnectionManager(parameterService, statisticManager); - this.clusterService = new ClusterService(parameterService, symmetricDialect); this.purgeService = new PurgeService(parameterService, symmetricDialect, clusterService, statisticManager); this.transformService = new TransformService(parameterService, symmetricDialect, @@ -244,13 +244,13 @@ protected void init() { this.triggerRouterService = new TriggerRouterService(parameterService, symmetricDialect, clusterService, configurationService, statisticManager); this.outgoingBatchService = new OutgoingBatchService(parameterService, symmetricDialect, - nodeService, configurationService, sequenceService); + nodeService, configurationService, sequenceService, clusterService); this.dataService = new DataService(this); this.routerService = buildRouterService(); this.dataExtractorService = new DataExtractorService(parameterService, symmetricDialect, outgoingBatchService, routerService, configurationService, triggerRouterService, nodeService, dataService, transformService, statisticManager, stagingManager); - this.incomingBatchService = new IncomingBatchService(parameterService, symmetricDialect); + this.incomingBatchService = new IncomingBatchService(parameterService, symmetricDialect, clusterService); this.transportManager = new TransportManagerFactory(this).create(); this.dataLoaderService = new DataLoaderService(this); this.registrationService = new RegistrationService(parameterService, symmetricDialect, @@ -258,7 +258,7 @@ protected void init() { transportManager, statisticManager); this.acknowledgeService = new AcknowledgeService(parameterService, symmetricDialect, outgoingBatchService, registrationService, stagingManager); - this.nodeCommunicationService = buildNodeCommunicationService(); + this.nodeCommunicationService = buildNodeCommunicationService(clusterService, nodeService, parameterService, symmetricDialect); this.pushService = new PushService(parameterService, symmetricDialect, dataExtractorService, acknowledgeService, transportManager, nodeService, clusterService, nodeCommunicationService); @@ -284,8 +284,8 @@ protected IRouterService buildRouterService() { return new RouterService(this); } - protected INodeCommunicationService buildNodeCommunicationService() { - return new NodeCommunicationService(nodeService, parameterService, symmetricDialect); + protected INodeCommunicationService buildNodeCommunicationService(IClusterService clusterService, INodeService nodeService, IParameterService parameterService, ISymmetricDialect symmetricDialect) { + return new NodeCommunicationService(clusterService, nodeService, parameterService, symmetricDialect); } public static ISecurityService createSecurityService(TypedProperties properties) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index d6e4f096f1..93e17851c4 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -179,6 +179,7 @@ private ParameterConstants() { public final static String NODE_ID_CREATOR_SCRIPT = "node.id.creator.script"; public final static String NODE_ID_CREATOR_MAX_NODES = "node.id.creator.max.nodes"; + public final static String CLUSTER_SERVER_ID = "cluster.server.id"; public final static String CLUSTER_LOCKING_ENABLED = "cluster.lock.enabled"; public final static String CLUSTER_LOCK_TIMEOUT_MS = "cluster.lock.timeout.ms"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IClusterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IClusterService.java index 72ffe0b64a..2fd71f7c60 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IClusterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IClusterService.java @@ -30,7 +30,7 @@ */ public interface IClusterService { - public void init(); + public void init(); public void initLockTable(final String action); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPurgeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPurgeService.java index 17d4168678..11e048e160 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPurgeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPurgeService.java @@ -44,6 +44,6 @@ public interface IPurgeService { public void purgeAllIncomingEventsForNode(String nodeId); - public void purgeStats(); + public void purgeStats(boolean force); } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ClusterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ClusterService.java index 207365d8ab..fc439f1137 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ClusterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ClusterService.java @@ -38,6 +38,7 @@ import java.util.HashMap; import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.DateUtils; import org.jumpmind.db.sql.ISqlRowMapper; import org.jumpmind.db.sql.Row; @@ -54,7 +55,7 @@ */ public class ClusterService extends AbstractService implements IClusterService { - private String serverId = AppUtils.getServerId(); + private String serverId = null; public ClusterService(IParameterService parameterService, ISymmetricDialect dialect) { super(parameterService, dialect); @@ -96,7 +97,7 @@ public boolean lock(final String action) { if (isClusteringEnabled()) { final Date timeout = DateUtils.add(new Date(), Calendar.MILLISECOND, (int) -parameterService.getLong(ParameterConstants.CLUSTER_LOCK_TIMEOUT_MS)); - return lock(action, timeout, new Date(), serverId); + return lock(action, timeout, new Date(), getServerId()); } else { return true; } @@ -127,14 +128,36 @@ public Lock mapRow(Row rs) { return locks; } + /** + * Get a unique identifier that represents the JVM instance this server is + * currently running in. + */ public String getServerId() { + if (StringUtils.isBlank(serverId)) { + serverId = parameterService.getString(ParameterConstants.CLUSTER_SERVER_ID); + if (StringUtils.isBlank(serverId)) { + serverId = System.getProperty("runtime.symmetric.cluster.server.id", null); + if (StringUtils.isBlank(serverId)) { + // JBoss uses this system property to identify a server in a + // cluster + serverId = System.getProperty("bind.address", null); + if (StringUtils.isBlank(serverId)) { + try { + serverId = AppUtils.getHostName(); + } catch (Exception ex) { + serverId = "unknown"; + } + } + } + } + } return serverId; } public void unlock(final String action) { if (isClusteringEnabled()) { - if (!unlock(action, serverId)) { - log.warn("Failed to release lock for action:{} server:{}", action, serverId); + if (!unlock(action, getServerId())) { + log.warn("Failed to release lock for action:{} server:{}", action, getServerId()); } } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java index 0dd6b06983..91ce8df175 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; -import org.apache.commons.lang.StringUtils; import org.jumpmind.db.sql.ISqlRowMapper; import org.jumpmind.db.sql.Row; import org.jumpmind.db.sql.UniqueKeyException; @@ -36,9 +35,9 @@ import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.model.IncomingBatch; import org.jumpmind.symmetric.model.IncomingBatch.Status; +import org.jumpmind.symmetric.service.IClusterService; import org.jumpmind.symmetric.service.IIncomingBatchService; import org.jumpmind.symmetric.service.IParameterService; -import org.jumpmind.util.AppUtils; import org.jumpmind.util.FormatUtils; /** @@ -46,9 +45,12 @@ */ public class IncomingBatchService extends AbstractService implements IIncomingBatchService { + protected IClusterService clusterService; + public IncomingBatchService(IParameterService parameterService, - ISymmetricDialect symmetricDialect) { + ISymmetricDialect symmetricDialect, IClusterService clusterService) { super(parameterService, symmetricDialect); + this.clusterService = clusterService; setSqlMap(new IncomingBatchServiceSqlMap(symmetricDialect.getPlatform(), createSqlReplacementTokens())); } @@ -189,7 +191,7 @@ public boolean acquireIncomingBatch(IncomingBatch batch) { public void insertIncomingBatch(IncomingBatch batch) { if (batch.isPersistable()) { - batch.setLastUpdatedHostName(AppUtils.getServerId()); + batch.setLastUpdatedHostName(clusterService.getServerId()); batch.setLastUpdatedTime(new Date()); sqlTemplate.update( getSql("insertIncomingBatchSql"), @@ -226,7 +228,7 @@ public int updateIncomingBatch(IncomingBatch batch) { } else if (batch.getStatus() == IncomingBatch.Status.OK) { batch.setErrorFlag(false); } - batch.setLastUpdatedHostName(AppUtils.getServerId()); + batch.setLastUpdatedHostName(clusterService.getServerId()); batch.setLastUpdatedTime(new Date()); count = sqlTemplate.update( getSql("updateIncomingBatchSql"), diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeCommunicationService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeCommunicationService.java index 30aef7c5c1..ad5b32b462 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeCommunicationService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeCommunicationService.java @@ -25,35 +25,36 @@ import org.jumpmind.symmetric.model.NodeCommunication.CommunicationType; import org.jumpmind.symmetric.model.RemoteNodeStatus; import org.jumpmind.symmetric.model.RemoteNodeStatuses; +import org.jumpmind.symmetric.service.IClusterService; import org.jumpmind.symmetric.service.INodeCommunicationService; import org.jumpmind.symmetric.service.INodeService; import org.jumpmind.symmetric.service.IParameterService; -import org.jumpmind.util.AppUtils; public class NodeCommunicationService extends AbstractService implements INodeCommunicationService { - private String serverId = AppUtils.getServerId(); - private Map executors = new HashMap(); private INodeService nodeService; + + private IClusterService clusterService; private boolean initialized = false; - public NodeCommunicationService(INodeService nodeService, IParameterService parameterService, + public NodeCommunicationService(IClusterService clusterService, INodeService nodeService, IParameterService parameterService, ISymmetricDialect symmetricDialect) { super(parameterService, symmetricDialect); setSqlMap(new NodeCommunicationServiceSqlMap(symmetricDialect.getPlatform(), createSqlReplacementTokens())); + this.clusterService = clusterService; this.nodeService = nodeService; } private final void initialize() { if (!initialized) { try { - int locksCleared = sqlTemplate.update(getSql("clearLocksOnRestartSql"), serverId); + int locksCleared = sqlTemplate.update(getSql("clearLocksOnRestartSql"), clusterService.getServerId()); if (locksCleared > 0) { - log.info("Cleared {} node communication locks for {}", locksCleared, serverId); + log.info("Cleared {} node communication locks for {}", locksCleared, clusterService.getServerId()); } } finally { initialized = true; @@ -219,12 +220,12 @@ public boolean execute(final NodeCommunication nodeCommunication, RemoteNodeStat final INodeCommunicationExecutor executor) { Date now = new Date(); Date lockTimeout = getLockTimeoutDate(nodeCommunication.getCommunicationType()); - boolean locked = sqlTemplate.update(getSql("aquireLockSql"), serverId, now, now, + boolean locked = sqlTemplate.update(getSql("aquireLockSql"), clusterService.getServerId(), now, now, nodeCommunication.getNodeId(), nodeCommunication.getCommunicationType().name(), - lockTimeout, serverId) == 1; + lockTimeout, clusterService.getServerId()) == 1; if (locked) { nodeCommunication.setLastLockTime(now); - nodeCommunication.setLockingServerId(serverId); + nodeCommunication.setLockingServerId(clusterService.getServerId()); final RemoteNodeStatus status = statuses.add(nodeCommunication.getNode()); ThreadPoolExecutor service = getExecutor(nodeCommunication.getCommunicationType()); service.execute(new Runnable() { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index ad52dbe0d1..8f7ffd1719 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -44,12 +44,12 @@ import org.jumpmind.symmetric.model.OutgoingBatch.Status; import org.jumpmind.symmetric.model.OutgoingBatchSummary; import org.jumpmind.symmetric.model.OutgoingBatches; +import org.jumpmind.symmetric.service.IClusterService; import org.jumpmind.symmetric.service.IConfigurationService; import org.jumpmind.symmetric.service.INodeService; import org.jumpmind.symmetric.service.IOutgoingBatchService; import org.jumpmind.symmetric.service.IParameterService; import org.jumpmind.symmetric.service.ISequenceService; -import org.jumpmind.util.AppUtils; import org.jumpmind.util.FormatUtils; /** @@ -62,14 +62,17 @@ public class OutgoingBatchService extends AbstractService implements IOutgoingBa private IConfigurationService configurationService; private ISequenceService sequenceService; + + private IClusterService clusterService; public OutgoingBatchService(IParameterService parameterService, ISymmetricDialect symmetricDialect, INodeService nodeService, - IConfigurationService configurationService, ISequenceService sequenceService) { + IConfigurationService configurationService, ISequenceService sequenceService, IClusterService clusterService) { super(parameterService, symmetricDialect); this.nodeService = nodeService; this.configurationService = configurationService; this.sequenceService = sequenceService; + this.clusterService = clusterService; setSqlMap(new OutgoingBatchServiceSqlMap(symmetricDialect.getPlatform(), createSqlReplacementTokens())); } @@ -99,7 +102,7 @@ public void updateOutgoingBatches(List outgoingBatches) { public void updateOutgoingBatch(OutgoingBatch outgoingBatch) { outgoingBatch.setLastUpdatedTime(new Date()); - outgoingBatch.setLastUpdatedHostName(AppUtils.getServerId()); + outgoingBatch.setLastUpdatedHostName(clusterService.getServerId()); sqlTemplate.update( getSql("updateOutgoingBatchSql"), new Object[] { outgoingBatch.getStatus().name(), @@ -137,7 +140,7 @@ public void insertOutgoingBatch(final OutgoingBatch outgoingBatch) { } public void insertOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgoingBatch) { - outgoingBatch.setLastUpdatedHostName(AppUtils.getServerId()); + outgoingBatch.setLastUpdatedHostName(clusterService.getServerId()); long batchId = outgoingBatch.getBatchId(); if (batchId <= 0) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java index 42c6c9b32d..2787d41550 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java @@ -330,15 +330,26 @@ private int purgeByNodeBatchRangeList(List nodeBatchRangeList) { return totalCount; } - public void purgeStats() { + public void purgeStats(boolean force) { Calendar retentionCutoff = Calendar.getInstance(); retentionCutoff.add(Calendar.MINUTE, -parameterService.getInt(ParameterConstants.PURGE_STATS_RETENTION_MINUTES)); - int purgedCount = sqlTemplate.update(getSql("purgeNodeHostChannelStatsSql"), retentionCutoff.getTime()); - purgedCount += sqlTemplate.update(getSql("purgeNodeHostStatsSql"), retentionCutoff.getTime()); - purgedCount += sqlTemplate.update(getSql("purgeNodeHostJobStatsSql"), retentionCutoff.getTime()); - if (purgedCount > 0) { - log.info("{} stats rows were purged", purgedCount); + if (force || clusterService.lock(ClusterConstants.PURGE_STATISTICS)) { + try { + int purgedCount = sqlTemplate.update(getSql("purgeNodeHostChannelStatsSql"), + retentionCutoff.getTime()); + purgedCount += sqlTemplate.update(getSql("purgeNodeHostStatsSql"), + retentionCutoff.getTime()); + purgedCount += sqlTemplate.update(getSql("purgeNodeHostJobStatsSql"), + retentionCutoff.getTime()); + if (purgedCount > 0) { + log.info("{} stats rows were purged", purgedCount); + } + } finally { + if (!force) { + clusterService.unlock(ClusterConstants.PURGE_STATISTICS); + } + } } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java index 7eb20668b4..57699ef15d 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java @@ -30,11 +30,11 @@ import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.NodeChannel; +import org.jumpmind.symmetric.service.IClusterService; import org.jumpmind.symmetric.service.IConfigurationService; import org.jumpmind.symmetric.service.INodeService; import org.jumpmind.symmetric.service.IParameterService; import org.jumpmind.symmetric.service.IStatisticService; -import org.jumpmind.util.AppUtils; /** * @see IStatisticManager @@ -56,6 +56,8 @@ public class StatisticManager implements IStatisticManager { protected IParameterService parameterService; protected IConfigurationService configurationService; + + protected IClusterService clusterService; private static final int NUMBER_OF_PERMITS = 1000; @@ -66,11 +68,12 @@ public class StatisticManager implements IStatisticManager { Semaphore jobStatsLock = new Semaphore(NUMBER_OF_PERMITS, true); public StatisticManager(IParameterService parameterService, INodeService nodeService, - IConfigurationService configurationService, IStatisticService statisticsService) { + IConfigurationService configurationService, IStatisticService statisticsService, IClusterService clusterService) { this.parameterService = parameterService; this.nodeService = nodeService; this.configurationService = configurationService; this.statisticService = statisticsService; + this.clusterService = clusterService; } protected void init() { @@ -397,7 +400,7 @@ public void flush() { Node node = nodeService.getCachedIdentity(); if (node != null) { String nodeId = node.getNodeId(); - String serverId = AppUtils.getServerId(); + String serverId = clusterService.getServerId(); for (JobStats stats : toFlush) { stats.setNodeId(nodeId); stats.setHostName(serverId); @@ -444,11 +447,11 @@ protected ChannelStats getChannelStats(String channelId) { if (stats == null) { Node node = nodeService.getCachedIdentity(); if (node != null) { - stats = new ChannelStats(node.getNodeId(), AppUtils.getServerId(), new Date(), + stats = new ChannelStats(node.getNodeId(), clusterService.getServerId(), new Date(), null, channelId); channelStats.put(channelId, stats); } else { - stats = new ChannelStats(UNKNOWN, AppUtils.getServerId(), new Date(), null, + stats = new ChannelStats(UNKNOWN, clusterService.getServerId(), new Date(), null, channelId); } @@ -460,10 +463,10 @@ protected HostStats getHostStats() { if (hostStats == null) { Node node = nodeService.getCachedIdentity(); if (node != null) { - hostStats = new HostStats(node.getNodeId(), AppUtils.getServerId(), new Date(), + hostStats = new HostStats(node.getNodeId(), clusterService.getServerId(), new Date(), null); } else { - hostStats = new HostStats(UNKNOWN, AppUtils.getServerId(), new Date(), null); + hostStats = new HostStats(UNKNOWN, clusterService.getServerId(), new Date(), null); } } diff --git a/symmetric-util/src/main/java/org/jumpmind/util/AppUtils.java b/symmetric-util/src/main/java/org/jumpmind/util/AppUtils.java index 8f511ef4d1..f695ce5ae1 100644 --- a/symmetric-util/src/main/java/org/jumpmind/util/AppUtils.java +++ b/symmetric-util/src/main/java/org/jumpmind/util/AppUtils.java @@ -30,7 +30,6 @@ import java.util.Enumeration; import java.util.TimeZone; -import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.FastDateFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,33 +43,8 @@ public class AppUtils { private static Logger log = LoggerFactory.getLogger(AppUtils.class); - private static String serverId; - private static FastDateFormat timezoneFormatter = FastDateFormat.getInstance("Z"); - /** - * Get a unique identifier that represents the JVM instance this server is - * currently running in. - */ - public static String getServerId() { - if (StringUtils.isBlank(serverId)) { - serverId = System.getProperty("runtime.symmetric.cluster.server.id", null); - if (StringUtils.isBlank(serverId)) { - // JBoss uses this system property to identify a server in a - // cluster - serverId = System.getProperty("bind.address", null); - if (StringUtils.isBlank(serverId)) { - try { - serverId = getHostName(); - } catch (Exception ex) { - serverId = "unknown"; - } - } - } - } - return serverId; - } - public static String getHostName() { String hostName = System.getProperty("host.name", UNKNOWN); if (UNKNOWN.equals(hostName)) {