Skip to content

Commit

Permalink
SYMMETRICDS-558 - Transaction boundaries can be broken when max_batch…
Browse files Browse the repository at this point in the history
…_size is set to a small number and batch algorithm is set to 'transactional' or 'default'
  • Loading branch information
chenson42 committed Nov 28, 2011
1 parent 10e7d24 commit fe97e0c
Showing 1 changed file with 24 additions and 15 deletions.
Expand Up @@ -147,8 +147,11 @@ public Integer doInConnection(Connection c) throws SQLException, DataAccessExcep
ps = prepareStatment(c);
rs = executeQuery(ps);

boolean moreData = true;
while (dataCount <= maxDataToRoute || lastTransactionId != null) {
fillPeekAheadQueue(peekAheadQueue, peekAheadCount, rs);
if (moreData) {
moreData = fillPeekAheadQueue(peekAheadQueue, peekAheadCount, rs);
}
if (lastTransactionId == null && peekAheadQueue.size() > 0) {
Data data = peekAheadQueue.remove(0);
copyToQueue(data);
Expand Down Expand Up @@ -198,28 +201,34 @@ public Integer doInConnection(Connection c) throws SQLException, DataAccessExcep
});
}

protected int fillPeekAheadQueue(List<Data> peekAheadQueue, int peekAheadCount, ResultSet rs)
protected boolean fillPeekAheadQueue(List<Data> peekAheadQueue, int peekAheadCount, ResultSet rs)
throws SQLException {
boolean moreData = true;
int toRead = peekAheadCount - peekAheadQueue.size();
int dataCount = 0;
long ts = System.currentTimeMillis();
while (rs.next() && reading && dataCount < toRead) {
if (process(rs)) {
Data data = dataService.readData(rs);
context.setLastDataIdForTransactionId(data);
peekAheadQueue.add(data);
dataCount++;
context.incrementStat(System.currentTimeMillis() - ts,
ChannelRouterContext.STAT_READ_DATA_MS);
while (reading && dataCount < toRead) {
if (rs.next()) {
if (process(rs)) {
Data data = dataService.readData(rs);
context.setLastDataIdForTransactionId(data);
peekAheadQueue.add(data);
dataCount++;
context.incrementStat(System.currentTimeMillis() - ts,
ChannelRouterContext.STAT_READ_DATA_MS);
} else {
context.incrementStat(System.currentTimeMillis() - ts,
ChannelRouterContext.STAT_REREAD_DATA_MS);
}

ts = System.currentTimeMillis();
} else {
context.incrementStat(System.currentTimeMillis() - ts,
ChannelRouterContext.STAT_REREAD_DATA_MS);
moreData = false;
break;
}

ts = System.currentTimeMillis();

}
return dataCount;
return moreData;
}

protected ResultSet executeQuery(PreparedStatement ps) throws SQLException {
Expand Down

0 comments on commit fe97e0c

Please sign in to comment.