Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,15 @@ public boolean getTransactionalStateCheckpointEnabled() {
}

public boolean getTransactionalStateRestoreEnabled() {
return getBoolean(TRANSACTIONAL_STATE_RESTORE_ENABLED, DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED);
JobConfig jobConfig = new JobConfig(this);

boolean standByEnabled = jobConfig.getStandbyTasksEnabled();
boolean asyncCommitEnabled = getAsyncCommit();

// TODO remove check of standby enabled when SAMZA-2353 is completed
// TODO remove check of async commit when SAMZA-2505 is completed
// transactional state restore must remain disabled until it is supported in the above use cases
return !standByEnabled && !asyncCommitEnabled && getBoolean(TRANSACTIONAL_STATE_RESTORE_ENABLED, DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED);
}

public boolean getTransactionalStateRetainExistingState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -333,6 +334,30 @@ public void testGetShutdownMs() {
assertEquals(TaskConfig.DEFAULT_TASK_SHUTDOWN_MS, new TaskConfig(new MapConfig()).getShutdownMs());
}

@Test
public void testGetTransactionalStateRestoreEnabled() {
Map<String, String> configMap = new HashMap<>();
configMap.put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true");

// standby and async commit both off; transactional state restore returned as enabled
assertTrue(new TaskConfig(new MapConfig(configMap)).getTransactionalStateRestoreEnabled());

// standby off and async commit on; transactional state restore returned as disabled
configMap.put(TaskConfig.ASYNC_COMMIT, "true");
configMap.put(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, "1");
assertFalse(new TaskConfig(new MapConfig(configMap)).getTransactionalStateRestoreEnabled());

// standby on and async commit off; transactional state restore returned as disabled
configMap.put(TaskConfig.ASYNC_COMMIT, "false");
configMap.put(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, "2");
assertFalse(new TaskConfig(new MapConfig(configMap)).getTransactionalStateRestoreEnabled());

// standby on and async commit on; transactional state restore returned as disabled
configMap.put(TaskConfig.ASYNC_COMMIT, "true");
configMap.put(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, "2");
assertFalse(new TaskConfig(new MapConfig(configMap)).getTransactionalStateRestoreEnabled());
}

/**
* Used for testing classloading a {@link CheckpointManagerFactory}.
*/
Expand Down