From 7073c091810f221682a4b85fe521cb58c2abe6e3 Mon Sep 17 00:00:00 2001 From: chenson42 Date: Thu, 13 Aug 2009 17:24:06 +0000 Subject: [PATCH] Gather stats in the RoutingContext. Fixed BshDataRouter to not initialize an interpreter for every event. --- .../integrate/XmlPublisherDataRouter.java | 3 +- .../symmetric/route/AbstractDataRouter.java | 8 ++- .../symmetric/route/BshDataRouter.java | 18 ++++++- .../symmetric/route/DefaultDataRouter.java | 3 +- .../jumpmind/symmetric/route/IDataRouter.java | 3 +- .../symmetric/route/IRouterContext.java | 2 + .../symmetric/route/RouterContext.java | 6 ++- .../symmetric/route/SimpleRouterContext.java | 32 ++++++++++++ .../symmetric/service/impl/RouterService.java | 51 ++++++++++++------- .../service/impl/RouterServiceTest.java | 5 +- 10 files changed, 104 insertions(+), 27 deletions(-) diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/integrate/XmlPublisherDataRouter.java b/symmetric/src/main/java/org/jumpmind/symmetric/integrate/XmlPublisherDataRouter.java index a7ec7be6cd..dc009d5a4d 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/integrate/XmlPublisherDataRouter.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/integrate/XmlPublisherDataRouter.java @@ -26,6 +26,7 @@ import org.jdom.Element; import org.jumpmind.symmetric.model.DataMetaData; import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.route.IDataRouter; import org.jumpmind.symmetric.route.IRouterContext; @@ -36,7 +37,7 @@ */ public class XmlPublisherDataRouter extends AbstractXmlPublisherExtensionPoint implements IDataRouter { - public void completeBatch(IRouterContext context) { + public void completeBatch(IRouterContext context, OutgoingBatch batch) { if (doesXmlExistToPublish(context)) { finalizeXmlAndPublish(context); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/AbstractDataRouter.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/AbstractDataRouter.java index 838016a0e6..8773f55ad5 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/route/AbstractDataRouter.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/route/AbstractDataRouter.java @@ -31,6 +31,7 @@ import org.jumpmind.symmetric.model.DataEventType; import org.jumpmind.symmetric.model.DataMetaData; import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.OutgoingBatch; public abstract class AbstractDataRouter implements IDataRouter { @@ -91,7 +92,7 @@ protected Map getNewData(String prefix, DataMetaData dataMetaDat protected Map getOldData(String prefix, DataMetaData dataMetaData, IDbDialect dbDialect) { return getData(prefix, dataMetaData, dbDialect, dataMetaData.getData().getParsedOldData()); } - + protected Map getNullData(String prefix, DataMetaData dataMetaData) { String[] columnNames = dataMetaData.getTriggerHistory().getParsedColumnNames(); Map data = new HashMap(columnNames.length); @@ -128,6 +129,9 @@ protected Set toNodeIds(Set nodes) { /** * Override if needed. */ - public void completeBatch(IRouterContext context) { + public void completeBatch(IRouterContext context, OutgoingBatch batch) { + if (logger.isDebugEnabled()) { + logger.debug("Completing batch " + batch.getBatchId()); + } } } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/BshDataRouter.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/BshDataRouter.java index e8ea77aa58..ee1f976961 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/route/BshDataRouter.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/route/BshDataRouter.java @@ -23,10 +23,16 @@ public class BshDataRouter extends AbstractDataRouter { public Collection routeToNodes(IRouterContext context, DataMetaData dataMetaData, Set nodes, boolean initialLoad) { try { - Interpreter interpreter = new Interpreter(); + long ts = System.currentTimeMillis(); + Interpreter interpreter = getInterpreter(context); + context.incrementStat(System.currentTimeMillis() - ts, "bsh.init"); HashSet targetNodes = new HashSet(); + ts = System.currentTimeMillis(); bind(interpreter, dataMetaData, nodes, targetNodes); + context.incrementStat(System.currentTimeMillis() - ts, "bsh.bind"); + ts = System.currentTimeMillis(); Object returnValue = interpreter.eval(dataMetaData.getTrigger().getRouterExpression()); + context.incrementStat(System.currentTimeMillis() - ts, "bsh.eval"); return eval(returnValue, nodes, targetNodes); } catch (EvalError e) { logger.error("Error in data router. Routing to nobody.", e); @@ -34,6 +40,16 @@ public Collection routeToNodes(IRouterContext context, DataMetaData data } } + protected Interpreter getInterpreter(IRouterContext context) { + final String KEY = String.format("%s.Interpreter", getClass().getName()); + Interpreter interpreter = (Interpreter) context.getContextCache().get(KEY); + if (interpreter == null) { + interpreter = new Interpreter(); + context.getContextCache().put(KEY, interpreter); + } + return interpreter; + } + protected Collection eval(Object value, Set nodes, Set targetNodes) { if (targetNodes.size() > 0) { return targetNodes; diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/DefaultDataRouter.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/DefaultDataRouter.java index 026e04a704..8c13bcd551 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/route/DefaultDataRouter.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/route/DefaultDataRouter.java @@ -24,6 +24,7 @@ import org.jumpmind.symmetric.model.DataMetaData; import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.OutgoingBatch; public class DefaultDataRouter extends AbstractDataRouter { @@ -32,7 +33,7 @@ public Collection routeToNodes(IRouterContext routingContext, DataMetaDa return toNodeIds(nodes); } - public void completeBatch(IRouterContext context) { + public void completeBatch(IRouterContext context, OutgoingBatch batch) { } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/IDataRouter.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/IDataRouter.java index dcbd87d287..04d2a0ea69 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/route/IDataRouter.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/route/IDataRouter.java @@ -25,6 +25,7 @@ import org.jumpmind.symmetric.ext.IExtensionPoint; import org.jumpmind.symmetric.model.DataMetaData; import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.OutgoingBatch; /** * The data router is an extension point that allows the end user to target @@ -43,6 +44,6 @@ public interface IDataRouter extends IExtensionPoint { public Collection routeToNodes(IRouterContext context, DataMetaData dataMetaData, Set nodes, boolean initialLoad); - public void completeBatch(IRouterContext context); + public void completeBatch(IRouterContext context, OutgoingBatch batch); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/IRouterContext.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/IRouterContext.java index 8983a9d5ce..0a382d6147 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/route/IRouterContext.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/route/IRouterContext.java @@ -37,4 +37,6 @@ public interface IRouterContext extends ICacheContext { public boolean isEncountedTransactionBoundary(); + public void incrementStat(long amount, String name); + } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/RouterContext.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/RouterContext.java index 87db376872..36633ad9d8 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/route/RouterContext.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/route/RouterContext.java @@ -94,11 +94,11 @@ public boolean isNeedsCommitted() { public boolean isRouted() { return routed; } - + public Set getUsedDataRouters() { return usedDataRouters; } - + public void addUsedDataRouter(IDataRouter dataRouter) { this.usedDataRouters.add(dataRouter); } @@ -107,4 +107,6 @@ public void resetForNextData() { this.routed = false; this.needsCommitted = false; } + + } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/SimpleRouterContext.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/SimpleRouterContext.java index a03ee63311..682e6d1041 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/route/SimpleRouterContext.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/route/SimpleRouterContext.java @@ -2,6 +2,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,4 +55,35 @@ public boolean isEncountedTransactionBoundary() { return this.encountedTransactionBoundary; } + private final String getStatKey(String name) { + return String.format("Stat.%s", name); + } + + public void incrementStat(long amount, String name) { + final String KEY = getStatKey(name); + Long val = (Long) contextCache.get(KEY); + if (val == null) { + val = 0l; + } + val += amount; + contextCache.put(KEY, val); + } + + public long getStat(String name) { + final String KEY = getStatKey(name); + Long val = (Long) contextCache.get(KEY); + if (val == null) { + val = 0l; + } + return val; + } + + public void logStats(Log logger) { + Set keys = contextCache.keySet(); + for (String key : keys) { + if (key.startsWith("Stat.")) { + logger.info(String.format("routing '%s' stat %s=%s", channel.getId(), key.substring(key.indexOf(".")+1), contextCache.get(key))); + } + } + } } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java index 53d41eb265..c8bba3c8cf 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java @@ -62,8 +62,8 @@ import org.springframework.jdbc.support.JdbcUtils; /** - * This service is responsible for routing data to specific nodes and managing the batching of data to be delivered to - * each node. + * This service is responsible for routing data to specific nodes and managing + * the batching of data to be delivered to each node. * * @since 2.0 */ @@ -74,7 +74,7 @@ public class RouterService extends AbstractService implements IRouterService { private IDataService dataService; private IConfigurationService configurationService; - + private ITriggerService triggerService; private IOutgoingBatchService outgoingBatchService; @@ -116,9 +116,10 @@ public void routeData() { } /** - * We route data channel by channel for two reasons. One is that if/when we decide to multi-thread the routing it is - * a simple matter of inserting a thread pool here and waiting for all channels to be processed. The other reason is - * to reduce the number of connections we are required to have. + * We route data channel by channel for two reasons. One is that if/when we + * decide to multi-thread the routing it is a simple matter of inserting a + * thread pool here and waiting for all channels to be processed. The other + * reason is to reduce the number of connections we are required to have. */ protected void routeDataForEachChannel(DataRef ref, Node sourceNode) { final List channels = configurationService.getChannels(); @@ -135,7 +136,7 @@ public Object doInConnection(Connection c) throws SQLException, DataAccessExcept RouterContext context = null; try { context = new RouterContext(sourceNode.getNodeId(), nodeChannel, dataSource); - selectDataAndRoute(c, ref, context); + selectDataAndRoute(c, ref, context); } catch (Exception ex) { if (context != null) { context.rollback(); @@ -152,7 +153,8 @@ public Object doInConnection(Connection c) throws SQLException, DataAccessExcept } catch (SQLException e) { logger.error(e, e); } finally { - context.cleanup(); + context.logStats(logger); + context.cleanup(); } } return null; @@ -231,8 +233,10 @@ protected void filterDisabledNodes(Set nodes) { } /** - * Pre-read data and fill up a queue so we can peek ahead to see if we have crossed a database transaction boundary. - * Then route each {@link Data} while continuing to keep the queue filled until the result set is entirely read. + * Pre-read data and fill up a queue so we can peek ahead to see if we have + * crossed a database transaction boundary. Then route each {@link Data} + * while continuing to keep the queue filled until the result set is + * entirely read. * * @param conn * The connection to use for selecting the data. @@ -257,13 +261,17 @@ protected void selectDataAndRoute(Connection conn, DataRef ref, RouterContext co Map transactionIdDataId = new HashMap(); LinkedList dataQueue = new LinkedList(); for (int i = 0; i < peekAheadLength && rs.next(); i++) { + long ts = System.currentTimeMillis(); readData(rs, dataQueue, transactionIdDataId); + context.incrementStat(System.currentTimeMillis()-ts, "readData"); } while (dataQueue.size() > 0) { routeData(dataQueue.poll(), transactionIdDataId, context); if (rs.next()) { + long ts = System.currentTimeMillis(); readData(rs, dataQueue, transactionIdDataId); + context.incrementStat(System.currentTimeMillis()-ts, "readData"); } } @@ -285,9 +293,12 @@ protected void routeData(Data data, Map transactionIdDataId, Route if (!context.getChannel().isIgnored()) { IDataRouter dataRouter = getDataRouter(trigger); context.addUsedDataRouter(dataRouter); - Collection nodeIds = dataRouter.routeToNodes(context, dataMetaData, findAvailableNodes( - trigger, context), false); + long ts = System.currentTimeMillis(); + Collection nodeIds = dataRouter.routeToNodes(context, dataMetaData, findAvailableNodes(trigger, + context), false); + context.incrementStat(System.currentTimeMillis()-ts, "dataRouter"); insertDataEvents(context, dataMetaData, nodeIds); + } if (!context.isRouted()) { @@ -308,6 +319,7 @@ protected void routeData(Data data, Map transactionIdDataId, Route protected void insertDataEvents(RouterContext context, DataMetaData dataMetaData, Collection nodeIds) { if (nodeIds != null && nodeIds.size() > 0) { + long ts = System.currentTimeMillis(); for (String nodeId : nodeIds) { if (dataMetaData.getData().getSourceNodeId() == null || !dataMetaData.getData().getSourceNodeId().equals(nodeId)) { @@ -319,22 +331,24 @@ protected void insertDataEvents(RouterContext context, DataMetaData dataMetaData } batch.incrementDataEventCount(); context.setRouted(true); - dataService.insertDataEvent(context.getJdbcTemplate(), dataMetaData.getData().getDataId(), - batch.getBatchId()); + dataService.insertDataEvent(context.getJdbcTemplate(), dataMetaData.getData().getDataId(), batch + .getBatchId()); if (batchAlgorithms.get(context.getChannel().getBatchAlgorithm()).isBatchComplete(batch, dataMetaData, context)) { completeBatch(batch, context); } } } + context.incrementStat(System.currentTimeMillis()-ts, "insertDataEvents"); } + } protected void completeBatch(OutgoingBatch batch, RouterContext context) { batch.setRouterMillis(System.currentTimeMillis() - batch.getCreateTime().getTime()); Set usedRouters = context.getUsedDataRouters(); for (IDataRouter dataRouter : usedRouters) { - dataRouter.completeBatch(context); + dataRouter.completeBatch(context, batch); } outgoingBatchService.updateOutgoingBatch(context.getJdbcTemplate(), batch); context.getBatchesByNodes().remove(batch.getNodeId()); @@ -369,8 +383,9 @@ protected Data readData(ResultSet rs, LinkedList dataStack, Map routers) { public void setBatchAlgorithms(Map batchAlgorithms) { this.batchAlgorithms = batchAlgorithms; } - + public void setTriggerService(ITriggerService triggerService) { this.triggerService = triggerService; } diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/RouterServiceTest.java b/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/RouterServiceTest.java index e958799ef8..c0608a21b9 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/RouterServiceTest.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/RouterServiceTest.java @@ -254,11 +254,14 @@ public void testBshTransactionalRoutingOnUpdate() { testChannel.setBatchAlgorithm("transactional"); getConfigurationService().saveChannel(testChannel); + long ts = System.currentTimeMillis(); SimpleJdbcTemplate t = new SimpleJdbcTemplate(getJdbcTemplate()); int count = t.update(String.format("update %s set ROUTING_VARCHAR=?", TEST_TABLE_1), NODE_GROUP_NODE_3); - logger.info("Just recorded a change to " + count + " rows in " + TEST_TABLE_1); + logger.info("Just recorded a change to " + count + " rows in " + TEST_TABLE_1 + " in " + (System.currentTimeMillis()-ts) + "ms"); + ts = System.currentTimeMillis(); getRoutingService().routeData(); + logger.info("Just routed " + count + " rows in " + TEST_TABLE_1 + " in " + (System.currentTimeMillis()-ts) + "ms"); List batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1); filterForChannels(batches, testChannel);