Skip to content

Commit

Permalink
0002895: node_communication and lock tables no longer export in snapshot
Browse files Browse the repository at this point in the history
when NOT in cluster mode
  • Loading branch information
chenson42 committed Nov 10, 2016
1 parent 63592e8 commit 3c8da84
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 19 deletions.
Expand Up @@ -191,7 +191,7 @@ public boolean execute(NodeCommunication nodeCommunication, RemoteNodeStatuses s
nodeCommunication.setFailCount(0);
}
status.setComplete(true);
save(nodeCommunication);
save(nodeCommunication, false);
}
return !failed;
}
Expand Down
Expand Up @@ -224,11 +224,16 @@ public static File createSnapshot(ISymmetricEngine engine) {
extract(export, new File(tmpDir, "sym_trigger_hist.csv"),
TableConstants.getTableName(tablePrefix, TableConstants.SYM_TRIGGER_HIST));

if (!parameterService.is(ParameterConstants.CLUSTER_LOCKING_ENABLED)) {
engine.getNodeCommunicationService().persistToTableForSnapshot();
engine.getClusterService().persistToTableForSnapshot();
}

extract(export, new File(tmpDir, "sym_lock.csv"),
TableConstants.getTableName(tablePrefix, TableConstants.SYM_LOCK));

extract(export, new File(tmpDir, "sym_node_communication.csv"),
TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_COMMUNICATION));
TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_COMMUNICATION));

extract(export, 10000, "order by create_time desc", new File(tmpDir, "sym_outgoing_batch.csv"),
TableConstants.getTableName(tablePrefix, TableConstants.SYM_OUTGOING_BATCH));
Expand Down
Expand Up @@ -56,5 +56,7 @@ public interface IClusterService {
public void clearInfiniteLock(String action);

public boolean isInfiniteLocked(String action);

public void persistToTableForSnapshot();

}
Expand Up @@ -20,7 +20,6 @@
*/
package org.jumpmind.symmetric.service;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -33,8 +32,6 @@ public interface INodeCommunicationService {

public List<NodeCommunication> list(CommunicationType communicationType);

public void save(NodeCommunication nodeCommunication);

public NodeCommunication find(String nodeId, String channelId, CommunicationType communicationType);

public boolean execute(NodeCommunication nodeCommunication, RemoteNodeStatuses statuses, INodeCommunicationExecutor executor);
Expand All @@ -43,11 +40,14 @@ public interface INodeCommunicationService {

public void stop();

public void updateBatchToSendCounts(String nodeId, Map<String, Integer> batchesCountToQueues);

public Map<String, Integer> parseQueueToBatchCounts(String channelToBatchCountsString);

public void persistToTableForSnapshot();

public interface INodeCommunicationExecutor {
public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status);
}

public void updateBatchToSendCounts(String nodeId, Map<String, Integer> batchesCountToQueues);

public Map<String, Integer> parseQueueToBatchCounts(String channelToBatchCountsString);
}
Expand Up @@ -22,11 +22,14 @@

import static org.jumpmind.symmetric.service.ClusterConstants.FILE_SYNC_PULL;
import static org.jumpmind.symmetric.service.ClusterConstants.FILE_SYNC_PUSH;
import static org.jumpmind.symmetric.service.ClusterConstants.FILE_SYNC_SCAN;
import static org.jumpmind.symmetric.service.ClusterConstants.FILE_SYNC_SHARED;
import static org.jumpmind.symmetric.service.ClusterConstants.FILE_SYNC_TRACKER;
import static org.jumpmind.symmetric.service.ClusterConstants.FILE_SYNC_SCAN;
import static org.jumpmind.symmetric.service.ClusterConstants.HEARTBEAT;
import static org.jumpmind.symmetric.service.ClusterConstants.INITIAL_LOAD_EXTRACT;
import static org.jumpmind.symmetric.service.ClusterConstants.MONITOR;
import static org.jumpmind.symmetric.service.ClusterConstants.OFFLINE_PULL;
import static org.jumpmind.symmetric.service.ClusterConstants.OFFLINE_PUSH;
import static org.jumpmind.symmetric.service.ClusterConstants.PULL;
import static org.jumpmind.symmetric.service.ClusterConstants.PURGE_DATA_GAPS;
import static org.jumpmind.symmetric.service.ClusterConstants.PURGE_INCOMING;
Expand All @@ -41,10 +44,8 @@
import static org.jumpmind.symmetric.service.ClusterConstants.TYPE_EXCLUSIVE;
import static org.jumpmind.symmetric.service.ClusterConstants.TYPE_SHARED;
import static org.jumpmind.symmetric.service.ClusterConstants.WATCHDOG;
import static org.jumpmind.symmetric.service.ClusterConstants.OFFLINE_PULL;
import static org.jumpmind.symmetric.service.ClusterConstants.OFFLINE_PUSH;
import static org.jumpmind.symmetric.service.ClusterConstants.MONITOR;

