diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java index 5129b1dcc..962f1725d 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/RollbackCheckIterator.java @@ -34,7 +34,7 @@ public class RollbackCheckIterator implements SortedKeyValueIterator { private static final String TIMESTAMP_OPT = "timestampOpt"; - private SortedKeyValueIterator source; + private TimestampSkippingIterator source; private long lockTime; boolean hasTop = false; @@ -50,7 +50,7 @@ public static void setLocktime(IteratorSetting cfg, long time) { @Override public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { - this.source = source; + this.source = new TimestampSkippingIterator(source); this.lockTime = Long.parseLong(options.get(TIMESTAMP_OPT)); } @@ -95,7 +95,8 @@ public void seek(Range range, Collection columnFamilies, boolean i long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK; if (colType == ColumnConstants.TX_DONE_PREFIX) { - // do nothing if TX_DONE + source.skipToPrefix(curCol, ColumnConstants.WRITE_PREFIX); + continue; } else if (colType == ColumnConstants.WRITE_PREFIX) { long timePtr = WriteValue.getTimestamp(source.getTopValue().get()); @@ -107,6 +108,12 @@ public void seek(Range range, Collection columnFamilies, boolean i hasTop = true; return; } + + if (lockTime > timePtr) { + source.skipToPrefix(curCol, ColumnConstants.DEL_LOCK_PREFIX); + continue; + } + } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) { if (ts > invalidationTime) { invalidationTime = ts; @@ -117,6 +124,11 @@ public void seek(Range range, Collection columnFamilies, boolean i return; } + if (lockTime > ts) { + source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX); + continue; + } + } else if (colType == ColumnConstants.LOCK_PREFIX) { if (ts > invalidationTime) { // nothing supersedes this lock, therefore the column is locked