Skip to content

Commit def2aed

Browse files
committed
[FLINK-9804][state] Fix KeyedStateBackend.getKeys() for RocksDBMapState.
This closes #6306.
1 parent f1ac0f2 commit def2aed

File tree

2 files changed

+85
-13
lines changed

2 files changed

+85
-13
lines changed

flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3840,6 +3840,69 @@ public String fold(String acc, Integer value) throws Exception {
38403840
}
38413841
}
38423842

3843+
@Test
3844+
public void testMapStateGetKeys() throws Exception {
3845+
final int namespace1ElementsNum = 1000;
3846+
final int namespace2ElementsNum = 1000;
3847+
String fieldName = "get-keys-test";
3848+
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
3849+
try {
3850+
final String ns1 = "ns1";
3851+
MapState<String, Integer> keyedState1 = backend.getPartitionedState(
3852+
ns1,
3853+
StringSerializer.INSTANCE,
3854+
new MapStateDescriptor<>(fieldName, StringSerializer.INSTANCE, IntSerializer.INSTANCE)
3855+
);
3856+
3857+
for (int key = 0; key < namespace1ElementsNum; key++) {
3858+
backend.setCurrentKey(key);
3859+
keyedState1.put("he", key * 2);
3860+
keyedState1.put("ho", key * 2);
3861+
}
3862+
3863+
final String ns2 = "ns2";
3864+
MapState<String, Integer> keyedState2 = backend.getPartitionedState(
3865+
ns2,
3866+
StringSerializer.INSTANCE,
3867+
new MapStateDescriptor<>(fieldName, StringSerializer.INSTANCE, IntSerializer.INSTANCE)
3868+
);
3869+
3870+
for (int key = namespace1ElementsNum; key < namespace1ElementsNum + namespace2ElementsNum; key++) {
3871+
backend.setCurrentKey(key);
3872+
keyedState2.put("he", key * 2);
3873+
keyedState2.put("ho", key * 2);
3874+
}
3875+
3876+
// valid for namespace1
3877+
try (Stream<Integer> keysStream = backend.getKeys(fieldName, ns1).sorted()) {
3878+
PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();
3879+
3880+
for (int expectedKey = 0; expectedKey < namespace1ElementsNum; expectedKey++) {
3881+
assertTrue(actualIterator.hasNext());
3882+
assertEquals(expectedKey, actualIterator.nextInt());
3883+
}
3884+
3885+
assertFalse(actualIterator.hasNext());
3886+
}
3887+
3888+
// valid for namespace2
3889+
try (Stream<Integer> keysStream = backend.getKeys(fieldName, ns2).sorted()) {
3890+
PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();
3891+
3892+
for (int expectedKey = namespace1ElementsNum; expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) {
3893+
assertTrue(actualIterator.hasNext());
3894+
assertEquals(expectedKey, actualIterator.nextInt());
3895+
}
3896+
3897+
assertFalse(actualIterator.hasNext());
3898+
}
3899+
}
3900+
finally {
3901+
IOUtils.closeQuietly(backend);
3902+
backend.dispose();
3903+
}
3904+
}
3905+
38433906
@Test
38443907
public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync() throws Exception {
38453908

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1641,6 +1641,7 @@ static class RocksIteratorForKeysWrapper<K> implements Iterator<K>, AutoCloseabl
16411641
private final byte[] namespaceBytes;
16421642
private final boolean ambiguousKeyPossible;
16431643
private K nextKey;
1644+
private K previousKey;
16441645

16451646
RocksIteratorForKeysWrapper(
16461647
RocksIteratorWrapper iterator,
@@ -1655,6 +1656,7 @@ static class RocksIteratorForKeysWrapper<K> implements Iterator<K>, AutoCloseabl
16551656
this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes);
16561657
this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes);
16571658
this.nextKey = null;
1659+
this.previousKey = null;
16581660
this.ambiguousKeyPossible = ambiguousKeyPossible;
16591661
}
16601662

@@ -1664,15 +1666,22 @@ public boolean hasNext() {
16641666
while (nextKey == null && iterator.isValid()) {
16651667

16661668
byte[] key = iterator.key();
1667-
if (isMatchingNameSpace(key)) {
1668-
ByteArrayInputStreamWithPos inputStream =
1669-
new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes);
1670-
DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(inputStream);
1671-
K value = RocksDBKeySerializationUtils.readKey(
1672-
keySerializer,
1673-
inputStream,
1674-
dataInput,
1675-
ambiguousKeyPossible);
1669+
1670+
ByteArrayInputStreamWithPos inputStream =
1671+
new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes);
1672+
1673+
DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(inputStream);
1674+
1675+
K value = RocksDBKeySerializationUtils.readKey(
1676+
keySerializer,
1677+
inputStream,
1678+
dataInput,
1679+
ambiguousKeyPossible);
1680+
1681+
int namespaceByteStartPos = inputStream.getPosition();
1682+
1683+
if (isMatchingNameSpace(key, namespaceByteStartPos) && !Objects.equals(previousKey, value)) {
1684+
previousKey = value;
16761685
nextKey = value;
16771686
}
16781687
iterator.next();
@@ -1694,12 +1703,12 @@ public K next() {
16941703
return tmpKey;
16951704
}
16961705

1697-
private boolean isMatchingNameSpace(@Nonnull byte[] key) {
1706+
private boolean isMatchingNameSpace(@Nonnull byte[] key, int beginPos) {
16981707
final int namespaceBytesLength = namespaceBytes.length;
1699-
final int basicLength = namespaceBytesLength + keyGroupPrefixBytes;
1708+
final int basicLength = namespaceBytesLength + beginPos;
17001709
if (key.length >= basicLength) {
1701-
for (int i = 1; i <= namespaceBytesLength; ++i) {
1702-
if (key[key.length - i] != namespaceBytes[namespaceBytesLength - i]) {
1710+
for (int i = 0; i < namespaceBytesLength; ++i) {
1711+
if (key[beginPos + i] != namespaceBytes[i]) {
17031712
return false;
17041713
}
17051714
}

0 commit comments

Comments
 (0)