Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,12 @@ private CheckpointStreamFactory createCheckpointStreamFactory() {
}
}

public void createAndRestoreKeyedStateBackend(KeyedStateHandle snapshot) {
public void createAndRestoreKeyedStateBackend(KeyedStateHandle snapshot) throws IOException {
createAndRestoreKeyedStateBackend(NUMBER_OF_KEY_GROUPS, snapshot);
}

void createAndRestoreKeyedStateBackend(int numberOfKeyGroups, KeyedStateHandle snapshot) {
void createAndRestoreKeyedStateBackend(int numberOfKeyGroups, KeyedStateHandle snapshot)
throws IOException {
Collection<KeyedStateHandle> stateHandles;
if (snapshot == null) {
stateHandles = Collections.emptyList();
Expand All @@ -102,6 +103,8 @@ void createAndRestoreKeyedStateBackend(int numberOfKeyGroups, KeyedStateHandle s
stateHandles.add(snapshot);
}
env = MockEnvironment.builder().build();
env.setCheckpointStorageAccess(
createCheckpointStorage().createCheckpointStorage(new JobID()));
try {
disposeKeyedStateBackend();
keyedStateBackend =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.ttl;

import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;

import java.util.Arrays;
import java.util.Collections;

/** Test suite for {@link TtlListState} with elements of serialized by kryo. */
public class TtlListStateWithKryoTestContext
extends TtlListStateTestContextBase<TtlListStateWithKryoTestContext.NotPojoElement> {
TtlListStateWithKryoTestContext() {
super(new KryoSerializer<>(NotPojoElement.class, getForceKryoSerializerConfig()));
}

private static SerializerConfig getForceKryoSerializerConfig() {
Configuration config = new Configuration();
config.set(PipelineOptions.FORCE_KRYO, true);
return new SerializerConfigImpl(config);
}

@Override
NotPojoElement generateRandomElement(int i) {
return new NotPojoElement(RANDOM.nextInt(100));
}

@Override
void initTestValues() {
emptyValue = Collections.emptyList();

updateEmpty =
Arrays.asList(new NotPojoElement(5), new NotPojoElement(7), new NotPojoElement(10));
updateUnexpired =
Arrays.asList(new NotPojoElement(8), new NotPojoElement(9), new NotPojoElement(11));
updateExpired = Arrays.asList(new NotPojoElement(1), new NotPojoElement(4));

getUpdateEmpty = updateEmpty;
getUnexpired = updateUnexpired;
getUpdateExpired = updateExpired;
}

public static class NotPojoElement {
public int value;

public NotPojoElement(int value) {
this.value = value;
}

@Override
public String toString() {
return "NotPojoElement{" + "value=" + value + '}';
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
NotPojoElement that = (NotPojoElement) obj;
return value == that.value;
}

@Override
public int hashCode() {
return Integer.hashCode(value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ protected abstract StateBackendTestContext createStateBackendTestContext(
new TtlValueStateTestContext(),
new TtlFixedLenElemListStateTestContext(),
new TtlNonFixedLenElemListStateTestContext(),
new TtlListStateWithKryoTestContext(),
new TtlMapStateAllEntriesTestContext(),
new TtlMapStatePerElementTestContext(),
new TtlMapStatePerNullElementTestContext(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.function.Supplier;

/** RocksDB compaction filter utils for state with TTL. */
public class ForStDBTtlCompactFiltersManager {
Expand Down Expand Up @@ -205,15 +206,27 @@ public void configCompactFilter(

private static class ListElementFilterFactory<T>
implements FlinkCompactionFilter.ListElementFilterFactory {
private final TypeSerializer<T> serializer;
// {@See #createListElementFilter}.
private final ThreadLocalSerializerProvider<T> threadLocalSerializer;

private ListElementFilterFactory(TypeSerializer<T> serializer) {
this.serializer = serializer;
ClassLoader contextClassLoader = null;
try {
contextClassLoader = Thread.currentThread().getContextClassLoader();
} catch (Throwable e) {
LOG.info("Cannot get context classloader for list state's compaction filter.", e);
}
threadLocalSerializer =
new ThreadLocalSerializerProvider<>(serializer, contextClassLoader);
}

@Override
public FlinkCompactionFilter.ListElementFilter createListElementFilter() {
return new ListElementFilter<>(serializer);
// This method will be invoked by native code multiple times when creating compaction
// filter. And the created filter will be shared by multiple background threads.
// Make sure the serializer is thread-local and has classloader set for each thread
// correctly and individually.
return new ListElementFilter<>(threadLocalSerializer);
}
}

Expand All @@ -231,21 +244,22 @@ public long currentTimestamp() {
}

private static class ListElementFilter<T> implements FlinkCompactionFilter.ListElementFilter {
private final TypeSerializer<T> serializer;
private DataInputDeserializer input;
private final ThreadLocalSerializerProvider<T> threadLocalSerializer;
private final DataInputDeserializer input;

private ListElementFilter(TypeSerializer<T> serializer) {
this.serializer = serializer;
private ListElementFilter(ThreadLocalSerializerProvider<T> serializer) {
this.threadLocalSerializer = serializer;
this.input = new DataInputDeserializer();
}

@Override
public int nextUnexpiredOffset(byte[] bytes, long ttl, long currentTimestamp) {
input.setBuffer(bytes);
int lastElementOffset = 0;
TypeSerializer<T> serializer = threadLocalSerializer.get();
while (input.available() > 0) {
try {
long timestamp = nextElementLastAccessTimestamp();
long timestamp = nextElementLastAccessTimestamp(serializer);
if (!TtlUtils.expired(timestamp, ttl, currentTimestamp)) {
break;
}
Expand All @@ -258,7 +272,8 @@ public int nextUnexpiredOffset(byte[] bytes, long ttl, long currentTimestamp) {
return lastElementOffset;
}

private long nextElementLastAccessTimestamp() throws IOException {
private long nextElementLastAccessTimestamp(TypeSerializer<T> serializer)
throws IOException {
TtlValue<?> ttlValue = (TtlValue<?>) serializer.deserialize(input);
if (input.available() > 0) {
input.skipBytesToRead(1);
Expand All @@ -267,6 +282,37 @@ private long nextElementLastAccessTimestamp() throws IOException {
}
}

private static class ThreadLocalSerializerProvider<T> implements Supplier<TypeSerializer<T>> {
// Multiple background threads may share the same filter instance, so we need to make sure
// the serializer is thread-local, and every thread has its own instance with classloader.
private final ThreadLocal<TypeSerializer<T>> threadLocalSerializer;

public ThreadLocalSerializerProvider(
TypeSerializer<T> serializer, ClassLoader classLoader) {
this.threadLocalSerializer =
ThreadLocal.withInitial(
() -> {
setClassloaderIfNeeded(classLoader);
return serializer.duplicate();
});
}

private void setClassloaderIfNeeded(ClassLoader classLoader) {
// The classloader that should be set to the current thread when deserializing.
// The reason why we should set classloader is that the serializer may be Kryo
// serializer which needs user classloader to load user classes.
// See FLINK-16686 for more details.
if (classLoader != null) {
Thread.currentThread().setContextClassLoader(classLoader);
}
}

@Override
public TypeSerializer<T> get() {
return threadLocalSerializer.get();
}
}

public void disposeAndClearRegisteredCompactionFactories() {
for (FlinkCompactionFilterFactory factory : compactionFilterFactories.values()) {
IOUtils.closeQuietly(factory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.forstdb.ColumnFamilyOptions;
import org.forstdb.ReadOptions;
import org.forstdb.RocksDB;
import org.forstdb.RocksDBException;
import org.forstdb.Snapshot;
import org.forstdb.WriteOptions;
import org.slf4j.Logger;
Expand Down Expand Up @@ -944,6 +945,13 @@ public boolean isSafeToReuseKVState() {
return true;
}

@VisibleForTesting
public void compactState(StateDescriptor<?, ?> stateDesc) throws RocksDBException {
ForStOperationUtils.ForStKvStateInfo kvStateInfo =
kvStateInformation.get(stateDesc.getName());
db.compactRange(kvStateInfo.columnFamilyHandle);
}

@Nonnegative
long getWriteBatchSize() {
return writeBatchSize;
Expand Down
Loading