Skip to content

Commit

Permalink
0002693: Use node cache
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Jul 27, 2016
1 parent 3c8e0dc commit d8da2c5
Show file tree
Hide file tree
Showing 13 changed files with 95 additions and 13 deletions.
Expand Up @@ -209,6 +209,7 @@ private ParameterConstants() {

public final static String CACHE_TIMEOUT_GROUPLETS_IN_MS = "cache.grouplets.time.ms";
public final static String CACHE_TIMEOUT_NODE_SECURITY_IN_MS = "cache.node.security.time.ms";
public final static String CACHE_TIMEOUT_NODE_IN_MS = "cache.node.time.ms";
public final static String CACHE_TIMEOUT_TRIGGER_ROUTER_IN_MS = "cache.trigger.router.time.ms";
public final static String CACHE_TIMEOUT_CHANNEL_IN_MS = "cache.channel.time.ms";
public final static String CACHE_TIMEOUT_NODE_GROUP_LINK_IN_MS = "cache.node.group.link.time.ms";
Expand Down
Expand Up @@ -81,6 +81,9 @@ public class ConfigurationChangedDatabaseWriterFilter extends DatabaseWriterFilt
final String CTX_KEY_FLUSH_NODE_SECURITY_NEEDED = "FlushNodeSecurity."
+ ConfigurationChangedDatabaseWriterFilter.class.getSimpleName() + hashCode();

final String CTX_KEY_FLUSH_NODE_NEEDED = "FlushNode."
+ ConfigurationChangedDatabaseWriterFilter.class.getSimpleName() + hashCode();

final String CTX_KEY_RESTART_JOBMANAGER_NEEDED = "RestartJobManager."
+ ConfigurationChangedDatabaseWriterFilter.class.getSimpleName() + hashCode();

Expand Down Expand Up @@ -122,6 +125,7 @@ public void afterWrite(DataContext context, Table table, CsvData data) {
recordJobManagerRestartNeeded(context, table, data);
recordConflictFlushNeeded(context, table);
recordNodeSecurityFlushNeeded(context, table);
recordNodeFlushNeeded(context, table);
}

private void recordGroupletFlushNeeded(DataContext context, Table table) {
Expand Down Expand Up @@ -202,6 +206,12 @@ private void recordNodeSecurityFlushNeeded(DataContext context, Table table) {
}
}

private void recordNodeFlushNeeded(DataContext context, Table table) {
if (matchesTable(table, TableConstants.SYM_NODE)) {
context.put(CTX_KEY_FLUSH_NODE_NEEDED, true);
}
}

private boolean isSyncTriggersNeeded(DataContext context, Table table) {
boolean autoSync = engine.getParameterService().is(ParameterConstants.AUTO_SYNC_TRIGGERS_AFTER_CONFIG_LOADED) ||
context.getBatch().getBatchId() == Constants.VIRTUAL_BATCH_FOR_REGISTRATION;
Expand Down Expand Up @@ -349,6 +359,12 @@ public void batchCommitted(DataContext context) {
context.remove(CTX_KEY_FLUSH_NODE_SECURITY_NEEDED);
}

if (context.get(CTX_KEY_FLUSH_NODE_NEEDED) != null) {
log.info("About to refresh the cache of nodes because new configuration came through the data loader");
nodeService.flushNodeCache();
context.remove(CTX_KEY_FLUSH_NODE_NEEDED);
}

if (context.get(CTX_KEY_RESYNC_TABLE_NEEDED) != null
&& parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) {
@SuppressWarnings("unchecked")
Expand Down
Expand Up @@ -42,7 +42,6 @@
import org.jumpmind.symmetric.model.NetworkedNode;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.model.TableReloadRequest;
import org.jumpmind.symmetric.model.TableReloadRequestKey;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.model.TriggerHistory;
Expand Down Expand Up @@ -81,6 +80,12 @@ public class ConfigurationChangedDataRouter extends AbstractDataRouter implement
final String CTX_KEY_FLUSH_NOTIFICATIONS_NEEDED = "FlushNotifcations."
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();

final String CTX_KEY_FLUSH_NODES_NEEDED = "FlushNodes."
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();

final String CTX_KEY_FLUSH_NODE_SECURITYS_NEEDED = "FlushNodeSecuritys."
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();

final String CTX_KEY_RESTART_JOBMANAGER_NEEDED = "RestartJobManager."
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();

Expand Down Expand Up @@ -122,6 +127,13 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
|| tableMatches(dataMetaData, TableConstants.SYM_NODE_SECURITY)
|| tableMatches(dataMetaData, TableConstants.SYM_NODE_HOST)
|| tableMatches(dataMetaData, TableConstants.SYM_MONITOR_EVENT)) {

if (tableMatches(dataMetaData, TableConstants.SYM_NODE)) {
routingContext.put(CTX_KEY_FLUSH_NODES_NEEDED, Boolean.TRUE);
} else if (tableMatches(dataMetaData, TableConstants.SYM_NODE_SECURITY)) {
routingContext.put(CTX_KEY_FLUSH_NODE_SECURITYS_NEEDED, Boolean.TRUE);
}

/*
* If this is sym_node or sym_node_security determine which
* nodes it goes to.
Expand Down Expand Up @@ -567,6 +579,18 @@ public void contextCommitted(SimpleRouterContext routingContext) {
log.info("About to refresh the cache of notifications because new configuration came through the data router");
engine.getMonitorService().flushNotificationCache();
}

if (routingContext.get(CTX_KEY_FLUSH_NODES_NEEDED) != null) {
log.info("About to refresh the cache of nodes because new configuration came through the data router");
engine.getNodeService().flushNodeCache();
engine.getNodeService().flushNodeGroupCache();
}

if (routingContext.get(CTX_KEY_FLUSH_NODE_SECURITYS_NEEDED) != null) {
log.info("About to refresh the cache of node security because new configuration came through the data router");
engine.getNodeService().flushNodeAuthorizedCache();
}

}
}

Expand Down
Expand Up @@ -44,6 +44,8 @@ public interface INodeService {

public Node findNode(String nodeId);

public Node findNode(String id, boolean useCache);

public String getExternalId(String nodeId);

public List<NodeHost> findNodeHosts(String nodeId);
Expand Down Expand Up @@ -92,6 +94,8 @@ public void ignoreNodeChannelForExternalId(boolean ignore, String channelId,

public boolean isNodeAuthorized(String nodeId, String password);

public void flushNodeCache();

public void flushNodeAuthorizedCache();

public void flushNodeGroupCache();
Expand Down
Expand Up @@ -586,7 +586,7 @@ protected IDataWriter buildDataWriter(ProcessInfo processInfo, String sourceNode
List<IDatabaseWriterErrorHandler> dynamicErrorHandlers = errorHandlers;

if (sourceNodeId != null) {
Node sourceNode = nodeService.findNode(sourceNodeId);
Node sourceNode = nodeService.findNode(sourceNodeId, true);
if (sourceNode != null) {
link = new NodeGroupLink(sourceNode.getNodeGroupId(),
parameterService.getNodeGroupId());
Expand Down
Expand Up @@ -630,7 +630,7 @@ else if (parameterService.is(ParameterConstants.FILE_SYNC_DELETE_CTL_FILE_AFTER_
public void loadFilesFromPush(String nodeId, InputStream in, OutputStream out) {
INodeService nodeService = engine.getNodeService();
Node local = nodeService.findIdentity();
Node sourceNode = nodeService.findNode(nodeId);
Node sourceNode = nodeService.findNode(nodeId, true);
if (local != null && sourceNode != null) {
ProcessInfo processInfo = engine.getStatisticManager().newProcessInfo(
new ProcessInfoKey(nodeId, local.getNodeId(),
Expand Down
Expand Up @@ -69,11 +69,15 @@ public class NodeService extends AbstractService implements INodeService {

private long securityCacheTime;

private Map<String, Node> nodeCache = new HashMap<String, Node>();

private long nodeCacheTime;

private Map<String, List<Node>> sourceNodesCache = new HashMap<String, List<Node>>();

private Map<String, List<Node>> targetNodesCache = new HashMap<String, List<Node>>();

private long nodesCacheTime;
private long nodeLinkCacheTime;

private INodePasswordFilter nodePasswordFilter;

Expand Down Expand Up @@ -145,6 +149,23 @@ public Node findNode(String id) {
return findAllNodesAsMap().get(id);
}

public Node findNode(String id, boolean useCache) {
if (useCache) {
long cacheTimeoutInMs = parameterService.getLong(ParameterConstants.CACHE_TIMEOUT_NODE_IN_MS);
if ((System.currentTimeMillis() - nodeCacheTime) >= cacheTimeoutInMs) {
nodeCache = findAllNodesAsMap();
nodeCacheTime = System.currentTimeMillis();
}
return nodeCache.get(id);
} else {
return findAllNodesAsMap().get(id);

This comment has been minimized.

Copy link
@woehrl01

woehrl01 Jul 27, 2016

Contributor

Greating, having those cached! Wouldn't it be more performant to also load only the specific node (#31)? Same for the findNode(String id) method. As you still load all nodes, just to fetch a single one.

}
}

public void flushNodeCache() {
nodeCacheTime = 0;
}

public Node findNodeByExternalId(String nodeGroupId, String externalId) {
List<Node> list = sqlTemplate.query(
getSql("selectNodePrefixSql", "findNodeByExternalIdSql"), new NodeRowMapper(),
Expand Down Expand Up @@ -349,11 +370,11 @@ public List<Node> findSourceNodesFor(NodeGroupLinkAction eventAction) {
long cacheTimeoutInMs = parameterService.getLong(ParameterConstants.CACHE_TIMEOUT_NODE_GROUP_LINK_IN_MS);
if (node != null) {
List<Node> list = sourceNodesCache.get(eventAction.name());
if (list == null || (System.currentTimeMillis() - nodesCacheTime) >= cacheTimeoutInMs) {
if (list == null || (System.currentTimeMillis() - nodeLinkCacheTime) >= cacheTimeoutInMs) {
list = sqlTemplate.query(getSql("selectNodePrefixSql", "findNodesWhoTargetMeSql"),
new NodeRowMapper(), node.getNodeGroupId(), eventAction.name());
sourceNodesCache.put(eventAction.name(), list);
nodesCacheTime = System.currentTimeMillis();
nodeLinkCacheTime = System.currentTimeMillis();
}
return list;
} else {
Expand All @@ -366,11 +387,11 @@ public List<Node> findTargetNodesFor(NodeGroupLinkAction eventAction) {
long cacheTimeoutInMs = parameterService.getLong(ParameterConstants.CACHE_TIMEOUT_NODE_GROUP_LINK_IN_MS);
if (node != null) {
List<Node> list = targetNodesCache.get(eventAction.name());
if (list == null || (System.currentTimeMillis() - nodesCacheTime) >= cacheTimeoutInMs) {
if (list == null || (System.currentTimeMillis() - nodeLinkCacheTime) >= cacheTimeoutInMs) {
list = sqlTemplate.query(getSql("selectNodePrefixSql", "findNodesWhoITargetSql"),
new NodeRowMapper(), node.getNodeGroupId(), eventAction.name());
targetNodesCache.put(eventAction.name(), list);
nodesCacheTime = System.currentTimeMillis();
nodeLinkCacheTime = System.currentTimeMillis();
}
return list;
} else {
Expand Down
Expand Up @@ -1160,6 +1160,13 @@ parameter.reload.timeout.ms=600000
# Tags: other
cache.node.security.time.ms=600000

# This is the amount of time node entries will be cached before re-reading
# them from the database.
#
# DatabaseOverridable: true
# Tags: other
cache.node.time.ms=600000

# This is the amount of time node group links entries will be cached before re-reading them from the database.
#
# DatabaseOverridable: true
Expand Down
Expand Up @@ -306,4 +306,13 @@ public String getExternalId(String nodeId) {
return nodeId;
}

@Override
public void flushNodeCache() {
}

@Override
public Node findNode(String id, boolean useCache) {
return null;
}

}
Expand Up @@ -85,7 +85,7 @@ public void after(HttpServletRequest req, HttpServletResponse res) throws IOExce

protected AuthenticationStatus getAuthenticationStatus(String nodeId, String securityToken) {
AuthenticationStatus retVal = AuthenticationStatus.ACCEPTED;
Node node = nodeService.findNode(nodeId);
Node node = nodeService.findNode(nodeId, true);
if (node == null) {
retVal = AuthenticationStatus.REGISTRATION_REQUIRED;
} else if (!syncEnabled(node)) {
Expand Down
Expand Up @@ -64,8 +64,8 @@ public void handle(HttpServletRequest req, HttpServletResponse res) throws IOExc
ProcessType.FILE_SYNC_PULL_HANDLER));
try {
engine.getFileSyncService().sendFiles(processInfo,
engine.getNodeService().findNode(nodeId), outgoingTransport);
Node targetNode = engine.getNodeService().findNode(nodeId);
engine.getNodeService().findNode(nodeId, true), outgoingTransport);
Node targetNode = engine.getNodeService().findNode(nodeId, true);

if (processInfo.getBatchCount() == 0 && targetNode.isVersionGreaterThanOrEqualTo(3,8,0)) {
ServletUtils.sendError(res, HttpServletResponse.SC_NO_CONTENT,
Expand Down
Expand Up @@ -124,7 +124,7 @@ public void pull(String nodeId, String remoteHost, String remoteAddress,
nodeService.findIdentityNodeId(), map.getThreadChannel(), nodeId, ProcessType.PULL_HANDLER));

try {
Node targetNode = nodeService.findNode(nodeId);
Node targetNode = nodeService.findNode(nodeId, true);
List<OutgoingBatch> batchList = dataExtractorService.extract(processInfo, targetNode,
map.getThreadChannel(), outgoingTransport);
logDataReceivedFromPush(targetNode, batchList);
Expand Down
Expand Up @@ -79,7 +79,7 @@ protected void push(String sourceNodeId, InputStream inputStream, OutputStream o
protected void push(String sourceNodeId, String channelId, InputStream inputStream, OutputStream outputStream) throws IOException {
long ts = System.currentTimeMillis();
try {
Node sourceNode = nodeService.findNode(sourceNodeId);
Node sourceNode = nodeService.findNode(sourceNodeId, true);
dataLoaderService.loadDataFromPush(sourceNode, channelId, inputStream, outputStream);
} finally {
statisticManager.incrementNodesPushed(1);
Expand Down

0 comments on commit d8da2c5

Please sign in to comment.