From 19d76b8ea3a33e99aa3c6609c1517b5d7b918f1d Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Thu, 5 Jan 2017 23:45:13 +0100 Subject: [PATCH 1/4] Explicit restore for backends --- .../state/RocksDBKeyedStateBackend.java | 67 +++----- .../streaming/state/RocksDBStateBackend.java | 37 ----- .../runtime/state/AbstractStateBackend.java | 32 +--- .../state/DefaultOperatorStateBackend.java | 151 ++++++++---------- .../flink/runtime/state/Snapshotable.java | 9 ++ .../state/filesystem/FsStateBackend.java | 21 --- .../state/heap/HeapKeyedStateBackend.java | 37 ++--- .../state/memory/MemoryStateBackend.java | 22 --- .../state/OperatorStateBackendTest.java | 11 +- .../runtime/state/StateBackendTestBase.java | 21 ++- .../streaming/runtime/tasks/StreamTask.java | 48 +++--- .../tasks/BlockingCheckpointsTest.java | 13 -- .../AbstractStreamOperatorTestHarness.java | 15 +- ...eyedOneInputStreamOperatorTestHarness.java | 37 ++--- ...eyedTwoInputStreamOperatorTestHarness.java | 33 ++-- .../streaming/runtime/StateBackendITCase.java | 15 -- 16 files changed, 196 insertions(+), 373 deletions(-) diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 1c0a4b7be7d90..71e2c79b0bb3c 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -180,51 +180,6 @@ public RocksDBKeyedStateBackend( kvStateInformation = new HashMap<>(); } - public RocksDBKeyedStateBackend( - JobID jobId, - String operatorIdentifier, - ClassLoader userCodeClassLoader, - File instanceBasePath, - DBOptions dbOptions, - ColumnFamilyOptions columnFamilyOptions, - TaskKvStateRegistry kvStateRegistry, - TypeSerializer keySerializer, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - Collection restoreState - ) throws Exception { - - this(jobId, - operatorIdentifier, - userCodeClassLoader, - instanceBasePath, - dbOptions, - columnFamilyOptions, - kvStateRegistry, - keySerializer, - numberOfKeyGroups, - keyGroupRange); - - LOG.info("Initializing RocksDB keyed state backend from snapshot."); - - if (LOG.isDebugEnabled()) { - LOG.debug("Restoring snapshot from state handles: {}.", restoreState); - } - - try { - if (MigrationUtil.isOldSavepointKeyedState(restoreState)) { - LOG.info("Converting RocksDB state from old savepoint."); - restoreOldSavepointKeyedState(restoreState); - } else { - RocksDBRestoreOperation restoreOperation = new RocksDBRestoreOperation(this); - restoreOperation.doRestore(restoreState); - } - } catch (Exception ex) { - dispose(); - throw ex; - } - } - /** * Should only be called by one thread, and only after all accesses to the DB happened. */ @@ -631,6 +586,28 @@ private static void checkInterrupted() throws InterruptedException { } } + @Override + public void restore(Collection restoreState) throws Exception { + LOG.info("Initializing RocksDB keyed state backend from snapshot."); + + if (LOG.isDebugEnabled()) { + LOG.debug("Restoring snapshot from state handles: {}.", restoreState); + } + + try { + if (MigrationUtil.isOldSavepointKeyedState(restoreState)) { + LOG.info("Converting RocksDB state from old savepoint."); + restoreOldSavepointKeyedState(restoreState); + } else { + RocksDBRestoreOperation restoreOperation = new RocksDBRestoreOperation(this); + restoreOperation.doRestore(restoreState); + } + } catch (Exception ex) { + dispose(); + throw ex; + } + } + /** * Encapsulates the process of restoring a RocksDBKeyedStateBackend from a snapshot. */ diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index c2e33d43a706e..1e5620f54420c 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -28,13 +28,10 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.util.AbstractID; - import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; - import org.rocksdb.NativeLibraryLoader; import org.rocksdb.RocksDB; import org.slf4j.Logger; @@ -46,7 +43,6 @@ import java.net.URI; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.Random; import java.util.UUID; @@ -262,39 +258,6 @@ public AbstractKeyedStateBackend createKeyedStateBackend( keyGroupRange); } - @Override - public AbstractKeyedStateBackend restoreKeyedStateBackend( - Environment env, - JobID jobID, - String operatorIdentifier, - TypeSerializer keySerializer, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - Collection restoredState, - TaskKvStateRegistry kvStateRegistry) throws Exception { - - // first, make sure that the RocksDB JNI library is loaded - // we do this explicitly here to have better error handling - String tempDir = env.getTaskManagerInfo().getTmpDirectories()[0]; - ensureRocksDBIsLoaded(tempDir); - - lazyInitializeForJob(env, operatorIdentifier); - - File instanceBasePath = new File(getDbPath(), UUID.randomUUID().toString()); - return new RocksDBKeyedStateBackend<>( - jobID, - operatorIdentifier, - env.getUserClassLoader(), - instanceBasePath, - getDbOptions(), - getColumnOptions(), - kvStateRegistry, - keySerializer, - numberOfKeyGroups, - keyGroupRange, - restoredState); - } - // ------------------------------------------------------------------------ // Parameters // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java index 1b53f1a76e71f..60d035af7db61 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry; import java.io.IOException; -import java.util.Collection; /** * A state backend defines how state is stored and snapshotted during checkpoints. @@ -58,42 +57,13 @@ public abstract AbstractKeyedStateBackend createKeyedStateBackend( TaskKvStateRegistry kvStateRegistry ) throws Exception; - /** - * Creates a new {@link AbstractKeyedStateBackend} that restores its state from the given list - * {@link KeyGroupsStateHandle KeyGroupStateHandles}. - */ - public abstract AbstractKeyedStateBackend restoreKeyedStateBackend( - Environment env, - JobID jobID, - String operatorIdentifier, - TypeSerializer keySerializer, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - Collection restoredState, - TaskKvStateRegistry kvStateRegistry - ) throws Exception; - - /** * Creates a new {@link OperatorStateBackend} that can be used for storing partitionable operator * state in checkpoint streams. */ public OperatorStateBackend createOperatorStateBackend( Environment env, - String operatorIdentifier - ) throws Exception { + String operatorIdentifier) throws Exception { return new DefaultOperatorStateBackend(env.getUserClassLoader()); } - - /** - * Creates a new {@link OperatorStateBackend} that restores its state from the given collection of - * {@link OperatorStateHandle}. - */ - public OperatorStateBackend restoreOperatorStateBackend( - Environment env, - String operatorIdentifier, - Collection restoreSnapshots - ) throws Exception { - return new DefaultOperatorStateBackend(env.getUserClassLoader(), restoreSnapshots); - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index d7a10d527211d..10bb4090cc3e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -50,33 +50,16 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { public static final String DEFAULT_OPERATOR_STATE_NAME = "_default_"; private final Map> registeredStates; - private final Collection restoreSnapshots; private final CloseableRegistry closeStreamOnCancelRegistry; private final JavaSerializer javaSerializer; private final ClassLoader userClassloader; - /** - * Restores a OperatorStateStore (lazily) using the provided snapshots. - * - * @param restoreSnapshots snapshots that are available to restore partitionable states on request. - */ - public DefaultOperatorStateBackend( - ClassLoader userClassLoader, - Collection restoreSnapshots) throws IOException { + public DefaultOperatorStateBackend(ClassLoader userClassLoader) throws IOException { + this.closeStreamOnCancelRegistry = new CloseableRegistry(); this.userClassloader = Preconditions.checkNotNull(userClassLoader); this.javaSerializer = new JavaSerializer<>(); this.registeredStates = new HashMap<>(); - this.closeStreamOnCancelRegistry = new CloseableRegistry(); - this.restoreSnapshots = restoreSnapshots; - restoreState(); - } - - /** - * Creates an empty OperatorStateStore. - */ - public DefaultOperatorStateBackend(ClassLoader userClassLoader) throws IOException { - this(userClassLoader, null); } @SuppressWarnings("unchecked") @@ -111,69 +94,6 @@ public ListState getOperatorState(ListStateDescriptor stateDescriptor) return partitionableListState; } - private void restoreState() throws IOException { - - if (null == restoreSnapshots) { - return; - } - - for (OperatorStateHandle stateHandle : restoreSnapshots) { - - if (stateHandle == null) { - continue; - } - - FSDataInputStream in = stateHandle.openInputStream(); - closeStreamOnCancelRegistry.registerClosable(in); - - ClassLoader restoreClassLoader = Thread.currentThread().getContextClassLoader(); - - try { - Thread.currentThread().setContextClassLoader(userClassloader); - OperatorBackendSerializationProxy backendSerializationProxy = - new OperatorBackendSerializationProxy(userClassloader); - - backendSerializationProxy.read(new DataInputViewStreamWrapper(in)); - - List> metaInfoList = - backendSerializationProxy.getNamedStateSerializationProxies(); - - // Recreate all PartitionableListStates from the meta info - for (OperatorBackendSerializationProxy.StateMetaInfo stateMetaInfo : metaInfoList) { - PartitionableListState listState = registeredStates.get(stateMetaInfo.getName()); - - if (null == listState) { - listState = new PartitionableListState<>( - stateMetaInfo.getName(), - stateMetaInfo.getStateSerializer()); - - registeredStates.put(listState.getName(), listState); - } else { - Preconditions.checkState(listState.getPartitionStateSerializer().isCompatibleWith( - stateMetaInfo.getStateSerializer()), "Incompatible state serializers found: " + - listState.getPartitionStateSerializer() + " is not compatible with " + - stateMetaInfo.getStateSerializer()); - } - } - - // Restore all the state in PartitionableListStates - for (Map.Entry nameToOffsets : stateHandle.getStateNameToPartitionOffsets().entrySet()) { - PartitionableListState stateListForName = registeredStates.get(nameToOffsets.getKey()); - - Preconditions.checkState(null != stateListForName, "Found state without " + - "corresponding meta info: " + nameToOffsets.getKey()); - - deserializeStateValues(stateListForName, in, nameToOffsets.getValue()); - } - - } finally { - Thread.currentThread().setContextClassLoader(restoreClassLoader); - closeStreamOnCancelRegistry.unregisterClosable(in); - IOUtils.closeQuietly(in); - } - } - } - private static void deserializeStateValues( PartitionableListState stateListForName, FSDataInputStream in, @@ -238,6 +158,70 @@ public RunnableFuture snapshot( } } + @Override + public void restore(Collection restoreSnapshots) throws Exception { + + if (null == restoreSnapshots) { + return; + } + + for (OperatorStateHandle stateHandle : restoreSnapshots) { + + if (stateHandle == null) { + continue; + } + + FSDataInputStream in = stateHandle.openInputStream(); + closeStreamOnCancelRegistry.registerClosable(in); + + ClassLoader restoreClassLoader = Thread.currentThread().getContextClassLoader(); + + try { + Thread.currentThread().setContextClassLoader(userClassloader); + OperatorBackendSerializationProxy backendSerializationProxy = + new OperatorBackendSerializationProxy(userClassloader); + + backendSerializationProxy.read(new DataInputViewStreamWrapper(in)); + + List> metaInfoList = + backendSerializationProxy.getNamedStateSerializationProxies(); + + // Recreate all PartitionableListStates from the meta info + for (OperatorBackendSerializationProxy.StateMetaInfo stateMetaInfo : metaInfoList) { + PartitionableListState listState = registeredStates.get(stateMetaInfo.getName()); + + if (null == listState) { + listState = new PartitionableListState<>( + stateMetaInfo.getName(), + stateMetaInfo.getStateSerializer()); + + registeredStates.put(listState.getName(), listState); + } else { + Preconditions.checkState(listState.getPartitionStateSerializer().isCompatibleWith( + stateMetaInfo.getStateSerializer()), "Incompatible state serializers found: " + + listState.getPartitionStateSerializer() + " is not compatible with " + + stateMetaInfo.getStateSerializer()); + } + } + + // Restore all the state in PartitionableListStates + for (Map.Entry nameToOffsets : stateHandle.getStateNameToPartitionOffsets().entrySet()) { + PartitionableListState stateListForName = registeredStates.get(nameToOffsets.getKey()); + + Preconditions.checkState(null != stateListForName, "Found state without " + + "corresponding meta info: " + nameToOffsets.getKey()); + + deserializeStateValues(stateListForName, in, nameToOffsets.getValue()); + } + + } finally { + Thread.currentThread().setContextClassLoader(restoreClassLoader); + closeStreamOnCancelRegistry.unregisterClosable(in); + IOUtils.closeQuietly(in); + } + } + } + @Override public void dispose() { registeredStates.clear(); @@ -314,5 +298,4 @@ public String toString() { '}'; } } -} - +} \ No newline at end of file diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java index 2aa282d064988..a4a6bc4b00e5d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state; +import java.util.Collection; import java.util.concurrent.RunnableFuture; /** @@ -42,4 +43,12 @@ RunnableFuture snapshot( long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception; + + /** + * Restores state that was previously snapshotted from the provided parameters. Typically the parameters are state + * handles from which the old state is read. + * + * @param state the old state to restore. + */ + void restore(Collection state) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index 4e15cd51048c9..281dbb0c83462 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +35,6 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; -import java.util.Collection; /** * The file state backend is a state backend that stores the state of streaming jobs in a file system. @@ -191,25 +189,6 @@ public AbstractKeyedStateBackend createKeyedStateBackend( keyGroupRange); } - @Override - public AbstractKeyedStateBackend restoreKeyedStateBackend( - Environment env, - JobID jobID, - String operatorIdentifier, - TypeSerializer keySerializer, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - Collection restoredState, - TaskKvStateRegistry kvStateRegistry) throws Exception { - return new HeapKeyedStateBackend<>( - kvStateRegistry, - keySerializer, - env.getUserClassLoader(), - numberOfKeyGroups, - keyGroupRange, - restoredState); - } - @Override public String toString() { return "File State Backend @ " + basePath; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index d07901b8f176d..d461dfdf32c57 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -101,28 +101,6 @@ public HeapKeyedStateBackend( LOG.info("Initializing heap keyed state backend with stream factory."); } - public HeapKeyedStateBackend( - TaskKvStateRegistry kvStateRegistry, - TypeSerializer keySerializer, - ClassLoader userCodeClassLoader, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - Collection restoredState) throws Exception { - super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange); - - LOG.info("Initializing heap keyed state backend from snapshot."); - - if (LOG.isDebugEnabled()) { - LOG.debug("Restoring snapshot from state handles: {}.", restoredState); - } - - if (MigrationUtil.isOldSavepointKeyedState(restoredState)) { - restoreOldSavepointKeyedState(restoredState); - } else { - restorePartitionedState(restoredState); - } - } - // ------------------------------------------------------------------------ // state backend operations // ------------------------------------------------------------------------ @@ -251,6 +229,21 @@ public RunnableFuture snapshot( } } + @Override + public void restore(Collection restoredState) throws Exception { + LOG.info("Initializing heap keyed state backend from snapshot."); + + if (LOG.isDebugEnabled()) { + LOG.debug("Restoring snapshot from state handles: {}.", restoredState); + } + + if (MigrationUtil.isOldSavepointKeyedState(restoredState)) { + restoreOldSavepointKeyedState(restoredState); + } else { + restorePartitionedState(restoredState); + } + } + private void writeStateTableForKeyGroup( DataOutputView outView, StateTable stateTable, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java index 33f03ad3ae0fd..58a86dfa15e1e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java @@ -26,11 +26,9 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import java.io.IOException; -import java.util.Collection; /** * A {@link AbstractStateBackend} that stores all its data and checkpoints in memory and has no @@ -92,24 +90,4 @@ public AbstractKeyedStateBackend createKeyedStateBackend( numberOfKeyGroups, keyGroupRange); } - - @Override - public AbstractKeyedStateBackend restoreKeyedStateBackend( - Environment env, JobID jobID, - String operatorIdentifier, - TypeSerializer keySerializer, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - Collection restoredState, - TaskKvStateRegistry kvStateRegistry) throws Exception { - - return new HeapKeyedStateBackend<>( - kvStateRegistry, - keySerializer, - env.getUserClassLoader(), - numberOfKeyGroups, - keyGroupRange, - restoredState); - } - } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index 648d762272fdd..515011fdccbcf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -45,7 +45,9 @@ static Environment createMockEnvironment() { } private OperatorStateBackend createNewOperatorStateBackend() throws Exception { - return abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "test-operator"); + return abstractStateBackend.createOperatorStateBackend( + createMockEnvironment(), + "test-operator"); } @Test @@ -131,8 +133,11 @@ public void testSnapshotRestore() throws Exception { operatorStateBackend.dispose(); - operatorStateBackend = abstractStateBackend.restoreOperatorStateBackend( - createMockEnvironment(), "testOperator", Collections.singletonList(stateHandle)); + operatorStateBackend = abstractStateBackend.createOperatorStateBackend( + createMockEnvironment(), + "testOperator"); + + operatorStateBackend.restore(Collections.singletonList(stateHandle)); assertEquals(2, operatorStateBackend.getRegisteredStateNames().size()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 5655f1cfd39f6..9bc4c539d7a5d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -58,7 +58,13 @@ import java.util.concurrent.RunnableFuture; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -101,8 +107,7 @@ protected AbstractKeyedStateBackend createKeyedBackend( keySerializer, numberOfKeyGroups, keyGroupRange, - env.getTaskKvStateRegistry()) -; + env.getTaskKvStateRegistry()); } protected AbstractKeyedStateBackend restoreKeyedBackend(TypeSerializer keySerializer, KeyGroupsStateHandle state) throws Exception { @@ -127,15 +132,21 @@ protected AbstractKeyedStateBackend restoreKeyedBackend( KeyGroupRange keyGroupRange, List state, Environment env) throws Exception { - return getStateBackend().restoreKeyedStateBackend( + + AbstractKeyedStateBackend backend = getStateBackend().createKeyedStateBackend( env, new JobID(), "test_op", keySerializer, numberOfKeyGroups, keyGroupRange, - state, env.getTaskKvStateRegistry()); + + if (null != state) { + backend.restore(state); + } + + return backend; } @Test diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index bd9215ab093ff..3bbc53b27757e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -741,13 +741,17 @@ public OperatorStateBackend createOperatorStateBackend( Environment env = getEnvironment(); String opId = createOperatorIdentifier(op, getConfiguration().getVertexID()); - OperatorStateBackend newBackend = restoreStateHandles == null ? - stateBackend.createOperatorStateBackend(env, opId) - : stateBackend.restoreOperatorStateBackend(env, opId, restoreStateHandles); + OperatorStateBackend operatorStateBackend = stateBackend.createOperatorStateBackend(env, opId); - cancelables.registerClosable(newBackend); + // let operator state backend participate in the operator lifecycle, i.e. make it responsive to cancelation + cancelables.registerClosable(operatorStateBackend); - return newBackend; + // restore if we have some old state + if (null != restoreStateHandles) { + operatorStateBackend.restore(restoreStateHandles); + } + + return operatorStateBackend; } public AbstractKeyedStateBackend createKeyedStateBackend( @@ -763,29 +767,23 @@ public AbstractKeyedStateBackend createKeyedStateBackend( headOperator, configuration.getVertexID()); - if (null != restoreStateHandles && null != restoreStateHandles.getManagedKeyedState()) { - keyedStateBackend = stateBackend.restoreKeyedStateBackend( - getEnvironment(), - getEnvironment().getJobID(), - operatorIdentifier, - keySerializer, - numberOfKeyGroups, - keyGroupRange, - restoreStateHandles.getManagedKeyedState(), - getEnvironment().getTaskKvStateRegistry()); - } else { - keyedStateBackend = stateBackend.createKeyedStateBackend( - getEnvironment(), - getEnvironment().getJobID(), - operatorIdentifier, - keySerializer, - numberOfKeyGroups, - keyGroupRange, - getEnvironment().getTaskKvStateRegistry()); - } + keyedStateBackend = stateBackend.createKeyedStateBackend( + getEnvironment(), + getEnvironment().getJobID(), + operatorIdentifier, + keySerializer, + numberOfKeyGroups, + keyGroupRange, + getEnvironment().getTaskKvStateRegistry()); + // let keyed state backend participate in the operator lifecycle, i.e. make it responsive to cancelation cancelables.registerClosable(keyedStateBackend); + // restore if we have some old state + if (null != restoreStateHandles && null != restoreStateHandles.getManagedKeyedState()) { + keyedStateBackend.restore(restoreStateHandles.getManagedKeyedState()); + } + @SuppressWarnings("unchecked") AbstractKeyedStateBackend typedBackend = (AbstractKeyedStateBackend) keyedStateBackend; return typedBackend; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java index 7becbf4503ce9..492b4707e66a6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java @@ -52,7 +52,6 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.runtime.state.StreamStateHandle; @@ -64,12 +63,10 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.StreamFilter; import org.apache.flink.util.SerializedValue; - import org.junit.Test; import java.io.IOException; import java.net.URL; -import java.util.Collection; import java.util.Collections; import static org.junit.Assert.assertEquals; @@ -183,16 +180,6 @@ public AbstractKeyedStateBackend createKeyedStateBackend( throw new UnsupportedOperationException(); } - - @Override - public AbstractKeyedStateBackend restoreKeyedStateBackend( - Environment env, JobID jobID, String operatorIdentifier, - TypeSerializer keySerializer, int numberOfKeyGroups, - KeyGroupRange keyGroupRange, Collection restoredState, - TaskKvStateRegistry kvStateRegistry) throws Exception { - - throw new UnsupportedOperationException(); - } } private static final class LockingOutputStreamFactory implements CheckpointStreamFactory { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 346d5c373ad70..7fe4ebc9abec0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -192,12 +192,17 @@ public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Thr final StreamOperator operator = (StreamOperator) invocationOnMock.getArguments()[0]; final Collection stateHandles = (Collection) invocationOnMock.getArguments()[1]; OperatorStateBackend osb; - if (null == stateHandles) { - osb = stateBackend.createOperatorStateBackend(environment, operator.getClass().getSimpleName()); - } else { - osb = stateBackend.restoreOperatorStateBackend(environment, operator.getClass().getSimpleName(), stateHandles); - } + + osb = stateBackend.createOperatorStateBackend( + environment, + operator.getClass().getSimpleName()); + mockTask.getCancelables().registerClosable(osb); + + if (null != stateHandles) { + osb.restore(stateHandles); + } + return osb; } }).when(mockTask).createOperatorStateBackend(any(StreamOperator.class), any(Collection.class)); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java index 3a47a1ded20e0..4abb6e25d722d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java @@ -100,33 +100,24 @@ public KeyedStateBackend answer(InvocationOnMock invocationOnMock) throws Throwa final int numberOfKeyGroups = (Integer) invocationOnMock.getArguments()[1]; final KeyGroupRange keyGroupRange = (KeyGroupRange) invocationOnMock.getArguments()[2]; - if(keyedStateBackend != null) { + if (keyedStateBackend != null) { keyedStateBackend.dispose(); } - if (restoredKeyedState == null) { - keyedStateBackend = stateBackend.createKeyedStateBackend( - mockTask.getEnvironment(), - new JobID(), - "test_op", - keySerializer, - numberOfKeyGroups, - keyGroupRange, - mockTask.getEnvironment().getTaskKvStateRegistry()); - return keyedStateBackend; - } else { - keyedStateBackend = stateBackend.restoreKeyedStateBackend( - mockTask.getEnvironment(), - new JobID(), - "test_op", - keySerializer, - numberOfKeyGroups, - keyGroupRange, - restoredKeyedState, - mockTask.getEnvironment().getTaskKvStateRegistry()); - restoredKeyedState = null; - return keyedStateBackend; + keyedStateBackend = stateBackend.createKeyedStateBackend( + mockTask.getEnvironment(), + new JobID(), + "test_op", + keySerializer, + numberOfKeyGroups, + keyGroupRange, + mockTask.getEnvironment().getTaskKvStateRegistry()); + + if (restoredKeyedState != null) { + keyedStateBackend.restore(restoredKeyedState); } + + return keyedStateBackend; } }).when(mockTask).createKeyedStateBackend(any(TypeSerializer.class), anyInt(), any(KeyGroupRange.class)); } catch (Exception e) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java index 0aa91d9445291..8e76f7065f4e8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java @@ -94,29 +94,18 @@ public KeyedStateBackend answer(InvocationOnMock invocationOnMock) throws Throwa keyedStateBackend.close(); } - if (restoredKeyedState == null) { - keyedStateBackend = stateBackend.createKeyedStateBackend( - mockTask.getEnvironment(), - new JobID(), - "test_op", - keySerializer, - numberOfKeyGroups, - keyGroupRange, - mockTask.getEnvironment().getTaskKvStateRegistry()); - return keyedStateBackend; - } else { - keyedStateBackend = stateBackend.restoreKeyedStateBackend( - mockTask.getEnvironment(), - new JobID(), - "test_op", - keySerializer, - numberOfKeyGroups, - keyGroupRange, - restoredKeyedState, - mockTask.getEnvironment().getTaskKvStateRegistry()); - restoredKeyedState = null; - return keyedStateBackend; + keyedStateBackend = stateBackend.createKeyedStateBackend( + mockTask.getEnvironment(), + new JobID(), + "test_op", + keySerializer, + numberOfKeyGroups, + keyGroupRange, + mockTask.getEnvironment().getTaskKvStateRegistry()); + if (restoredKeyedState != null) { + keyedStateBackend.restore(restoredKeyedState); } + return keyedStateBackend; } }).when(mockTask).createKeyedStateBackend(any(TypeSerializer.class), anyInt(), any(KeyGroupRange.class)); } catch (Exception e) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java index 963d18a7bcbcd..0e62fbbb91634 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java @@ -32,13 +32,11 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.junit.Test; import java.io.IOException; -import java.util.Collection; import static org.junit.Assert.fail; @@ -110,19 +108,6 @@ public AbstractKeyedStateBackend createKeyedStateBackend( TaskKvStateRegistry kvStateRegistry) throws Exception { throw new SuccessException(); } - - @Override - public AbstractKeyedStateBackend restoreKeyedStateBackend( - Environment env, - JobID jobID, - String operatorIdentifier, - TypeSerializer keySerializer, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - Collection restoredState, - TaskKvStateRegistry kvStateRegistry) throws Exception { - throw new SuccessException(); - } } static final class SuccessException extends IOException { From 03e83df8b62375ff8bbc76c7441101767cde2cb5 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Wed, 11 Jan 2017 14:48:06 +0100 Subject: [PATCH 2/4] Updated unit test --- .../state/StateInitializationContextImpl.java | 66 +++++--- .../tasks/InterruptSensitiveRestoreTest.java | 148 +++++++++++++++++- 2 files changed, 182 insertions(+), 32 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java index c86ff6c3902fb..be59a2ad04fd4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java @@ -30,6 +30,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.NoSuchElementException; /** * Default implementation of {@link StateInitializationContext}. @@ -155,19 +156,21 @@ public KeyGroupStreamIterator( public boolean hasNext() { if (null != currentStateHandle && currentOffsetsIterator.hasNext()) { return true; - } else { - while (stateHandleIterator.hasNext()) { - currentStateHandle = stateHandleIterator.next(); - if (currentStateHandle.getNumberOfKeyGroups() > 0) { - currentOffsetsIterator = currentStateHandle.getGroupRangeOffsets().iterator(); - closableRegistry.unregisterClosable(currentStream); - IOUtils.closeQuietly(currentStream); - currentStream = null; - return true; - } + } + + while (stateHandleIterator.hasNext()) { + currentStateHandle = stateHandleIterator.next(); + if (currentStateHandle.getNumberOfKeyGroups() > 0) { + currentOffsetsIterator = currentStateHandle.getGroupRangeOffsets().iterator(); + closableRegistry.unregisterClosable(currentStream); + IOUtils.closeQuietly(currentStream); + currentStream = null; + + return true; } - return false; } + + return false; } private void openStream() throws IOException { @@ -178,6 +181,11 @@ private void openStream() throws IOException { @Override public KeyGroupStatePartitionStreamProvider next() { + + if (!hasNext()) { + throw new NoSuchElementException("Iterator exhausted"); + } + Tuple2 keyGroupOffset = currentOffsetsIterator.next(); try { if (null == currentStream) { @@ -220,26 +228,28 @@ public OperatorStateStreamIterator( @Override public boolean hasNext() { - if (null != currentStateHandle && offPos < offsets.length) { + + if (null != offsets && offPos < offsets.length) { return true; - } else { - while (stateHandleIterator.hasNext()) { - currentStateHandle = stateHandleIterator.next(); - long[] offsets = currentStateHandle.getStateNameToPartitionOffsets().get(stateName); - if (null != offsets && offsets.length > 0) { + } + + while (stateHandleIterator.hasNext()) { + currentStateHandle = stateHandleIterator.next(); + long[] offsets = currentStateHandle.getStateNameToPartitionOffsets().get(stateName); + if (null != offsets && offsets.length > 0) { - this.offsets = offsets; - this.offPos = 0; + this.offsets = offsets; + this.offPos = 0; - closableRegistry.unregisterClosable(currentStream); - IOUtils.closeQuietly(currentStream); - currentStream = null; + closableRegistry.unregisterClosable(currentStream); + IOUtils.closeQuietly(currentStream); + currentStream = null; - return true; - } + return true; } - return false; } + + return false; } private void openStream() throws IOException { @@ -250,7 +260,13 @@ private void openStream() throws IOException { @Override public StatePartitionStreamProvider next() { + + if (!hasNext()) { + throw new NoSuchElementException("Iterator exhausted"); + } + long offset = offsets[offPos++]; + try { if (null == currentStream) { openStream(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index aa2492c634357..8ba64ffc5e5f8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.testutils.OneShotLatch; @@ -44,8 +45,14 @@ import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.DefaultOperatorStateBackend; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.taskmanager.CheckpointResponder; @@ -55,11 +62,11 @@ import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.util.SerializedValue; - import org.junit.Test; import java.io.EOFException; @@ -68,7 +75,9 @@ import java.net.URL; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.Executor; import static org.junit.Assert.assertEquals; @@ -89,17 +98,63 @@ public class InterruptSensitiveRestoreTest { private static final OneShotLatch IN_RESTORE_LATCH = new OneShotLatch(); + private static final int OPERATOR_MANAGED = 0; + private static final int OPERATOR_RAW = 1; + private static final int KEYED_MANAGED = 2; + private static final int KEYED_RAW = 3; + private static final int LEGACY = 4; + + @Test + public void testRestoreWithInterruptLegacy() throws Exception { + testRestoreWithInterrupt(LEGACY); + } + + @Test + public void testRestoreWithInterruptOperatorManaged() throws Exception { + testRestoreWithInterrupt(OPERATOR_MANAGED); + } + @Test - public void testRestoreWithInterrupt() throws Exception { + public void testRestoreWithInterruptOperatorRaw() throws Exception { + testRestoreWithInterrupt(OPERATOR_RAW); + } + + @Test + public void testRestoreWithInterruptKeyedManaged() throws Exception { + testRestoreWithInterrupt(KEYED_MANAGED); + } + + @Test + public void testRestoreWithInterruptKeyedRaw() throws Exception { + testRestoreWithInterrupt(KEYED_RAW); + } + private void testRestoreWithInterrupt(int mode) throws Exception { + + IN_RESTORE_LATCH.reset(); Configuration taskConfig = new Configuration(); StreamConfig cfg = new StreamConfig(taskConfig); cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); - cfg.setStreamOperator(new StreamSource<>(new TestSource())); + switch (mode) { + case OPERATOR_MANAGED: + case OPERATOR_RAW: + cfg.setStreamOperator(new StreamSource<>(new TestSourceOperator())); + break; + case KEYED_MANAGED: + case KEYED_RAW: + cfg.setStateKeySerializer(IntSerializer.INSTANCE); + cfg.setStreamOperator(new StreamSource<>(new TestSourceKeyed())); + break; + case LEGACY: + cfg.setStreamOperator(new StreamSource<>(new TestSourceLegacy())); + break; + default: + throw new IllegalArgumentException(); + } StreamStateHandle lockingHandle = new InterruptLockingStateHandle(); - Task task = createTask(taskConfig, lockingHandle); + Task task = createTask(taskConfig, lockingHandle, mode); // start the task and wait until it is in "restore" task.startTaskThread(); @@ -124,18 +179,51 @@ public void testRestoreWithInterrupt() throws Exception { private static Task createTask( Configuration taskConfig, - StreamStateHandle state) throws IOException { + StreamStateHandle state, + int mode) throws IOException { NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class); when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) .thenReturn(mock(TaskKvStateRegistry.class)); - ChainedStateHandle operatorState = new ChainedStateHandle<>(Collections.singletonList(state)); + + ChainedStateHandle operatorState = null; List keyGroupStateFromBackend = Collections.emptyList(); List keyGroupStateFromStream = Collections.emptyList(); List> operatorStateBackend = Collections.emptyList(); List> operatorStateStream = Collections.emptyList(); + Map operatorStateMetadata = new HashMap<>(1); + operatorStateMetadata.put(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, new long[]{0}); + + KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(new KeyGroupRange(0,0)); + + Collection operatorStateHandles = + Collections.singletonList(new OperatorStateHandle(operatorStateMetadata, state)); + + List keyGroupsStateHandles = + Collections.singletonList(new KeyGroupsStateHandle(keyGroupRangeOffsets, state)); + + switch (mode) { + case OPERATOR_MANAGED: + operatorStateBackend = Collections.singletonList(operatorStateHandles); + break; + case OPERATOR_RAW: + operatorStateStream = Collections.singletonList(operatorStateHandles); + break; + case KEYED_MANAGED: + keyGroupStateFromBackend = keyGroupsStateHandles; + break; + case KEYED_RAW: + keyGroupStateFromStream = keyGroupsStateHandles; + break; + case LEGACY: + operatorState = new ChainedStateHandle<>(Collections.singletonList(state)); + break; + default: + throw new IllegalArgumentException(); + } + TaskStateHandles taskStateHandles = new TaskStateHandles( operatorState, operatorStateBackend, @@ -258,7 +346,7 @@ public long getStateSize() { // ------------------------------------------------------------------------ - private static class TestSource implements SourceFunction, Checkpointed { + private static class TestSourceLegacy implements SourceFunction, Checkpointed { private static final long serialVersionUID = 1L; @Override @@ -280,4 +368,50 @@ public void restoreState(Serializable state) throws Exception { fail("should never be called"); } } + + private static class TestSourceOperator implements SourceFunction, CheckpointedFunction { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext ctx) throws Exception { + fail("should never be called"); + } + + @Override + public void cancel() {} + + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + fail("should never be called"); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + ((StateInitializationContext)context).getRawOperatorStateInputs().iterator().next().getStream().read(); + } + } + + private static class TestSourceKeyed implements SourceFunction, CheckpointedFunction { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext ctx) throws Exception { + fail("should never be called"); + } + + @Override + public void cancel() {} + + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + fail("should never be called"); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + ((StateInitializationContext)context).getRawKeyedStateInputs().iterator().next().getStream().read(); + } + } } From 8d474990b70de54a4fad866acdcce211c006e815 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Wed, 11 Jan 2017 15:35:37 +0100 Subject: [PATCH 3/4] removed unnecessary code from unit test --- .../tasks/InterruptSensitiveRestoreTest.java | 29 ++----------------- 1 file changed, 2 insertions(+), 27 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index 8ba64ffc5e5f8..0206cf58b2a41 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -138,12 +138,10 @@ private void testRestoreWithInterrupt(int mode) throws Exception { switch (mode) { case OPERATOR_MANAGED: case OPERATOR_RAW: - cfg.setStreamOperator(new StreamSource<>(new TestSourceOperator())); - break; case KEYED_MANAGED: case KEYED_RAW: cfg.setStateKeySerializer(IntSerializer.INSTANCE); - cfg.setStreamOperator(new StreamSource<>(new TestSourceKeyed())); + cfg.setStreamOperator(new StreamSource<>(new TestSource())); break; case LEGACY: cfg.setStreamOperator(new StreamSource<>(new TestSourceLegacy())); @@ -369,7 +367,7 @@ public void restoreState(Serializable state) throws Exception { } } - private static class TestSourceOperator implements SourceFunction, CheckpointedFunction { + private static class TestSource implements SourceFunction, CheckpointedFunction { private static final long serialVersionUID = 1L; @Override @@ -391,27 +389,4 @@ public void initializeState(FunctionInitializationContext context) throws Except ((StateInitializationContext)context).getRawOperatorStateInputs().iterator().next().getStream().read(); } } - - private static class TestSourceKeyed implements SourceFunction, CheckpointedFunction { - private static final long serialVersionUID = 1L; - - @Override - public void run(SourceContext ctx) throws Exception { - fail("should never be called"); - } - - @Override - public void cancel() {} - - - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - fail("should never be called"); - } - - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - ((StateInitializationContext)context).getRawKeyedStateInputs().iterator().next().getStream().read(); - } - } } From 6ce722f27db63d92382bc29a2c165c4600a3f3e9 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Wed, 11 Jan 2017 15:36:04 +0100 Subject: [PATCH 4/4] minor code deduplication --- .../state/StateInitializationContextImpl.java | 93 ++++++++++--------- 1 file changed, 50 insertions(+), 43 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java index be59a2ad04fd4..46445d2cce21d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java @@ -136,35 +136,31 @@ public void close() { IOUtils.closeQuietly(closableRegistry); } - private static class KeyGroupStreamIterator implements Iterator { + private static class KeyGroupStreamIterator + extends AbstractStateStreamIterator { - private final Iterator stateHandleIterator; - private final CloseableRegistry closableRegistry; - - private KeyGroupsStateHandle currentStateHandle; - private FSDataInputStream currentStream; private Iterator> currentOffsetsIterator; public KeyGroupStreamIterator( Iterator stateHandleIterator, CloseableRegistry closableRegistry) { - this.stateHandleIterator = Preconditions.checkNotNull(stateHandleIterator); - this.closableRegistry = Preconditions.checkNotNull(closableRegistry); + super(stateHandleIterator, closableRegistry); } @Override public boolean hasNext() { + if (null != currentStateHandle && currentOffsetsIterator.hasNext()) { + return true; } + closeCurrentStream(); + while (stateHandleIterator.hasNext()) { currentStateHandle = stateHandleIterator.next(); if (currentStateHandle.getNumberOfKeyGroups() > 0) { currentOffsetsIterator = currentStateHandle.getGroupRangeOffsets().iterator(); - closableRegistry.unregisterClosable(currentStream); - IOUtils.closeQuietly(currentStream); - currentStream = null; return true; } @@ -173,46 +169,33 @@ public boolean hasNext() { return false; } - private void openStream() throws IOException { - FSDataInputStream stream = currentStateHandle.openInputStream(); - closableRegistry.registerClosable(stream); - currentStream = stream; - } - @Override public KeyGroupStatePartitionStreamProvider next() { if (!hasNext()) { + throw new NoSuchElementException("Iterator exhausted"); } Tuple2 keyGroupOffset = currentOffsetsIterator.next(); try { if (null == currentStream) { - openStream(); + openCurrentStream(); } currentStream.seek(keyGroupOffset.f1); + return new KeyGroupStatePartitionStreamProvider(currentStream, keyGroupOffset.f0); } catch (IOException ioex) { + return new KeyGroupStatePartitionStreamProvider(ioex, keyGroupOffset.f0); } } - - @Override - public void remove() { - throw new UnsupportedOperationException("Read only Iterator"); - } } - private static class OperatorStateStreamIterator implements Iterator { + private static class OperatorStateStreamIterator + extends AbstractStateStreamIterator { private final String stateName; //TODO since we only support a single named state in raw, this could be dropped - - private final Iterator stateHandleIterator; - private final CloseableRegistry closableRegistry; - - private OperatorStateHandle currentStateHandle; - private FSDataInputStream currentStream; private long[] offsets; private int offPos; @@ -221,18 +204,20 @@ public OperatorStateStreamIterator( Iterator stateHandleIterator, CloseableRegistry closableRegistry) { + super(stateHandleIterator, closableRegistry); this.stateName = Preconditions.checkNotNull(stateName); - this.stateHandleIterator = Preconditions.checkNotNull(stateHandleIterator); - this.closableRegistry = Preconditions.checkNotNull(closableRegistry); } @Override public boolean hasNext() { if (null != offsets && offPos < offsets.length) { + return true; } + closeCurrentStream(); + while (stateHandleIterator.hasNext()) { currentStateHandle = stateHandleIterator.next(); long[] offsets = currentStateHandle.getStateNameToPartitionOffsets().get(stateName); @@ -241,10 +226,6 @@ public boolean hasNext() { this.offsets = offsets; this.offPos = 0; - closableRegistry.unregisterClosable(currentStream); - IOUtils.closeQuietly(currentStream); - currentStream = null; - return true; } } @@ -252,16 +233,11 @@ public boolean hasNext() { return false; } - private void openStream() throws IOException { - FSDataInputStream stream = currentStateHandle.openInputStream(); - closableRegistry.registerClosable(stream); - currentStream = stream; - } - @Override public StatePartitionStreamProvider next() { if (!hasNext()) { + throw new NoSuchElementException("Iterator exhausted"); } @@ -269,15 +245,46 @@ public StatePartitionStreamProvider next() { try { if (null == currentStream) { - openStream(); + openCurrentStream(); } currentStream.seek(offset); return new StatePartitionStreamProvider(currentStream); } catch (IOException ioex) { + return new StatePartitionStreamProvider(ioex); } } + } + + abstract static class AbstractStateStreamIterator + implements Iterator { + + protected final Iterator stateHandleIterator; + protected final CloseableRegistry closableRegistry; + + protected H currentStateHandle; + protected FSDataInputStream currentStream; + + public AbstractStateStreamIterator( + Iterator stateHandleIterator, + CloseableRegistry closableRegistry) { + + this.stateHandleIterator = Preconditions.checkNotNull(stateHandleIterator); + this.closableRegistry = Preconditions.checkNotNull(closableRegistry); + } + + protected void openCurrentStream() throws IOException { + FSDataInputStream stream = currentStateHandle.openInputStream(); + closableRegistry.registerClosable(stream); + currentStream = stream; + } + + protected void closeCurrentStream() { + closableRegistry.unregisterClosable(currentStream); + IOUtils.closeQuietly(currentStream); + currentStream = null; + } @Override public void remove() {