From 8777583d6549a2cf921a29dcab48e55fe35d2ba3 Mon Sep 17 00:00:00 2001 From: Brett Konold Date: Mon, 6 Apr 2020 18:23:15 -0700 Subject: [PATCH] disable transactional state restore in cases where standby or async commit are enabled --- .../org/apache/samza/config/TaskConfig.java | 10 +++++++- .../apache/samza/config/TestTaskConfig.java | 25 +++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java index b02f6c988e..1e48045328 100644 --- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java @@ -310,7 +310,15 @@ public boolean getTransactionalStateCheckpointEnabled() { } public boolean getTransactionalStateRestoreEnabled() { - return getBoolean(TRANSACTIONAL_STATE_RESTORE_ENABLED, DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED); + JobConfig jobConfig = new JobConfig(this); + + boolean standByEnabled = jobConfig.getStandbyTasksEnabled(); + boolean asyncCommitEnabled = getAsyncCommit(); + + // TODO remove check of standby enabled when SAMZA-2353 is completed + // TODO remove check of async commit when SAMZA-2505 is completed + // transactional state restore must remain disabled until it is supported in the above use cases + return !standByEnabled && !asyncCommitEnabled && getBoolean(TRANSACTIONAL_STATE_RESTORE_ENABLED, DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED); } public boolean getTransactionalStateRetainExistingState() { diff --git a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfig.java index f55b5a950d..f6df6f7c7d 100644 --- a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfig.java +++ b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfig.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Optional; import java.util.Set; @@ -333,6 +334,30 @@ public void testGetShutdownMs() { assertEquals(TaskConfig.DEFAULT_TASK_SHUTDOWN_MS, new TaskConfig(new MapConfig()).getShutdownMs()); } + @Test + public void testGetTransactionalStateRestoreEnabled() { + Map configMap = new HashMap<>(); + configMap.put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true"); + + // standby and async commit both off; transactional state restore returned as enabled + assertTrue(new TaskConfig(new MapConfig(configMap)).getTransactionalStateRestoreEnabled()); + + // standby off and async commit on; transactional state restore returned as disabled + configMap.put(TaskConfig.ASYNC_COMMIT, "true"); + configMap.put(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, "1"); + assertFalse(new TaskConfig(new MapConfig(configMap)).getTransactionalStateRestoreEnabled()); + + // standby on and async commit off; transactional state restore returned as disabled + configMap.put(TaskConfig.ASYNC_COMMIT, "false"); + configMap.put(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, "2"); + assertFalse(new TaskConfig(new MapConfig(configMap)).getTransactionalStateRestoreEnabled()); + + // standby on and async commit on; transactional state restore returned as disabled + configMap.put(TaskConfig.ASYNC_COMMIT, "true"); + configMap.put(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, "2"); + assertFalse(new TaskConfig(new MapConfig(configMap)).getTransactionalStateRestoreEnabled()); + } + /** * Used for testing classloading a {@link CheckpointManagerFactory}. */