Skip to content
Permalink
Browse files
IGNITE-16584 Fix metastorage cursor wrong order (#675)
  • Loading branch information
SammyVimes committed Feb 21, 2022
1 parent 2cc992d commit 12e1c383781554b309fe6c2aac1dc159b6b803e2
Showing 6 changed files with 204 additions and 151 deletions.
@@ -177,7 +177,7 @@ public interface KeyValueStorage extends AutoCloseable {
Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo);

/**
* Returns cursor by entries which correspond to the given keys range and bounded by revision number..
* Returns cursor by entries which correspond to the given keys range and bounded by revision number.
*
* @param keyFrom Start key of range (inclusive).
* @param keyTo Last key of range (exclusive).
@@ -100,7 +100,7 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
private static final String TMP_SUFFIX = ".tmp";

/** A revision to store with system entries. */
private static final long SYSTEM_REVISION_MARKER_VALUE = -1;
private static final long SYSTEM_REVISION_MARKER_VALUE = 0;

/** Revision key. */
private static final byte[] REVISION_KEY = keyToRocksKey(
@@ -34,11 +34,11 @@
class RocksStorageUtils {
/**
* VarHandle that gives the access to the elements of a {@code byte[]} array viewed as if it were a {@code long[]} array. Byte order
* must be little endian for a correct lexicographic order comparison.
* must be big endian for a correct lexicographic order comparison.
*/
private static final VarHandle LONG_ARRAY_HANDLE = MethodHandles.byteArrayViewVarHandle(
long[].class,
ByteOrder.LITTLE_ENDIAN
ByteOrder.BIG_ENDIAN
);

/**
@@ -85,7 +85,7 @@ static byte[] keyToRocksKey(long revision, byte[] key) {
}

/**
* Gets a key from a key with revision.
* Gets a key from a key with a revision.
*
* @param rocksKey Key with a revision.
* @return Key without a revision.
@@ -95,6 +95,16 @@ static byte[] rocksKeyToBytes(byte[] rocksKey) {
return Arrays.copyOfRange(rocksKey, Long.BYTES, rocksKey.length);
}

/**
* Gets a revision from a key with a revision.
*
* @param rocksKey Key with a revision.
* @return Revision.
*/
static long revisionFromRocksKey(byte[] rocksKey) {
return (long) LONG_ARRAY_HANDLE.get(rocksKey, 0);
}

/**
* Builds a value from a byte array.
*
@@ -19,12 +19,14 @@

import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.bytesToValue;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.longToBytes;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.revisionFromRocksKey;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.rocksKeyToBytes;
import static org.apache.ignite.internal.rocksdb.RocksUtils.checkIterator;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Predicate;
import org.apache.ignite.internal.metastorage.server.Entry;
import org.apache.ignite.internal.metastorage.server.EntryEvent;
@@ -34,8 +36,6 @@
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
@@ -51,24 +51,17 @@ class WatchCursor implements Cursor<WatchEvent> {
/** Key predicate. */
private final Predicate<byte[]> predicate;

/** Iterator for this cursor. */
private final Iterator<WatchEvent> it;

/** Options for {@link #nativeIterator}. */
private final ReadOptions options = new ReadOptions().setPrefixSameAsStart(true);
private final ReadOptions options = new ReadOptions();

/** RocksDB iterator. */
private final RocksIterator nativeIterator;

/**
* Last matching revision.
*/
private long lastRetRev;
/** Current revision. */
private long currentRevision;

/**
* Next matching revision. {@code -1} means that it has not been found yet or does not exist.
*/
private long nextRetRev = -1;
/** Current value of the inner iterator's hasNext that is being reset to {@code false} after next is called. */
private boolean currentHasNext = false;

/**
* Constructor.
@@ -80,163 +73,120 @@ class WatchCursor implements Cursor<WatchEvent> {
WatchCursor(RocksDbKeyValueStorage storage, long rev, Predicate<byte[]> predicate) {
this.storage = storage;
this.predicate = predicate;
this.lastRetRev = rev - 1;
this.nativeIterator = storage.newDataIterator(options);
this.it = createIterator();
}

/** {@inheritDoc} */
@Override
public boolean hasNext() {
return it.hasNext();
}

/** {@inheritDoc} */
@Nullable
@Override
public WatchEvent next() {
return it.next();
}
this.currentRevision = rev;

/** {@inheritDoc} */
@Override
public void close() throws Exception {
IgniteUtils.closeAll(options, nativeIterator);
this.nativeIterator = storage.newDataIterator(options);
this.nativeIterator.seek(longToBytes(rev));
}

/** {@inheritDoc} */
@NotNull
@Override
public Iterator<WatchEvent> iterator() {
return it;
}
public boolean hasNext() {
storage.lock().readLock().lock();

/**
* Creates an iterator for this cursor.
*
* @return Iterator.
*/
@NotNull
private Iterator<WatchEvent> createIterator() {
return new Iterator<>() {
/** {@inheritDoc} */
@Override
public boolean hasNext() {
storage.lock().readLock().lock();

try {
if (nextRetRev != -1) {
// Next revision is already calculated and is not -1, meaning that there is a set of keys
// matching the revision and the predicate.
return true;
}
try {
if (currentHasNext) {
return true;
}

while (true) {
long curRev = lastRetRev + 1;
if (!nativeIterator.isValid()) {
nativeIterator.refresh();

byte[] revisionPrefix = longToBytes(curRev);
nativeIterator.seek(longToBytes(currentRevision));
}

boolean empty = true;
// Check all keys to see if any one of them match the predicate.
currentHasNext = RocksUtils.find(nativeIterator, (rocksKey, value) -> {
byte[] key = rocksKeyToBytes(rocksKey);

if (!nativeIterator.isValid()) {
try {
nativeIterator.refresh();
} catch (RocksDBException e) {
throw new IgniteInternalException(e);
}
}
if (predicate.test(key)) {
// We may have jumped to the next revision if there were no matching keys in previous.
currentRevision = revisionFromRocksKey(rocksKey);

// Check all keys by the revision to see if any one of them match the predicate.
for (nativeIterator.seek(revisionPrefix); nativeIterator.isValid(); nativeIterator.next()) {
empty = false;
return true;
}

byte[] key = rocksKeyToBytes(nativeIterator.key());
return false;
});

if (predicate.test(key)) {
// Current revision matches.
nextRetRev = curRev;
return currentHasNext;
} catch (RocksDBException e) {
throw new IgniteInternalException(e);
} finally {
storage.lock().readLock().unlock();
}
}

return true;
}
}
/** {@inheritDoc} */
@Override
public WatchEvent next() {
storage.lock().readLock().lock();

checkIterator(nativeIterator);
try {
if (!hasNext()) {
throw new NoSuchElementException();
}

if (empty) {
return false;
}
List<EntryEvent> evts = new ArrayList<>();

// Go to the next revision.
lastRetRev++;
}
} finally {
storage.lock().readLock().unlock();
}
}
long lastSeenRevision = currentRevision;

/** {@inheritDoc} */
@Nullable
@Override
public WatchEvent next() {
storage.lock().readLock().lock();
// Iterate over the keys of the current revision and get all matching entries.
for (; nativeIterator.isValid(); nativeIterator.next()) {
byte[] rocksKey = nativeIterator.key();
byte[] rocksValue = nativeIterator.value();

try {
while (true) {
if (!hasNext()) {
return null;
}
long revision = revisionFromRocksKey(rocksKey);

var ref = new Object() {
boolean noItemsInRevision = true;
};
lastSeenRevision = revision;

List<EntryEvent> evts = new ArrayList<>();
if (revision > currentRevision) {
// There are no more keys for the current revision
break;
}

// Iterate over the keys of the current revision and get all matching entries.
RocksUtils.forEach(nativeIterator, (k, v) -> {
ref.noItemsInRevision = false;
byte[] key = rocksKeyToBytes(rocksKey);

byte[] key = rocksKeyToBytes(k);
if (predicate.test(key)) {
Value val = bytesToValue(rocksValue);

Value val = bytesToValue(v);
Entry newEntry;

if (predicate.test(key)) {
Entry newEntry;
if (val.tombstone()) {
newEntry = Entry.tombstone(key, revision, val.updateCounter());
} else {
newEntry = new Entry(key, val.bytes(), revision, val.updateCounter());
}

if (val.tombstone()) {
newEntry = Entry.tombstone(key, nextRetRev, val.updateCounter());
} else {
newEntry = new Entry(key, val.bytes(), nextRetRev, val.updateCounter());
}
Entry oldEntry = storage.doGet(key, revision - 1, false);

Entry oldEntry = storage.doGet(key, nextRetRev - 1, false);
evts.add(new EntryEvent(oldEntry, newEntry));
}
}

evts.add(new EntryEvent(oldEntry, newEntry));
}
});
currentHasNext = false;

if (ref.noItemsInRevision) {
return null;
}
// Go to the next revision
currentRevision = lastSeenRevision > currentRevision ? lastSeenRevision : currentRevision + 1;

if (evts.isEmpty()) {
continue;
}
checkIterator(nativeIterator);

// Set the last returned revision to the current revision's value.
lastRetRev = nextRetRev;
return new WatchEvent(evts);
} finally {
storage.lock().readLock().unlock();
}
}

// Set current revision to -1, meaning that it is not found yet.
nextRetRev = -1;
/** {@inheritDoc} */
@Override
public void close() throws Exception {
IgniteUtils.closeAll(options, nativeIterator);
}

return new WatchEvent(evts);
}
} catch (RocksDBException e) {
throw new IgniteInternalException(e);
} finally {
storage.lock().readLock().unlock();
}
}
};
/** {@inheritDoc} */
@Override
public Iterator<WatchEvent> iterator() {
return this;
}
}

0 comments on commit 12e1c38

Please sign in to comment.