From 8a7a76859b74d9ec499ba9d0ef49bddd137501e1 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Tue, 14 Mar 2017 14:25:57 +0100 Subject: [PATCH] [FLINK-6048] [checkpoint] Implement asynchronous snapshots for DefaultOperatorStateBackend --- .../state/RocksDBKeyedStateBackend.java | 2 +- .../streaming/state/RocksDBStateBackend.java | 17 +- .../state/RocksDBStateBackendTest.java | 2 +- .../AbstractAsyncSnapshotIOCallable.java | 109 +++++++ .../runtime/state/AbstractStateBackend.java | 7 +- .../state/DefaultOperatorStateBackend.java | 220 ++++++++++--- .../runtime/state/OperatorStateHandle.java | 6 +- .../state/filesystem/FsStateBackend.java | 13 + .../state/heap/HeapKeyedStateBackend.java | 107 +++---- .../state/memory/MemoryStateBackend.java | 13 + .../BlockerCheckpointStreamFactory.java | 112 +++++++ .../state/OperatorStateBackendTest.java | 301 ++++++++++++++++-- .../tasks/BlockingCheckpointsTest.java | 10 + .../streaming/runtime/StateBackendITCase.java | 9 + 14 files changed, 787 insertions(+), 141 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 91d947ecc481a..199a5a4a0ef2d 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -421,7 +421,6 @@ public void writeDBSnapshot() throws IOException, InterruptedException { */ public void closeCheckpointStream() throws IOException { if (outStream != null) { - stateBackend.cancelStreamRegistry.unregisterClosable(outStream); snapshotResultStateHandle = closeSnapshotStreamAndGetHandle(); } else { snapshotResultStateHandle = null; @@ -592,6 +591,7 @@ private void writeKVStateData() throws IOException, InterruptedException { } private KeyGroupsStateHandle closeSnapshotStreamAndGetHandle() throws IOException { + stateBackend.cancelStreamRegistry.unregisterClosable(outStream); StreamStateHandle stateHandle = outStream.closeAndGetHandle(); outStream = null; return stateHandle != null ? new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle) : null; diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 16b08693177c1..80c9a291ddef5 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -26,15 +26,15 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.util.AbstractID; - import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; import org.rocksdb.NativeLibraryLoader; import org.rocksdb.RocksDB; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -426,6 +426,19 @@ public ColumnFamilyOptions getColumnOptions() { return opt; } + @Override + public OperatorStateBackend createOperatorStateBackend( + Environment env, + String operatorIdentifier) throws Exception { + + //the default for RocksDB; eventually there can be a operator state backend based on RocksDB, too. + final boolean asyncSnapshots = true; + return new DefaultOperatorStateBackend( + env.getUserClassLoader(), + env.getExecutionConfig(), + asyncSnapshots); + } + @Override public String toString() { return "RocksDB State Backend {" + diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index d95a9b41d9857..b5f18a4f30c45 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.checkpoint.BlockerCheckpointStreamFactory; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; @@ -38,7 +39,6 @@ import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java new file mode 100644 index 0000000000000..1aaa473c3a359 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractAsyncSnapshotIOCallable.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.commons.io.IOUtils; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Abstract base class for async IO operations of snapshots against a + * {@link java.util.zip.CheckedOutputStream}. This includes participating in lifecycle management + * through a {@link CloseableRegistry}. + */ +public abstract class AbstractAsyncSnapshotIOCallable + extends AbstractAsyncIOCallable { + + protected final long checkpointId; + protected final long timestamp; + + protected final CheckpointStreamFactory streamFactory; + protected final CloseableRegistry closeStreamOnCancelRegistry; + protected final AtomicBoolean open; + + public AbstractAsyncSnapshotIOCallable( + long checkpointId, + long timestamp, + CheckpointStreamFactory streamFactory, + CloseableRegistry closeStreamOnCancelRegistry) { + + this.streamFactory = Preconditions.checkNotNull(streamFactory); + this.closeStreamOnCancelRegistry = Preconditions.checkNotNull(closeStreamOnCancelRegistry); + this.checkpointId = checkpointId; + this.timestamp = timestamp; + this.open = new AtomicBoolean(false); + } + + @Override + public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception { + if (checkStreamClosedAndDoTransitionToOpen()) { + CheckpointStreamFactory.CheckpointStateOutputStream stream = + streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp); + try { + closeStreamOnCancelRegistry.registerClosable(stream); + return stream; + } catch (Exception ex) { + open.set(false); + throw ex; + } + } else { + throw new IOException("Async snapshot: a checkpoint stream was already opened."); + } + } + + @Override + public void done(boolean canceled) { + if (checkStreamOpenAndDoTransitionToClose()) { + CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle(); + if (stream != null) { + closeStreamOnCancelRegistry.unregisterClosable(stream); + IOUtils.closeQuietly(stream); + } + } + } + + protected boolean checkStreamClosedAndDoTransitionToOpen() { + return open.compareAndSet(false, true); + } + + protected boolean checkStreamOpenAndDoTransitionToClose() { + return open.compareAndSet(true, false); + } + + protected StreamStateHandle closeStreamAndGetStateHandle() throws IOException { + if (checkStreamOpenAndDoTransitionToClose()) { + final CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle(); + try { + return stream.closeAndGetHandle(); + } finally { + closeStreamOnCancelRegistry.unregisterClosable(stream); + } + } else { + throw new IOException("Checkpoint stream already closed."); + } + } + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java index 74025bf2f8409..1594e2e73edc6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java @@ -91,12 +91,9 @@ public abstract AbstractKeyedStateBackend createKeyedStateBackend( TaskKvStateRegistry kvStateRegistry) throws IOException; @Override - public OperatorStateBackend createOperatorStateBackend( + public abstract OperatorStateBackend createOperatorStateBackend( Environment env, - String operatorIdentifier) throws Exception { - - return new DefaultOperatorStateBackend(env.getUserClassLoader(), env.getExecutionConfig()); - } + String operatorIdentifier) throws Exception; // ------------------------------------------------------------------------ // Loading the state backend from a configuration diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index eb3ba01e6b1f0..e7ed26f67fd11 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -20,10 +20,10 @@ import org.apache.commons.io.IOUtils; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; @@ -31,8 +31,12 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; @@ -50,21 +54,54 @@ @Internal public class DefaultOperatorStateBackend implements OperatorStateBackend { - /** The default namespace for state in cases where no state name is provided */ + private static final Logger LOG = LoggerFactory.getLogger(DefaultOperatorStateBackend.class); + + /** + * The default namespace for state in cases where no state name is provided + */ public static final String DEFAULT_OPERATOR_STATE_NAME = "_default_"; - + + /** + * Map for all registered operator states. Maps state name -> state + */ private final Map> registeredStates; + + /** + * CloseableRegistry to participate in the tasks lifecycle. + */ private final CloseableRegistry closeStreamOnCancelRegistry; + + /** + * Default serializer. Only used for the default operator state. + */ private final JavaSerializer javaSerializer; + + /** + * The user code classloader. + */ private final ClassLoader userClassloader; + + /** + * The execution configuration. + */ private final ExecutionConfig executionConfig; - public DefaultOperatorStateBackend(ClassLoader userClassLoader, ExecutionConfig executionConfig) throws IOException { + /** + * Flag to de/activate asynchronous snapshots. + */ + private final boolean asynchronousSnapshots; + + public DefaultOperatorStateBackend( + ClassLoader userClassLoader, + ExecutionConfig executionConfig, + boolean asynchronousSnapshots) throws IOException { + this.closeStreamOnCancelRegistry = new CloseableRegistry(); this.userClassloader = Preconditions.checkNotNull(userClassLoader); this.executionConfig = executionConfig; this.javaSerializer = new JavaSerializer<>(); this.registeredStates = new HashMap<>(); + this.asynchronousSnapshots = asynchronousSnapshots; } public ExecutionConfig getExecutionConfig() { @@ -131,59 +168,109 @@ public ListState getSerializableListState(String sta @Override public RunnableFuture snapshot( - long checkpointId, - long timestamp, - CheckpointStreamFactory streamFactory, - CheckpointOptions checkpointOptions) throws Exception { + final long checkpointId, + final long timestamp, + final CheckpointStreamFactory streamFactory, + final CheckpointOptions checkpointOptions) throws Exception { + + final long syncStartTime = System.currentTimeMillis(); if (registeredStates.isEmpty()) { return DoneFuture.nullValue(); } - List> metaInfoList = - new ArrayList<>(registeredStates.size()); - - for (Map.Entry> entry : registeredStates.entrySet()) { - PartitionableListState state = entry.getValue(); - OperatorBackendSerializationProxy.StateMetaInfo metaInfo = - new OperatorBackendSerializationProxy.StateMetaInfo<>( - state.getName(), - state.getPartitionStateSerializer(), - state.getAssignmentMode()); - metaInfoList.add(metaInfo); + final Map> registeredStatesDeepCopies = + new HashMap<>(registeredStates.size()); + + // eagerly create deep copies of the list states in the sync phase, so that we can use them in the async writing + for (Map.Entry> entry : this.registeredStates.entrySet()) { + + PartitionableListState listState = entry.getValue(); + if (null != listState) { + listState = listState.deepCopy(); + } + registeredStatesDeepCopies.put(entry.getKey(), listState); } - Map writtenStatesMetaData = new HashMap<>(registeredStates.size()); + // implementation of the async IO operation, based on FutureTask + final AbstractAsyncSnapshotIOCallable ioCallable = + new AbstractAsyncSnapshotIOCallable( + checkpointId, + timestamp, + streamFactory, + closeStreamOnCancelRegistry) { + + @Override + public OperatorStateHandle performOperation() throws Exception { + long asyncStartTime = System.currentTimeMillis(); + + final Map writtenStatesMetaData = + new HashMap<>(registeredStatesDeepCopies.size()); + + List> metaInfoList = + new ArrayList<>(registeredStatesDeepCopies.size()); + + for (Map.Entry> entry : + registeredStatesDeepCopies.entrySet()) { + + PartitionableListState state = entry.getValue(); + OperatorBackendSerializationProxy.StateMetaInfo metaInfo = + new OperatorBackendSerializationProxy.StateMetaInfo<>( + state.getName(), + state.getPartitionStateSerializer(), + state.getAssignmentMode()); + metaInfoList.add(metaInfo); + } - CheckpointStreamFactory.CheckpointStateOutputStream out = streamFactory. - createCheckpointStateOutputStream(checkpointId, timestamp); + CheckpointStreamFactory.CheckpointStateOutputStream out = getIoHandle(); + DataOutputView dov = new DataOutputViewStreamWrapper(out); - try { - closeStreamOnCancelRegistry.registerClosable(out); + OperatorBackendSerializationProxy backendSerializationProxy = + new OperatorBackendSerializationProxy(metaInfoList); - DataOutputView dov = new DataOutputViewStreamWrapper(out); + backendSerializationProxy.write(dov); + + dov.writeInt(registeredStatesDeepCopies.size()); - OperatorBackendSerializationProxy backendSerializationProxy = - new OperatorBackendSerializationProxy(metaInfoList); + for (Map.Entry> entry : + registeredStatesDeepCopies.entrySet()) { - backendSerializationProxy.write(dov); + PartitionableListState value = entry.getValue(); + long[] partitionOffsets = value.write(out); + OperatorStateHandle.Mode mode = value.getAssignmentMode(); + writtenStatesMetaData.put( + entry.getKey(), + new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode)); + } - dov.writeInt(registeredStates.size()); - for (Map.Entry> entry : registeredStates.entrySet()) { + StreamStateHandle stateHandle = closeStreamAndGetStateHandle(); - PartitionableListState value = entry.getValue(); - long[] partitionOffsets = value.write(out); - OperatorStateHandle.Mode mode = value.getAssignmentMode(); - writtenStatesMetaData.put(entry.getKey(), new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode)); - } + if (asynchronousSnapshots) { + LOG.info("DefaultOperatorStateBackend snapshot ({}, asynchronous part) in thread {} took {} ms.", + streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime)); + } + + if (stateHandle == null) { + return null; + } - OperatorStateHandle handle = new OperatorStateHandle(writtenStatesMetaData, out.closeAndGetHandle()); + OperatorStateHandle operatorStateHandle = + new OperatorStateHandle(writtenStatesMetaData, stateHandle); - return new DoneFuture<>(handle); - } finally { - closeStreamOnCancelRegistry.unregisterClosable(out); - out.close(); + return operatorStateHandle; + } + }; + + AsyncStoppableTaskWithCallback task = AsyncStoppableTaskWithCallback.from(ioCallable); + + if (!asynchronousSnapshots) { + task.run(); } + + LOG.info("DefaultOperatorStateBackend snapshot (" + streamFactory + ", synchronous part) in thread " + + Thread.currentThread() + " took " + (System.currentTimeMillis() - syncStartTime) + " ms."); + + return task; } @Override @@ -253,22 +340,67 @@ public void restore(Collection restoreSnapshots) throws Exc } } + /** + * + * Implementation of operator list state. + * + * @param the type of an operator state partition. + */ static final class PartitionableListState implements ListState { + /** + * The name of the state, as registered by the user + */ private final String name; + + /** + * The type serializer for the elements in the state list + */ private final TypeSerializer partitionStateSerializer; + + /** + * The mode how elements in this state are assigned to tasks during restore + */ private final OperatorStateHandle.Mode assignmentMode; - private final List internalList; + + /** + * The internal list the holds the elements of the state + */ + private final ArrayList internalList; + + /** + * A serializer that allows to perfom deep copies of internalList + */ + private final ArrayListSerializer internalListCopySerializer; public PartitionableListState( String name, TypeSerializer partitionStateSerializer, OperatorStateHandle.Mode assignmentMode) { + this(name, partitionStateSerializer, assignmentMode, new ArrayList()); + } + + private PartitionableListState( + String name, + TypeSerializer partitionStateSerializer, + OperatorStateHandle.Mode assignmentMode, + ArrayList internalList) { + this.name = Preconditions.checkNotNull(name); this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer); this.assignmentMode = Preconditions.checkNotNull(assignmentMode); - this.internalList = new ArrayList<>(); + this.internalList = Preconditions.checkNotNull(internalList); + this.internalListCopySerializer = new ArrayListSerializer<>(partitionStateSerializer); + } + + private PartitionableListState(PartitionableListState toCopy) { + + this( + toCopy.name, + toCopy.partitionStateSerializer.duplicate(), + toCopy.assignmentMode, + toCopy.internalListCopySerializer.copy(toCopy.internalList)); } public String getName() { @@ -287,6 +419,10 @@ public List getInternalList() { return internalList; } + public PartitionableListState deepCopy() { + return new PartitionableListState<>(this); + } + @Override public void clear() { internalList.clear(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java index c59fbadc1f9fd..7c493383ec2b0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java @@ -32,8 +32,12 @@ */ public class OperatorStateHandle implements StreamStateHandle { + /** + * The modes that determine how an {@link OperatorStateHandle} is assigned to tasks during restore. + */ public enum Mode { - SPLIT_DISTRIBUTE, BROADCAST + SPLIT_DISTRIBUTE, // The operator state partitions in the state handle are split and distributed to one task each. + BROADCAST // The operator state partitions are broadcasted to all task. } private static final long serialVersionUID = 35876522969227335L; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index e27712caa67a7..e320bf3140de7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -27,7 +27,9 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -309,6 +311,17 @@ public AbstractKeyedStateBackend createKeyedStateBackend( env.getExecutionConfig()); } + @Override + public OperatorStateBackend createOperatorStateBackend( + Environment env, + String operatorIdentifier) throws Exception { + + return new DefaultOperatorStateBackend( + env.getUserClassLoader(), + env.getExecutionConfig(), + asynchronousSnapshots); + } + @Override public String toString() { return "File State Backend @ " + basePath; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 9247ffac5327c..38817cdb4bd57 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -37,10 +37,10 @@ import org.apache.flink.migration.MigrationUtil; import org.apache.flink.migration.runtime.state.KvStateSnapshot; import org.apache.flink.migration.runtime.state.memory.MigrationRestoreSnapshot; -import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable; -import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; import org.apache.flink.migration.state.MigrationKeyGroupStateHandle; +import org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.ArrayListSerializer; @@ -72,7 +72,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.RunnableFuture; -import java.util.concurrent.atomic.AtomicBoolean; /** * A {@link AbstractKeyedStateBackend} that keeps state on the Java Heap and will serialize state to @@ -269,78 +268,50 @@ public RunnableFuture snapshot( //--------------------------------------------------- this becomes the end of sync part // implementation of the async IO operation, based on FutureTask - final AbstractAsyncIOCallable ioCallable = - new AbstractAsyncIOCallable() { - - AtomicBoolean open = new AtomicBoolean(false); - - @Override - public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception { - if (open.compareAndSet(false, true)) { - CheckpointStreamFactory.CheckpointStateOutputStream stream = - streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp); - try { - cancelStreamRegistry.registerClosable(stream); - return stream; - } catch (Exception ex) { - open.set(false); - throw ex; - } - } else { - throw new IOException("Operation already opened."); + final AbstractAsyncSnapshotIOCallable ioCallable = + new AbstractAsyncSnapshotIOCallable( + checkpointId, + timestamp, + streamFactory, + cancelStreamRegistry) { + + @Override + public KeyGroupsStateHandle performOperation() throws Exception { + long asyncStartTime = System.currentTimeMillis(); + CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle(); + DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(stream); + serializationProxy.write(outView); + + long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()]; + + for (int keyGroupPos = 0; keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) { + int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos); + keyGroupRangeOffsets[keyGroupPos] = stream.getPos(); + outView.writeInt(keyGroupId); + + for (Map.Entry> kvState : stateTables.entrySet()) { + outView.writeShort(kVStateToId.get(kvState.getKey())); + cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(outView, keyGroupId); } } - @Override - public KeyGroupsStateHandle performOperation() throws Exception { - long asyncStartTime = System.currentTimeMillis(); - CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle(); - DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(stream); - serializationProxy.write(outView); - - long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()]; - - for (int keyGroupPos = 0; keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) { - int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos); - keyGroupRangeOffsets[keyGroupPos] = stream.getPos(); - outView.writeInt(keyGroupId); - - for (Map.Entry> kvState : stateTables.entrySet()) { - outView.writeShort(kVStateToId.get(kvState.getKey())); - cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(outView, keyGroupId); - } - } - - if (open.compareAndSet(true, false)) { - StreamStateHandle streamStateHandle = stream.closeAndGetHandle(); - KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets); - final KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle); + final StreamStateHandle streamStateHandle = closeStreamAndGetStateHandle(); - if (asynchronousSnapshots) { - LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.", - streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime)); - } - - return keyGroupsStateHandle; - } else { - throw new IOException("Checkpoint stream already closed."); - } + if (asynchronousSnapshots) { + LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.", + streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime)); } - @Override - public void done(boolean canceled) { - if (open.compareAndSet(true, false)) { - CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle(); - if (null != stream) { - cancelStreamRegistry.unregisterClosable(stream); - IOUtils.closeQuietly(stream); - } - } - for (StateTableSnapshot snapshot : cowStateStableSnapshots.values()) { - snapshot.release(); - } + if (streamStateHandle == null) { + return null; } - }; + + KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets); + final KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle); + + return keyGroupsStateHandle; + } + }; AsyncStoppableTaskWithCallback task = AsyncStoppableTaskWithCallback.from(ioCallable); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java index f0bac1b17c56c..7ed1dea17a13f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java @@ -25,7 +25,9 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import java.io.IOException; @@ -88,6 +90,17 @@ public MemoryStateBackend(int maxStateSize, boolean asynchronousSnapshots) { this.asynchronousSnapshots = asynchronousSnapshots; } + @Override + public OperatorStateBackend createOperatorStateBackend( + Environment env, + String operatorIdentifier) throws Exception { + + return new DefaultOperatorStateBackend( + env.getUserClassLoader(), + env.getExecutionConfig(), + asynchronousSnapshots); + } + @Override public String toString() { return "MemoryStateBackend (data in heap memory / checkpoints to JobManager)"; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java new file mode 100644 index 0000000000000..6f892e2053980 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; + +import java.io.IOException; + +/** + * A {@link CheckpointStreamFactory} for tests that creates streams that block on a latch to test concurrency in + * checkpointing. + */ +public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory { + + private final int maxSize; + private int afterNumberInvocations; + private OneShotLatch blocker; + private OneShotLatch waiter; + + MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream; + + public MemCheckpointStreamFactory.MemoryCheckpointOutputStream getLastCreatedStream() { + return lastCreatedStream; + } + + public BlockerCheckpointStreamFactory(int maxSize) { + this.maxSize = maxSize; + } + + public void setAfterNumberInvocations(int afterNumberInvocations) { + this.afterNumberInvocations = afterNumberInvocations; + } + + public void setBlockerLatch(OneShotLatch latch) { + this.blocker = latch; + } + + public void setWaiterLatch(OneShotLatch latch) { + this.waiter = latch; + } + + @Override + public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { + waiter.trigger(); + this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) { + + private int afterNInvocations = afterNumberInvocations; + private final OneShotLatch streamBlocker = blocker; + private final OneShotLatch streamWaiter = waiter; + + @Override + public void write(int b) throws IOException { + + if (afterNInvocations > 0) { + --afterNInvocations; + } + + if (0 == afterNInvocations && null != streamBlocker) { + try { + streamBlocker.await(); + } catch (InterruptedException ignored) { + } + } + try { + super.write(b); + } catch (IOException ex) { + if (null != streamWaiter) { + streamWaiter.trigger(); + } + throw ex; + } + + if (0 == afterNInvocations && null != streamWaiter) { + streamWaiter.trigger(); + } + } + + @Override + public void close() { + super.close(); + if (null != streamWaiter) { + streamWaiter.trigger(); + } + } + }; + + return lastCreatedStream; + } + + @Override + public void close() throws Exception { + + } +} \ No newline at end of file diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index 13a6307921588..c04ed8cf39562 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -21,22 +21,30 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.checkpoint.BlockerCheckpointStreamFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState; import org.apache.flink.runtime.state.memory.MemoryStateBackend; - import org.apache.flink.util.FutureUtil; +import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.io.Serializable; import java.io.File; import java.util.Collections; import java.util.Iterator; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -75,7 +83,7 @@ public void testRegisterStatesWithoutTypeSerializer() throws Exception { final ExecutionConfig cfg = new ExecutionConfig(); cfg.registerTypeWithKryoSerializer(registeredType, com.esotericsoftware.kryo.serializers.JavaSerializer.class); - final OperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackend(classLoader, cfg); + final OperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackend(classLoader, cfg, false); ListStateDescriptor stateDescriptor = new ListStateDescriptor<>("test", File.class); ListStateDescriptor stateDescriptor2 = new ListStateDescriptor<>("test2", String.class); @@ -108,7 +116,7 @@ public void testRegisterStatesWithoutTypeSerializer() throws Exception { @Test public void testRegisterStates() throws Exception { final OperatorStateBackend operatorStateBackend = - new DefaultOperatorStateBackend(classLoader, new ExecutionConfig()); + new DefaultOperatorStateBackend(classLoader, new ExecutionConfig(), false); ListStateDescriptor stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>()); ListStateDescriptor stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>()); @@ -117,19 +125,19 @@ public void testRegisterStates() throws Exception { assertNotNull(listState1); assertEquals(1, operatorStateBackend.getRegisteredStateNames().size()); Iterator it = listState1.get().iterator(); - assertTrue(!it.hasNext()); + assertFalse(it.hasNext()); listState1.add(42); listState1.add(4711); it = listState1.get().iterator(); assertEquals(42, it.next()); assertEquals(4711, it.next()); - assertTrue(!it.hasNext()); + assertFalse(it.hasNext()); ListState listState2 = operatorStateBackend.getListState(stateDescriptor2); assertNotNull(listState2); assertEquals(2, operatorStateBackend.getRegisteredStateNames().size()); - assertTrue(!it.hasNext()); + assertFalse(it.hasNext()); listState2.add(7); listState2.add(13); listState2.add(23); @@ -138,12 +146,12 @@ public void testRegisterStates() throws Exception { assertEquals(7, it.next()); assertEquals(13, it.next()); assertEquals(23, it.next()); - assertTrue(!it.hasNext()); + assertFalse(it.hasNext()); ListState listState3 = operatorStateBackend.getUnionListState(stateDescriptor3); assertNotNull(listState3); assertEquals(3, operatorStateBackend.getRegisteredStateNames().size()); - assertTrue(!it.hasNext()); + assertFalse(it.hasNext()); listState3.add(17); listState3.add(3); listState3.add(123); @@ -152,7 +160,7 @@ public void testRegisterStates() throws Exception { assertEquals(17, it.next()); assertEquals(3, it.next()); assertEquals(123, it.next()); - assertTrue(!it.hasNext()); + assertFalse(it.hasNext()); ListState listState1b = operatorStateBackend.getListState(stateDescriptor1); assertNotNull(listState1b); @@ -161,19 +169,19 @@ public void testRegisterStates() throws Exception { assertEquals(42, it.next()); assertEquals(4711, it.next()); assertEquals(123, it.next()); - assertTrue(!it.hasNext()); + assertFalse(it.hasNext()); it = listState1.get().iterator(); assertEquals(42, it.next()); assertEquals(4711, it.next()); assertEquals(123, it.next()); - assertTrue(!it.hasNext()); + assertFalse(it.hasNext()); it = listState1b.get().iterator(); assertEquals(42, it.next()); assertEquals(4711, it.next()); assertEquals(123, it.next()); - assertTrue(!it.hasNext()); + assertFalse(it.hasNext()); try { operatorStateBackend.getUnionListState(stateDescriptor2); @@ -208,12 +216,10 @@ public void testSnapshotEmpty() throws Exception { } @Test - public void testSnapshotRestore() throws Exception { + public void testSnapshotRestoreSync() throws Exception { AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096); - OperatorStateBackend operatorStateBackend = - abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "test-op-name"); - + OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "test-op-name"); ListStateDescriptor stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>()); ListStateDescriptor stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>()); ListStateDescriptor stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>()); @@ -234,8 +240,9 @@ public void testSnapshotRestore() throws Exception { listState3.add(20); CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator"); - OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet( - operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint())); + RunnableFuture runnableFuture = + operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint()); + OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(runnableFuture); try { @@ -259,26 +266,278 @@ public void testSnapshotRestore() throws Exception { Iterator it = listState1.get().iterator(); assertEquals(42, it.next()); assertEquals(4711, it.next()); - assertTrue(!it.hasNext()); + assertFalse(it.hasNext()); it = listState2.get().iterator(); assertEquals(7, it.next()); assertEquals(13, it.next()); assertEquals(23, it.next()); - assertTrue(!it.hasNext()); + assertFalse(it.hasNext()); it = listState3.get().iterator(); assertEquals(17, it.next()); assertEquals(18, it.next()); assertEquals(19, it.next()); assertEquals(20, it.next()); - assertTrue(!it.hasNext()); + assertFalse(it.hasNext()); + + operatorStateBackend.close(); + operatorStateBackend.dispose(); + } finally { + stateHandle.discardState(); + } + } + + @Test + public void testSnapshotRestoreAsync() throws Exception { + DefaultOperatorStateBackend operatorStateBackend = + new DefaultOperatorStateBackend(OperatorStateBackendTest.class.getClassLoader(), new ExecutionConfig(), true); + + ListStateDescriptor stateDescriptor1 = + new ListStateDescriptor<>("test1", new JavaSerializer()); + ListStateDescriptor stateDescriptor2 = + new ListStateDescriptor<>("test2", new JavaSerializer()); + ListStateDescriptor stateDescriptor3 = + new ListStateDescriptor<>("test3", new JavaSerializer()); + ListState listState1 = operatorStateBackend.getListState(stateDescriptor1); + ListState listState2 = operatorStateBackend.getListState(stateDescriptor2); + ListState listState3 = operatorStateBackend.getUnionListState(stateDescriptor3); + + listState1.add(MutableType.of(42)); + listState1.add(MutableType.of(4711)); + + listState2.add(MutableType.of(7)); + listState2.add(MutableType.of(13)); + listState2.add(MutableType.of(23)); + + listState3.add(MutableType.of(17)); + listState3.add(MutableType.of(18)); + listState3.add(MutableType.of(19)); + listState3.add(MutableType.of(20)); + + BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(1024 * 1024); + + OneShotLatch waiterLatch = new OneShotLatch(); + OneShotLatch blockerLatch = new OneShotLatch(); + + streamFactory.setWaiterLatch(waiterLatch); + streamFactory.setBlockerLatch(blockerLatch); + + RunnableFuture runnableFuture = + operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint()); + + ExecutorService executorService = Executors.newFixedThreadPool(1); + + executorService.submit(runnableFuture); + + // wait until the async checkpoint is in the write code, then continue + waiterLatch.await(); + + // do some mutations to the state, to test if our snapshot will NOT reflect them + + listState1.add(MutableType.of(77)); + + int n = 0; + + for (MutableType mutableType : listState2.get()) { + if (++n == 2) { + // allow the write code to continue, so that we could do changes while state is written in parallel. + blockerLatch.trigger(); + } + mutableType.setValue(mutableType.getValue() + 10); + } + + listState3.clear(); + + operatorStateBackend.getListState( + new ListStateDescriptor<>("test4", new JavaSerializer())); + + // run the snapshot + OperatorStateHandle stateHandle = runnableFuture.get(); + + try { + + operatorStateBackend.close(); + operatorStateBackend.dispose(); + + AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096); + + //TODO this is temporarily casted to test already functionality that we do not yet expose through public API + operatorStateBackend = (DefaultOperatorStateBackend) abstractStateBackend.createOperatorStateBackend( + createMockEnvironment(), + "testOperator"); + + operatorStateBackend.restore(Collections.singletonList(stateHandle)); + + assertEquals(3, operatorStateBackend.getRegisteredStateNames().size()); + + listState1 = operatorStateBackend.getListState(stateDescriptor1); + listState2 = operatorStateBackend.getListState(stateDescriptor2); + listState3 = operatorStateBackend.getUnionListState(stateDescriptor3); + + assertEquals(3, operatorStateBackend.getRegisteredStateNames().size()); + + Iterator it = listState1.get().iterator(); + assertEquals(42, it.next().value); + assertEquals(4711, it.next().value); + assertFalse(it.hasNext()); + + it = listState2.get().iterator(); + assertEquals(7, it.next().value); + assertEquals(13, it.next().value); + assertEquals(23, it.next().value); + assertFalse(it.hasNext()); + + it = listState3.get().iterator(); + assertEquals(17, it.next().value); + assertEquals(18, it.next().value); + assertEquals(19, it.next().value); + assertEquals(20, it.next().value); + assertFalse(it.hasNext()); operatorStateBackend.close(); operatorStateBackend.dispose(); } finally { stateHandle.discardState(); } + + executorService.shutdown(); + } + + @Test + public void testSnapshotAsyncClose() throws Exception { + DefaultOperatorStateBackend operatorStateBackend = + new DefaultOperatorStateBackend(OperatorStateBackendTest.class.getClassLoader(), new ExecutionConfig(), true); + + ListStateDescriptor stateDescriptor1 = + new ListStateDescriptor<>("test1", new JavaSerializer()); + + ListState listState1 = operatorStateBackend.getOperatorState(stateDescriptor1); + + + listState1.add(MutableType.of(42)); + listState1.add(MutableType.of(4711)); + + BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(1024 * 1024); + + OneShotLatch waiterLatch = new OneShotLatch(); + OneShotLatch blockerLatch = new OneShotLatch(); + + streamFactory.setWaiterLatch(waiterLatch); + streamFactory.setBlockerLatch(blockerLatch); + + RunnableFuture runnableFuture = + operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint()); + + ExecutorService executorService = Executors.newFixedThreadPool(1); + + executorService.submit(runnableFuture); + + // wait until the async checkpoint is in the write code, then continue + waiterLatch.await(); + + operatorStateBackend.close(); + + blockerLatch.trigger(); + + try { + runnableFuture.get(60, TimeUnit.SECONDS); + Assert.fail(); + } catch (ExecutionException eex) { + Assert.assertTrue(eex.getCause() instanceof IOException); + } + } + + @Test + public void testSnapshotAsyncCancel() throws Exception { + DefaultOperatorStateBackend operatorStateBackend = + new DefaultOperatorStateBackend(OperatorStateBackendTest.class.getClassLoader(), new ExecutionConfig(), true); + + ListStateDescriptor stateDescriptor1 = + new ListStateDescriptor<>("test1", new JavaSerializer()); + + ListState listState1 = operatorStateBackend.getOperatorState(stateDescriptor1); + + + listState1.add(MutableType.of(42)); + listState1.add(MutableType.of(4711)); + + BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(1024 * 1024); + + OneShotLatch waiterLatch = new OneShotLatch(); + OneShotLatch blockerLatch = new OneShotLatch(); + + streamFactory.setWaiterLatch(waiterLatch); + streamFactory.setBlockerLatch(blockerLatch); + + RunnableFuture runnableFuture = + operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint()); + + ExecutorService executorService = Executors.newFixedThreadPool(1); + + executorService.submit(runnableFuture); + + // wait until the async checkpoint is in the write code, then continue + waiterLatch.await(); + + runnableFuture.cancel(true); + + blockerLatch.trigger(); + + try { + runnableFuture.get(60, TimeUnit.SECONDS); + Assert.fail(); + } catch (CancellationException ignore) { + + } + } + + static final class MutableType implements Serializable { + + private static final long serialVersionUID = 1L; + + private int value; + + public MutableType() { + this(0); + } + + public MutableType(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + + public void setValue(int value) { + this.value = value; + } + + @Override + public boolean equals(Object o) { + + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + MutableType that = (MutableType) o; + + return value == that.value; + } + + @Override + public int hashCode() { + return value; + } + + static MutableType of(int value) { + return new MutableType(value); + } } // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java index e266ea1d053f6..bf5be7925c47a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java @@ -53,7 +53,9 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; +import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.runtime.state.StreamStateHandle; @@ -187,6 +189,14 @@ public AbstractKeyedStateBackend createKeyedStateBackend( throw new UnsupportedOperationException(); } + + @Override + public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception { + return new DefaultOperatorStateBackend( + getClass().getClassLoader(), + new ExecutionConfig(), + true); + } } private static final class LockingOutputStreamFactory implements CheckpointStreamFactory { diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java index 4677242bb67c8..ce342c0b62f7d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.junit.Test; @@ -112,6 +113,14 @@ public AbstractKeyedStateBackend createKeyedStateBackend( TaskKvStateRegistry kvStateRegistry) throws IOException { throw new SuccessException(); } + + @Override + public OperatorStateBackend createOperatorStateBackend( + Environment env, + String operatorIdentifier) throws Exception { + + throw new UnsupportedOperationException(); + } } static final class SuccessException extends IOException {