diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/INodeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/INodeService.java index 440aa634e7..d36b94b6a5 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/INodeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/INodeService.java @@ -21,6 +21,7 @@ package org.jumpmind.symmetric.service; import java.util.Collection; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.Set; @@ -61,13 +62,13 @@ public interface INodeService { public Set findNodesThatOriginatedFromNodeId(String originalNodeId, boolean recursive); public Collection findEnabledNodesFromNodeGroup(String nodeGroupId); - + public Collection findNodesWithOpenRegistration(); public Map findAllNodeSecurity(boolean useCache); - + public List findNodeSecurityWithLoadEnabled(); - + public List findAllExternalIds(); public NodeSecurity findNodeSecurity(String nodeId); @@ -75,9 +76,9 @@ public interface INodeService { public NodeSecurity findNodeSecurity(String nodeId, boolean createIfNotFound); public void deleteNodeSecurity(String nodeId); - - public void deleteNode(String nodeId, boolean syncChange); - + + public void deleteNode(String nodeId, boolean syncChange); + public String findSymmetricVersion(); public String findIdentityNodeId(); @@ -94,9 +95,9 @@ public void ignoreNodeChannelForExternalId(boolean ignore, String channelId, public Node findIdentity(); public Node findIdentity(boolean useCache); - + public Node findIdentity(boolean useCache, boolean logSqlError); - + public Node getCachedIdentity(); public boolean deleteIdentity(); @@ -114,9 +115,9 @@ public void ignoreNodeChannelForExternalId(boolean ignore, String channelId, public boolean isExternalIdRegistered(String nodeGroupId, String externalId); public void save(Node node); - + public void updateNodeHost(NodeHost nodeHost); - + public void updateNodeHostForCurrentNode(); public void insertNodeIdentity(String nodeId); @@ -124,15 +125,15 @@ public void ignoreNodeChannelForExternalId(boolean ignore, String channelId, public void insertNodeGroup(String groupId, String description); public boolean updateNodeSecurity(NodeSecurity security); - + public boolean updateNodeSecurity(ISqlTransaction transaction, NodeSecurity security); public boolean setInitialLoadEnabled(String nodeId, boolean initialLoadEnabled, boolean syncChange, long loadId, String createBy); - + public boolean setInitialLoadEnabled(ISqlTransaction transaction, String nodeId, boolean initialLoadEnabled, boolean syncChange, long loadId, String createBy); - + public boolean setReverseInitialLoadEnabled(ISqlTransaction transaction, String nodeId, boolean initialLoadEnabled, boolean syncChange, long loadId, String createBy); - + public boolean setReverseInitialLoadEnabled(String nodeId, boolean initialLoadEnabled, boolean syncChange, long loadId, String createBy); public INodeIdCreator getNodeIdCreator(); @@ -153,7 +154,7 @@ public void ignoreNodeChannelForExternalId(boolean ignore, String channelId, /** * Get the current status of this node. - * + * * @return {@link NodeStatus} */ public NodeStatus getNodeStatus(); @@ -165,28 +166,30 @@ public void ignoreNodeChannelForExternalId(boolean ignore, String channelId, public void checkForOfflineNodes(); /** - * Find nodes that have been offline for the configured timeframe before {@link IOfflineClientListener} + * Find nodes that have been offline for the configured timeframe before {@link IOfflineClientListener} * and {@link IOfflineServerListener} will be called - * + * * @return list of offline nodes */ public List findOfflineNodes(); /** * Find nodes that have been offline for a number of minutes - * + * * @return list of offline nodes * @param minutesOffline * the number of minutes that have passed that a node has not * checked in for until it is considered offline */ public List findOfflineNodes(long minutesOffline); - + + public Map findLastHeartbeats(); + public List findOfflineNodeIds(long minutesOffline); public void addOfflineServerListener(IOfflineServerListener listener); public boolean removeOfflineServerListener(IOfflineServerListener listener); - + public NetworkedNode getRootNetworkedNode(); } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java index c59fb261f2..b8842a6a43 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java @@ -166,11 +166,11 @@ public NodeSecurity findNodeSecurity(String id) { public List findNodeHosts(String nodeId) { return sqlTemplate.query(getSql("selectNodeHostPrefixSql", "selectNodeHostByNodeIdSql"), new NodeHostRowMapper(), nodeId); - } + } public void updateNodeHost(NodeHost nodeHost) { - Object[] params = new Object[] { + Object[] params = new Object[] { nodeHost.getIpAddress(), nodeHost.getOsUser(), nodeHost.getOsName(), nodeHost.getOsArch(), nodeHost.getOsVersion(), @@ -184,13 +184,13 @@ public void updateNodeHost(NodeHost nodeHost) { nodeHost.getHeartbeatTime(), nodeHost.getLastRestartTime(), nodeHost.getNodeId(), nodeHost.getHostName() }; - + if (sqlTemplate.update(getSql("updateNodeHostSql"), params) == 0) { sqlTemplate.update(getSql("insertNodeHostSql"), params); } - + } - + public void updateNodeHostForCurrentNode() { if (nodeHostForCurrentNode == null) { nodeHostForCurrentNode = new NodeHost(findIdentityNodeId()); @@ -251,7 +251,7 @@ public void deleteNode(String nodeId, boolean syncChange) { if (transaction != null) { transaction.rollback(); } - throw ex; + throw ex; } finally { if (!syncChange) { symmetricDialect.enableSyncTriggers(transaction); @@ -291,7 +291,7 @@ public void insertNodeGroup(String groupId, String description) { sqlTemplate.update(getSql("insertNodeGroupSql"), description, groupId); } } - + public void save(Node node) { if (!updateNode(node)) { sqlTemplate.update( @@ -331,12 +331,12 @@ protected T getFirstEntry(List list) { } return null; } - + public List findNodeSecurityWithLoadEnabled() { return sqlTemplate.query( - getSql("findNodeSecurityWithLoadEnabledSql"), new NodeSecurityRowMapper()); + getSql("findNodeSecurityWithLoadEnabledSql"), new NodeSecurityRowMapper()); } - + public Map findAllNodeSecurity(boolean useCache) { long maxSecurityCacheTime = parameterService @@ -382,9 +382,9 @@ public Node findIdentity() { } public Node findIdentity(boolean useCache) { - return findIdentity(useCache, true); + return findIdentity(useCache, true); } - + public Node findIdentity(boolean useCache, boolean logSqlError) { if (cachedNodeIdentity == null || useCache == false) { try { @@ -468,7 +468,7 @@ public NetworkedNode getRootNetworkedNode() { return null; } } - + public boolean updateNodeSecurity(NodeSecurity security) { ISqlTransaction transaction = null; try { @@ -485,7 +485,7 @@ public boolean updateNodeSecurity(NodeSecurity security) { if (transaction != null) { transaction.rollback(); } - throw ex; + throw ex; } finally { close(transaction); } @@ -513,7 +513,7 @@ public boolean updateNodeSecurity(ISqlTransaction transaction, NodeSecurity secu flushNodeAuthorizedCache(); return updated; } - + public boolean setInitialLoadEnabled(ISqlTransaction transaction, String nodeId, boolean initialLoadEnabled, boolean syncChange, long loadId, String createBy) { try { @@ -556,12 +556,12 @@ public boolean setInitialLoadEnabled(String nodeId, boolean initialLoadEnabled, if (transaction != null) { transaction.rollback(); } - throw ex; + throw ex; } finally { close(transaction); } } - + public boolean setReverseInitialLoadEnabled(ISqlTransaction transaction, String nodeId, boolean initialLoadEnabled, boolean syncChange, long loadId, String createBy) { try { @@ -587,8 +587,8 @@ public boolean setReverseInitialLoadEnabled(ISqlTransaction transaction, String symmetricDialect.enableSyncTriggers(transaction); } } - } - + } + public boolean setReverseInitialLoadEnabled(String nodeId, boolean initialLoadEnabled, boolean syncChange, long loadId, String createBy) { ISqlTransaction transaction = null; try { @@ -605,11 +605,11 @@ public boolean setReverseInitialLoadEnabled(String nodeId, boolean initialLoadEn if (transaction != null) { transaction.rollback(); } - throw ex; - } finally { + throw ex; + } finally { close(transaction); } - } + } public boolean isExternalIdRegistered(String nodeGroupId, String externalId) { return sqlTemplate.queryForInt(getSql("isNodeRegisteredSql"), new Object[] { nodeGroupId, @@ -717,7 +717,7 @@ public List findOfflineNodes(long minutesOffline) { if (myNode != null) { long offlineNodeDetectionMillis = minutesOffline * 60 * 1000; - List list = sqlTemplate.query(getSql("findOfflineNodesSql"), new Object[] { + List list = sqlTemplate.query(getSql("findNodeHeartbeatsSql"), new Object[] { myNode.getNodeId(), myNode.getNodeId()}, (int[])null); for (Row node : list) { String nodeId = node.getString("node_id"); @@ -742,9 +742,23 @@ public List findOfflineNodes(long minutesOffline) { } return offlineNodeList; - } - + + public Map findLastHeartbeats() { + Map dates = new HashMap(); + Node myNode = findIdentity(); + if (myNode != null) { + List list = sqlTemplate.query(getSql("findNodeHeartbeatsSql"), new Object[] { + myNode.getNodeId(), myNode.getNodeId()}, (int[])null); + for (Row node : list) { + String nodeId = node.getString("node_id"); + Date time = node.getDateTime("heartbeat_time"); + dates.put(nodeId, time); + } + } + return dates; + } + public List findOfflineNodeIds(long minutesOffline) { List offlineNodeList = new ArrayList(); Node myNode = findIdentity(); @@ -752,7 +766,7 @@ public List findOfflineNodeIds(long minutesOffline) { if (myNode != null) { long offlineNodeDetectionMillis = minutesOffline * 60 * 1000; - List list = sqlTemplate.query(getSql("findOfflineNodesSql"), new Object[] { + List list = sqlTemplate.query(getSql("findNodeHeartbeatsSql"), new Object[] { myNode.getNodeId(), myNode.getNodeId()}, (int[])null); for (Row node : list) { String nodeId = node.getString("node_id"); @@ -776,7 +790,7 @@ public List findOfflineNodeIds(long minutesOffline) { } } return offlineNodeList; - } + } public void setOfflineServerListeners(List listeners) { this.offlineServerListeners = listeners; @@ -839,7 +853,7 @@ public NodeSecurity mapRow(Row rs) { nodeSecurity.setInitialLoadTime(rs.getDateTime("initial_load_time")); nodeSecurity.setCreatedAtNodeId(rs.getString("created_at_node_id")); nodeSecurity.setRevInitialLoadEnabled(rs.getBoolean("rev_initial_load_enabled")); - nodeSecurity.setRevInitialLoadTime(rs.getDateTime("rev_initial_load_time")); + nodeSecurity.setRevInitialLoadTime(rs.getDateTime("rev_initial_load_time")); nodeSecurity.setInitialLoadId(rs.getLong("initial_load_id")); nodeSecurity.setInitialLoadCreateBy(rs.getString("initial_load_create_by")); nodeSecurity.setRevInitialLoadId(rs.getLong("rev_initial_load_id")); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeServiceSqlMap.java index 069c22fed0..dda9dc304d 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeServiceSqlMap.java @@ -83,7 +83,7 @@ public NodeServiceSqlMap(IDatabasePlatform platform, Map replace putSql("selectExternalIdsSql", "select distinct(external_id) from $(node) where sync_enabled=1 order by external_id asc "); - + putSql("findNodeSecurityWithLoadEnabledSql", "select node_id, node_password, registration_enabled, registration_time, " + " initial_load_enabled, initial_load_time, created_at_node_id, " @@ -91,7 +91,7 @@ public NodeServiceSqlMap(IDatabasePlatform platform, Map replace " initial_load_create_by, rev_initial_load_id, rev_initial_load_create_by " + " from $(node_security) " + " where initial_load_enabled=1 or rev_initial_load_enabled=1 "); - + putSql("findAllNodeSecuritySql", "select node_id, node_password, registration_enabled, registration_time, " + " initial_load_enabled, initial_load_time, created_at_node_id, " @@ -102,7 +102,7 @@ public NodeServiceSqlMap(IDatabasePlatform platform, Map replace putSql("deleteNodeSecuritySql", "delete from $(node_security) where node_id = ?"); putSql("deleteNodeSql", "delete from $(node) where node_id = ?"); - + putSql("deleteNodeHostSql", "delete from $(node_host) where node_id = ?"); putSql("findNodeIdentitySql", "inner join $(node_identity) i on c.node_id = " @@ -169,7 +169,7 @@ public NodeServiceSqlMap(IDatabasePlatform platform, Map replace + " total_memory_bytes=?, max_memory_bytes=?, java_version=?, java_vendor=?, symmetric_version=?, timezone_offset=?, heartbeat_time=?, " + " last_restart_time=? where node_id=? and host_name=? "); - putSql("findOfflineNodesSql", + putSql("findNodeHeartbeatsSql", "select h.node_id, max(h.heartbeat_time) as heartbeat_time, h.timezone_offset from $(node_host) h inner join $(node) n on h.node_id=n.node_id" + " where n.sync_enabled = 1 and n.node_id != ? and n.created_at_node_id = ? group by h.node_id, h.timezone_offset"); diff --git a/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/MockNodeService.java b/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/MockNodeService.java index d25d0af7b2..80b627e7c4 100644 --- a/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/MockNodeService.java +++ b/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/MockNodeService.java @@ -22,6 +22,7 @@ package org.jumpmind.symmetric.service.impl; import java.util.Collection; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.Set; @@ -39,30 +40,30 @@ import org.jumpmind.symmetric.service.INodeService; public class MockNodeService implements INodeService { - + public Node getCachedIdentity() { return null; } - + public List findAllNodes() { return null; } - + public void deleteNode(String nodeId, boolean syncChange) { } - + public List findAllExternalIds() { return null; } - + public NetworkedNode getRootNetworkedNode() { return null; } - + public List findOfflineNodeIds(long minutesOffline) { return null; } - + public boolean isRegistrationServer() { return false; } @@ -70,15 +71,15 @@ public boolean isRegistrationServer() { public Set findNodesThatOriginatedFromNodeId(String originalNodeId) { return null; } - + public Set findNodesThatOriginatedFromNodeId(String originalNodeId, boolean recursive) { return null; } - + public Collection findNodesWithOpenRegistration() { return null; } - + public List findNodeHosts(String nodeId) { return null; } @@ -90,11 +91,11 @@ public Node findIdentity() { public Collection findEnabledNodesFromNodeGroup(String nodeGroupId) { return null; } - + public Map findAllNodeSecurity(boolean useCache) { return null; } - + public String findSymmetricVersion() { return null; } @@ -105,10 +106,10 @@ public NodeSecurity findNodeSecurity(String nodeId, boolean createIfNotFound) { public void save(Node node) { } - + public void updateNodeHostForCurrentNode() { } - + public void insertNodeGroup(String groupId, String description) { } @@ -166,7 +167,7 @@ public boolean isRegistrationEnabled(String nodeId) { public boolean setInitialLoadEnabled(String nodeId, boolean initialLoadEnabled, boolean syncChange, long loadId, String createBy) { return false; } - + public boolean setInitialLoadEnabled(ISqlTransaction transaction, String nodeId, boolean initialLoadEnabled, boolean syncChange, long loadId, String createBy) { return false; @@ -175,7 +176,7 @@ public boolean setInitialLoadEnabled(ISqlTransaction transaction, String nodeId, public boolean updateNode(Node node) { return false; } - + public boolean updateNodeSecurity(ISqlTransaction transaction, NodeSecurity security) { return false; } @@ -221,13 +222,13 @@ public NodeStatus getNodeStatus() { public void setNodePasswordFilter(INodePasswordFilter nodePasswordFilter) { } - public void checkForOfflineNodes() { + public void checkForOfflineNodes() { } public List findOfflineNodes() { return null; } - + public List findOfflineNodes(long minutesOffline) { return null; } @@ -236,36 +237,38 @@ public boolean deleteIdentity() { return false; } - public void deleteNodeSecurity(String nodeId) { + public void deleteNodeSecurity(String nodeId) { } - public void addOfflineServerListener(IOfflineServerListener listener) { + public void addOfflineServerListener(IOfflineServerListener listener) { } public boolean removeOfflineServerListener(IOfflineServerListener listener) { return false; } - + public boolean setReverseInitialLoadEnabled(ISqlTransaction transaction, String nodeId, boolean initialLoadEnabled, boolean syncChange, long loadId, String createBy) { return true; } - + public boolean setReverseInitialLoadEnabled(String nodeId, boolean initialLoadEnabled, boolean syncChange, long loadId, String createBy) { return true; } - + public List findNodeSecurityWithLoadEnabled() { return null; } - + public Node findIdentity(boolean useCache, boolean logSqlError) { return null; } public void updateNodeHost(NodeHost nodeHost) { - // TODO Auto-generated method stub - - } - + } + + public Map findLastHeartbeats() { + return null; + } + } \ No newline at end of file