Skip to content

Commit

Permalink
0000884: Documented cluster.server.id property is not supported. Add …
Browse files Browse the repository at this point in the history
…support.

0000882: Stats Purge Job needs to be clustered. Running on multiple nodes can cause deadlocks
  • Loading branch information
chenson42 committed Oct 28, 2012
1 parent 4d4d4c8 commit f4bc7df
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 71 deletions.
Expand Up @@ -39,7 +39,7 @@ public StatisticFlushJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSc
@Override
public void doJob(boolean force) throws Exception {
engine.getStatisticManager().flush();
engine.getPurgeService().purgeStats();
engine.getPurgeService().purgeStats(force);
}

public String getClusterLockName() {
Expand Down
Expand Up @@ -229,12 +229,12 @@ protected void init() {
this.nodeService = new NodeService(parameterService, symmetricDialect);
this.configurationService = new ConfigurationService(parameterService, symmetricDialect,
nodeService);
this.clusterService = new ClusterService(parameterService, symmetricDialect);
this.statisticService = new StatisticService(parameterService, symmetricDialect);
this.statisticManager = new StatisticManager(parameterService, nodeService,
configurationService, statisticService);
configurationService, statisticService, clusterService);
this.concurrentConnectionManager = new ConcurrentConnectionManager(parameterService,
statisticManager);
this.clusterService = new ClusterService(parameterService, symmetricDialect);
this.purgeService = new PurgeService(parameterService, symmetricDialect, clusterService,
statisticManager);
this.transformService = new TransformService(parameterService, symmetricDialect,
Expand All @@ -244,21 +244,21 @@ protected void init() {
this.triggerRouterService = new TriggerRouterService(parameterService, symmetricDialect,
clusterService, configurationService, statisticManager);
this.outgoingBatchService = new OutgoingBatchService(parameterService, symmetricDialect,
nodeService, configurationService, sequenceService);
nodeService, configurationService, sequenceService, clusterService);
this.dataService = new DataService(this);
this.routerService = buildRouterService();
this.dataExtractorService = new DataExtractorService(parameterService, symmetricDialect,
outgoingBatchService, routerService, configurationService, triggerRouterService,
nodeService, dataService, transformService, statisticManager, stagingManager);
this.incomingBatchService = new IncomingBatchService(parameterService, symmetricDialect);
this.incomingBatchService = new IncomingBatchService(parameterService, symmetricDialect, clusterService);
this.transportManager = new TransportManagerFactory(this).create();
this.dataLoaderService = new DataLoaderService(this);
this.registrationService = new RegistrationService(parameterService, symmetricDialect,
nodeService, dataExtractorService, dataService, dataLoaderService,
transportManager, statisticManager);
this.acknowledgeService = new AcknowledgeService(parameterService, symmetricDialect,
outgoingBatchService, registrationService, stagingManager);
this.nodeCommunicationService = buildNodeCommunicationService();
this.nodeCommunicationService = buildNodeCommunicationService(clusterService, nodeService, parameterService, symmetricDialect);
this.pushService = new PushService(parameterService, symmetricDialect,
dataExtractorService, acknowledgeService, transportManager, nodeService,
clusterService, nodeCommunicationService);
Expand All @@ -284,8 +284,8 @@ protected IRouterService buildRouterService() {
return new RouterService(this);
}

protected INodeCommunicationService buildNodeCommunicationService() {
return new NodeCommunicationService(nodeService, parameterService, symmetricDialect);
protected INodeCommunicationService buildNodeCommunicationService(IClusterService clusterService, INodeService nodeService, IParameterService parameterService, ISymmetricDialect symmetricDialect) {
return new NodeCommunicationService(clusterService, nodeService, parameterService, symmetricDialect);
}

public static ISecurityService createSecurityService(TypedProperties properties) {
Expand Down
Expand Up @@ -179,6 +179,7 @@ private ParameterConstants() {
public final static String NODE_ID_CREATOR_SCRIPT = "node.id.creator.script";
public final static String NODE_ID_CREATOR_MAX_NODES = "node.id.creator.max.nodes";

public final static String CLUSTER_SERVER_ID = "cluster.server.id";
public final static String CLUSTER_LOCKING_ENABLED = "cluster.lock.enabled";
public final static String CLUSTER_LOCK_TIMEOUT_MS = "cluster.lock.timeout.ms";

Expand Down
Expand Up @@ -30,7 +30,7 @@
*/
public interface IClusterService {

public void init();
public void init();

public void initLockTable(final String action);

Expand Down
Expand Up @@ -44,6 +44,6 @@ public interface IPurgeService {

public void purgeAllIncomingEventsForNode(String nodeId);

public void purgeStats();
public void purgeStats(boolean force);

}
Expand Down
Expand Up @@ -38,6 +38,7 @@
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateUtils;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.Row;
Expand All @@ -54,7 +55,7 @@
*/
public class ClusterService extends AbstractService implements IClusterService {

private String serverId = AppUtils.getServerId();
private String serverId = null;

public ClusterService(IParameterService parameterService, ISymmetricDialect dialect) {
super(parameterService, dialect);
Expand Down Expand Up @@ -96,7 +97,7 @@ public boolean lock(final String action) {
if (isClusteringEnabled()) {
final Date timeout = DateUtils.add(new Date(), Calendar.MILLISECOND,
(int) -parameterService.getLong(ParameterConstants.CLUSTER_LOCK_TIMEOUT_MS));
return lock(action, timeout, new Date(), serverId);
return lock(action, timeout, new Date(), getServerId());
} else {
return true;
}
Expand Down Expand Up @@ -127,14 +128,36 @@ public Lock mapRow(Row rs) {
return locks;
}

/**
* Get a unique identifier that represents the JVM instance this server is
* currently running in.
*/
public String getServerId() {
if (StringUtils.isBlank(serverId)) {
serverId = parameterService.getString(ParameterConstants.CLUSTER_SERVER_ID);
if (StringUtils.isBlank(serverId)) {
serverId = System.getProperty("runtime.symmetric.cluster.server.id", null);
if (StringUtils.isBlank(serverId)) {
// JBoss uses this system property to identify a server in a
// cluster
serverId = System.getProperty("bind.address", null);
if (StringUtils.isBlank(serverId)) {
try {
serverId = AppUtils.getHostName();
} catch (Exception ex) {
serverId = "unknown";
}
}
}
}
}
return serverId;
}

public void unlock(final String action) {
if (isClusteringEnabled()) {
if (!unlock(action, serverId)) {
log.warn("Failed to release lock for action:{} server:{}", action, serverId);
if (!unlock(action, getServerId())) {
log.warn("Failed to release lock for action:{} server:{}", action, getServerId());
}
}
}
Expand Down
Expand Up @@ -27,7 +27,6 @@
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.Row;
import org.jumpmind.db.sql.UniqueKeyException;
Expand All @@ -36,19 +35,22 @@
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.model.IncomingBatch;
import org.jumpmind.symmetric.model.IncomingBatch.Status;
import org.jumpmind.symmetric.service.IClusterService;
import org.jumpmind.symmetric.service.IIncomingBatchService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.util.AppUtils;
import org.jumpmind.util.FormatUtils;

/**
* @see IIncomingBatchService
*/
public class IncomingBatchService extends AbstractService implements IIncomingBatchService {

protected IClusterService clusterService;

public IncomingBatchService(IParameterService parameterService,
ISymmetricDialect symmetricDialect) {
ISymmetricDialect symmetricDialect, IClusterService clusterService) {
super(parameterService, symmetricDialect);
this.clusterService = clusterService;
setSqlMap(new IncomingBatchServiceSqlMap(symmetricDialect.getPlatform(),
createSqlReplacementTokens()));
}
Expand Down Expand Up @@ -189,7 +191,7 @@ public boolean acquireIncomingBatch(IncomingBatch batch) {

public void insertIncomingBatch(IncomingBatch batch) {
if (batch.isPersistable()) {
batch.setLastUpdatedHostName(AppUtils.getServerId());
batch.setLastUpdatedHostName(clusterService.getServerId());
batch.setLastUpdatedTime(new Date());
sqlTemplate.update(
getSql("insertIncomingBatchSql"),
Expand Down Expand Up @@ -226,7 +228,7 @@ public int updateIncomingBatch(IncomingBatch batch) {
} else if (batch.getStatus() == IncomingBatch.Status.OK) {
batch.setErrorFlag(false);
}
batch.setLastUpdatedHostName(AppUtils.getServerId());
batch.setLastUpdatedHostName(clusterService.getServerId());
batch.setLastUpdatedTime(new Date());
count = sqlTemplate.update(
getSql("updateIncomingBatchSql"),
Expand Down
Expand Up @@ -25,35 +25,36 @@
import org.jumpmind.symmetric.model.NodeCommunication.CommunicationType;
import org.jumpmind.symmetric.model.RemoteNodeStatus;
import org.jumpmind.symmetric.model.RemoteNodeStatuses;
import org.jumpmind.symmetric.service.IClusterService;
import org.jumpmind.symmetric.service.INodeCommunicationService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.util.AppUtils;

public class NodeCommunicationService extends AbstractService implements INodeCommunicationService {

private String serverId = AppUtils.getServerId();

private Map<CommunicationType, ThreadPoolExecutor> executors = new HashMap<NodeCommunication.CommunicationType, ThreadPoolExecutor>();

private INodeService nodeService;

private IClusterService clusterService;

private boolean initialized = false;

public NodeCommunicationService(INodeService nodeService, IParameterService parameterService,
public NodeCommunicationService(IClusterService clusterService, INodeService nodeService, IParameterService parameterService,
ISymmetricDialect symmetricDialect) {
super(parameterService, symmetricDialect);
setSqlMap(new NodeCommunicationServiceSqlMap(symmetricDialect.getPlatform(),
createSqlReplacementTokens()));
this.clusterService = clusterService;
this.nodeService = nodeService;
}

private final void initialize() {
if (!initialized) {
try {
int locksCleared = sqlTemplate.update(getSql("clearLocksOnRestartSql"), serverId);
int locksCleared = sqlTemplate.update(getSql("clearLocksOnRestartSql"), clusterService.getServerId());
if (locksCleared > 0) {
log.info("Cleared {} node communication locks for {}", locksCleared, serverId);
log.info("Cleared {} node communication locks for {}", locksCleared, clusterService.getServerId());
}
} finally {
initialized = true;
Expand Down Expand Up @@ -219,12 +220,12 @@ public boolean execute(final NodeCommunication nodeCommunication, RemoteNodeStat
final INodeCommunicationExecutor executor) {
Date now = new Date();
Date lockTimeout = getLockTimeoutDate(nodeCommunication.getCommunicationType());
boolean locked = sqlTemplate.update(getSql("aquireLockSql"), serverId, now, now,
boolean locked = sqlTemplate.update(getSql("aquireLockSql"), clusterService.getServerId(), now, now,
nodeCommunication.getNodeId(), nodeCommunication.getCommunicationType().name(),
lockTimeout, serverId) == 1;
lockTimeout, clusterService.getServerId()) == 1;
if (locked) {
nodeCommunication.setLastLockTime(now);
nodeCommunication.setLockingServerId(serverId);
nodeCommunication.setLockingServerId(clusterService.getServerId());
final RemoteNodeStatus status = statuses.add(nodeCommunication.getNode());
ThreadPoolExecutor service = getExecutor(nodeCommunication.getCommunicationType());
service.execute(new Runnable() {
Expand Down
Expand Up @@ -44,12 +44,12 @@
import org.jumpmind.symmetric.model.OutgoingBatch.Status;
import org.jumpmind.symmetric.model.OutgoingBatchSummary;
import org.jumpmind.symmetric.model.OutgoingBatches;
import org.jumpmind.symmetric.service.IClusterService;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.ISequenceService;
import org.jumpmind.util.AppUtils;
import org.jumpmind.util.FormatUtils;

/**
Expand All @@ -62,14 +62,17 @@ public class OutgoingBatchService extends AbstractService implements IOutgoingBa
private IConfigurationService configurationService;

private ISequenceService sequenceService;

private IClusterService clusterService;

public OutgoingBatchService(IParameterService parameterService,
ISymmetricDialect symmetricDialect, INodeService nodeService,
IConfigurationService configurationService, ISequenceService sequenceService) {
IConfigurationService configurationService, ISequenceService sequenceService, IClusterService clusterService) {
super(parameterService, symmetricDialect);
this.nodeService = nodeService;
this.configurationService = configurationService;
this.sequenceService = sequenceService;
this.clusterService = clusterService;
setSqlMap(new OutgoingBatchServiceSqlMap(symmetricDialect.getPlatform(),
createSqlReplacementTokens()));
}
Expand Down Expand Up @@ -99,7 +102,7 @@ public void updateOutgoingBatches(List<OutgoingBatch> outgoingBatches) {

public void updateOutgoingBatch(OutgoingBatch outgoingBatch) {
outgoingBatch.setLastUpdatedTime(new Date());
outgoingBatch.setLastUpdatedHostName(AppUtils.getServerId());
outgoingBatch.setLastUpdatedHostName(clusterService.getServerId());
sqlTemplate.update(
getSql("updateOutgoingBatchSql"),
new Object[] { outgoingBatch.getStatus().name(),
Expand Down Expand Up @@ -137,7 +140,7 @@ public void insertOutgoingBatch(final OutgoingBatch outgoingBatch) {
}

public void insertOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgoingBatch) {
outgoingBatch.setLastUpdatedHostName(AppUtils.getServerId());
outgoingBatch.setLastUpdatedHostName(clusterService.getServerId());

long batchId = outgoingBatch.getBatchId();
if (batchId <= 0) {
Expand Down
Expand Up @@ -330,15 +330,26 @@ private int purgeByNodeBatchRangeList(List<NodeBatchRange> nodeBatchRangeList) {
return totalCount;
}

public void purgeStats() {
public void purgeStats(boolean force) {
Calendar retentionCutoff = Calendar.getInstance();
retentionCutoff.add(Calendar.MINUTE,
-parameterService.getInt(ParameterConstants.PURGE_STATS_RETENTION_MINUTES));
int purgedCount = sqlTemplate.update(getSql("purgeNodeHostChannelStatsSql"), retentionCutoff.getTime());
purgedCount += sqlTemplate.update(getSql("purgeNodeHostStatsSql"), retentionCutoff.getTime());
purgedCount += sqlTemplate.update(getSql("purgeNodeHostJobStatsSql"), retentionCutoff.getTime());
if (purgedCount > 0) {
log.info("{} stats rows were purged", purgedCount);
if (force || clusterService.lock(ClusterConstants.PURGE_STATISTICS)) {
try {
int purgedCount = sqlTemplate.update(getSql("purgeNodeHostChannelStatsSql"),
retentionCutoff.getTime());
purgedCount += sqlTemplate.update(getSql("purgeNodeHostStatsSql"),
retentionCutoff.getTime());
purgedCount += sqlTemplate.update(getSql("purgeNodeHostJobStatsSql"),
retentionCutoff.getTime());
if (purgedCount > 0) {
log.info("{} stats rows were purged", purgedCount);
}
} finally {
if (!force) {
clusterService.unlock(ClusterConstants.PURGE_STATISTICS);
}
}
}
}

Expand Down

0 comments on commit f4bc7df

Please sign in to comment.