34
34
public class RollbackCheckIterator implements SortedKeyValueIterator <Key , Value > {
35
35
private static final String TIMESTAMP_OPT = "timestampOpt" ;
36
36
37
- private SortedKeyValueIterator < Key , Value > source ;
37
+ private TimestampSkippingIterator source ;
38
38
private long lockTime ;
39
39
40
40
boolean hasTop = false ;
@@ -50,7 +50,7 @@ public static void setLocktime(IteratorSetting cfg, long time) {
50
50
@ Override
51
51
public void init (SortedKeyValueIterator <Key , Value > source , Map <String , String > options ,
52
52
IteratorEnvironment env ) throws IOException {
53
- this .source = source ;
53
+ this .source = new TimestampSkippingIterator ( source ) ;
54
54
this .lockTime = Long .parseLong (options .get (TIMESTAMP_OPT ));
55
55
}
56
56
@@ -95,7 +95,8 @@ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean i
95
95
long ts = source .getTopKey ().getTimestamp () & ColumnConstants .TIMESTAMP_MASK ;
96
96
97
97
if (colType == ColumnConstants .TX_DONE_PREFIX ) {
98
- // do nothing if TX_DONE
98
+ source .skipToPrefix (curCol , ColumnConstants .WRITE_PREFIX );
99
+ continue ;
99
100
} else if (colType == ColumnConstants .WRITE_PREFIX ) {
100
101
long timePtr = WriteValue .getTimestamp (source .getTopValue ().get ());
101
102
@@ -107,6 +108,12 @@ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean i
107
108
hasTop = true ;
108
109
return ;
109
110
}
111
+
112
+ if (lockTime > timePtr ) {
113
+ source .skipToPrefix (curCol , ColumnConstants .DEL_LOCK_PREFIX );
114
+ continue ;
115
+ }
116
+
110
117
} else if (colType == ColumnConstants .DEL_LOCK_PREFIX ) {
111
118
if (ts > invalidationTime ) {
112
119
invalidationTime = ts ;
@@ -117,6 +124,11 @@ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean i
117
124
return ;
118
125
}
119
126
127
+ if (lockTime > ts ) {
128
+ source .skipToPrefix (curCol , ColumnConstants .LOCK_PREFIX );
129
+ continue ;
130
+ }
131
+
120
132
} else if (colType == ColumnConstants .LOCK_PREFIX ) {
121
133
if (ts > invalidationTime ) {
122
134
// nothing supersedes this lock, therefore the column is locked
0 commit comments