diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java index f681c551e85a4..b63fe500b6755 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java @@ -89,11 +89,12 @@ private CheckpointStreamFactory createCheckpointStreamFactory() { } } - public void createAndRestoreKeyedStateBackend(KeyedStateHandle snapshot) { + public void createAndRestoreKeyedStateBackend(KeyedStateHandle snapshot) throws IOException { createAndRestoreKeyedStateBackend(NUMBER_OF_KEY_GROUPS, snapshot); } - void createAndRestoreKeyedStateBackend(int numberOfKeyGroups, KeyedStateHandle snapshot) { + void createAndRestoreKeyedStateBackend(int numberOfKeyGroups, KeyedStateHandle snapshot) + throws IOException { Collection stateHandles; if (snapshot == null) { stateHandles = Collections.emptyList(); @@ -102,6 +103,8 @@ void createAndRestoreKeyedStateBackend(int numberOfKeyGroups, KeyedStateHandle s stateHandles.add(snapshot); } env = MockEnvironment.builder().build(); + env.setCheckpointStorageAccess( + createCheckpointStorage().createCheckpointStorage(new JobID())); try { disposeKeyedStateBackend(); keyedStateBackend = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateWithKryoTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateWithKryoTestContext.java new file mode 100644 index 0000000000000..6d0dcf49bb235 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateWithKryoTestContext.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; + +import java.util.Arrays; +import java.util.Collections; + +/** Test suite for {@link TtlListState} with elements of serialized by kryo. */ +public class TtlListStateWithKryoTestContext + extends TtlListStateTestContextBase { + TtlListStateWithKryoTestContext() { + super(new KryoSerializer<>(NotPojoElement.class, getForceKryoSerializerConfig())); + } + + private static SerializerConfig getForceKryoSerializerConfig() { + Configuration config = new Configuration(); + config.set(PipelineOptions.FORCE_KRYO, true); + return new SerializerConfigImpl(config); + } + + @Override + NotPojoElement generateRandomElement(int i) { + return new NotPojoElement(RANDOM.nextInt(100)); + } + + @Override + void initTestValues() { + emptyValue = Collections.emptyList(); + + updateEmpty = + Arrays.asList(new NotPojoElement(5), new NotPojoElement(7), new NotPojoElement(10)); + updateUnexpired = + Arrays.asList(new NotPojoElement(8), new NotPojoElement(9), new NotPojoElement(11)); + updateExpired = Arrays.asList(new NotPojoElement(1), new NotPojoElement(4)); + + getUpdateEmpty = updateEmpty; + getUnexpired = updateUnexpired; + getUpdateExpired = updateExpired; + } + + public static class NotPojoElement { + public int value; + + public NotPojoElement(int value) { + this.value = value; + } + + @Override + public String toString() { + return "NotPojoElement{" + "value=" + value + '}'; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + NotPojoElement that = (NotPojoElement) obj; + return value == that.value; + } + + @Override + public int hashCode() { + return Integer.hashCode(value); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java index ca06f4b5c6a38..4adba8bb5615d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java @@ -85,6 +85,7 @@ protected abstract StateBackendTestContext createStateBackendTestContext( new TtlValueStateTestContext(), new TtlFixedLenElemListStateTestContext(), new TtlNonFixedLenElemListStateTestContext(), + new TtlListStateWithKryoTestContext(), new TtlMapStateAllEntriesTestContext(), new TtlMapStatePerElementTestContext(), new TtlMapStatePerNullElementTestContext(), diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBTtlCompactFiltersManager.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBTtlCompactFiltersManager.java index a1bc9ef2f92fd..89b55b9691874 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBTtlCompactFiltersManager.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBTtlCompactFiltersManager.java @@ -48,6 +48,7 @@ import java.io.IOException; import java.time.Duration; import java.util.LinkedHashMap; +import java.util.function.Supplier; /** RocksDB compaction filter utils for state with TTL. */ public class ForStDBTtlCompactFiltersManager { @@ -205,15 +206,27 @@ public void configCompactFilter( private static class ListElementFilterFactory implements FlinkCompactionFilter.ListElementFilterFactory { - private final TypeSerializer serializer; + // {@See #createListElementFilter}. + private final ThreadLocalSerializerProvider threadLocalSerializer; private ListElementFilterFactory(TypeSerializer serializer) { - this.serializer = serializer; + ClassLoader contextClassLoader = null; + try { + contextClassLoader = Thread.currentThread().getContextClassLoader(); + } catch (Throwable e) { + LOG.info("Cannot get context classloader for list state's compaction filter.", e); + } + threadLocalSerializer = + new ThreadLocalSerializerProvider<>(serializer, contextClassLoader); } @Override public FlinkCompactionFilter.ListElementFilter createListElementFilter() { - return new ListElementFilter<>(serializer); + // This method will be invoked by native code multiple times when creating compaction + // filter. And the created filter will be shared by multiple background threads. + // Make sure the serializer is thread-local and has classloader set for each thread + // correctly and individually. + return new ListElementFilter<>(threadLocalSerializer); } } @@ -231,11 +244,11 @@ public long currentTimestamp() { } private static class ListElementFilter implements FlinkCompactionFilter.ListElementFilter { - private final TypeSerializer serializer; - private DataInputDeserializer input; + private final ThreadLocalSerializerProvider threadLocalSerializer; + private final DataInputDeserializer input; - private ListElementFilter(TypeSerializer serializer) { - this.serializer = serializer; + private ListElementFilter(ThreadLocalSerializerProvider serializer) { + this.threadLocalSerializer = serializer; this.input = new DataInputDeserializer(); } @@ -243,9 +256,10 @@ private ListElementFilter(TypeSerializer serializer) { public int nextUnexpiredOffset(byte[] bytes, long ttl, long currentTimestamp) { input.setBuffer(bytes); int lastElementOffset = 0; + TypeSerializer serializer = threadLocalSerializer.get(); while (input.available() > 0) { try { - long timestamp = nextElementLastAccessTimestamp(); + long timestamp = nextElementLastAccessTimestamp(serializer); if (!TtlUtils.expired(timestamp, ttl, currentTimestamp)) { break; } @@ -258,7 +272,8 @@ public int nextUnexpiredOffset(byte[] bytes, long ttl, long currentTimestamp) { return lastElementOffset; } - private long nextElementLastAccessTimestamp() throws IOException { + private long nextElementLastAccessTimestamp(TypeSerializer serializer) + throws IOException { TtlValue ttlValue = (TtlValue) serializer.deserialize(input); if (input.available() > 0) { input.skipBytesToRead(1); @@ -267,6 +282,37 @@ private long nextElementLastAccessTimestamp() throws IOException { } } + private static class ThreadLocalSerializerProvider implements Supplier> { + // Multiple background threads may share the same filter instance, so we need to make sure + // the serializer is thread-local, and every thread has its own instance with classloader. + private final ThreadLocal> threadLocalSerializer; + + public ThreadLocalSerializerProvider( + TypeSerializer serializer, ClassLoader classLoader) { + this.threadLocalSerializer = + ThreadLocal.withInitial( + () -> { + setClassloaderIfNeeded(classLoader); + return serializer.duplicate(); + }); + } + + private void setClassloaderIfNeeded(ClassLoader classLoader) { + // The classloader that should be set to the current thread when deserializing. + // The reason why we should set classloader is that the serializer may be Kryo + // serializer which needs user classloader to load user classes. + // See FLINK-16686 for more details. + if (classLoader != null) { + Thread.currentThread().setContextClassLoader(classLoader); + } + } + + @Override + public TypeSerializer get() { + return threadLocalSerializer.get(); + } + } + public void disposeAndClearRegisteredCompactionFactories() { for (FlinkCompactionFilterFactory factory : compactionFilterFactories.values()) { IOUtils.closeQuietly(factory); diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java index 2a2705cf8c019..0cd1312cc44f6 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java @@ -74,6 +74,7 @@ import org.forstdb.ColumnFamilyOptions; import org.forstdb.ReadOptions; import org.forstdb.RocksDB; +import org.forstdb.RocksDBException; import org.forstdb.Snapshot; import org.forstdb.WriteOptions; import org.slf4j.Logger; @@ -944,6 +945,13 @@ public boolean isSafeToReuseKVState() { return true; } + @VisibleForTesting + public void compactState(StateDescriptor stateDesc) throws RocksDBException { + ForStOperationUtils.ForStKvStateInfo kvStateInfo = + kvStateInformation.get(stateDesc.getName()); + db.compactRange(kvStateInfo.columnFamilyHandle); + } + @Nonnegative long getWriteBatchSize() { return writeBatchSize; diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStSyncTtlStateTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStSyncTtlStateTest.java new file mode 100644 index 0000000000000..659f38d14bb69 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStSyncTtlStateTest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointStorage; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; +import org.apache.flink.runtime.state.ttl.StateBackendTestContext; +import org.apache.flink.runtime.state.ttl.TtlStateTestBase; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.state.forst.sync.ForStSyncKeyedStateBackend; +import org.apache.flink.testutils.junit.utils.TempDirUtils; +import org.apache.flink.util.FlinkRuntimeException; + +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Path; + +import static org.apache.flink.state.forst.ForStStateBackend.LOCAL_DIR_AS_PRIMARY_SHORTCUT; +import static org.assertj.core.api.Assertions.assertThat; + +/** Base test suite for rocksdb state TTL. */ +public class ForStSyncTtlStateTest extends TtlStateTestBase { + @TempDir private Path tempFolder; + + @Override + protected StateBackendTestContext createStateBackendTestContext(TtlTimeProvider timeProvider) { + return new StateBackendTestContext(timeProvider) { + @Override + protected StateBackend createStateBackend() { + return ForStSyncTtlStateTest.this.createStateBackend(); + } + + @Override + protected CheckpointStorage createCheckpointStorage() { + String checkpointPath; + try { + checkpointPath = TempDirUtils.newFolder(tempFolder).toURI().toString(); + } catch (IOException e) { + throw new FlinkRuntimeException("Failed to init rocksdb test state backend"); + } + return new FileSystemCheckpointStorage(checkpointPath); + } + }; + } + + StateBackend createStateBackend() { + ForStStateBackend backend = new ForStStateBackend(); + Configuration config = new Configuration(); + config.set(ForStOptions.PRIMARY_DIRECTORY, LOCAL_DIR_AS_PRIMARY_SHORTCUT); + backend = backend.configure(config, Thread.currentThread().getContextClassLoader()); + return backend; + } + + @Override + public boolean isSavepoint() { + return false; + } + + @TestTemplate + public void testCompactFilter() throws Exception { + testCompactFilter(false, false); + } + + @TestTemplate + public void testCompactFilterWithSnapshot() throws Exception { + testCompactFilter(true, false); + } + + @TestTemplate + public void testCompactFilterWithSnapshotAndRescalingAfterRestore() throws Exception { + testCompactFilter(true, true); + } + + @SuppressWarnings("resource") + private void testCompactFilter(boolean takeSnapshot, boolean rescaleAfterRestore) + throws Exception { + int numberOfKeyGroupsAfterRestore = StateBackendTestContext.NUMBER_OF_KEY_GROUPS; + if (rescaleAfterRestore) { + numberOfKeyGroupsAfterRestore *= 2; + } + + StateDescriptor stateDesc = + initTest( + getConfBuilder(TTL) + .setStateVisibility( + StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) + .build()); + + if (takeSnapshot) { + takeAndRestoreSnapshot(numberOfKeyGroupsAfterRestore); + } + + setTimeAndCompact(stateDesc, 0L); + + sbetc.setCurrentKey("k1"); + ctx().update(ctx().updateEmpty); + checkUnexpiredOriginalAvailable(); + + sbetc.setCurrentKey("k2"); + ctx().update(ctx().updateEmpty); + checkUnexpiredOriginalAvailable(); + + if (takeSnapshot) { + takeAndRestoreSnapshot(numberOfKeyGroupsAfterRestore); + } + + setTimeAndCompact(stateDesc, 50L); + + sbetc.setCurrentKey("k1"); + checkUnexpiredOriginalAvailable(); + assertThat(ctx().get()).withFailMessage(UNEXPIRED_AVAIL).isEqualTo(ctx().getUpdateEmpty); + + ctx().update(ctx().updateUnexpired); + checkUnexpiredOriginalAvailable(); + + sbetc.setCurrentKey("k2"); + checkUnexpiredOriginalAvailable(); + assertThat(ctx().get()).withFailMessage(UNEXPIRED_AVAIL).isEqualTo(ctx().getUpdateEmpty); + + ctx().update(ctx().updateUnexpired); + checkUnexpiredOriginalAvailable(); + + if (takeSnapshot) { + takeAndRestoreSnapshot(numberOfKeyGroupsAfterRestore); + } + + // compaction which should not touch unexpired data + // and merge list element with different expiration time + setTimeAndCompact(stateDesc, 80L); + // expire oldest data + setTimeAndCompact(stateDesc, 120L); + + sbetc.setCurrentKey("k1"); + checkUnexpiredOriginalAvailable(); + assertThat(ctx().get()) + .withFailMessage(UPDATED_UNEXPIRED_AVAIL) + .isEqualTo(ctx().getUnexpired); + + sbetc.setCurrentKey("k2"); + checkUnexpiredOriginalAvailable(); + assertThat(ctx().get()) + .withFailMessage(UPDATED_UNEXPIRED_AVAIL) + .isEqualTo(ctx().getUnexpired); + + if (takeSnapshot) { + takeAndRestoreSnapshot(numberOfKeyGroupsAfterRestore); + } + + setTimeAndCompact(stateDesc, 170L); + sbetc.setCurrentKey("k1"); + assertThat(ctx().isOriginalEmptyValue()) + .withFailMessage("Expired original state should be unavailable") + .isTrue(); + assertThat(ctx().get()).withFailMessage(EXPIRED_UNAVAIL).isEqualTo(ctx().emptyValue); + + sbetc.setCurrentKey("k2"); + assertThat(ctx().isOriginalEmptyValue()) + .withFailMessage("Expired original state should be unavailable") + .isTrue(); + assertThat(ctx().get()) + .withFailMessage("Expired state should be unavailable") + .isEqualTo(ctx().emptyValue); + } + + protected void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws Exception { + // Not supported state migration yet. + } + + private void checkUnexpiredOriginalAvailable() throws Exception { + assertThat(ctx().getOriginal()) + .withFailMessage("Unexpired original state should be available") + .isNotEqualTo(ctx().emptyValue); + } + + private void setTimeAndCompact(StateDescriptor stateDesc, long ts) throws Exception { + @SuppressWarnings("resource") + ForStSyncKeyedStateBackend keyedBackend = sbetc.getKeyedStateBackend(); + timeProvider.time = ts; + keyedBackend.compactState(stateDesc); + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/ttl/RocksDbTtlCompactFiltersManager.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/ttl/RocksDbTtlCompactFiltersManager.java index 69c7f5706c6db..5f6890e7a73ca 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/ttl/RocksDbTtlCompactFiltersManager.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/ttl/RocksDbTtlCompactFiltersManager.java @@ -48,6 +48,7 @@ import java.io.IOException; import java.time.Duration; import java.util.LinkedHashMap; +import java.util.function.Supplier; /** RocksDB compaction filter utils for state with TTL. */ public class RocksDbTtlCompactFiltersManager { @@ -186,15 +187,27 @@ public void configCompactFilter( private static class ListElementFilterFactory implements FlinkCompactionFilter.ListElementFilterFactory { - private final TypeSerializer serializer; + // {@See #createListElementFilter}. + private final ThreadLocalSerializerProvider threadLocalSerializer; private ListElementFilterFactory(TypeSerializer serializer) { - this.serializer = serializer; + ClassLoader contextClassLoader = null; + try { + contextClassLoader = Thread.currentThread().getContextClassLoader(); + } catch (Throwable e) { + LOG.info("Cannot get context classloader for list state's compaction filter.", e); + } + threadLocalSerializer = + new ThreadLocalSerializerProvider<>(serializer, contextClassLoader); } @Override public FlinkCompactionFilter.ListElementFilter createListElementFilter() { - return new ListElementFilter<>(serializer); + // This method will be invoked by native code multiple times when creating compaction + // filter. And the created filter will be shared by multiple background threads. + // Make sure the serializer is thread-local and has classloader set for each thread + // correctly and individually. + return new ListElementFilter<>(threadLocalSerializer); } } @@ -212,11 +225,11 @@ public long currentTimestamp() { } private static class ListElementFilter implements FlinkCompactionFilter.ListElementFilter { - private final TypeSerializer serializer; - private DataInputDeserializer input; + private final ThreadLocalSerializerProvider threadLocalSerializer; + private final DataInputDeserializer input; - private ListElementFilter(TypeSerializer serializer) { - this.serializer = serializer; + private ListElementFilter(ThreadLocalSerializerProvider serializer) { + this.threadLocalSerializer = serializer; this.input = new DataInputDeserializer(); } @@ -224,9 +237,10 @@ private ListElementFilter(TypeSerializer serializer) { public int nextUnexpiredOffset(byte[] bytes, long ttl, long currentTimestamp) { input.setBuffer(bytes); int lastElementOffset = 0; + TypeSerializer serializer = threadLocalSerializer.get(); while (input.available() > 0) { try { - long timestamp = nextElementLastAccessTimestamp(); + long timestamp = nextElementLastAccessTimestamp(serializer); if (!TtlUtils.expired(timestamp, ttl, currentTimestamp)) { break; } @@ -239,7 +253,8 @@ public int nextUnexpiredOffset(byte[] bytes, long ttl, long currentTimestamp) { return lastElementOffset; } - private long nextElementLastAccessTimestamp() throws IOException { + private long nextElementLastAccessTimestamp(TypeSerializer serializer) + throws IOException { TtlValue ttlValue = (TtlValue) serializer.deserialize(input); if (input.available() > 0) { input.skipBytesToRead(1); @@ -248,6 +263,37 @@ private long nextElementLastAccessTimestamp() throws IOException { } } + private static class ThreadLocalSerializerProvider implements Supplier> { + // Multiple background threads may share the same filter instance, so we need to make sure + // the serializer is thread-local, and every thread has its own instance with classloader. + private final ThreadLocal> threadLocalSerializer; + + public ThreadLocalSerializerProvider( + TypeSerializer serializer, ClassLoader classLoader) { + this.threadLocalSerializer = + ThreadLocal.withInitial( + () -> { + setClassloaderIfNeeded(classLoader); + return serializer.duplicate(); + }); + } + + private void setClassloaderIfNeeded(ClassLoader classLoader) { + // The classloader that should be set to the current thread when deserializing. + // The reason why we should set classloader is that the serializer may be Kryo + // serializer which needs user classloader to load user classes. + // See FLINK-16686 for more details. + if (classLoader != null) { + Thread.currentThread().setContextClassLoader(classLoader); + } + } + + @Override + public TypeSerializer get() { + return threadLocalSerializer.get(); + } + } + public void disposeAndClearRegisteredCompactionFactories() { for (FlinkCompactionFilterFactory factory : compactionFilterFactories.values()) { IOUtils.closeQuietly(factory);