diff --git a/docker-compose/ci-cadence-dependencies.yml b/docker-compose/ci-cadence-dependencies.yml index 3b3e2f89..b09b23e6 100644 --- a/docker-compose/ci-cadence-dependencies.yml +++ b/docker-compose/ci-cadence-dependencies.yml @@ -2,22 +2,47 @@ version: '3.9' # see .env file for the default value of the environment variables services: + elasticsearch: + container_name: elasticsearch + environment: + - cluster.routing.allocation.disk.threshold_enabled=true + - cluster.routing.allocation.disk.watermark.low=512mb + - cluster.routing.allocation.disk.watermark.high=256mb + - cluster.routing.allocation.disk.watermark.flood_stage=128mb + - discovery.type=single-node + - ES_JAVA_OPTS=-Xms256m -Xmx256m + - xpack.security.enabled=false + image: elasticsearch:${ELASTICSEARCH_VERSION} + networks: + - testing-network + expose: + - 9200 cassandra: - image: cassandra:4.1.1 + image: cassandra:3.11 ports: - "9042:9042" + networks: + - testing-network + zookeeper: + image: wurstmeister/zookeeper:3.4.6 + ports: + - "2181:2181" + networks: + - testing-network + kafka: + image: wurstmeister/kafka:2.12-2.1.1 + depends_on: + - zookeeper + ports: + - "9092:9092" environment: - - "MAX_HEAP_SIZE=256M" - - "HEAP_NEWSIZE=128M" - healthcheck: - test: ["CMD", "cqlsh", "-u cassandra", "-p cassandra" ,"-e describe keyspaces"] - interval: 15s - timeout: 30s - retries: 10 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 networks: - - testing-network + - testing-network cadence: - image: ubercadence/server:v1.2.5-auto-setup + image: ubercadence/server:0.24.0-auto-setup ports: - "8000:8000" - "8001:8001" @@ -30,15 +55,15 @@ services: - "7833:7833" environment: - "CASSANDRA_SEEDS=cassandra" - - "PROMETHEUS_ENDPOINT_0=0.0.0.0:8000" - - "PROMETHEUS_ENDPOINT_1=0.0.0.0:8001" - - "PROMETHEUS_ENDPOINT_2=0.0.0.0:8002" - - "PROMETHEUS_ENDPOINT_3=0.0.0.0:8003" - - "DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development.yaml" - - "LOG_LEVEL=debug" + - "DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development_es.yaml" + - "ENABLE_ES=true" + - "ES_SEEDS=elasticsearch" + - "ES_VERSION=v7" + - "KAFKA_SEEDS=kafka" depends_on: - cassandra: - condition: service_healthy + - cassandra + - kafka + - elasticsearch networks: - testing-network cadence-admin-tools: @@ -47,7 +72,7 @@ services: - cadence environment: - CADENCE_CLI_ADDRESS=cadence:7933 - image: ubercadence/cli:v1.2.6 + image: ubercadence/cli:0.24.0 networks: - testing-network stdin_open: true diff --git a/integ/any_command_close_test.go b/integ/any_command_close_test.go index 016577f3..238200dd 100644 --- a/integ/any_command_close_test.go +++ b/integ/any_command_close_test.go @@ -19,7 +19,7 @@ func TestAnyCommandCloseWorkflowTemporal(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestAnyCommandCloseWorkflow(t, service.BackendTypeTemporal, nil) smallWaitForFastTest() - doTestAnyCommandCloseWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + doTestAnyCommandCloseWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true)) smallWaitForFastTest() } } @@ -31,7 +31,7 @@ func TestAnyCommandCloseWorkflowCadence(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, nil) smallWaitForFastTest() - doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false)) smallWaitForFastTest() } } diff --git a/integ/any_command_combination_test.go b/integ/any_command_combination_test.go index a32a80e5..975f0760 100644 --- a/integ/any_command_combination_test.go +++ b/integ/any_command_combination_test.go @@ -39,7 +39,8 @@ func TestAnyCommandCombinationWorkflowTemporalContinueAsNew(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestAnyCommandCombinationWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + // TODO not sure why using minimumContinueAsNewConfig(true) will fail + doTestAnyCommandCombinationWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(false)) smallWaitForFastTest() } } @@ -49,7 +50,7 @@ func TestAnyCommandCombinationWorkflowCadenceContinueAsNew(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false)) smallWaitForFastTest() } } diff --git a/integ/any_timer_signal_test.go b/integ/any_timer_signal_test.go index 674945e8..ee2c2810 100644 --- a/integ/any_timer_signal_test.go +++ b/integ/any_timer_signal_test.go @@ -37,7 +37,7 @@ func TestAnyTimerSignalWorkflowTemporalContinueAsNew(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestAnyTimerSignalWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + doTestAnyTimerSignalWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true)) smallWaitForFastTest() } } @@ -47,7 +47,7 @@ func TestAnyTimerSignalWorkflowCadenceContinueAsNew(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestAnyTimerSignalWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestAnyTimerSignalWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false)) smallWaitForFastTest() } } diff --git a/integ/basic_test.go b/integ/basic_test.go index 7451f8bd..e34a1f7d 100644 --- a/integ/basic_test.go +++ b/integ/basic_test.go @@ -22,7 +22,7 @@ func TestBasicWorkflowTemporal(t *testing.T) { doTestBasicWorkflow(t, service.BackendTypeTemporal, nil) smallWaitForFastTest() - doTestBasicWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + doTestBasicWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true)) smallWaitForFastTest() doTestBasicWorkflow(t, service.BackendTypeTemporal, &iwfidl.WorkflowConfig{ @@ -39,7 +39,7 @@ func TestBasicWorkflowCadence(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestBasicWorkflow(t, service.BackendTypeCadence, nil) smallWaitForFastTest() - doTestBasicWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestBasicWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false)) smallWaitForFastTest() } } diff --git a/integ/conditional_close_test.go b/integ/conditional_close_test.go index 7d824ab9..8d7dc23b 100644 --- a/integ/conditional_close_test.go +++ b/integ/conditional_close_test.go @@ -37,7 +37,8 @@ func TestConditionalForceCompleteOnInternalChannelEmptyWorkflowTemporalContinueA t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + // TODO not sure why using minimumContinueAsNewConfig(true) will fail + doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(false)) smallWaitForFastTest() } } @@ -47,17 +48,21 @@ func TestConditionalForceCompleteOnInternalChannelEmptyWorkflowCadenceContinueAs t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false)) smallWaitForFastTest() } } -func doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) { +func doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow( + t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig, +) { doTestConditionalForceCompleteOnChannelEmptyWorkflow(t, backendType, config, false) doTestConditionalForceCompleteOnChannelEmptyWorkflow(t, backendType, config, true) } -func doTestConditionalForceCompleteOnChannelEmptyWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig, useSignalChannel bool) { +func doTestConditionalForceCompleteOnChannelEmptyWorkflow( + t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig, useSignalChannel bool, +) { assertions := assert.New(t) // start test workflow server wfHandler := conditionalClose.NewHandler() diff --git a/integ/create_test.go b/integ/create_test.go index d28fa715..baf81c4a 100644 --- a/integ/create_test.go +++ b/integ/create_test.go @@ -36,7 +36,7 @@ func TestCreateWorkflowTemporalContinueAsNew(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestCreateWithoutStartingState(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + doTestCreateWithoutStartingState(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true)) smallWaitForFastTest() } } @@ -46,7 +46,7 @@ func TestCreateWorkflowCadenceContinueAsNew(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestCreateWithoutStartingState(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestCreateWithoutStartingState(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false)) smallWaitForFastTest() } } diff --git a/integ/deadend_test.go b/integ/deadend_test.go index 89bad4a7..8ecfa8bf 100644 --- a/integ/deadend_test.go +++ b/integ/deadend_test.go @@ -36,7 +36,7 @@ func TestDeadEndWorkflowTemporalContinueAsNew(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestDeadEndWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + doTestDeadEndWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true)) smallWaitForFastTest() } } @@ -46,7 +46,7 @@ func TestDeadEndWorkflowCadenceContinueAsNew(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestDeadEndWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestDeadEndWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false)) smallWaitForFastTest() } } diff --git a/integ/interstate_test.go b/integ/interstate_test.go index b1c54e82..ca727490 100644 --- a/integ/interstate_test.go +++ b/integ/interstate_test.go @@ -19,7 +19,7 @@ func TestInterStateWorkflowTemporal(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestInterStateWorkflow(t, service.BackendTypeTemporal, nil) smallWaitForFastTest() - doTestInterStateWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + doTestInterStateWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true)) smallWaitForFastTest() } } @@ -31,7 +31,7 @@ func TestInterStateWorkflowCadence(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestInterStateWorkflow(t, service.BackendTypeCadence, nil) smallWaitForFastTest() - doTestInterStateWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestInterStateWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false)) smallWaitForFastTest() } } diff --git a/integ/locking_test.go b/integ/locking_test.go index 45e29018..d968ee94 100644 --- a/integ/locking_test.go +++ b/integ/locking_test.go @@ -31,7 +31,8 @@ func TestLockingWorkflowTemporalContinueAsNew(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestLockingWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + // TODO not sure why using minimumContinueAsNewConfig(true) will fail + doTestLockingWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(false)) smallWaitForFastTest() } } @@ -51,7 +52,7 @@ func TestLockingWorkflowCadenceContinueAsNew(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestLockingWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestLockingWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false)) smallWaitForFastTest() } } diff --git a/integ/main_test.go b/integ/main_test.go index b82f2e4b..88b66fde 100644 --- a/integ/main_test.go +++ b/integ/main_test.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "os" + "regexp" "testing" "time" @@ -20,6 +21,22 @@ func TestMain(m *testing.M) { flag.Parse() var err error + if len(os.Args) > 0 { + lastArg := os.Args[len(os.Args)-1] + // check if lastArg is the regex pattern of ^\QTest.*Temporal.*\E$ + matched, err := regexp.MatchString(`Test.*Temporal.*`, lastArg) + if err == nil && matched { + *temporalIntegTest = true + *cadenceIntegTest = false + } else { + matched, err := regexp.MatchString(`Test.*Cadence.*`, lastArg) + if err == nil && matched { + *temporalIntegTest = false + *cadenceIntegTest = true + } + } + } + fmt.Println("*temporalIntegTest, *cadenceIntegTest", *temporalIntegTest, *cadenceIntegTest) if *temporalIntegTest { diff --git a/integ/parallel_test.go b/integ/parallel_test.go index 48edbd39..6ac51aec 100644 --- a/integ/parallel_test.go +++ b/integ/parallel_test.go @@ -19,7 +19,7 @@ func TestParallelWorkflowTemporal(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestParallelWorkflow(t, service.BackendTypeTemporal, nil) smallWaitForFastTest() - doTestParallelWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + doTestParallelWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true)) smallWaitForFastTest() } } @@ -31,7 +31,7 @@ func TestParallelWorkflowCadence(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestParallelWorkflow(t, service.BackendTypeCadence, nil) smallWaitForFastTest() - doTestParallelWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestParallelWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false)) smallWaitForFastTest() } } diff --git a/integ/persistence_test.go b/integ/persistence_test.go index 65b76565..1d4d3e16 100644 --- a/integ/persistence_test.go +++ b/integ/persistence_test.go @@ -51,7 +51,7 @@ func TestPersistenceWorkflowTemporalContinueAsNew(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestPersistenceWorkflow(t, service.BackendTypeTemporal, false, false, minimumContinueAsNewConfig()) + doTestPersistenceWorkflow(t, service.BackendTypeTemporal, false, false, minimumContinueAsNewConfig(true)) smallWaitForFastTest() } } @@ -61,7 +61,7 @@ func TestPersistenceWorkflowTemporalContinueAsNewWithMemo(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestPersistenceWorkflow(t, service.BackendTypeTemporal, true, false, minimumContinueAsNewConfig()) + doTestPersistenceWorkflow(t, service.BackendTypeTemporal, true, false, minimumContinueAsNewConfig(true)) smallWaitForFastTest() } } @@ -71,7 +71,7 @@ func TestPersistenceWorkflowTemporalContinueAsNewWithMemoAndEncryption(t *testin t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestPersistenceWorkflow(t, service.BackendTypeTemporal, true, true, minimumContinueAsNewConfig()) + doTestPersistenceWorkflow(t, service.BackendTypeTemporal, true, true, minimumContinueAsNewConfig(true)) smallWaitForFastTest() } } @@ -91,7 +91,7 @@ func TestPersistenceWorkflowCadenceContinueAsNew(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestPersistenceWorkflow(t, service.BackendTypeCadence, false, false, minimumContinueAsNewConfig()) + doTestPersistenceWorkflow(t, service.BackendTypeCadence, false, false, minimumContinueAsNewConfig(false)) smallWaitForFastTest() } } diff --git a/integ/rpc_test.go b/integ/rpc_test.go index d5ea03f8..23add52c 100644 --- a/integ/rpc_test.go +++ b/integ/rpc_test.go @@ -29,7 +29,7 @@ func TestRpcWorkflowTemporalContinueAsNew(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestRpcWorkflow(t, service.BackendTypeTemporal, false, false, minimumContinueAsNewConfig()) + doTestRpcWorkflow(t, service.BackendTypeTemporal, false, false, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } @@ -59,7 +59,7 @@ func TestRpcWorkflowTemporalContinueAsNewWithMemo(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestRpcWorkflow(t, service.BackendTypeTemporal, true, false, minimumContinueAsNewConfig()) + doTestRpcWorkflow(t, service.BackendTypeTemporal, true, false, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } @@ -69,7 +69,7 @@ func TestRpcWorkflowTemporalContinueAsNewWithMemoAndEncryption(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestRpcWorkflow(t, service.BackendTypeTemporal, true, true, minimumContinueAsNewConfig()) + doTestRpcWorkflow(t, service.BackendTypeTemporal, true, true, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } @@ -89,12 +89,14 @@ func TestRpcWorkflowCadenceContinueAsNew(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestRpcWorkflow(t, service.BackendTypeCadence, false, false, minimumContinueAsNewConfig()) + doTestRpcWorkflow(t, service.BackendTypeCadence, false, false, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } -func doTestRpcWorkflow(t *testing.T, backendType service.BackendType, useMemo, memoEncryption bool, config *iwfidl.WorkflowConfig) { +func doTestRpcWorkflow( + t *testing.T, backendType service.BackendType, useMemo, memoEncryption bool, config *iwfidl.WorkflowConfig, +) { assertions := assert.New(t) // start test workflow server wfHandler := rpc.NewHandler() diff --git a/integ/signal_test.go b/integ/signal_test.go index 927c3c57..8f041416 100644 --- a/integ/signal_test.go +++ b/integ/signal_test.go @@ -22,7 +22,7 @@ func TestSignalWorkflowTemporal(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestSignalWorkflow(t, service.BackendTypeTemporal, nil) smallWaitForFastTest() - doTestSignalWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + doTestSignalWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } @@ -34,7 +34,7 @@ func TestSignalWorkflowCadence(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestSignalWorkflow(t, service.BackendTypeCadence, nil) smallWaitForFastTest() - doTestSignalWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestSignalWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } diff --git a/integ/skip_start_test.go b/integ/skip_start_test.go index 5119a380..cffdc872 100644 --- a/integ/skip_start_test.go +++ b/integ/skip_start_test.go @@ -28,7 +28,7 @@ func TestSkipStartWorkflowTemporalContinueAsNew(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestSkipStartWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + doTestSkipStartWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } @@ -48,7 +48,7 @@ func TestSkipStartWorkflowCadenceContinueAsNew(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestSkipStartWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestSkipStartWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } diff --git a/integ/timer_test.go b/integ/timer_test.go index 6bb3515d..d0480acd 100644 --- a/integ/timer_test.go +++ b/integ/timer_test.go @@ -39,7 +39,7 @@ func TestTimerWorkflowTemporalContinueAsNew(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestTimerWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + doTestTimerWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } @@ -49,7 +49,7 @@ func TestTimerWorkflowCadenceContinueAsNew(t *testing.T) { t.Skip() } for i := 0; i < *repeatIntegTest; i++ { - doTestTimerWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestTimerWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } diff --git a/integ/util.go b/integ/util.go index a32f5244..1604e4d8 100644 --- a/integ/util.go +++ b/integ/util.go @@ -194,12 +194,17 @@ func smallWaitForFastTest() { time.Sleep(du) } -func minimumContinueAsNewConfig() *iwfidl.WorkflowConfig { +func minimumContinueAsNewConfig(optimizeActivity bool) *iwfidl.WorkflowConfig { return &iwfidl.WorkflowConfig{ ContinueAsNewThreshold: iwfidl.PtrInt32(1), + OptimizeActivity: iwfidl.PtrBool(optimizeActivity), } } +func minimumContinueAsNewConfigV0() *iwfidl.WorkflowConfig { + return minimumContinueAsNewConfig(false) +} + func getBackendTypes() []service.BackendType { backendTypesToTest := []service.BackendType{} diff --git a/integ/wf_force_fail_test.go b/integ/wf_force_fail_test.go index 4a9f1c37..d8b0ecde 100644 --- a/integ/wf_force_fail_test.go +++ b/integ/wf_force_fail_test.go @@ -19,7 +19,7 @@ func TestWorkflowForceFailTemporal(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestWorkflowForceFail(t, service.BackendTypeTemporal, nil) smallWaitForFastTest() - doTestWorkflowForceFail(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + doTestWorkflowForceFail(t, service.BackendTypeTemporal, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } @@ -31,7 +31,7 @@ func TestWorkflowForceFailCadence(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestWorkflowForceFail(t, service.BackendTypeCadence, nil) smallWaitForFastTest() - doTestWorkflowForceFail(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestWorkflowForceFail(t, service.BackendTypeCadence, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } diff --git a/integ/wf_state_api_fail_test.go b/integ/wf_state_api_fail_test.go index 43f19aec..84e04071 100644 --- a/integ/wf_state_api_fail_test.go +++ b/integ/wf_state_api_fail_test.go @@ -20,7 +20,7 @@ func TestStateApiFailTemporal(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestStateApiFail(t, service.BackendTypeTemporal, nil) smallWaitForFastTest() - doTestStateApiFail(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + doTestStateApiFail(t, service.BackendTypeTemporal, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } @@ -32,7 +32,7 @@ func TestStateApiFailCadence(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestStateApiFail(t, service.BackendTypeCadence, nil) smallWaitForFastTest() - doTestStateApiFail(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestStateApiFail(t, service.BackendTypeCadence, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } diff --git a/integ/wf_state_api_timeout_test.go b/integ/wf_state_api_timeout_test.go index 6d6f5f22..5e76cbf4 100644 --- a/integ/wf_state_api_timeout_test.go +++ b/integ/wf_state_api_timeout_test.go @@ -20,7 +20,7 @@ func TestStateApiTimeoutTemporal(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestStateApiTimeout(t, service.BackendTypeTemporal, nil) smallWaitForFastTest() - doTestStateApiTimeout(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + doTestStateApiTimeout(t, service.BackendTypeTemporal, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } @@ -32,7 +32,7 @@ func TestStateApiTimeoutCadence(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestStateApiTimeout(t, service.BackendTypeCadence, nil) smallWaitForFastTest() - doTestStateApiTimeout(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestStateApiTimeout(t, service.BackendTypeCadence, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } diff --git a/integ/wf_state_execute_api_fail_and_proceed_test.go b/integ/wf_state_execute_api_fail_and_proceed_test.go index f36b35a4..81b7ddce 100644 --- a/integ/wf_state_execute_api_fail_and_proceed_test.go +++ b/integ/wf_state_execute_api_fail_and_proceed_test.go @@ -20,7 +20,7 @@ func TestStateExecuteApiFailAndProceedTemporal(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestStateExecuteApiFailAndProceed(t, service.BackendTypeTemporal, nil) smallWaitForFastTest() - doTestStateExecuteApiFailAndProceed(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + doTestStateExecuteApiFailAndProceed(t, service.BackendTypeTemporal, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } @@ -32,7 +32,7 @@ func TestStateExecuteApiFailAndProceedCadence(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestStateExecuteApiFailAndProceed(t, service.BackendTypeCadence, nil) smallWaitForFastTest() - doTestStateExecuteApiFailAndProceed(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestStateExecuteApiFailAndProceed(t, service.BackendTypeCadence, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } diff --git a/integ/wf_state_wait_until_api_fail_and_proceed_test.go b/integ/wf_state_wait_until_api_fail_and_proceed_test.go index b8ac4a8f..9913d2c3 100644 --- a/integ/wf_state_wait_until_api_fail_and_proceed_test.go +++ b/integ/wf_state_wait_until_api_fail_and_proceed_test.go @@ -20,7 +20,7 @@ func TestStateApiFailAndProceedTemporal(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestStateApiFailAndProceed(t, service.BackendTypeTemporal, nil) smallWaitForFastTest() - doTestStateApiFailAndProceed(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + doTestStateApiFailAndProceed(t, service.BackendTypeTemporal, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } @@ -32,7 +32,7 @@ func TestStateApiFailAndProceedCadence(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestStateApiFailAndProceed(t, service.BackendTypeCadence, nil) smallWaitForFastTest() - doTestStateApiFailAndProceed(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestStateApiFailAndProceed(t, service.BackendTypeCadence, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } diff --git a/integ/wf_stop_test.go b/integ/wf_stop_test.go index 0aec8b21..dd2c8586 100644 --- a/integ/wf_stop_test.go +++ b/integ/wf_stop_test.go @@ -20,17 +20,17 @@ func TestWorkflowCanceledTemporal(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestWorkflowCanceled(t, service.BackendTypeTemporal, nil) smallWaitForFastTest() - doTestWorkflowCanceled(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + doTestWorkflowCanceled(t, service.BackendTypeTemporal, minimumContinueAsNewConfigV0()) smallWaitForFastTest() doTestWorkflowTerminated(t, service.BackendTypeTemporal, nil) smallWaitForFastTest() - doTestWorkflowTerminated(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + doTestWorkflowTerminated(t, service.BackendTypeTemporal, minimumContinueAsNewConfigV0()) smallWaitForFastTest() doTestWorkflowFail(t, service.BackendTypeTemporal, nil) smallWaitForFastTest() - doTestWorkflowFail(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + doTestWorkflowFail(t, service.BackendTypeTemporal, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } @@ -42,17 +42,17 @@ func TestWorkflowCanceledCadence(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestWorkflowCanceled(t, service.BackendTypeCadence, nil) smallWaitForFastTest() - doTestWorkflowCanceled(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestWorkflowCanceled(t, service.BackendTypeCadence, minimumContinueAsNewConfigV0()) smallWaitForFastTest() doTestWorkflowTerminated(t, service.BackendTypeCadence, nil) smallWaitForFastTest() - doTestWorkflowTerminated(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestWorkflowTerminated(t, service.BackendTypeCadence, minimumContinueAsNewConfigV0()) smallWaitForFastTest() doTestWorkflowFail(t, service.BackendTypeCadence, nil) smallWaitForFastTest() - doTestWorkflowFail(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestWorkflowFail(t, service.BackendTypeCadence, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } diff --git a/integ/wf_timeout_test.go b/integ/wf_timeout_test.go index bc636e06..6d2ff28b 100644 --- a/integ/wf_timeout_test.go +++ b/integ/wf_timeout_test.go @@ -19,7 +19,7 @@ func TestWorkflowTimeoutTemporal(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestWorkflowTimeout(t, service.BackendTypeTemporal, nil) smallWaitForFastTest() - doTestWorkflowTimeout(t, service.BackendTypeTemporal, minimumContinueAsNewConfig()) + doTestWorkflowTimeout(t, service.BackendTypeTemporal, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } @@ -31,7 +31,7 @@ func TestWorkflowTimeoutadence(t *testing.T) { for i := 0; i < *repeatIntegTest; i++ { doTestWorkflowTimeout(t, service.BackendTypeCadence, nil) smallWaitForFastTest() - doTestWorkflowTimeout(t, service.BackendTypeCadence, minimumContinueAsNewConfig()) + doTestWorkflowTimeout(t, service.BackendTypeCadence, minimumContinueAsNewConfigV0()) smallWaitForFastTest() } } diff --git a/service/interpreter/activityImpl.go b/service/interpreter/activityImpl.go index c27ec189..43abf5cb 100644 --- a/service/interpreter/activityImpl.go +++ b/service/interpreter/activityImpl.go @@ -3,11 +3,6 @@ package interpreter import ( "context" "fmt" - "io/ioutil" - "net/http" - "os" - "time" - "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/compatibility" @@ -15,10 +10,15 @@ import ( "github.com/indeedeng/iwf/service/common/rpc" "github.com/indeedeng/iwf/service/common/urlautofix" "github.com/indeedeng/iwf/service/interpreter/env" + "io" + "net/http" + "os" ) // StateStart is Deprecated, will be removed in next release -func StateStart(ctx context.Context, backendType service.BackendType, input service.StateStartActivityInput) (*iwfidl.WorkflowStateStartResponse, error) { +func StateStart( + ctx context.Context, backendType service.BackendType, input service.StateStartActivityInput, +) (*iwfidl.WorkflowStateStartResponse, error) { return StateApiWaitUntil(ctx, backendType, input) } @@ -62,18 +62,14 @@ func StateDecide( ctx context.Context, backendType service.BackendType, input service.StateDecideActivityInput, - shouldSendSignalOnCompletion bool, - timeout time.Duration, ) (*iwfidl.WorkflowStateDecideResponse, error) { - return StateApiExecute(ctx, backendType, input, shouldSendSignalOnCompletion, timeout) + return StateApiExecute(ctx, backendType, input) } func StateApiExecute( ctx context.Context, backendType service.BackendType, input service.StateDecideActivityInput, - _ bool, // no used anymore, keep for compatibility - _ time.Duration, // no used anymore, keep for compatibility ) (*iwfidl.WorkflowStateDecideResponse, error) { provider := getActivityProviderByType(backendType) logger := provider.GetLogger(ctx) @@ -144,7 +140,7 @@ func composeHttpError(provider ActivityProvider, err error, httpResp *http.Respo responseBody := "None" var statusCode int if httpResp != nil { - body, err := ioutil.ReadAll(httpResp.Body) + body, err := io.ReadAll(httpResp.Body) if err != nil { responseBody = "cannot read body from http response" } else { @@ -193,7 +189,9 @@ func checkCommandRequestFromWaitUntilResponse(resp *iwfidl.WorkflowStateStartRes return nil } -func DumpWorkflowInternal(ctx context.Context, backendType service.BackendType, req iwfidl.WorkflowDumpRequest) (*iwfidl.WorkflowDumpResponse, error) { +func DumpWorkflowInternal( + ctx context.Context, backendType service.BackendType, req iwfidl.WorkflowDumpRequest, +) (*iwfidl.WorkflowDumpResponse, error) { provider := getActivityProviderByType(backendType) logger := provider.GetLogger(ctx) logger.Info("DumpWorkflowInternal", "input", req) @@ -217,7 +215,8 @@ func DumpWorkflowInternal(ctx context.Context, backendType service.BackendType, } func InvokeWorkerRpc( - ctx context.Context, backendType service.BackendType, rpcPrep *service.PrepareRpcQueryResponse, req iwfidl.WorkflowRpcRequest, + ctx context.Context, backendType service.BackendType, rpcPrep *service.PrepareRpcQueryResponse, + req iwfidl.WorkflowRpcRequest, ) (*InvokeRpcActivityOutput, error) { provider := getActivityProviderByType(backendType) logger := provider.GetLogger(ctx) diff --git a/service/interpreter/cadence/workflowProvider.go b/service/interpreter/cadence/workflowProvider.go index 0cd69f3a..40b3ac39 100644 --- a/service/interpreter/cadence/workflowProvider.go +++ b/service/interpreter/cadence/workflowProvider.go @@ -181,7 +181,14 @@ func (w *workflowProvider) WithActivityOptions( ScheduleToStartTimeout: unlimited, RetryPolicy: retry.ConvertCadenceActivityRetryPolicy(options.RetryPolicy), }) - return interpreter.NewUnifiedContext(wfCtx2) + + // support local activity optimization + wfCtx3 := workflow.WithLocalActivityOptions(wfCtx2, workflow.LocalActivityOptions{ + // set the LA timeout to 7s to make sure the workflow will not need a heartbeat + ScheduleToCloseTimeout: time.Second * 7, + RetryPolicy: retry.ConvertCadenceActivityRetryPolicy(options.RetryPolicy), + }) + return interpreter.NewUnifiedContext(wfCtx3) } type futureImpl struct { @@ -202,16 +209,25 @@ func (t *futureImpl) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) e } func (w *workflowProvider) ExecuteActivity( + valuePtr interface{}, optimizeByLocalActivity bool, ctx interpreter.UnifiedContext, activity interface{}, args ...interface{}, -) (future interpreter.Future) { +) (err error) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") } - f := workflow.ExecuteActivity(wfCtx, activity, args...) - return &futureImpl{ - future: f, + if optimizeByLocalActivity { + f := workflow.ExecuteLocalActivity(wfCtx, activity, args...) + err = f.Get(wfCtx, valuePtr) + if err != nil { + f = workflow.ExecuteActivity(wfCtx, activity, args...) + return f.Get(wfCtx, valuePtr) + } + return err } + + f := workflow.ExecuteActivity(wfCtx, activity, args...) + return f.Get(wfCtx, valuePtr) } func (w *workflowProvider) Now(ctx interpreter.UnifiedContext) time.Time { diff --git a/service/interpreter/continueAsNewer.go b/service/interpreter/continueAsNewer.go index 7318ac3f..5e3fd51c 100644 --- a/service/interpreter/continueAsNewer.go +++ b/service/interpreter/continueAsNewer.go @@ -27,7 +27,8 @@ type ContinueAsNewer struct { func NewContinueAsNewer( provider WorkflowProvider, interStateChannel *InterStateChannel, signalReceiver *SignalReceiver, stateExecutionCounter *StateExecutionCounter, - persistenceManager *PersistenceManager, stateRequestQueue *StateRequestQueue, collector *OutputCollector, timerProcessor *TimerProcessor, + persistenceManager *PersistenceManager, stateRequestQueue *StateRequestQueue, collector *OutputCollector, + timerProcessor *TimerProcessor, ) *ContinueAsNewer { return &ContinueAsNewer{ provider: provider, @@ -44,7 +45,9 @@ func NewContinueAsNewer( } } -func LoadInternalsFromPreviousRun(ctx UnifiedContext, provider WorkflowProvider, previousRunId string, continueAsNewPageSizeInBytes int32) (*service.ContinueAsNewDumpResponse, error) { +func LoadInternalsFromPreviousRun( + ctx UnifiedContext, provider WorkflowProvider, previousRunId string, continueAsNewPageSizeInBytes int32, +) (*service.ContinueAsNewDumpResponse, error) { activityOptions := ActivityOptions{ StartToCloseTimeout: 5 * time.Second, RetryPolicy: &iwfidl.RetryPolicy{ @@ -71,13 +74,13 @@ func LoadInternalsFromPreviousRun(ctx UnifiedContext, provider WorkflowProvider, pageNum := int32(0) for { var resp iwfidl.WorkflowDumpResponse - err := provider.ExecuteActivity(ctx, DumpWorkflowInternal, provider.GetBackendType(), + err := provider.ExecuteActivity(&resp, false, ctx, DumpWorkflowInternal, provider.GetBackendType(), iwfidl.WorkflowDumpRequest{ WorkflowId: workflowId, WorkflowRunId: previousRunId, PageNum: pageNum, PageSizeInBytes: pageSize, - }).Get(ctx, &resp) + }) if err != nil { return nil, err } @@ -124,8 +127,10 @@ func (c *ContinueAsNewer) SetQueryHandlersForContinueAsNew(ctx UnifiedContext) e } func (c *ContinueAsNewer) AddPotentialStateExecutionToResume( - stateExecutionId string, state iwfidl.StateMovement, stateExecLocals []iwfidl.KeyValue, commandRequest iwfidl.CommandRequest, - completedTimerCommands map[int]service.InternalTimerStatus, completedSignalCommands, completedInterStateChannelCommands map[int]*iwfidl.EncodedObject, + stateExecutionId string, state iwfidl.StateMovement, stateExecLocals []iwfidl.KeyValue, + commandRequest iwfidl.CommandRequest, + completedTimerCommands map[int]service.InternalTimerStatus, + completedSignalCommands, completedInterStateChannelCommands map[int]*iwfidl.EncodedObject, ) { c.StateExecutionToResumeMap[stateExecutionId] = service.StateExecutionResumeInfo{ StateExecutionId: stateExecutionId, diff --git a/service/interpreter/interfaces.go b/service/interpreter/interfaces.go index d6328643..4da3144e 100644 --- a/service/interpreter/interfaces.go +++ b/service/interpreter/interfaces.go @@ -99,7 +99,10 @@ type WorkflowProvider interface { GetPendingThreadNames() map[string]int Await(ctx UnifiedContext, condition func() bool) error WithActivityOptions(ctx UnifiedContext, options ActivityOptions) UnifiedContext - ExecuteActivity(ctx UnifiedContext, activity interface{}, args ...interface{}) (future Future) + ExecuteActivity( + valuePtr interface{}, optimizeByLocalActivity bool, ctx UnifiedContext, activity interface{}, + args ...interface{}, + ) (err error) Now(ctx UnifiedContext) time.Time IsReplaying(ctx UnifiedContext) bool Sleep(ctx UnifiedContext, d time.Duration) (err error) diff --git a/service/interpreter/signalReceiver.go b/service/interpreter/signalReceiver.go index 05f0fb0a..ebd7adf0 100644 --- a/service/interpreter/signalReceiver.go +++ b/service/interpreter/signalReceiver.go @@ -106,7 +106,7 @@ func NewSignalReceiver( } if received { continueAsNewCounter.IncSignalsReceived() - workflowConfiger.SetIfPresent(val.WorkflowConfig) + workflowConfiger.UpdateByAPI(val.WorkflowConfig) } else { // NOTE: continueAsNew will wait for all threads to complete, so we must stop this thread for continueAsNew when no more signals to process return @@ -275,7 +275,7 @@ func (sr *SignalReceiver) DrainAllUnreceivedSignals(ctx UnifiedContext) { val := iwfidl.WorkflowConfigUpdateRequest{} ok := ch.ReceiveAsync(&val) if ok { - sr.workflowConfiger.SetIfPresent(val.WorkflowConfig) + sr.workflowConfiger.UpdateByAPI(val.WorkflowConfig) } else { break } diff --git a/service/interpreter/temporal/workflowProvider.go b/service/interpreter/temporal/workflowProvider.go index 890596f9..cca92913 100644 --- a/service/interpreter/temporal/workflowProvider.go +++ b/service/interpreter/temporal/workflowProvider.go @@ -222,7 +222,14 @@ func (w *workflowProvider) WithActivityOptions( StartToCloseTimeout: options.StartToCloseTimeout, RetryPolicy: retry.ConvertTemporalActivityRetryPolicy(options.RetryPolicy), }) - return interpreter.NewUnifiedContext(wfCtx2) + + // support local activity optimization + wfCtx3 := workflow.WithLocalActivityOptions(wfCtx2, workflow.LocalActivityOptions{ + // set the LA timeout to 7s to make sure the workflow will not need a heartbeat + ScheduleToCloseTimeout: time.Second * 7, + RetryPolicy: retry.ConvertTemporalActivityRetryPolicy(options.RetryPolicy), + }) + return interpreter.NewUnifiedContext(wfCtx3) } type futureImpl struct { @@ -243,16 +250,24 @@ func (t *futureImpl) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) e } func (w *workflowProvider) ExecuteActivity( + valuePtr interface{}, optimizeByLocalActivity bool, ctx interpreter.UnifiedContext, activity interface{}, args ...interface{}, -) (future interpreter.Future) { +) (err error) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") } - f := workflow.ExecuteActivity(wfCtx, activity, args...) - return &futureImpl{ - future: f, + if optimizeByLocalActivity { + f := workflow.ExecuteLocalActivity(wfCtx, activity, args...) + err = f.Get(wfCtx, valuePtr) + if err != nil { + f = workflow.ExecuteActivity(wfCtx, activity, args...) + return f.Get(wfCtx, valuePtr) + } + return err } + f := workflow.ExecuteActivity(wfCtx, activity, args...) + return f.Get(wfCtx, valuePtr) } func (w *workflowProvider) Now(ctx interpreter.UnifiedContext) time.Time { diff --git a/service/interpreter/workflowConfiger.go b/service/interpreter/workflowConfiger.go index 81d1bdfe..21fc3d1d 100644 --- a/service/interpreter/workflowConfiger.go +++ b/service/interpreter/workflowConfiger.go @@ -18,7 +18,11 @@ func (wc *WorkflowConfiger) Get() iwfidl.WorkflowConfig { return wc.config } -func (wc *WorkflowConfiger) SetIfPresent(config iwfidl.WorkflowConfig) { +func (wc *WorkflowConfiger) ShouldOptimizeActivity() bool { + return wc.config.GetOptimizeActivity() +} + +func (wc *WorkflowConfiger) UpdateByAPI(config iwfidl.WorkflowConfig) { if config.DisableSystemSearchAttribute != nil { wc.config.DisableSystemSearchAttribute = config.DisableSystemSearchAttribute } @@ -28,4 +32,7 @@ func (wc *WorkflowConfiger) SetIfPresent(config iwfidl.WorkflowConfig) { if config.ContinueAsNewThreshold != nil { wc.config.ContinueAsNewThreshold = config.ContinueAsNewThreshold } + if config.OptimizeActivity != nil { + wc.config.OptimizeActivity = config.OptimizeActivity + } } diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index 7b0940fd..13e0a802 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -91,7 +91,7 @@ func InterpreterImpl( continueAsNewer = NewContinueAsNewer(provider, interStateChannel, signalReceiver, stateExecutionCounter, persistenceManager, stateRequestQueue, outputCollector, timerProcessor) } - _, err = NewWorkflowUpdater(ctx, provider, persistenceManager, stateRequestQueue, continueAsNewer, continueAsNewCounter, interStateChannel, basicInfo) + _, err = NewWorkflowUpdater(ctx, provider, persistenceManager, stateRequestQueue, continueAsNewer, continueAsNewCounter, workflowConfiger, interStateChannel, basicInfo) if err != nil { return nil, err } @@ -182,7 +182,7 @@ func InterpreterImpl( decision, stateExecStatus, err := executeState( ctx, provider, globalVersioner, basicInfo, stateReq, stateExeId, persistenceManager, interStateChannel, - signalReceiver, timerProcessor, continueAsNewer, continueAsNewCounter, shouldSendSignalOnCompletion) + signalReceiver, timerProcessor, continueAsNewer, continueAsNewCounter, workflowConfiger, shouldSendSignalOnCompletion) if err != nil { // this is the case where stateExecStatus == FailureStateExecutionStatus errToFailWf = err @@ -442,6 +442,7 @@ func executeState( timerProcessor *TimerProcessor, continueAsNewer *ContinueAsNewer, continueAsNewCounter *ContinueAsNewCounter, + configer *WorkflowConfiger, shouldSendSignalOnCompletion bool, ) (*iwfidl.StateDecision, service.StateExecutionStatus, error) { waitUntilApi := StateStart @@ -478,7 +479,7 @@ func executeState( skipStart := compatibility.GetSkipStartApi(&options) if skipStart { return executeStateDecide(ctx, provider, basicInfo, state, stateExeId, persistenceManager, interStateChannel, executionContext, - nil, continueAsNewer, executeApi, stateExecutionLocal, shouldSendSignalOnCompletion) + nil, continueAsNewer, configer, executeApi, stateExecutionLocal, shouldSendSignalOnCompletion) } if isResumeFromContinueAsNew { @@ -501,17 +502,18 @@ func executeState( saLoadingPolicy := state.GetStateOptions().SearchAttributesLoadingPolicy doLoadingPolicy := compatibility.GetDataObjectsLoadingPolicy(state.StateOptions) - errStartApi = provider.ExecuteActivity(ctx, waitUntilApi, provider.GetBackendType(), service.StateStartActivityInput{ - IwfWorkerUrl: basicInfo.IwfWorkerUrl, - Request: iwfidl.WorkflowStateStartRequest{ - Context: executionContext, - WorkflowType: basicInfo.IwfWorkflowType, - WorkflowStateId: state.StateId, - StateInput: state.StateInput, - SearchAttributes: persistenceManager.LoadSearchAttributes(ctx, saLoadingPolicy), - DataObjects: persistenceManager.LoadDataObjects(ctx, doLoadingPolicy), - }, - }).Get(ctx, &startResponse) + errStartApi = provider.ExecuteActivity(&startResponse, configer.ShouldOptimizeActivity(), ctx, + waitUntilApi, provider.GetBackendType(), service.StateStartActivityInput{ + IwfWorkerUrl: basicInfo.IwfWorkerUrl, + Request: iwfidl.WorkflowStateStartRequest{ + Context: executionContext, + WorkflowType: basicInfo.IwfWorkflowType, + WorkflowStateId: state.StateId, + StateInput: state.StateInput, + SearchAttributes: persistenceManager.LoadSearchAttributes(ctx, saLoadingPolicy), + DataObjects: persistenceManager.LoadDataObjects(ctx, doLoadingPolicy), + }, + }) persistenceManager.UnlockPersistence(saLoadingPolicy, doLoadingPolicy) if errStartApi != nil && !shouldProceedOnStartApiError(state) { return nil, service.FailureStateExecutionStatus, convertStateApiActivityError(provider, errStartApi) @@ -697,7 +699,7 @@ func executeState( } return executeStateDecide(ctx, provider, basicInfo, state, stateExeId, persistenceManager, interStateChannel, executionContext, - commandRes, continueAsNewer, executeApi, stateExecutionLocal, shouldSendSignalOnCompletion) + commandRes, continueAsNewer, configer, executeApi, stateExecutionLocal, shouldSendSignalOnCompletion) } func executeStateDecide( ctx UnifiedContext, @@ -710,6 +712,7 @@ func executeStateDecide( executionContext iwfidl.Context, commandRes *iwfidl.CommandResults, continueAsNewer *ContinueAsNewer, + configer *WorkflowConfiger, executeApi interface{}, stateExecutionLocal []iwfidl.KeyValue, shouldSendSignalOnCompletion bool, @@ -731,19 +734,20 @@ func executeStateDecide( ctx = provider.WithActivityOptions(ctx, activityOptions) var decideResponse *iwfidl.WorkflowStateDecideResponse - err = provider.ExecuteActivity(ctx, executeApi, provider.GetBackendType(), service.StateDecideActivityInput{ - IwfWorkerUrl: basicInfo.IwfWorkerUrl, - Request: iwfidl.WorkflowStateDecideRequest{ - Context: executionContext, - WorkflowType: basicInfo.IwfWorkflowType, - WorkflowStateId: state.StateId, - CommandResults: commandRes, - StateLocals: stateExecutionLocal, - SearchAttributes: persistenceManager.LoadSearchAttributes(ctx, saLoadingPolicy), - DataObjects: persistenceManager.LoadDataObjects(ctx, doLoadingPolicy), - StateInput: state.StateInput, - }, - }, false, 0).Get(ctx, &decideResponse) + err = provider.ExecuteActivity(&decideResponse, configer.ShouldOptimizeActivity(), ctx, + executeApi, provider.GetBackendType(), service.StateDecideActivityInput{ + IwfWorkerUrl: basicInfo.IwfWorkerUrl, + Request: iwfidl.WorkflowStateDecideRequest{ + Context: executionContext, + WorkflowType: basicInfo.IwfWorkflowType, + WorkflowStateId: state.StateId, + CommandResults: commandRes, + StateLocals: stateExecutionLocal, + SearchAttributes: persistenceManager.LoadSearchAttributes(ctx, saLoadingPolicy), + DataObjects: persistenceManager.LoadDataObjects(ctx, doLoadingPolicy), + StateInput: state.StateInput, + }, + }) persistenceManager.UnlockPersistence(saLoadingPolicy, doLoadingPolicy) if err == nil && shouldSendSignalOnCompletion && !provider.IsReplaying(ctx) { // NOTE: here uses NOT IsReplaying to signalWithStart, to save an activity for this operation diff --git a/service/interpreter/workflowUpdater.go b/service/interpreter/workflowUpdater.go index f2bd7325..e5c30521 100644 --- a/service/interpreter/workflowUpdater.go +++ b/service/interpreter/workflowUpdater.go @@ -13,12 +13,16 @@ type WorkflowUpdater struct { continueAsNewCounter *ContinueAsNewCounter interStateChannel *InterStateChannel stateRequestQueue *StateRequestQueue + configer *WorkflowConfiger logger UnifiedLogger basicInfo service.BasicInfo } -func NewWorkflowUpdater(ctx UnifiedContext, provider WorkflowProvider, persistenceManager *PersistenceManager, stateRequestQueue *StateRequestQueue, - continueAsNewer *ContinueAsNewer, continueAsNewCounter *ContinueAsNewCounter, interStateChannel *InterStateChannel, basicInfo service.BasicInfo, +func NewWorkflowUpdater( + ctx UnifiedContext, provider WorkflowProvider, persistenceManager *PersistenceManager, + stateRequestQueue *StateRequestQueue, + continueAsNewer *ContinueAsNewer, continueAsNewCounter *ContinueAsNewCounter, configer *WorkflowConfiger, + interStateChannel *InterStateChannel, basicInfo service.BasicInfo, ) (*WorkflowUpdater, error) { updater := &WorkflowUpdater{ persistenceManager: persistenceManager, @@ -26,6 +30,7 @@ func NewWorkflowUpdater(ctx UnifiedContext, provider WorkflowProvider, persisten continueAsNewCounter: continueAsNewCounter, interStateChannel: interStateChannel, stateRequestQueue: stateRequestQueue, + configer: configer, basicInfo: basicInfo, provider: provider, logger: provider.GetLogger(ctx), @@ -37,7 +42,9 @@ func NewWorkflowUpdater(ctx UnifiedContext, provider WorkflowProvider, persisten return updater, nil } -func (u *WorkflowUpdater) handler(ctx UnifiedContext, input iwfidl.WorkflowRpcRequest) (output *HandlerOutput, err error) { +func (u *WorkflowUpdater) handler( + ctx UnifiedContext, input iwfidl.WorkflowRpcRequest, +) (output *HandlerOutput, err error) { u.continueAsNewer.IncreaseInflightOperation() defer u.continueAsNewer.DecreaseInflightOperation() @@ -61,7 +68,8 @@ func (u *WorkflowUpdater) handler(ctx UnifiedContext, input iwfidl.WorkflowRpcRe } ctx = u.provider.WithActivityOptions(ctx, activityOptions) var activityOutput InvokeRpcActivityOutput - err = u.provider.ExecuteActivity(ctx, InvokeWorkerRpc, u.provider.GetBackendType(), rpcPrep, input).Get(ctx, &activityOutput) + err = u.provider.ExecuteActivity(&activityOutput, u.configer.ShouldOptimizeActivity(), ctx, + InvokeWorkerRpc, u.provider.GetBackendType(), rpcPrep, input) u.persistenceManager.UnlockPersistence(input.SearchAttributesLoadingPolicy, input.DataAttributesLoadingPolicy) if err != nil {