Skip to content

Commit

Permalink
multi-row count and update for batching
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Sep 2, 2008
1 parent 16c632a commit fecdff3
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 0 deletions.
Expand Up @@ -87,6 +87,45 @@ public void buildOutgoingBatches(final String nodeId, final List<NodeChannel> ch
@Transactional
public void buildOutgoingBatches(final String nodeId, final NodeChannel channel) {

if (channel.isSuspended()) {
logger.warn(channel.getId() + " channel for " + nodeId + " is currently suspended.");
} else if (channel.isEnabled()) {
long dataEventCount = jdbcTemplate.queryForLong(getSql("selectEventsToBatchCountSql"),
new Object[] { 0, nodeId, channel.getId() });

if (dataEventCount > channel.getMaxBatchSize()) {
buildOutgoingBatchesPeekAhead(nodeId, channel);
} else if (dataEventCount > 0) {
OutgoingBatch newBatch = new OutgoingBatch();
newBatch.setBatchType(BatchType.EVENTS);
newBatch.setChannelId(channel.getId());
newBatch.setNodeId(nodeId);

if (channel.isIgnored()) {
newBatch.setStatus(Status.OK);
}

long startTime = System.currentTimeMillis();
insertOutgoingBatch(newBatch);
dataEventCount = jdbcTemplate.update(getSql("updateBatchedEventsMultiSql"), new Object[] {
newBatch.getBatchId(), 1, 0, nodeId, newBatch.getChannelId() });
long databaseMillis = System.currentTimeMillis() - startTime;

OutgoingBatchHistory history = new OutgoingBatchHistory(newBatch);
history.setEndTime(new Date());
history.setDataEventCount(dataEventCount);
history.setDatabaseMillis(databaseMillis);
insertOutgoingBatchHistory(history);
statisticManager.getStatistic(StatisticName.OUTGOING_MS_PER_EVENT_BATCHED).add(
databaseMillis, dataEventCount);
statisticManager.getStatistic(StatisticName.OUTGOING_EVENTS_PER_BATCH).add(dataEventCount, 1);
}
}
}

@Transactional
private void buildOutgoingBatchesPeekAhead(final String nodeId, final NodeChannel channel) {

final int batchSizePeekAhead = parameterService.getInt(ParameterConstants.OUTGOING_BATCH_PEEK_AHEAD_WINDOW);

jdbcTemplate.execute(new ConnectionCallback() {
Expand Down
12 changes: 12 additions & 0 deletions symmetric/src/main/resources/sql/outgoingbatch-service-sql.xml
Expand Up @@ -14,6 +14,12 @@
node_id=?
</value>
</entry>
<entry key="selectEventsToBatchCountSql">
<value>
select count(*) from ${sync.table.prefix}_data_event where batched = ? and node_id = ? and
channel_id = ?
</value>
</entry>
<entry key="selectEventsToBatchSql">
<value>
select transaction_id, data_id from ${sync.table.prefix}_data_event where batched = ? and
Expand All @@ -26,6 +32,12 @@
batch_type, create_time) values (null, ?, ?, ?, ?, current_timestamp)
</value>
</entry>
<entry key="updateBatchedEventsMultiSql">
<value>
update ${sync.table.prefix}_data_event set batch_id = ?, batched = ? where batched = ?
and node_id = ? and channel_id = ?
</value>
</entry>
<entry key="updateBatchedEventsSql">
<value>
update ${sync.table.prefix}_data_event set batch_id = ?, batched = ? where node_id = ? and
Expand Down

0 comments on commit fecdff3

Please sign in to comment.