Skip to content

Commit

Permalink
0004219: Routing should use transaction to insert multiple
Browse files Browse the repository at this point in the history
sym_outgoing_batch rows
  • Loading branch information
erilong committed Dec 31, 2019
1 parent 90d69d2 commit 82027ad
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 47 deletions.
Expand Up @@ -52,6 +52,9 @@ public class ChannelRouterContext extends SimpleRouterContext {
public static final String STAT_ENQUEUE_EOD_MS = "data.enqueue.eod.time.ms";
public static final String STAT_DATA_EVENTS_INSERTED = "data.events.insert.count";
public static final String STAT_DATA_ROUTED_COUNT = "data.routed.count";
public static final String STAT_INSERT_BATCHES_MS = "batches.insert.time.ms";
public static final String STAT_BATCHES_INSERTED = "batches.insert.count";
public static final String STAT_UPDATE_BATCHES_MS = "batches.update.time.ms";
public static final String STAT_ROUTE_TOTAL_TIME = "total.time.ms";

private Map<String, OutgoingBatch> batchesByNodes = new HashMap<String, OutgoingBatch>();
Expand Down Expand Up @@ -179,8 +182,8 @@ synchronized public void logStats(Logger log, long totalTimeInMs) {
super.logStats(log, totalTimeInMs);
if (log.isDebugEnabled()) {
log.debug(channel.getChannelId() + ", startDataId=" + startDataId + ", endDataId=" + endDataId +
", dataReadCount=" + dataReadCount + ", peekAheadFillCount=" + peekAheadFillCount +
", dataGaps=" + dataGaps.toString());
", lastDataId=" + lastDataId + ", dataReadCount=" + dataReadCount + ", peekAheadFillCount=" + peekAheadFillCount +
", dataGaps=" + dataGaps.size());
}
}

Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.Row;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SymmetricException;
Expand Down Expand Up @@ -572,12 +573,11 @@ protected long routeDataForChannel(ProcessInfo processInfo, final NodeChannel no
engine.getDataService().insertDataEvents(context.getSqlTransaction(),
context.getDataEventList());
context.clearDataEventsList();
context.incrementStat(System.currentTimeMillis() - insertTs, ChannelRouterContext.STAT_INSERT_DATA_EVENTS_MS);
completeBatchesAndCommit(context);
gapDetector.addDataIds(context.getDataIds());
gapDetector.setIsAllDataRead(context.getDataIds().size() < context.getChannel().getMaxDataToRoute());
hasMaxDataRoutedOnChannel |= context.getDataIds().size() >= context.getChannel().getMaxDataToRoute();
context.incrementStat(System.currentTimeMillis() - insertTs,
ChannelRouterContext.STAT_INSERT_DATA_EVENTS_MS);

if (parameterService.is(ParameterConstants.ROUTING_COLLECT_STATS_UNROUTED)) {
Data lastDataProcessed = context.getLastDataProcessed();
Expand Down Expand Up @@ -625,8 +625,10 @@ protected void completeBatchesAndCommit(ChannelRouterContext context) {
batches.addAll(groupBatches.values());
}

long ts = System.currentTimeMillis();
completeBatches(context, batches, usedRouters);
context.commit();
context.incrementStat(System.currentTimeMillis() - ts, ChannelRouterContext.STAT_UPDATE_BATCHES_MS);

for (IDataRouter dataRouter : usedRouters) {
dataRouter.contextCommitted(context);
Expand Down Expand Up @@ -762,14 +764,12 @@ protected long selectDataAndRoute(ProcessInfo processInfo, ChannelRouterContext
engine.getDataService().insertDataEvents(
context.getSqlTransaction(), context.getDataEventList());
context.clearDataEventsList();
context.incrementStat(System.currentTimeMillis() - insertTs, ChannelRouterContext.STAT_INSERT_DATA_EVENTS_MS);
}
if (context.isNeedsCommitted()) {
completeBatchesAndCommit(context);
}
} finally {
context.incrementStat(System.currentTimeMillis() - insertTs,
ChannelRouterContext.STAT_INSERT_DATA_EVENTS_MS);

if (statsDataCount > StatisticConstants.FLUSH_SIZE_ROUTER_DATA) {
engine.getStatisticManager().incrementDataRouted(
context.getChannel().getChannelId(), statsDataCount);
Expand Down Expand Up @@ -931,6 +931,7 @@ protected int insertDataEvents(ProcessInfo processInfo, ChannelRouterContext con
boolean useCommonMode = context.isProduceCommonBatches() || context.isProduceGroupBatches();
boolean firstTimeForGroup = false;
int numberOfDataEventsInserted = 0;
ISqlTransaction transaction = null;

if (nodeIds == null || nodeIds.size() == 0) {
nodeIds = new HashSet<String>(1);
Expand Down Expand Up @@ -971,53 +972,68 @@ protected int insertDataEvents(ProcessInfo processInfo, ChannelRouterContext con
} else {
context.setLastLoadId(-1);
}

for (String nodeId : nodeIds) {
if (nodeId != null) {
OutgoingBatch batch = batches.get(nodeId);
if (batch == null) {
batch = new OutgoingBatch(nodeId, dataMetaData.getNodeChannel().getChannelId(), Status.RT);
batch.setBatchId(batchIdToReuse);
batch.setCommonFlag(useCommonMode);

if (log.isDebugEnabled()) {
log.debug("About to insert a new batch for node {} on the '{}' channel. Batches in progress are: {}.",
nodeId, batch.getChannelId(), batches.values());
}

if (useCommonMode && !firstTimeForGroup) {
throw new CommonBatchCollisionException("Collision detected for common batch group");
}

engine.getOutgoingBatchService().insertOutgoingBatch(batch);
processInfo.incrementBatchCount();
batches.put(nodeId, batch);

// if in reuse mode, then share the batch id
if (useCommonMode) {
batchIdToReuse = batch.getBatchId();
}
}
try {
for (String nodeId : nodeIds) {
if (nodeId != null) {
OutgoingBatch batch = batches.get(nodeId);
if (batch == null) {
batch = new OutgoingBatch(nodeId, dataMetaData.getNodeChannel().getChannelId(), Status.RT);
batch.setBatchId(batchIdToReuse);
batch.setCommonFlag(useCommonMode);

if (log.isDebugEnabled()) {
log.debug("About to insert a new batch for node {} on the '{}' channel. Batches in progress are: {}.",
nodeId, batch.getChannelId(), batches.values());
}

if (useCommonMode && !firstTimeForGroup) {
throw new CommonBatchCollisionException("Collision detected for common batch group");
}

batch.incrementRowCount(eventType);
batch.incrementDataRowCount();
batch.incrementTableCount(tableName);
if (transaction == null) {
transaction = sqlTemplate.startSqlTransaction();
}

if (loadId != -1) {
batch.setLoadId(loadId);
}
if (!useCommonMode || (useCommonMode && !dataEventAdded)) {
context.addDataEvent(dataMetaData.getData().getDataId(), batch.getBatchId());
numberOfDataEventsInserted++;
dataEventAdded = true;
}
if (context.isBatchComplete(batch, dataMetaData)) {
context.setNeedsCommitted(true);
engine.getOutgoingBatchService().insertOutgoingBatch(transaction, batch);
processInfo.incrementBatchCount();
context.incrementStat(1, ChannelRouterContext.STAT_BATCHES_INSERTED);
batches.put(nodeId, batch);

// if in reuse mode, then share the batch id
if (useCommonMode) {
batchIdToReuse = batch.getBatchId();
}
}

batch.incrementRowCount(eventType);
batch.incrementDataRowCount();
batch.incrementTableCount(tableName);

if (loadId != -1) {
batch.setLoadId(loadId);
}
if (!useCommonMode || (useCommonMode && !dataEventAdded)) {
context.addDataEvent(dataMetaData.getData().getDataId(), batch.getBatchId());
numberOfDataEventsInserted++;
dataEventAdded = true;
}
if (context.isBatchComplete(batch, dataMetaData)) {
context.setNeedsCommitted(true);
}
}
}
if (transaction != null) {
transaction.commit();
transaction = null;
}
} finally {
if (transaction != null) {
transaction.rollback();
}
}

context.incrementStat(System.currentTimeMillis() - ts, ChannelRouterContext.STAT_INSERT_DATA_EVENTS_MS);
context.incrementStat(System.currentTimeMillis() - ts, ChannelRouterContext.STAT_INSERT_BATCHES_MS);
return numberOfDataEventsInserted;
}

Expand Down

0 comments on commit 82027ad

Please sign in to comment.