Skip to content

Commit

Permalink
[FLINK-8679][State Backends] Ensure that RocksDBKeyedBackend.getKeys(…
Browse files Browse the repository at this point in the history
…) filters keys by namespace

This closes #5518.
  • Loading branch information
sihuazhou authored and StefanRRichter committed Feb 23, 2018
1 parent 0651876 commit eeac022
Show file tree
Hide file tree
Showing 7 changed files with 523 additions and 85 deletions.
Expand Up @@ -293,6 +293,7 @@ public S get(K key, N namespace) {
public Stream<K> getKeys(N namespace) { public Stream<K> getKeys(N namespace) {
Iterable<StateEntry<K, N, S>> iterable = () -> iterator(); Iterable<StateEntry<K, N, S>> iterable = () -> iterator();
return StreamSupport.stream(iterable.spliterator(), false) return StreamSupport.stream(iterable.spliterator(), false)
.filter(entry -> entry.getNamespace().equals(namespace))
.map(entry -> entry.getKey()); .map(entry -> entry.getKey());
} }


Expand Down
Expand Up @@ -211,24 +211,52 @@ protected <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(


@Test @Test
public void testGetKeys() throws Exception { public void testGetKeys() throws Exception {
final int elementsToTest = 1000; final int namespace1ElementsNum = 1000;
final int namespace2ElementsNum = 1000;
String fieldName = "get-keys-test"; String fieldName = "get-keys-test";
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
try { try {
ValueState<Integer> keyedState = backend.getOrCreateKeyedState( final String ns1 = "ns1";
VoidNamespaceSerializer.INSTANCE, ValueState<Integer> keyedState1 = backend.getPartitionedState(
new ValueStateDescriptor<>(fieldName, IntSerializer.INSTANCE)); ns1,
((InternalValueState<VoidNamespace, Integer>) keyedState).setCurrentNamespace(VoidNamespace.INSTANCE); StringSerializer.INSTANCE,
new ValueStateDescriptor<>(fieldName, IntSerializer.INSTANCE)
);

for (int key = 0; key < namespace1ElementsNum; key++) {
backend.setCurrentKey(key);
keyedState1.update(key * 2);
}

final String ns2 = "ns2";
ValueState<Integer> keyedState2 = backend.getPartitionedState(
ns2,
StringSerializer.INSTANCE,
new ValueStateDescriptor<>(fieldName, IntSerializer.INSTANCE)
);


for (int key = 0; key < elementsToTest; key++) { for (int key = namespace1ElementsNum; key < namespace1ElementsNum + namespace2ElementsNum; key++) {
backend.setCurrentKey(key); backend.setCurrentKey(key);
keyedState.update(key * 2); keyedState2.update(key * 2);
}

// valid for namespace1
try (Stream<Integer> keysStream = backend.getKeys(fieldName, ns1).sorted()) {
PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();

for (int expectedKey = 0; expectedKey < namespace1ElementsNum; expectedKey++) {
assertTrue(actualIterator.hasNext());
assertEquals(expectedKey, actualIterator.nextInt());
}

assertFalse(actualIterator.hasNext());
} }


try (Stream<Integer> keysStream = backend.getKeys(fieldName, VoidNamespace.INSTANCE).sorted()) { // valid for namespace2
try (Stream<Integer> keysStream = backend.getKeys(fieldName, ns2).sorted()) {
PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator(); PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();


for (int expectedKey = 0; expectedKey < elementsToTest; expectedKey++) { for (int expectedKey = namespace1ElementsNum; expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) {
assertTrue(actualIterator.hasNext()); assertTrue(actualIterator.hasNext());
assertEquals(expectedKey, actualIterator.nextInt()); assertEquals(expectedKey, actualIterator.nextInt());
} }
Expand Down
Expand Up @@ -95,8 +95,7 @@ protected AbstractRocksDBState(


this.keySerializationStream = new ByteArrayOutputStreamWithPos(128); this.keySerializationStream = new ByteArrayOutputStreamWithPos(128);
this.keySerializationDataOutputView = new DataOutputViewStreamWrapper(keySerializationStream); this.keySerializationDataOutputView = new DataOutputViewStreamWrapper(keySerializationStream);
this.ambiguousKeyPossible = (backend.getKeySerializer().getLength() < 0) this.ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(backend.getKeySerializer(), namespaceSerializer);
&& (namespaceSerializer.getLength() < 0);
} }


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
Expand Down Expand Up @@ -158,63 +157,8 @@ protected void writeKeyWithGroupAndNamespace(
Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context."); Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context.");


keySerializationStream.reset(); keySerializationStream.reset();
writeKeyGroup(keyGroup, keySerializationDataOutputView); RocksDBKeySerializationUtils.writeKeyGroup(keyGroup, backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView);
writeKey(key, keySerializationStream, keySerializationDataOutputView); RocksDBKeySerializationUtils.writeKey(key, backend.getKeySerializer(), keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
writeNameSpace(namespace, keySerializationStream, keySerializationDataOutputView); RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
}

private void writeKeyGroup(
int keyGroup,
DataOutputView keySerializationDateDataOutputView) throws IOException {
for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) {
keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3));
}
}

private void writeKey(
K key,
ByteArrayOutputStreamWithPos keySerializationStream,
DataOutputView keySerializationDataOutputView) throws IOException {
//write key
int beforeWrite = keySerializationStream.getPosition();
backend.getKeySerializer().serialize(key, keySerializationDataOutputView);

if (ambiguousKeyPossible) {
//write size of key
writeLengthFrom(beforeWrite, keySerializationStream,
keySerializationDataOutputView);
}
}

private void writeNameSpace(
N namespace,
ByteArrayOutputStreamWithPos keySerializationStream,
DataOutputView keySerializationDataOutputView) throws IOException {
int beforeWrite = keySerializationStream.getPosition();
namespaceSerializer.serialize(namespace, keySerializationDataOutputView);

if (ambiguousKeyPossible) {
//write length of namespace
writeLengthFrom(beforeWrite, keySerializationStream,
keySerializationDataOutputView);
}
}

private static void writeLengthFrom(
int fromPosition,
ByteArrayOutputStreamWithPos keySerializationStream,
DataOutputView keySerializationDateDataOutputView) throws IOException {
int length = keySerializationStream.getPosition() - fromPosition;
writeVariableIntBytes(length, keySerializationDateDataOutputView);
}

private static void writeVariableIntBytes(
int value,
DataOutputView keySerializationDateDataOutputView)
throws IOException {
do {
keySerializationDateDataOutputView.writeByte(value);
value >>>= 8;
} while (value != 0);
} }
} }
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.contrib.streaming.state;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

import java.io.IOException;

/**
* Utils for RocksDB state serialization and deserialization.
*/
public class RocksDBKeySerializationUtils {

public static int readKeyGroup(int keyGroupPrefixBytes, DataInputView inputView) throws IOException {
int keyGroup = 0;
for (int i = 0; i < keyGroupPrefixBytes; ++i) {
keyGroup <<= 8;
keyGroup |= (inputView.readByte() & 0xFF);
}
return keyGroup;
}

public static <K> K readKey(
TypeSerializer<K> keySerializer,
ByteArrayInputStreamWithPos inputStream,
DataInputView inputView,
boolean ambiguousKeyPossible) throws IOException {
int beforeRead = inputStream.getPosition();
K key = keySerializer.deserialize(inputView);
if (ambiguousKeyPossible) {
int length = inputStream.getPosition() - beforeRead;
readVariableIntBytes(inputView, length);
}
return key;
}

public static <N> N readNamespace(
TypeSerializer<N> namespaceSerializer,
ByteArrayInputStreamWithPos inputStream,
DataInputView inputView,
boolean ambiguousKeyPossible) throws IOException {
int beforeRead = inputStream.getPosition();
N namespace = namespaceSerializer.deserialize(inputView);
if (ambiguousKeyPossible) {
int length = inputStream.getPosition() - beforeRead;
readVariableIntBytes(inputView, length);
}
return namespace;
}

public static <N> void writeNameSpace(
N namespace,
TypeSerializer<N> namespaceSerializer,
ByteArrayOutputStreamWithPos keySerializationStream,
DataOutputView keySerializationDataOutputView,
boolean ambiguousKeyPossible) throws IOException {

int beforeWrite = keySerializationStream.getPosition();
namespaceSerializer.serialize(namespace, keySerializationDataOutputView);

if (ambiguousKeyPossible) {
//write length of namespace
writeLengthFrom(beforeWrite, keySerializationStream,
keySerializationDataOutputView);
}
}

public static boolean isAmbiguousKeyPossible(TypeSerializer keySerializer, TypeSerializer namespaceSerializer) {
return (keySerializer.getLength() < 0) && (namespaceSerializer.getLength() < 0);
}

public static void writeKeyGroup(
int keyGroup,
int keyGroupPrefixBytes,
DataOutputView keySerializationDateDataOutputView) throws IOException {
for (int i = keyGroupPrefixBytes; --i >= 0; ) {
keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3));
}
}

