Skip to content

Commit

Permalink
Gather stats in the RoutingContext. Fixed BshDataRouter to not initia…
Browse files Browse the repository at this point in the history
…lize an interpreter for every event.
  • Loading branch information
chenson42 committed Aug 13, 2009
1 parent 0a864fc commit 7073c09
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 27 deletions.
Expand Up @@ -26,6 +26,7 @@
import org.jdom.Element;
import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.route.IDataRouter;
import org.jumpmind.symmetric.route.IRouterContext;

Expand All @@ -36,7 +37,7 @@
*/
public class XmlPublisherDataRouter extends AbstractXmlPublisherExtensionPoint implements IDataRouter {

public void completeBatch(IRouterContext context) {
public void completeBatch(IRouterContext context, OutgoingBatch batch) {
if (doesXmlExistToPublish(context)) {
finalizeXmlAndPublish(context);
}
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.jumpmind.symmetric.model.DataEventType;
import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.OutgoingBatch;

public abstract class AbstractDataRouter implements IDataRouter {

Expand Down Expand Up @@ -91,7 +92,7 @@ protected Map<String, Object> getNewData(String prefix, DataMetaData dataMetaDat
protected Map<String, Object> getOldData(String prefix, DataMetaData dataMetaData, IDbDialect dbDialect) {
return getData(prefix, dataMetaData, dbDialect, dataMetaData.getData().getParsedOldData());
}

protected Map<String, Object> getNullData(String prefix, DataMetaData dataMetaData) {
String[] columnNames = dataMetaData.getTriggerHistory().getParsedColumnNames();
Map<String, Object> data = new HashMap<String, Object>(columnNames.length);
Expand Down Expand Up @@ -128,6 +129,9 @@ protected Set<String> toNodeIds(Set<Node> nodes) {
/**
* Override if needed.
*/
public void completeBatch(IRouterContext context) {
public void completeBatch(IRouterContext context, OutgoingBatch batch) {
if (logger.isDebugEnabled()) {
logger.debug("Completing batch " + batch.getBatchId());
}
}
}
Expand Up @@ -23,17 +23,33 @@ public class BshDataRouter extends AbstractDataRouter {
public Collection<String> routeToNodes(IRouterContext context, DataMetaData dataMetaData, Set<Node> nodes,
boolean initialLoad) {
try {
Interpreter interpreter = new Interpreter();
long ts = System.currentTimeMillis();
Interpreter interpreter = getInterpreter(context);
context.incrementStat(System.currentTimeMillis() - ts, "bsh.init");
HashSet<String> targetNodes = new HashSet<String>();
ts = System.currentTimeMillis();
bind(interpreter, dataMetaData, nodes, targetNodes);
context.incrementStat(System.currentTimeMillis() - ts, "bsh.bind");
ts = System.currentTimeMillis();
Object returnValue = interpreter.eval(dataMetaData.getTrigger().getRouterExpression());
context.incrementStat(System.currentTimeMillis() - ts, "bsh.eval");
return eval(returnValue, nodes, targetNodes);
} catch (EvalError e) {
logger.error("Error in data router. Routing to nobody.", e);
return Collections.emptySet();
}
}

protected Interpreter getInterpreter(IRouterContext context) {
final String KEY = String.format("%s.Interpreter", getClass().getName());
Interpreter interpreter = (Interpreter) context.getContextCache().get(KEY);
if (interpreter == null) {
interpreter = new Interpreter();
context.getContextCache().put(KEY, interpreter);
}
return interpreter;
}

protected Collection<String> eval(Object value, Set<Node> nodes, Set<String> targetNodes) {
if (targetNodes.size() > 0) {
return targetNodes;
Expand Down
Expand Up @@ -24,6 +24,7 @@

import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.OutgoingBatch;

public class DefaultDataRouter extends AbstractDataRouter {

Expand All @@ -32,7 +33,7 @@ public Collection<String> routeToNodes(IRouterContext routingContext, DataMetaDa
return toNodeIds(nodes);
}

public void completeBatch(IRouterContext context) {
public void completeBatch(IRouterContext context, OutgoingBatch batch) {

}

Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.jumpmind.symmetric.ext.IExtensionPoint;
import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.OutgoingBatch;

/**
* The data router is an extension point that allows the end user to target
Expand All @@ -43,6 +44,6 @@ public interface IDataRouter extends IExtensionPoint {

public Collection<String> routeToNodes(IRouterContext context, DataMetaData dataMetaData, Set<Node> nodes, boolean initialLoad);

public void completeBatch(IRouterContext context);
public void completeBatch(IRouterContext context, OutgoingBatch batch);

}
Expand Up @@ -37,4 +37,6 @@ public interface IRouterContext extends ICacheContext {

public boolean isEncountedTransactionBoundary();

public void incrementStat(long amount, String name);

}
Expand Up @@ -94,11 +94,11 @@ public boolean isNeedsCommitted() {
public boolean isRouted() {
return routed;
}

public Set<IDataRouter> getUsedDataRouters() {
return usedDataRouters;
}

public void addUsedDataRouter(IDataRouter dataRouter) {
this.usedDataRouters.add(dataRouter);
}
Expand All @@ -107,4 +107,6 @@ public void resetForNextData() {
this.routed = false;
this.needsCommitted = false;
}


}
Expand Up @@ -2,6 +2,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -54,4 +55,35 @@ public boolean isEncountedTransactionBoundary() {
return this.encountedTransactionBoundary;
}

private final String getStatKey(String name) {
return String.format("Stat.%s", name);
}

public void incrementStat(long amount, String name) {
final String KEY = getStatKey(name);
Long val = (Long) contextCache.get(KEY);
if (val == null) {
val = 0l;
}
val += amount;
contextCache.put(KEY, val);
}

public long getStat(String name) {
final String KEY = getStatKey(name);
Long val = (Long) contextCache.get(KEY);
if (val == null) {
val = 0l;
}
return val;
}

public void logStats(Log logger) {
Set<String> keys = contextCache.keySet();
for (String key : keys) {
if (key.startsWith("Stat.")) {
logger.info(String.format("routing '%s' stat %s=%s", channel.getId(), key.substring(key.indexOf(".")+1), contextCache.get(key)));
}
}
}
}
Expand Up @@ -62,8 +62,8 @@
import org.springframework.jdbc.support.JdbcUtils;

/**
* This service is responsible for routing data to specific nodes and managing the batching of data to be delivered to
* each node.
* This service is responsible for routing data to specific nodes and managing
* the batching of data to be delivered to each node.
*
* @since 2.0
*/
Expand All @@ -74,7 +74,7 @@ public class RouterService extends AbstractService implements IRouterService {
private IDataService dataService;

private IConfigurationService configurationService;

private ITriggerService triggerService;

private IOutgoingBatchService outgoingBatchService;
Expand Down Expand Up @@ -116,9 +116,10 @@ public void routeData() {
}

/**
* We route data channel by channel for two reasons. One is that if/when we decide to multi-thread the routing it is
* a simple matter of inserting a thread pool here and waiting for all channels to be processed. The other reason is
* to reduce the number of connections we are required to have.
* We route data channel by channel for two reasons. One is that if/when we
* decide to multi-thread the routing it is a simple matter of inserting a
* thread pool here and waiting for all channels to be processed. The other
* reason is to reduce the number of connections we are required to have.
*/
protected void routeDataForEachChannel(DataRef ref, Node sourceNode) {
final List<NodeChannel> channels = configurationService.getChannels();
Expand All @@ -135,7 +136,7 @@ public Object doInConnection(Connection c) throws SQLException, DataAccessExcept
RouterContext context = null;
try {
context = new RouterContext(sourceNode.getNodeId(), nodeChannel, dataSource);
selectDataAndRoute(c, ref, context);
selectDataAndRoute(c, ref, context);
} catch (Exception ex) {
if (context != null) {
context.rollback();
Expand All @@ -152,7 +153,8 @@ public Object doInConnection(Connection c) throws SQLException, DataAccessExcept
} catch (SQLException e) {
logger.error(e, e);
} finally {
context.cleanup();
context.logStats(logger);
context.cleanup();
}
}
return null;
Expand Down Expand Up @@ -231,8 +233,10 @@ protected void filterDisabledNodes(Set<Node> nodes) {
}

/**
* Pre-read data and fill up a queue so we can peek ahead to see if we have crossed a database transaction boundary.
* Then route each {@link Data} while continuing to keep the queue filled until the result set is entirely read.
* Pre-read data and fill up a queue so we can peek ahead to see if we have
* crossed a database transaction boundary. Then route each {@link Data}
* while continuing to keep the queue filled until the result set is
* entirely read.
*
* @param conn
* The connection to use for selecting the data.
Expand All @@ -257,13 +261,17 @@ protected void selectDataAndRoute(Connection conn, DataRef ref, RouterContext co
Map<String, Long> transactionIdDataId = new HashMap<String, Long>();
LinkedList<Data> dataQueue = new LinkedList<Data>();
for (int i = 0; i < peekAheadLength && rs.next(); i++) {
long ts = System.currentTimeMillis();
readData(rs, dataQueue, transactionIdDataId);
context.incrementStat(System.currentTimeMillis()-ts, "readData");
}

while (dataQueue.size() > 0) {
routeData(dataQueue.poll(), transactionIdDataId, context);
if (rs.next()) {
long ts = System.currentTimeMillis();
readData(rs, dataQueue, transactionIdDataId);
context.incrementStat(System.currentTimeMillis()-ts, "readData");
}
}

Expand All @@ -285,9 +293,12 @@ protected void routeData(Data data, Map<String, Long> transactionIdDataId, Route
if (!context.getChannel().isIgnored()) {
IDataRouter dataRouter = getDataRouter(trigger);
context.addUsedDataRouter(dataRouter);
Collection<String> nodeIds = dataRouter.routeToNodes(context, dataMetaData, findAvailableNodes(
trigger, context), false);
long ts = System.currentTimeMillis();
Collection<String> nodeIds = dataRouter.routeToNodes(context, dataMetaData, findAvailableNodes(trigger,
context), false);
context.incrementStat(System.currentTimeMillis()-ts, "dataRouter");
insertDataEvents(context, dataMetaData, nodeIds);

}

if (!context.isRouted()) {
Expand All @@ -308,6 +319,7 @@ protected void routeData(Data data, Map<String, Long> transactionIdDataId, Route

protected void insertDataEvents(RouterContext context, DataMetaData dataMetaData, Collection<String> nodeIds) {
if (nodeIds != null && nodeIds.size() > 0) {
long ts = System.currentTimeMillis();
for (String nodeId : nodeIds) {
if (dataMetaData.getData().getSourceNodeId() == null
|| !dataMetaData.getData().getSourceNodeId().equals(nodeId)) {
Expand All @@ -319,22 +331,24 @@ protected void insertDataEvents(RouterContext context, DataMetaData dataMetaData
}
batch.incrementDataEventCount();
context.setRouted(true);
dataService.insertDataEvent(context.getJdbcTemplate(), dataMetaData.getData().getDataId(),
batch.getBatchId());
dataService.insertDataEvent(context.getJdbcTemplate(), dataMetaData.getData().getDataId(), batch
.getBatchId());
if (batchAlgorithms.get(context.getChannel().getBatchAlgorithm()).isBatchComplete(batch,
dataMetaData, context)) {
completeBatch(batch, context);
}
}
}
context.incrementStat(System.currentTimeMillis()-ts, "insertDataEvents");
}

}

protected void completeBatch(OutgoingBatch batch, RouterContext context) {
batch.setRouterMillis(System.currentTimeMillis() - batch.getCreateTime().getTime());
Set<IDataRouter> usedRouters = context.getUsedDataRouters();
for (IDataRouter dataRouter : usedRouters) {
dataRouter.completeBatch(context);
dataRouter.completeBatch(context, batch);
}
outgoingBatchService.updateOutgoingBatch(context.getJdbcTemplate(), batch);
context.getBatchesByNodes().remove(batch.getNodeId());
Expand Down Expand Up @@ -369,8 +383,9 @@ protected Data readData(ResultSet rs, LinkedList<Data> dataStack, Map<String, Lo
}

protected Trigger getTriggerForData(Data data) {
Trigger trigger = triggerService.getActiveTriggersForSourceNodeGroup(parameterService
.getString(ParameterConstants.NODE_GROUP_ID), false).get((data.getTriggerHistory().getTriggerId()));
Trigger trigger = triggerService.getActiveTriggersForSourceNodeGroup(
parameterService.getString(ParameterConstants.NODE_GROUP_ID), false).get(
(data.getTriggerHistory().getTriggerId()));
if (trigger == null) {
trigger = triggerService.getTriggerById(data.getTriggerHistory().getTriggerId());
if (trigger == null) {
Expand Down Expand Up @@ -416,7 +431,7 @@ public void setRouters(Map<String, IDataRouter> routers) {
public void setBatchAlgorithms(Map<String, IBatchAlgorithm> batchAlgorithms) {
this.batchAlgorithms = batchAlgorithms;
}

public void setTriggerService(ITriggerService triggerService) {
this.triggerService = triggerService;
}
Expand Down
Expand Up @@ -254,11 +254,14 @@ public void testBshTransactionalRoutingOnUpdate() {
testChannel.setBatchAlgorithm("transactional");
getConfigurationService().saveChannel(testChannel);

long ts = System.currentTimeMillis();
SimpleJdbcTemplate t = new SimpleJdbcTemplate(getJdbcTemplate());
int count = t.update(String.format("update %s set ROUTING_VARCHAR=?", TEST_TABLE_1),
NODE_GROUP_NODE_3);
logger.info("Just recorded a change to " + count + " rows in " + TEST_TABLE_1);
logger.info("Just recorded a change to " + count + " rows in " + TEST_TABLE_1 + " in " + (System.currentTimeMillis()-ts) + "ms");
ts = System.currentTimeMillis();
getRoutingService().routeData();
logger.info("Just routed " + count + " rows in " + TEST_TABLE_1 + " in " + (System.currentTimeMillis()-ts) + "ms");

List<OutgoingBatch> batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1);
filterForChannels(batches, testChannel);
Expand Down

0 comments on commit 7073c09

Please sign in to comment.