Skip to content

Commit

Permalink
0001288: Investigate where node heart beat information should be disp…
Browse files Browse the repository at this point in the history
…layed in Pro.

Added the last heart beat time to the node screen.
  • Loading branch information
abrougher committed Jun 25, 2013
1 parent 6543e0c commit afc0d50
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 80 deletions.
Expand Up @@ -21,6 +21,7 @@
package org.jumpmind.symmetric.service;

import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -61,23 +62,23 @@ public interface INodeService {
public Set<Node> findNodesThatOriginatedFromNodeId(String originalNodeId, boolean recursive);

public Collection<Node> findEnabledNodesFromNodeGroup(String nodeGroupId);

public Collection<Node> findNodesWithOpenRegistration();

public Map<String, NodeSecurity> findAllNodeSecurity(boolean useCache);

public List<NodeSecurity> findNodeSecurityWithLoadEnabled();

public List<String> findAllExternalIds();

public NodeSecurity findNodeSecurity(String nodeId);

public NodeSecurity findNodeSecurity(String nodeId, boolean createIfNotFound);

public void deleteNodeSecurity(String nodeId);
public void deleteNode(String nodeId, boolean syncChange);

public void deleteNode(String nodeId, boolean syncChange);

public String findSymmetricVersion();

public String findIdentityNodeId();
Expand All @@ -94,9 +95,9 @@ public void ignoreNodeChannelForExternalId(boolean ignore, String channelId,
public Node findIdentity();

public Node findIdentity(boolean useCache);

public Node findIdentity(boolean useCache, boolean logSqlError);

public Node getCachedIdentity();

public boolean deleteIdentity();
Expand All @@ -114,25 +115,25 @@ public void ignoreNodeChannelForExternalId(boolean ignore, String channelId,
public boolean isExternalIdRegistered(String nodeGroupId, String externalId);

public void save(Node node);

public void updateNodeHost(NodeHost nodeHost);

public void updateNodeHostForCurrentNode();

public void insertNodeIdentity(String nodeId);

public void insertNodeGroup(String groupId, String description);

public boolean updateNodeSecurity(NodeSecurity security);

public boolean updateNodeSecurity(ISqlTransaction transaction, NodeSecurity security);

public boolean setInitialLoadEnabled(String nodeId, boolean initialLoadEnabled, boolean syncChange, long loadId, String createBy);

public boolean setInitialLoadEnabled(ISqlTransaction transaction, String nodeId, boolean initialLoadEnabled, boolean syncChange, long loadId, String createBy);

public boolean setReverseInitialLoadEnabled(ISqlTransaction transaction, String nodeId, boolean initialLoadEnabled, boolean syncChange, long loadId, String createBy);

public boolean setReverseInitialLoadEnabled(String nodeId, boolean initialLoadEnabled, boolean syncChange, long loadId, String createBy);

public INodeIdCreator getNodeIdCreator();
Expand All @@ -153,7 +154,7 @@ public void ignoreNodeChannelForExternalId(boolean ignore, String channelId,

/**
* Get the current status of this node.
*
*
* @return {@link NodeStatus}
*/
public NodeStatus getNodeStatus();
Expand All @@ -165,28 +166,30 @@ public void ignoreNodeChannelForExternalId(boolean ignore, String channelId,
public void checkForOfflineNodes();

/**
* Find nodes that have been offline for the configured timeframe before {@link IOfflineClientListener}
* Find nodes that have been offline for the configured timeframe before {@link IOfflineClientListener}
* and {@link IOfflineServerListener} will be called
*
*
* @return list of offline nodes
*/
public List<Node> findOfflineNodes();

/**
* Find nodes that have been offline for a number of minutes
*
*
* @return list of offline nodes
* @param minutesOffline
* the number of minutes that have passed that a node has not
* checked in for until it is considered offline
*/
public List<Node> findOfflineNodes(long minutesOffline);


public Map<String, Date> findLastHeartbeats();

public List<String> findOfflineNodeIds(long minutesOffline);

public void addOfflineServerListener(IOfflineServerListener listener);

public boolean removeOfflineServerListener(IOfflineServerListener listener);

public NetworkedNode getRootNetworkedNode();
}
Expand Up @@ -166,11 +166,11 @@ public NodeSecurity findNodeSecurity(String id) {
public List<NodeHost> findNodeHosts(String nodeId) {
return sqlTemplate.query(getSql("selectNodeHostPrefixSql", "selectNodeHostByNodeIdSql"),
new NodeHostRowMapper(), nodeId);
}
}

public void updateNodeHost(NodeHost nodeHost) {

Object[] params = new Object[] {
Object[] params = new Object[] {
nodeHost.getIpAddress(),
nodeHost.getOsUser(), nodeHost.getOsName(),
nodeHost.getOsArch(), nodeHost.getOsVersion(),
Expand All @@ -184,13 +184,13 @@ public void updateNodeHost(NodeHost nodeHost) {
nodeHost.getHeartbeatTime(),
nodeHost.getLastRestartTime(), nodeHost.getNodeId(),
nodeHost.getHostName() };

if (sqlTemplate.update(getSql("updateNodeHostSql"), params) == 0) {
sqlTemplate.update(getSql("insertNodeHostSql"), params);
}

}

public void updateNodeHostForCurrentNode() {
if (nodeHostForCurrentNode == null) {
nodeHostForCurrentNode = new NodeHost(findIdentityNodeId());
Expand Down Expand Up @@ -251,7 +251,7 @@ public void deleteNode(String nodeId, boolean syncChange) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
throw ex;
} finally {
if (!syncChange) {
symmetricDialect.enableSyncTriggers(transaction);
Expand Down Expand Up @@ -291,7 +291,7 @@ public void insertNodeGroup(String groupId, String description) {
sqlTemplate.update(getSql("insertNodeGroupSql"), description, groupId);
}
}

public void save(Node node) {
if (!updateNode(node)) {
sqlTemplate.update(
Expand Down Expand Up @@ -331,12 +331,12 @@ protected <T> T getFirstEntry(List<T> list) {
}
return null;
}

public List<NodeSecurity> findNodeSecurityWithLoadEnabled() {
return sqlTemplate.query(
getSql("findNodeSecurityWithLoadEnabledSql"), new NodeSecurityRowMapper());
getSql("findNodeSecurityWithLoadEnabledSql"), new NodeSecurityRowMapper());
}


public Map<String, NodeSecurity> findAllNodeSecurity(boolean useCache) {
long maxSecurityCacheTime = parameterService
Expand Down Expand Up @@ -382,9 +382,9 @@ public Node findIdentity() {
}

public Node findIdentity(boolean useCache) {
return findIdentity(useCache, true);
return findIdentity(useCache, true);
}

public Node findIdentity(boolean useCache, boolean logSqlError) {
if (cachedNodeIdentity == null || useCache == false) {
try {
Expand Down Expand Up @@ -468,7 +468,7 @@ public NetworkedNode getRootNetworkedNode() {
return null;
}
}

public boolean updateNodeSecurity(NodeSecurity security) {
ISqlTransaction transaction = null;
try {
Expand All @@ -485,7 +485,7 @@ public boolean updateNodeSecurity(NodeSecurity security) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
throw ex;
} finally {
close(transaction);
}
Expand Down Expand Up @@ -513,7 +513,7 @@ public boolean updateNodeSecurity(ISqlTransaction transaction, NodeSecurity secu
flushNodeAuthorizedCache();
return updated;
}

public boolean setInitialLoadEnabled(ISqlTransaction transaction, String nodeId,
boolean initialLoadEnabled, boolean syncChange, long loadId, String createBy) {
try {
Expand Down Expand Up @@ -556,12 +556,12 @@ public boolean setInitialLoadEnabled(String nodeId, boolean initialLoadEnabled,
if (transaction != null) {
transaction.rollback();
}
throw ex;
throw ex;
} finally {
close(transaction);
}
}

public boolean setReverseInitialLoadEnabled(ISqlTransaction transaction, String nodeId,
boolean initialLoadEnabled, boolean syncChange, long loadId, String createBy) {
try {
Expand All @@ -587,8 +587,8 @@ public boolean setReverseInitialLoadEnabled(ISqlTransaction transaction, String
symmetricDialect.enableSyncTriggers(transaction);
}
}
}
}

public boolean setReverseInitialLoadEnabled(String nodeId, boolean initialLoadEnabled, boolean syncChange, long loadId, String createBy) {
ISqlTransaction transaction = null;
try {
Expand All @@ -605,11 +605,11 @@ public boolean setReverseInitialLoadEnabled(String nodeId, boolean initialLoadEn
if (transaction != null) {
transaction.rollback();
}
throw ex;
} finally {
throw ex;
} finally {
close(transaction);
}
}
}

public boolean isExternalIdRegistered(String nodeGroupId, String externalId) {
return sqlTemplate.queryForInt(getSql("isNodeRegisteredSql"), new Object[] { nodeGroupId,
Expand Down Expand Up @@ -717,7 +717,7 @@ public List<Node> findOfflineNodes(long minutesOffline) {
if (myNode != null) {
long offlineNodeDetectionMillis = minutesOffline * 60 * 1000;

List<Row> list = sqlTemplate.query(getSql("findOfflineNodesSql"), new Object[] {
List<Row> list = sqlTemplate.query(getSql("findNodeHeartbeatsSql"), new Object[] {
myNode.getNodeId(), myNode.getNodeId()}, (int[])null);
for (Row node : list) {
String nodeId = node.getString("node_id");
Expand All @@ -742,17 +742,31 @@ public List<Node> findOfflineNodes(long minutesOffline) {
}

return offlineNodeList;

}


public Map<String, Date> findLastHeartbeats() {
Map<String, Date> dates = new HashMap<String, Date>();
Node myNode = findIdentity();
if (myNode != null) {
List<Row> list = sqlTemplate.query(getSql("findNodeHeartbeatsSql"), new Object[] {
myNode.getNodeId(), myNode.getNodeId()}, (int[])null);
for (Row node : list) {
String nodeId = node.getString("node_id");
Date time = node.getDateTime("heartbeat_time");
dates.put(nodeId, time);
}
}
return dates;
}

public List<String> findOfflineNodeIds(long minutesOffline) {
List<String> offlineNodeList = new ArrayList<String>();
Node myNode = findIdentity();

if (myNode != null) {
long offlineNodeDetectionMillis = minutesOffline * 60 * 1000;

List<Row> list = sqlTemplate.query(getSql("findOfflineNodesSql"), new Object[] {
List<Row> list = sqlTemplate.query(getSql("findNodeHeartbeatsSql"), new Object[] {
myNode.getNodeId(), myNode.getNodeId()}, (int[])null);
for (Row node : list) {
String nodeId = node.getString("node_id");
Expand All @@ -776,7 +790,7 @@ public List<String> findOfflineNodeIds(long minutesOffline) {
}
}
return offlineNodeList;
}
}

public void setOfflineServerListeners(List<IOfflineServerListener> listeners) {
this.offlineServerListeners = listeners;
Expand Down Expand Up @@ -839,7 +853,7 @@ public NodeSecurity mapRow(Row rs) {
nodeSecurity.setInitialLoadTime(rs.getDateTime("initial_load_time"));
nodeSecurity.setCreatedAtNodeId(rs.getString("created_at_node_id"));
nodeSecurity.setRevInitialLoadEnabled(rs.getBoolean("rev_initial_load_enabled"));
nodeSecurity.setRevInitialLoadTime(rs.getDateTime("rev_initial_load_time"));
nodeSecurity.setRevInitialLoadTime(rs.getDateTime("rev_initial_load_time"));
nodeSecurity.setInitialLoadId(rs.getLong("initial_load_id"));
nodeSecurity.setInitialLoadCreateBy(rs.getString("initial_load_create_by"));
nodeSecurity.setRevInitialLoadId(rs.getLong("rev_initial_load_id"));
Expand Down
Expand Up @@ -83,15 +83,15 @@ public NodeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace

putSql("selectExternalIdsSql",
"select distinct(external_id) from $(node) where sync_enabled=1 order by external_id asc ");

putSql("findNodeSecurityWithLoadEnabledSql",
"select node_id, node_password, registration_enabled, registration_time, "
+ " initial_load_enabled, initial_load_time, created_at_node_id, "
+ " rev_initial_load_enabled, rev_initial_load_time, initial_load_id, " +
" initial_load_create_by, rev_initial_load_id, rev_initial_load_create_by " +
" from $(node_security) "
+ " where initial_load_enabled=1 or rev_initial_load_enabled=1 ");

putSql("findAllNodeSecuritySql",
"select node_id, node_password, registration_enabled, registration_time, "
+ " initial_load_enabled, initial_load_time, created_at_node_id, "
Expand All @@ -102,7 +102,7 @@ public NodeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
putSql("deleteNodeSecuritySql", "delete from $(node_security) where node_id = ?");

putSql("deleteNodeSql", "delete from $(node) where node_id = ?");

putSql("deleteNodeHostSql", "delete from $(node_host) where node_id = ?");

putSql("findNodeIdentitySql", "inner join $(node_identity) i on c.node_id = "
Expand Down Expand Up @@ -169,7 +169,7 @@ public NodeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
+ " total_memory_bytes=?, max_memory_bytes=?, java_version=?, java_vendor=?, symmetric_version=?, timezone_offset=?, heartbeat_time=?, "
+ " last_restart_time=? where node_id=? and host_name=? ");

putSql("findOfflineNodesSql",
putSql("findNodeHeartbeatsSql",
"select h.node_id, max(h.heartbeat_time) as heartbeat_time, h.timezone_offset from $(node_host) h inner join $(node) n on h.node_id=n.node_id"
+ " where n.sync_enabled = 1 and n.node_id != ? and n.created_at_node_id = ? group by h.node_id, h.timezone_offset");

Expand Down

0 comments on commit afc0d50

Please sign in to comment.