public static <K> void writeKey(
K key,
TypeSerializer<K> keySerializer,
ByteArrayOutputStreamWithPos keySerializationStream,
DataOutputView keySerializationDataOutputView,
boolean ambiguousKeyPossible) throws IOException {
//write key
int beforeWrite = keySerializationStream.getPosition();
keySerializer.serialize(key, keySerializationDataOutputView);

if (ambiguousKeyPossible) {
//write size of key
writeLengthFrom(beforeWrite, keySerializationStream,
keySerializationDataOutputView);
}
}

private static void readVariableIntBytes(DataInputView inputView, int value) throws IOException {
do {
inputView.readByte();
value >>>= 8;
} while (value != 0);
}

private static void writeLengthFrom(
int fromPosition,
ByteArrayOutputStreamWithPos keySerializationStream,
DataOutputView keySerializationDateDataOutputView) throws IOException {
int length = keySerializationStream.getPosition() - fromPosition;
writeVariableIntBytes(length, keySerializationDateDataOutputView);
}

private static void writeVariableIntBytes(
int value,
DataOutputView keySerializationDateDataOutputView)
throws IOException {
do {
keySerializationDateDataOutputView.writeByte(value);
value >>>= 8;
} while (value != 0);
}
}

0 comments on commit eeac022

Please sign in to comment.