Skip to content

Commit

Permalink
Fixed bug with routing of data the has the source_node_id set to the …
Browse files Browse the repository at this point in the history
…only node it is routed to. Fixed logging of total read time.
  • Loading branch information
chenson42 committed Feb 26, 2010
1 parent 24648f9 commit af54f46
Showing 1 changed file with 10 additions and 8 deletions.
Expand Up @@ -304,7 +304,6 @@ protected int selectDataAndRoute(Connection conn, DataRef ref, RouterContext con
long ts = System.currentTimeMillis();
if (readData(rs, dataQueue, transactionIdDataId)) {
dataCount++;
context.incrementStat(System.currentTimeMillis() - ts, STAT_READ_DATA_MS);
} else {
// don't count the event if we didn't read it
i--;
Expand All @@ -313,8 +312,8 @@ protected int selectDataAndRoute(Connection conn, DataRef ref, RouterContext con
if (hasNext) {
hasNext = rs.next();
}
context.incrementStat(System.currentTimeMillis() - ts, STAT_READ_DATA_MS);
}

// Go ahead and close the resource if we don't need it anymore.
if (!hasNext) {
JdbcUtils.closeResultSet(rs);
Expand All @@ -339,13 +338,14 @@ protected int selectDataAndRoute(Connection conn, DataRef ref, RouterContext con
if (readData(rs, dataQueue, transactionIdDataId)) {
dataCount++;
}
context.incrementStat(System.currentTimeMillis() - ts, STAT_READ_DATA_MS);

if (hasNext) {
hasNext = rs.next();
}
context.incrementStat(System.currentTimeMillis() - ts, STAT_READ_DATA_MS);
}
}

return dataCount;

} finally {
Expand Down Expand Up @@ -378,6 +378,10 @@ protected void routeData(Data data, Map<String, Long> transactionIdDataId, Route
context.incrementStat(System.currentTimeMillis() - ts, STAT_DATA_ROUTER_MS);
}

if (dataMetaData.getData().getSourceNodeId() != null) {
nodeIds.remove(dataMetaData.getData().getSourceNodeId());
}

insertDataEvents(context, dataMetaData, nodeIds, triggerRouter);

}
Expand All @@ -397,8 +401,7 @@ protected void insertDataEvents(RouterContext context, DataMetaData dataMetaData
}
long ts = System.currentTimeMillis();
for (String nodeId : nodeIds) {
if (dataMetaData.getData().getSourceNodeId() == null
|| !dataMetaData.getData().getSourceNodeId().equals(nodeId)) {

Map<String, OutgoingBatch> batches = context.getBatchesByNodes();
OutgoingBatch batch = batches.get(nodeId);
if (batch == null) {
Expand All @@ -415,8 +418,7 @@ protected void insertDataEvents(RouterContext context, DataMetaData dataMetaData
if (batchAlgorithms.get(context.getChannel().getBatchAlgorithm()).isBatchComplete(
batch, dataMetaData, context)) {
context.setNeedsCommitted(true);
}
}
}
}
context.incrementStat(System.currentTimeMillis() - ts, STAT_INSERT_DATA_EVENTS_MS);
}
Expand Down

0 comments on commit af54f46

Please sign in to comment.