import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -104,6 +105,20 @@ public void init() {
}
}
}

@Override
public synchronized void persistToTableForSnapshot() {
sqlTemplate.update(getSql("deleteSql"));
Collection<Lock> values = lockCache.values();
for (Lock lock : values) {
insertLock(lock);
}
}

protected void insertLock(Lock lock) {
sqlTemplate.update(getSql("insertCompleteLockSql"), lock.getLockAction(), lock.getLockType(), lock.getLockingServerId(), lock.getLockTime(), lock.getSharedCount(), lock.isSharedEnable() ? 1 : 0, lock.getLastLockTime(), lock.getLastLockingServerId());
}


protected void initLockTable(final String action) {
initLockTable(action, TYPE_CLUSTER);
Expand Down
Expand Up @@ -28,6 +28,8 @@ public class ClusterServiceSqlMap extends AbstractSqlMap {

public ClusterServiceSqlMap(IDatabasePlatform platform, Map<String, String> replacementTokens) {
super(platform, replacementTokens);

putSql("deleteSql", "delete from $(lock)");

putSql("acquireClusterLockSql",
"update $(lock) set locking_server_id=?, lock_time=? " +
Expand Down Expand Up @@ -72,6 +74,9 @@ public ClusterServiceSqlMap(IDatabasePlatform platform, Map<String, String> repl
"where locking_server_id=?");

putSql("insertLockSql", "insert into $(lock) (lock_action, lock_type) values(?,?)");

putSql("insertCompleteLockSql",
"insert into $(lock) (lock_action, lock_type, locking_server_id, lock_time, shared_count, shared_enable, last_lock_time, last_locking_server_id) values(?,?,?,?,?,?,?,?)");

putSql("findLocksSql",
"select lock_action, lock_type, locking_server_id, lock_time, shared_count, shared_enable, " +
Expand Down
Expand Up @@ -117,6 +117,18 @@ private final void initialize() {
}
}
}

@Override
public synchronized void persistToTableForSnapshot() {
sqlTemplate.update(getSql("deleteSql"));
Collection<Map<String, NodeCommunication>> values = lockCache.values();
for (Map<String, NodeCommunication> map : values) {
Collection<NodeCommunication> nodeCommies = map.values();
for (NodeCommunication nodeCommunication : nodeCommies) {
save(nodeCommunication, true);
}
}
}

public NodeCommunication find(String nodeId, String queue, CommunicationType communicationType) {
NodeCommunication lock = null;
Expand All @@ -133,7 +145,7 @@ public NodeCommunication find(String nodeId, String queue, CommunicationType com
lock.setNodeId(nodeId);
lock.setCommunicationType(communicationType);
lock.setQueue(queue);
save(lock);
save(lock, false);
}
return lock;
}
Expand Down Expand Up @@ -192,7 +204,7 @@ public List<NodeCommunication> list(CommunicationType communicationType) {
comm.setNodeId(nodeToCommunicateWith.getNodeId());
comm.setQueue(nodeToCommunicateWith.getQueue());
comm.setCommunicationType(communicationType);
save(comm);
save(comm, false);
communicationRows.add(comm);
}

Expand Down Expand Up @@ -317,8 +329,8 @@ public boolean delete(NodeCommunication nodeCommunication) {
}
}

public void save(NodeCommunication nodeCommunication) {
if (clusterService.isClusteringEnabled()) {
protected void save(NodeCommunication nodeCommunication, boolean force) {
if (clusterService.isClusteringEnabled() || force) {
if (0 == sqlTemplate.update(getSql("updateNodeCommunicationSql"),
nodeCommunication.getLockTime(), nodeCommunication.getLockingServerId(),
nodeCommunication.getLastLockMillis(), nodeCommunication.getSuccessCount(),
Expand Down Expand Up @@ -547,7 +559,7 @@ protected void unlock(NodeCommunication nodeCommunication,
nodeCommunication.setFailCount(0);
}
if (clusterService.isClusteringEnabled()) {
save(nodeCommunication);
save(nodeCommunication, false);
}
unlocked = true;
if (attempts > 1) {
Expand Down Expand Up @@ -709,7 +721,7 @@ public void updateBatchToSendCounts(String nodeId, Map<String, Integer> batchesC
}

for (NodeCommunication nodeCommunication : updatedNodeCommunications) {
save(nodeCommunication);
save(nodeCommunication, false);
}
}

Expand Down
Expand Up @@ -30,6 +30,8 @@ public NodeCommunicationServiceSqlMap(IDatabasePlatform platform,
Map<String, String> replacementTokens) {
super(platform, replacementTokens);

putSql("deleteSql", "delete from $(node_communication)");

putSql("clearLocksOnRestartSql", "update $(node_communication) set lock_time=null where locking_server_id=? and lock_time is not null");

putSql("selectNodeCommunicationSql",
Expand Down

0 comments on commit 3c8da84

Please sign in to comment.