Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3840,6 +3840,69 @@ public String fold(String acc, Integer value) throws Exception {
}
}

@Test
public void testMapStateGetKeys() throws Exception {
final int namespace1ElementsNum = 1000;
final int namespace2ElementsNum = 1000;
String fieldName = "get-keys-test";
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
try {
final String ns1 = "ns1";
MapState<String, Integer> keyedState1 = backend.getPartitionedState(
ns1,
StringSerializer.INSTANCE,
new MapStateDescriptor<>(fieldName, StringSerializer.INSTANCE, IntSerializer.INSTANCE)
);

for (int key = 0; key < namespace1ElementsNum; key++) {
backend.setCurrentKey(key);
keyedState1.put("he", key * 2);
keyedState1.put("ho", key * 2);
}

final String ns2 = "ns2";
MapState<String, Integer> keyedState2 = backend.getPartitionedState(
ns2,
StringSerializer.INSTANCE,
new MapStateDescriptor<>(fieldName, StringSerializer.INSTANCE, IntSerializer.INSTANCE)
);

for (int key = namespace1ElementsNum; key < namespace1ElementsNum + namespace2ElementsNum; key++) {
backend.setCurrentKey(key);
keyedState2.put("he", key * 2);
keyedState2.put("ho", key * 2);
}

// valid for namespace1
try (Stream<Integer> keysStream = backend.getKeys(fieldName, ns1).sorted()) {
PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();

for (int expectedKey = 0; expectedKey < namespace1ElementsNum; expectedKey++) {
assertTrue(actualIterator.hasNext());
assertEquals(expectedKey, actualIterator.nextInt());
}

assertFalse(actualIterator.hasNext());
}

// valid for namespace2
try (Stream<Integer> keysStream = backend.getKeys(fieldName, ns2).sorted()) {
PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();

for (int expectedKey = namespace1ElementsNum; expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) {
assertTrue(actualIterator.hasNext());
assertEquals(expectedKey, actualIterator.nextInt());
}

assertFalse(actualIterator.hasNext());
}
}
finally {
IOUtils.closeQuietly(backend);
backend.dispose();
}
}

@Test
public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1652,6 +1652,7 @@ static class RocksIteratorForKeysWrapper<K> implements Iterator<K>, AutoCloseabl
private final byte[] namespaceBytes;
private final boolean ambiguousKeyPossible;
private K nextKey;
private K preKey;

RocksIteratorForKeysWrapper(
RocksIteratorWrapper iterator,
Expand All @@ -1666,6 +1667,7 @@ static class RocksIteratorForKeysWrapper<K> implements Iterator<K>, AutoCloseabl
this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes);
this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes);
this.nextKey = null;
this.preKey = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call this previousKey, just to be more clear.

this.ambiguousKeyPossible = ambiguousKeyPossible;
}

Expand All @@ -1675,15 +1677,22 @@ public boolean hasNext() {
while (nextKey == null && iterator.isValid()) {

byte[] key = iterator.key();
if (isMatchingNameSpace(key)) {
ByteArrayInputStreamWithPos inputStream =
new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes);
DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(inputStream);
K value = RocksDBKeySerializationUtils.readKey(
keySerializer,
inputStream,
dataInput,
ambiguousKeyPossible);

ByteArrayInputStreamWithPos inputStream =
new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes);

DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(inputStream);

K value = RocksDBKeySerializationUtils.readKey(
keySerializer,
inputStream,
dataInput,
ambiguousKeyPossible);

int namespaceByteStartPos = inputStream.getPosition();

if (isMatchingNameSpace(key, namespaceByteStartPos) && !Objects.equals(preKey, value)) {
preKey = value;
nextKey = value;
}
iterator.next();
Expand All @@ -1705,12 +1714,12 @@ public K next() {
return tmpKey;
}

private boolean isMatchingNameSpace(@Nonnull byte[] key) {
private boolean isMatchingNameSpace(@Nonnull byte[] key, int beginPos) {
final int namespaceBytesLength = namespaceBytes.length;
final int basicLength = namespaceBytesLength + keyGroupPrefixBytes;
final int basicLength = namespaceBytesLength + beginPos;
if (key.length >= basicLength) {
for (int i = 1; i <= namespaceBytesLength; ++i) {
if (key[key.length - i] != namespaceBytes[namespaceBytesLength - i]) {
for (int i = 0; i < namespaceBytesLength; ++i) {
if (key[beginPos + i] != namespaceBytes[i]) {
return false;
}
}
Expand Down