From e61c4ffde1c2d0cf18147a9163a508ebd10ba21f Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Mon, 14 May 2018 11:14:47 +0200 Subject: [PATCH 1/3] [FLINK-9355][checkpointing] Simplify configuration of local recovery to a simple on/off switch --- .../configuration/CheckpointingOptions.java | 6 +-- .../runtime/state/LocalRecoveryConfig.java | 52 +++---------------- .../TaskExecutorLocalStateStoresManager.java | 12 ++--- .../state/heap/HeapKeyedStateBackend.java | 3 +- .../taskexecutor/TaskManagerServices.java | 8 +-- .../TaskManagerServicesConfiguration.java | 17 +++--- ...skExecutorLocalStateStoresManagerTest.java | 16 +++--- .../state/TaskLocalStateStoreImplTest.java | 3 +- .../state/TaskStateManagerImplTest.java | 6 +-- .../state/TestLocalRecoveryConfig.java | 2 +- .../NetworkBufferCalculationTest.java | 3 +- .../taskexecutor/TaskExecutorITCase.java | 3 +- .../taskexecutor/TaskExecutorTest.java | 27 +++++----- ...kManagerComponentsStartupShutdownTest.java | 3 +- .../state/RocksDBKeyedStateBackend.java | 17 ++---- .../StreamOperatorSnapshotRestoreTest.java | 4 +- .../tasks/LocalStateForwardingTest.java | 4 +- .../AbstractLocalRecoveryITCase.java | 11 ++-- .../LocalRecoveryHeapITCase.java | 5 +- .../LocalRecoveryRocksDBFullITCase.java | 5 +- ...LocalRecoveryRocksDBIncrementalITCase.java | 5 +- .../ResumeCheckpointManuallyITCase.java | 8 +-- 22 files changed, 71 insertions(+), 149 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java index c6af7dd894bcb..596e8dd65467a 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java @@ -67,11 +67,11 @@ public class CheckpointingOptions { " this option."); /** - * This option configures local recovery for this state backend. + * This option configures local recovery for this state backend. By default, local recovery is deactivated. */ - public static final ConfigOption LOCAL_RECOVERY = ConfigOptions + public static final ConfigOption LOCAL_RECOVERY = ConfigOptions .key("state.backend.local-recovery") - .defaultValue("DISABLED"); + .defaultValue(false); /** * The config parameter defining the root directories for storing file-based state for local recovery. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java index c97fa0b57d450..fc15f5d8f600a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java @@ -18,11 +18,6 @@ package org.apache.flink.runtime.state; -import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.configuration.Configuration; - -import org.slf4j.LoggerFactory; - import javax.annotation.Nonnull; /** @@ -31,57 +26,22 @@ */ public class LocalRecoveryConfig { - /** - * Enum over modes of local recovery: - *

    - *
  • DISABLED: disables local recovery. - *
  • ENABLE_FILE_BASED: enables local recovery in a variant that is based on local files. - *
- */ - public enum LocalRecoveryMode { - DISABLED, - ENABLE_FILE_BASED; - - /** - * Extracts the {@link LocalRecoveryMode} from the given configuration. Defaults to LocalRecoveryMode.DISABLED - * if no configuration value is specified or parsing the value resulted in an exception. - * - * @param configuration the configuration that specifies the value for the local recovery mode. - * @return the local recovery mode as found in the config, or LocalRecoveryMode.DISABLED if no mode was - * configured or the specified mode could not be parsed. - */ - @Nonnull - public static LocalRecoveryMode fromConfig(@Nonnull Configuration configuration) { - String localRecoveryConfString = configuration.getString(CheckpointingOptions.LOCAL_RECOVERY); - try { - return LocalRecoveryConfig.LocalRecoveryMode.valueOf(localRecoveryConfString); - } catch (IllegalArgumentException ex) { - LoggerFactory.getLogger(LocalRecoveryConfig.class).warn( - "Exception while parsing configuration of local recovery mode. Local recovery will be disabled.", - ex); - return LocalRecoveryConfig.LocalRecoveryMode.DISABLED; - } - } - } - /** The local recovery mode. */ - @Nonnull - private final LocalRecoveryMode localRecoveryMode; + private final boolean localRecoveryEnabled; /** Encapsulates the root directories and the subtask-specific path. */ @Nonnull private final LocalRecoveryDirectoryProvider localStateDirectories; public LocalRecoveryConfig( - @Nonnull LocalRecoveryMode localRecoveryMode, + boolean localRecoveryEnabled, @Nonnull LocalRecoveryDirectoryProvider directoryProvider) { - this.localRecoveryMode = localRecoveryMode; + this.localRecoveryEnabled = localRecoveryEnabled; this.localStateDirectories = directoryProvider; } - @Nonnull - public LocalRecoveryMode getLocalRecoveryMode() { - return localRecoveryMode; + public boolean isLocalRecoveryEnabled() { + return localRecoveryEnabled; } @Nonnull @@ -92,7 +52,7 @@ public LocalRecoveryDirectoryProvider getLocalStateDirectoryProvider() { @Override public String toString() { return "LocalRecoveryConfig{" + - "localRecoveryMode=" + localRecoveryMode + + "localRecoveryMode=" + localRecoveryEnabled + ", localStateDirectories=" + localStateDirectories + '}'; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java index 518ad81f2fbc8..4919f8094816f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java @@ -54,7 +54,7 @@ public class TaskExecutorLocalStateStoresManager { private final Map> taskStateStoresByAllocationID; /** The configured mode for local recovery on this task manager. */ - private final LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode; + private final boolean localRecoveryEnabled; /** This is the root directory for all local state of this task manager / executor. */ private final File[] localStateRootDirectories; @@ -71,12 +71,12 @@ public class TaskExecutorLocalStateStoresManager { private boolean closed; public TaskExecutorLocalStateStoresManager( - @Nonnull LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode, + boolean localRecoveryEnabled, @Nonnull File[] localStateRootDirectories, @Nonnull Executor discardExecutor) throws IOException { this.taskStateStoresByAllocationID = new HashMap<>(); - this.localRecoveryMode = localRecoveryMode; + this.localRecoveryEnabled = localRecoveryEnabled; this.localStateRootDirectories = localStateRootDirectories; this.discardExecutor = discardExecutor; this.lock = new Object(); @@ -140,7 +140,7 @@ public TaskLocalStateStore localStateStoreForSubtask( subtaskIndex); LocalRecoveryConfig localRecoveryConfig = - new LocalRecoveryConfig(localRecoveryMode, directoryProvider); + new LocalRecoveryConfig(localRecoveryEnabled, directoryProvider); taskLocalStateStore = new TaskLocalStateStoreImpl( jobId, @@ -217,8 +217,8 @@ public void shutdown() { } @VisibleForTesting - public LocalRecoveryConfig.LocalRecoveryMode getLocalRecoveryMode() { - return localRecoveryMode; + boolean isLocalRecoveryEnabled() { + return localRecoveryEnabled; } @VisibleForTesting diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index a02edb0f7d289..ab91ee175ed3c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -622,8 +622,7 @@ public RunnableFuture> performSnapshot( final SupplierWithException checkpointStreamSupplier = - LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED.equals( - localRecoveryConfig.getLocalRecoveryMode()) ? + localRecoveryConfig.isLocalRecoveryEnabled() ? () -> CheckpointStreamWithResultProvider.createDuplicatingStream( checkpointId, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index ad5e22d33f0ac..bac132488c8b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -42,7 +42,6 @@ import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.KvStateServer; import org.apache.flink.runtime.query.QueryableStateUtils; -import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; @@ -255,7 +254,6 @@ public static TaskManagerServices fromConfiguration( final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); - LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode = taskManagerServicesConfiguration.getLocalRecoveryMode(); final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories(); @@ -265,8 +263,10 @@ public static TaskManagerServices fromConfiguration( stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT); } - final TaskExecutorLocalStateStoresManager taskStateManager = - new TaskExecutorLocalStateStoresManager(localRecoveryMode, stateRootDirectoryFiles, taskIOExecutor); + final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager( + taskManagerServicesConfiguration.isLocalRecovery(), + stateRootDirectoryFiles, + taskIOExecutor); return new TaskManagerServices( taskManagerLocation, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index b80320c3717a7..84326d2ee530f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; @@ -29,7 +30,6 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.util.MathUtils; import org.apache.flink.util.NetUtils; @@ -78,13 +78,13 @@ public class TaskManagerServicesConfiguration { private final long timerServiceShutdownTimeout; - private final LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode; + private final boolean localRecovery; public TaskManagerServicesConfiguration( InetAddress taskManagerAddress, String[] tmpDirPaths, String[] localRecoveryStateRootDirectories, - LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode, + boolean localRecovery, NetworkEnvironmentConfiguration networkConfig, QueryableStateConfiguration queryableStateConfig, int numberOfSlots, @@ -97,7 +97,7 @@ public TaskManagerServicesConfiguration( this.taskManagerAddress = checkNotNull(taskManagerAddress); this.tmpDirPaths = checkNotNull(tmpDirPaths); this.localRecoveryStateRootDirectories = checkNotNull(localRecoveryStateRootDirectories); - this.localRecoveryMode = checkNotNull(localRecoveryMode); + this.localRecovery = checkNotNull(localRecovery); this.networkConfig = checkNotNull(networkConfig); this.queryableStateConfig = checkNotNull(queryableStateConfig); this.numberOfSlots = checkNotNull(numberOfSlots); @@ -128,8 +128,8 @@ public String[] getLocalRecoveryStateRootDirectories() { return localRecoveryStateRootDirectories; } - public LocalRecoveryConfig.LocalRecoveryMode getLocalRecoveryMode() { - return localRecoveryMode; + public boolean isLocalRecovery() { + return localRecovery; } public NetworkEnvironmentConfiguration getNetworkConfig() { @@ -209,8 +209,9 @@ public static TaskManagerServicesConfiguration fromConfiguration( localStateRootDir = tmpDirs; } - LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode = - LocalRecoveryConfig.LocalRecoveryMode.fromConfig(configuration); + boolean localRecoveryMode = configuration.getBoolean( + CheckpointingOptions.LOCAL_RECOVERY.key(), + CheckpointingOptions.LOCAL_RECOVERY.defaultValue()); final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration( configuration, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java index 2e9b107d557e9..97539bdb0f43f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java @@ -62,7 +62,7 @@ public void testCreationFromConfig() throws Exception { config.setString(CheckpointingOptions.LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS, rootDirString); // test configuration of the local state mode - config.setString(CheckpointingOptions.LOCAL_RECOVERY, "ENABLE_FILE_BASED"); + config.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true); final ResourceID tmResourceID = ResourceID.generate(); @@ -88,9 +88,7 @@ public void testCreationFromConfig() throws Exception { } // verify local recovery mode - Assert.assertEquals( - LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, - taskStateManager.getLocalRecoveryMode()); + Assert.assertTrue(taskStateManager.isLocalRecoveryEnabled()); Assert.assertEquals("localState", TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT); for (File rootDirectory : rootDirectories) { @@ -130,9 +128,7 @@ public void testCreationFromConfigDefault() throws Exception { localStateRootDirectories[i]); } - Assert.assertEquals( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, - taskStateManager.getLocalRecoveryMode()); + Assert.assertFalse(taskStateManager.isLocalRecoveryEnabled()); } /** @@ -150,7 +146,7 @@ public void testSubtaskStateStoreDirectoryCreateAndDelete() throws Exception { File[] rootDirs = {temporaryFolder.newFolder(), temporaryFolder.newFolder(), temporaryFolder.newFolder()}; TaskExecutorLocalStateStoresManager storesManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, + true, rootDirs, Executors.directExecutor()); @@ -187,8 +183,8 @@ public void testSubtaskStateStoreDirectoryCreateAndDelete() throws Exception { // test that local recovery mode is forwarded to the created store Assert.assertEquals( - storesManager.getLocalRecoveryMode(), - taskLocalStateStore.getLocalRecoveryConfig().getLocalRecoveryMode()); + storesManager.isLocalRecoveryEnabled(), + taskLocalStateStore.getLocalRecoveryConfig().isLocalRecoveryEnabled()); Assert.assertTrue(testFile.exists()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java index 618320ebc4f39..75317834b183a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java @@ -64,8 +64,7 @@ public void before() throws Exception { LocalRecoveryDirectoryProviderImpl directoryProvider = new LocalRecoveryDirectoryProviderImpl(allocationBaseDirs, jobID, jobVertexID, subtaskIdx); - LocalRecoveryConfig localRecoveryConfig = - new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, directoryProvider); + LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(false, directoryProvider); this.taskLocalStateStore = new TaskLocalStateStoreImpl( jobID, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java index f58f3f442ab25..71038c177214c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java @@ -193,7 +193,7 @@ public void testForwardingSubtaskLocalStateBaseDirFromLocalStateStore() throws I new LocalRecoveryDirectoryProviderImpl(allocBaseDirs, jobID, jobVertexID, 0); LocalRecoveryConfig localRecoveryConfig = - new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, directoryProvider); + new LocalRecoveryConfig(true, directoryProvider); TaskLocalStateStore taskLocalStateStore = new TaskLocalStateStoreImpl(jobID, allocationID, jobVertexID, 13, localRecoveryConfig, directExecutor); @@ -220,8 +220,8 @@ public void testForwardingSubtaskLocalStateBaseDirFromLocalStateStore() throws I } Assert.assertEquals( - localRecoveryConfFromTaskLocalStateStore.getLocalRecoveryMode(), - localRecoveryConfFromTaskStateManager.getLocalRecoveryMode()); + localRecoveryConfFromTaskLocalStateStore.isLocalRecoveryEnabled(), + localRecoveryConfFromTaskStateManager.isLocalRecoveryEnabled()); } finally { tmpFolder.delete(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java index 7801720140294..58affc5d768d8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java @@ -28,7 +28,7 @@ public class TestLocalRecoveryConfig { private static final LocalRecoveryDirectoryProvider INSTANCE = new TestDummyLocalDirectoryProvider(); public static LocalRecoveryConfig disabled() { - return new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, INSTANCE); + return new LocalRecoveryConfig(false, INSTANCE); } public static class TestDummyLocalDirectoryProvider implements LocalRecoveryDirectoryProvider { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java index 2116c2f36c8a0..e88f9da5e7c0e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java @@ -20,7 +20,6 @@ import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.testutils.category.LegacyAndNew; import org.apache.flink.util.TestLogger; @@ -102,7 +101,7 @@ private static TaskManagerServicesConfiguration getTmConfig( InetAddress.getLoopbackAddress(), new String[] {}, new String[] {}, - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, networkConfig, QueryableStateConfiguration.disabled(), 1, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 885d99f616cd9..a740bff713bfd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -50,7 +50,6 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; -import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; @@ -138,7 +137,7 @@ public void testSlotAllocation() throws Exception { new File[]{new File(System.getProperty("java.io.tmpdir"), "localRecovery")}; final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, taskExecutorLocalStateRootDirs, rpcService.getExecutor()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 465619e278e64..7dedb9ab86515 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -59,9 +59,9 @@ import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway; -import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; @@ -75,7 +75,6 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; -import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; @@ -236,7 +235,7 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception { CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId))); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -325,7 +324,7 @@ public void testHeartbeatTimeoutWithResourceManager() throws Exception { HeartbeatServices heartbeatServices = new HeartbeatServices(heartbeatInterval, heartbeatTimeout); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -460,7 +459,7 @@ public HeartbeatManagerImpl answer(InvocationOnMock invocation ); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -545,7 +544,7 @@ public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception { when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -608,7 +607,7 @@ public void testTriggerRegistrationOnLeaderChange() throws Exception { when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -729,7 +728,7 @@ public void testTaskSubmission() throws Exception { when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -832,7 +831,7 @@ public void testJobLeaderDetection() throws Exception { final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -942,7 +941,7 @@ public void testSlotAcceptance() throws Exception { rpc.registerGateway(jobManagerAddress, jobMasterGateway); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -1074,7 +1073,7 @@ public void testSubmitTaskBeforeAcceptSlot() throws Exception { final NetworkEnvironment networkMock = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -1193,7 +1192,7 @@ public void testFilterOutDuplicateJobMasterRegistrations() throws Exception { final JobManagerTable jobManagerTableMock = spy(new JobManagerTable()); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -1267,7 +1266,7 @@ public void testRMHeartbeatStopWhenLeadershipRevoked() throws Exception { rpc.registerGateway(rmAddress, rmGateway); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); @@ -1323,7 +1322,7 @@ public void testRemoveJobFromJobLeaderService() throws Exception { timerService); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, new File[]{tmp.newFolder()}, Executors.directExecutor()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 581d8edcd1018..828993012b8f8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -46,7 +46,6 @@ import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -162,7 +161,7 @@ public void testComponentsStartupShutdown() throws Exception { network.start(); TaskExecutorLocalStateStoresManager storesManager = new TaskExecutorLocalStateStoresManager( - LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + false, ioManager.getSpillingDirectories(), Executors.directExecutor()); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 90d0fc69e5b43..0ec2ef0fd1f52 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -1663,9 +1663,8 @@ public RunnableFuture> performSnapshot( final SupplierWithException supplier = - isWithLocalRecovery( - checkpointOptions.getCheckpointType(), - localRecoveryConfig.getLocalRecoveryMode()) ? + localRecoveryConfig.isLocalRecoveryEnabled() && + (CheckpointType.SAVEPOINT != checkpointOptions.getCheckpointType()) ? () -> CheckpointStreamWithResultProvider.createDuplicatingStream( checkpointId, @@ -1745,14 +1744,6 @@ public SnapshotResult performOperation() throws Exception { primaryStreamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); return AsyncStoppableTaskWithCallback.from(ioCallable); } - - private boolean isWithLocalRecovery( - CheckpointType checkpointType, - LocalRecoveryConfig.LocalRecoveryMode recoveryMode) { - // we use local recovery when it is activated and we are not taking a savepoint. - return LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED == recoveryMode - && CheckpointType.SAVEPOINT != checkpointType; - } } private class IncrementalSnapshotStrategy implements SnapshotStrategy> { @@ -1792,7 +1783,7 @@ public RunnableFuture> performSnapshot( SnapshotDirectory snapshotDirectory; - if (LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED == localRecoveryConfig.getLocalRecoveryMode()) { + if (localRecoveryConfig.isLocalRecoveryEnabled()) { // create a "permanent" snapshot directory for local recovery. LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider(); File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId); @@ -2299,7 +2290,7 @@ private SnapshotResult materializeMetaData() throws Exception CheckpointStreamWithResultProvider streamWithResultProvider = - LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED == localRecoveryConfig.getLocalRecoveryMode() ? + localRecoveryConfig.isLocalRecoveryEnabled() ? CheckpointStreamWithResultProvider.createDuplicatingStream( checkpointId, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java index b2b568e05365e..6d011a3baa1c6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java @@ -146,9 +146,7 @@ private void testOperatorStatesSnapshotRestoreInternal(final int mode) throws Ex new LocalRecoveryDirectoryProviderImpl(temporaryFolder.newFolder(), jobID, jobVertexID, subtaskIdx); LocalRecoveryConfig localRecoveryConfig = - mode != ONLY_JM_RECOVERY ? - new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, directoryProvider) : - new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, directoryProvider); + new LocalRecoveryConfig(mode != ONLY_JM_RECOVERY, directoryProvider); MockEnvironment mockEnvironment = new MockEnvironment( jobID, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java index e35f97c25a186..bc864a2e5d168 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java @@ -177,9 +177,7 @@ public void acknowledgeCheckpoint( jobVertexID, subtaskIdx); - LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig( - LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, - directoryProvider); + LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(true, directoryProvider); TaskLocalStateStore taskLocalStateStore = new TaskLocalStateStoreImpl(jobID, allocationID, jobVertexID, subtaskIdx, localRecoveryConfig, executor) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java index 13040c965549f..4e454d75b2034 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java @@ -28,7 +28,6 @@ import java.io.IOException; -import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode; import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum; /** @@ -40,14 +39,14 @@ public abstract class AbstractLocalRecoveryITCase extends TestLogger { private final StateBackendEnum backendEnum; - private final LocalRecoveryMode recoveryMode; + private final boolean localRecoveryEnabled; @Rule public TestName testName = new TestName(); - AbstractLocalRecoveryITCase(StateBackendEnum backendEnum, LocalRecoveryMode recoveryMode) { + AbstractLocalRecoveryITCase(StateBackendEnum backendEnum, boolean localRecoveryEnabled) { this.backendEnum = backendEnum; - this.recoveryMode = recoveryMode; + this.localRecoveryEnabled = localRecoveryEnabled; } @Test @@ -64,9 +63,9 @@ protected StateBackendEnum getStateBackend() { protected Configuration createClusterConfig() throws IOException { Configuration config = super.createClusterConfig(); - config.setString( + config.setBoolean( CheckpointingOptions.LOCAL_RECOVERY, - recoveryMode.toString()); + localRecoveryEnabled); return config; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java index 2c0c2943c3774..6749366c460cc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.test.checkpointing; -import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED; import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC; /** @@ -26,8 +25,6 @@ */ public class LocalRecoveryHeapITCase extends AbstractLocalRecoveryITCase { public LocalRecoveryHeapITCase() { - super( - FILE_ASYNC, - ENABLE_FILE_BASED); + super(FILE_ASYNC, true); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java index 16bbbfc19a681..2d12ae24c255e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.test.checkpointing; -import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED; import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC; /** @@ -26,8 +25,6 @@ */ public class LocalRecoveryRocksDBFullITCase extends AbstractLocalRecoveryITCase { public LocalRecoveryRocksDBFullITCase() { - super( - ROCKSDB_FULLY_ASYNC, - ENABLE_FILE_BASED); + super(ROCKSDB_FULLY_ASYNC, true); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java index fa8e13971d888..718d4a3dd653e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.test.checkpointing; -import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED; import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK; /** @@ -26,8 +25,6 @@ */ public class LocalRecoveryRocksDBIncrementalITCase extends AbstractLocalRecoveryITCase { public LocalRecoveryRocksDBIncrementalITCase() { - super( - ROCKSDB_INCREMENTAL_ZK, - ENABLE_FILE_BASED); + super(ROCKSDB_INCREMENTAL_ZK, true); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java index 6f5bbacdc4381..aebaa63edab27 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; -import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -253,12 +252,7 @@ private void testExternalizedCheckpoints( config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString()); - - if (localRecovery) { - config.setString( - CheckpointingOptions.LOCAL_RECOVERY, - LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED.toString()); - } + config.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, localRecovery); // ZooKeeper recovery mode? if (zooKeeperQuorum != null) { From e1cf28fcda7858bcc6e292aadb61093173d27020 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Mon, 14 May 2018 15:09:50 +0200 Subject: [PATCH 2/3] test fixes --- .../api/operators/AbstractStreamOperatorTest.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java index 904ff64025c14..f0195a27a24e6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java @@ -495,7 +495,7 @@ public void testSnapshotMethod() throws Exception { final CloseableRegistry closeableRegistry = new CloseableRegistry(); - StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class); + StateSnapshotContextSynchronousImpl context = spy(new StateSnapshotContextSynchronousImpl(0L, 0L)); whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context); @@ -573,7 +573,7 @@ public void testFailingBackendSnapshotMethod() throws Exception { RunnableFuture> futureKeyedStateHandle = mock(RunnableFuture.class); RunnableFuture> futureOperatorStateHandle = mock(RunnableFuture.class); - StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class); + StateSnapshotContextSynchronousImpl context = spy(new StateSnapshotContextSynchronousImpl(checkpointId, timestamp)); when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyedStateHandle); when(context.getOperatorStateStreamFuture()).thenReturn(futureOperatorStateHandle); @@ -582,7 +582,6 @@ public void testFailingBackendSnapshotMethod() throws Exception { whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context); whenNew(OperatorSnapshotFutures.class).withAnyArguments().thenReturn(operatorSnapshotResult); - CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class); StreamTask> containingTask = mock(StreamTask.class); when(containingTask.getCancelables()).thenReturn(closeableRegistry); @@ -600,7 +599,7 @@ public void testFailingBackendSnapshotMethod() throws Exception { when(operatorStateBackend.snapshot( eq(checkpointId), eq(timestamp), - eq(streamFactory), + any(CheckpointStreamFactory.class), any(CheckpointOptions.class))).thenReturn(futureManagedOperatorStateHandle); AbstractKeyedStateBackend keyedStateBackend = mock(AbstractKeyedStateBackend.class); From eaa523aba11aad58257243b3307e1893f5daf617 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Mon, 14 May 2018 15:16:56 +0200 Subject: [PATCH 3/3] added regenerated docs --- docs/_includes/generated/checkpointing_configuration.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/_includes/generated/checkpointing_configuration.html b/docs/_includes/generated/checkpointing_configuration.html index 8b4233f5e89e1..2894c1684c105 100644 --- a/docs/_includes/generated/checkpointing_configuration.html +++ b/docs/_includes/generated/checkpointing_configuration.html @@ -29,7 +29,7 @@
state.backend.local-recovery
- "DISABLED" + false