Skip to content

Commit

Permalink
fixes #949 Used TimestampSkipping iterator in RollbackCheckIterator (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
llvieira authored and keith-turner committed Oct 30, 2017
1 parent 955c86f commit 202fe08
Showing 1 changed file with 15 additions and 3 deletions.
Expand Up @@ -34,7 +34,7 @@
public class RollbackCheckIterator implements SortedKeyValueIterator<Key, Value> { public class RollbackCheckIterator implements SortedKeyValueIterator<Key, Value> {
private static final String TIMESTAMP_OPT = "timestampOpt"; private static final String TIMESTAMP_OPT = "timestampOpt";


private SortedKeyValueIterator<Key, Value> source; private TimestampSkippingIterator source;
private long lockTime; private long lockTime;


boolean hasTop = false; boolean hasTop = false;
Expand All @@ -50,7 +50,7 @@ public static void setLocktime(IteratorSetting cfg, long time) {
@Override @Override
public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options,
IteratorEnvironment env) throws IOException { IteratorEnvironment env) throws IOException {
this.source = source; this.source = new TimestampSkippingIterator(source);
this.lockTime = Long.parseLong(options.get(TIMESTAMP_OPT)); this.lockTime = Long.parseLong(options.get(TIMESTAMP_OPT));
} }


Expand Down Expand Up @@ -95,7 +95,8 @@ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean i
long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK; long ts = source.getTopKey().getTimestamp() & ColumnConstants.TIMESTAMP_MASK;


if (colType == ColumnConstants.TX_DONE_PREFIX) { if (colType == ColumnConstants.TX_DONE_PREFIX) {
// do nothing if TX_DONE source.skipToPrefix(curCol, ColumnConstants.WRITE_PREFIX);
continue;
} else if (colType == ColumnConstants.WRITE_PREFIX) { } else if (colType == ColumnConstants.WRITE_PREFIX) {
long timePtr = WriteValue.getTimestamp(source.getTopValue().get()); long timePtr = WriteValue.getTimestamp(source.getTopValue().get());


Expand All @@ -107,6 +108,12 @@ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean i
hasTop = true; hasTop = true;
return; return;
} }

if (lockTime > timePtr) {
source.skipToPrefix(curCol, ColumnConstants.DEL_LOCK_PREFIX);
continue;
}

} else if (colType == ColumnConstants.DEL_LOCK_PREFIX) { } else if (colType == ColumnConstants.DEL_LOCK_PREFIX) {
if (ts > invalidationTime) { if (ts > invalidationTime) {
invalidationTime = ts; invalidationTime = ts;
Expand All @@ -117,6 +124,11 @@ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean i
return; return;
} }


if (lockTime > ts) {
source.skipToPrefix(curCol, ColumnConstants.LOCK_PREFIX);
continue;
}

} else if (colType == ColumnConstants.LOCK_PREFIX) { } else if (colType == ColumnConstants.LOCK_PREFIX) {
if (ts > invalidationTime) { if (ts > invalidationTime) {
// nothing supersedes this lock, therefore the column is locked // nothing supersedes this lock, therefore the column is locked
Expand Down

0 comments on commit 202fe08

Please sign in to comment.