Skip to content

Commit

Permalink
make the batch window during peek ahead batch creation much smaller.
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Sep 25, 2008
1 parent 3f956ab commit a949908
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 8 deletions.
Expand Up @@ -57,6 +57,7 @@ public class ParameterConstants {
public final static String CONCURRENT_WORKERS = "http.concurrent.workers.max";
public final static String CONCURRENT_RESERVATION_TIMEOUT = "http.concurrent.reservation.timeout.ms";

public final static String OUTGOING_BATCH_PEEK_AHEAD_BATCH_COMMIT_SIZE = "outgoing.batches.peek.ahead.batch.commit.size";
public final static String OUTGOING_BATCH_PEEK_AHEAD_WINDOW = "outgoing.batches.peek.ahead.window.after.max.size";
public final static String INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED = "incoming.batches.skip.duplicates";
public final static String DATA_LOADER_NUM_OF_ACK_RETRIES = "num.of.ack.retries";
Expand Down
Expand Up @@ -90,8 +90,8 @@ public void buildOutgoingBatches(final String nodeId, final NodeChannel channel)
if (channel.isSuspended()) {
logger.warn(channel.getId() + " channel for " + nodeId + " is currently suspended.");
} else if (channel.isEnabled()) {
long dataEventCount = jdbcTemplate.queryForLong(getSql("selectEventsToBatchCountSql"),
new Object[] { 0, nodeId, channel.getId() });
long dataEventCount = jdbcTemplate.queryForLong(getSql("selectEventsToBatchCountSql"), new Object[] { 0,
nodeId, channel.getId() });

if (dataEventCount > channel.getMaxBatchSize()) {
buildOutgoingBatchesPeekAhead(nodeId, channel);
Expand All @@ -116,8 +116,8 @@ public void buildOutgoingBatches(final String nodeId, final NodeChannel channel)
history.setDataEventCount(dataEventCount);
history.setDatabaseMillis(databaseMillis);
insertOutgoingBatchHistory(history);
statisticManager.getStatistic(StatisticName.OUTGOING_MS_PER_EVENT_BATCHED).add(
databaseMillis, dataEventCount);
statisticManager.getStatistic(StatisticName.OUTGOING_MS_PER_EVENT_BATCHED).add(databaseMillis,
dataEventCount);
statisticManager.getStatistic(StatisticName.OUTGOING_EVENTS_PER_BATCH).add(dataEventCount, 1);
}
}
Expand Down Expand Up @@ -214,7 +214,9 @@ public Object doInConnection(Connection conn) throws SQLException, DataAccessExc

// put this in so we don't build up too many
// statements to send to the server.
if (count % 10000 == 0) {
if (count
% parameterService
.getInt(ParameterConstants.OUTGOING_BATCH_PEEK_AHEAD_BATCH_COMMIT_SIZE) == 0) {
long startTime = System.currentTimeMillis();
update.executeBatch();
databaseMillis += (System.currentTimeMillis() - startTime);
Expand Down Expand Up @@ -274,9 +276,8 @@ public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, D
@SuppressWarnings("unchecked")
public List<OutgoingBatch> getOutgoingBatches(String nodeId) {
List<OutgoingBatch> list = (List<OutgoingBatch>) jdbcTemplate.query(getSql("selectOutgoingBatchSql"),
new Object[] { nodeId, OutgoingBatch.Status.NE.toString(),
OutgoingBatch.Status.SE.toString(), OutgoingBatch.Status.ER.toString() },
new OutgoingBatchMapper());
new Object[] { nodeId, OutgoingBatch.Status.NE.toString(), OutgoingBatch.Status.SE.toString(),
OutgoingBatch.Status.ER.toString() }, new OutgoingBatchMapper());
final HashSet<String> errorChannels = new HashSet<String>();
for (OutgoingBatch batch : list) {
if (batch.getStatus().equals(OutgoingBatch.Status.ER)) {
Expand Down
7 changes: 7 additions & 0 deletions symmetric/src/main/resources/symmetric-default.properties
Expand Up @@ -144,9 +144,16 @@ https.verified.server.names=
# This is the maximum number of events that will be peeked at to look for additional transaction rows after
# the max batch size is reached. The more concurrency in your db and the longer the transaction takes the
# bigger this value might have to be.
# Note that this only kicks in if the prospective batch size is bigger than the configured max batch size.
#
# This property can be overridden in the database
outgoing.batches.peek.ahead.window.after.max.size=100

# This is the number of datae vents that will be batched and committed together while building a batch.
# Note that this only kicks in if the prospective batch size is bigger than the configured max batch size.
#
# This property can be overridden in the database
outgoing.batches.peek.ahead.batch.commit.size=10

# This instructs symmetric to attempt to skip duplicate batches that are received. Symmetric might
# be more efficient when recovering from error conditions if this is set to true, but you run the
Expand Down

0 comments on commit a949908

Please sign in to comment.