diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/AbstractDataToRouteReader.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/AbstractDataToRouteReader.java index be0665a4f5..1015a2980f 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/AbstractDataToRouteReader.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/AbstractDataToRouteReader.java @@ -147,8 +147,11 @@ public Integer doInConnection(Connection c) throws SQLException, DataAccessExcep ps = prepareStatment(c); rs = executeQuery(ps); + boolean moreData = true; while (dataCount <= maxDataToRoute || lastTransactionId != null) { - fillPeekAheadQueue(peekAheadQueue, peekAheadCount, rs); + if (moreData) { + moreData = fillPeekAheadQueue(peekAheadQueue, peekAheadCount, rs); + } if (lastTransactionId == null && peekAheadQueue.size() > 0) { Data data = peekAheadQueue.remove(0); copyToQueue(data); @@ -198,28 +201,34 @@ public Integer doInConnection(Connection c) throws SQLException, DataAccessExcep }); } - protected int fillPeekAheadQueue(List peekAheadQueue, int peekAheadCount, ResultSet rs) + protected boolean fillPeekAheadQueue(List peekAheadQueue, int peekAheadCount, ResultSet rs) throws SQLException { + boolean moreData = true; int toRead = peekAheadCount - peekAheadQueue.size(); int dataCount = 0; long ts = System.currentTimeMillis(); - while (rs.next() && reading && dataCount < toRead) { - if (process(rs)) { - Data data = dataService.readData(rs); - context.setLastDataIdForTransactionId(data); - peekAheadQueue.add(data); - dataCount++; - context.incrementStat(System.currentTimeMillis() - ts, - ChannelRouterContext.STAT_READ_DATA_MS); + while (reading && dataCount < toRead) { + if (rs.next()) { + if (process(rs)) { + Data data = dataService.readData(rs); + context.setLastDataIdForTransactionId(data); + peekAheadQueue.add(data); + dataCount++; + context.incrementStat(System.currentTimeMillis() - ts, + ChannelRouterContext.STAT_READ_DATA_MS); + } else { + context.incrementStat(System.currentTimeMillis() - ts, + ChannelRouterContext.STAT_REREAD_DATA_MS); + } + + ts = System.currentTimeMillis(); } else { - context.incrementStat(System.currentTimeMillis() - ts, - ChannelRouterContext.STAT_REREAD_DATA_MS); + moreData = false; + break; } - ts = System.currentTimeMillis(); - } - return dataCount; + return moreData; } protected ResultSet executeQuery(PreparedStatement ps) throws SQLException {