Skip to content

Commit

Permalink
Support local activity optimization (#389)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng committed May 21, 2024
1 parent 7321d1f commit bb39b2f
Show file tree
Hide file tree
Showing 34 changed files with 254 additions and 141 deletions.
63 changes: 44 additions & 19 deletions docker-compose/ci-cadence-dependencies.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,47 @@ version: '3.9'

# see .env file for the default value of the environment variables
services:
elasticsearch:
container_name: elasticsearch
environment:
- cluster.routing.allocation.disk.threshold_enabled=true
- cluster.routing.allocation.disk.watermark.low=512mb
- cluster.routing.allocation.disk.watermark.high=256mb
- cluster.routing.allocation.disk.watermark.flood_stage=128mb
- discovery.type=single-node
- ES_JAVA_OPTS=-Xms256m -Xmx256m
- xpack.security.enabled=false
image: elasticsearch:${ELASTICSEARCH_VERSION}
networks:
- testing-network
expose:
- 9200
cassandra:
image: cassandra:4.1.1
image: cassandra:3.11
ports:
- "9042:9042"
networks:
- testing-network
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
networks:
- testing-network
kafka:
image: wurstmeister/kafka:2.12-2.1.1
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
- "MAX_HEAP_SIZE=256M"
- "HEAP_NEWSIZE=128M"
healthcheck:
test: ["CMD", "cqlsh", "-u cassandra", "-p cassandra" ,"-e describe keyspaces"]
interval: 15s
timeout: 30s
retries: 10
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
networks:
- testing-network
- testing-network
cadence:
image: ubercadence/server:v1.2.5-auto-setup
image: ubercadence/server:0.24.0-auto-setup
ports:
- "8000:8000"
- "8001:8001"
Expand All @@ -30,15 +55,15 @@ services:
- "7833:7833"
environment:
- "CASSANDRA_SEEDS=cassandra"
- "PROMETHEUS_ENDPOINT_0=0.0.0.0:8000"
- "PROMETHEUS_ENDPOINT_1=0.0.0.0:8001"
- "PROMETHEUS_ENDPOINT_2=0.0.0.0:8002"
- "PROMETHEUS_ENDPOINT_3=0.0.0.0:8003"
- "DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development.yaml"
- "LOG_LEVEL=debug"
- "DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development_es.yaml"
- "ENABLE_ES=true"
- "ES_SEEDS=elasticsearch"
- "ES_VERSION=v7"
- "KAFKA_SEEDS=kafka"
depends_on:
cassandra:
condition: service_healthy
- cassandra
- kafka
- elasticsearch
networks:
- testing-network
cadence-admin-tools:
Expand All @@ -47,7 +72,7 @@ services:
- cadence
environment:
- CADENCE_CLI_ADDRESS=cadence:7933
image: ubercadence/cli:v1.2.6
image: ubercadence/cli:0.24.0
networks:
- testing-network
stdin_open: true
Expand Down
4 changes: 2 additions & 2 deletions integ/any_command_close_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestAnyCommandCloseWorkflowTemporal(t *testing.T) {
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyCommandCloseWorkflow(t, service.BackendTypeTemporal, nil)
smallWaitForFastTest()
doTestAnyCommandCloseWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig())
doTestAnyCommandCloseWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
}
Expand All @@ -31,7 +31,7 @@ func TestAnyCommandCloseWorkflowCadence(t *testing.T) {
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, nil)
smallWaitForFastTest()
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig())
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand Down
5 changes: 3 additions & 2 deletions integ/any_command_combination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func TestAnyCommandCombinationWorkflowTemporalContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyCommandCombinationWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig())
// TODO not sure why using minimumContinueAsNewConfig(true) will fail
doTestAnyCommandCombinationWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand All @@ -49,7 +50,7 @@ func TestAnyCommandCombinationWorkflowCadenceContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig())
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand Down
4 changes: 2 additions & 2 deletions integ/any_timer_signal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestAnyTimerSignalWorkflowTemporalContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyTimerSignalWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig())
doTestAnyTimerSignalWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
}
Expand All @@ -47,7 +47,7 @@ func TestAnyTimerSignalWorkflowCadenceContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyTimerSignalWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig())
doTestAnyTimerSignalWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand Down
4 changes: 2 additions & 2 deletions integ/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestBasicWorkflowTemporal(t *testing.T) {
doTestBasicWorkflow(t, service.BackendTypeTemporal, nil)
smallWaitForFastTest()

doTestBasicWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig())
doTestBasicWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true))
smallWaitForFastTest()

doTestBasicWorkflow(t, service.BackendTypeTemporal, &iwfidl.WorkflowConfig{
Expand All @@ -39,7 +39,7 @@ func TestBasicWorkflowCadence(t *testing.T) {
for i := 0; i < *repeatIntegTest; i++ {
doTestBasicWorkflow(t, service.BackendTypeCadence, nil)
smallWaitForFastTest()
doTestBasicWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig())
doTestBasicWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand Down
13 changes: 9 additions & 4 deletions integ/conditional_close_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func TestConditionalForceCompleteOnInternalChannelEmptyWorkflowTemporalContinueA
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig())
// TODO not sure why using minimumContinueAsNewConfig(true) will fail
doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand All @@ -47,17 +48,21 @@ func TestConditionalForceCompleteOnInternalChannelEmptyWorkflowCadenceContinueAs
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig())
doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}

func doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) {
func doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(
t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig,
) {
doTestConditionalForceCompleteOnChannelEmptyWorkflow(t, backendType, config, false)
doTestConditionalForceCompleteOnChannelEmptyWorkflow(t, backendType, config, true)
}

func doTestConditionalForceCompleteOnChannelEmptyWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig, useSignalChannel bool) {
func doTestConditionalForceCompleteOnChannelEmptyWorkflow(
t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig, useSignalChannel bool,
) {
assertions := assert.New(t)
// start test workflow server
wfHandler := conditionalClose.NewHandler()
Expand Down
4 changes: 2 additions & 2 deletions integ/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestCreateWorkflowTemporalContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestCreateWithoutStartingState(t, service.BackendTypeTemporal, minimumContinueAsNewConfig())
doTestCreateWithoutStartingState(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
}
Expand All @@ -46,7 +46,7 @@ func TestCreateWorkflowCadenceContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestCreateWithoutStartingState(t, service.BackendTypeCadence, minimumContinueAsNewConfig())
doTestCreateWithoutStartingState(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand Down
4 changes: 2 additions & 2 deletions integ/deadend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestDeadEndWorkflowTemporalContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestDeadEndWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig())
doTestDeadEndWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
}
Expand All @@ -46,7 +46,7 @@ func TestDeadEndWorkflowCadenceContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestDeadEndWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig())
doTestDeadEndWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand Down
4 changes: 2 additions & 2 deletions integ/interstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestInterStateWorkflowTemporal(t *testing.T) {
for i := 0; i < *repeatIntegTest; i++ {
doTestInterStateWorkflow(t, service.BackendTypeTemporal, nil)
smallWaitForFastTest()
doTestInterStateWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig())
doTestInterStateWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
}
Expand All @@ -31,7 +31,7 @@ func TestInterStateWorkflowCadence(t *testing.T) {
for i := 0; i < *repeatIntegTest; i++ {
doTestInterStateWorkflow(t, service.BackendTypeCadence, nil)
smallWaitForFastTest()
doTestInterStateWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig())
doTestInterStateWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand Down
5 changes: 3 additions & 2 deletions integ/locking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ func TestLockingWorkflowTemporalContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestLockingWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig())
// TODO not sure why using minimumContinueAsNewConfig(true) will fail
doTestLockingWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand All @@ -51,7 +52,7 @@ func TestLockingWorkflowCadenceContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestLockingWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig())
doTestLockingWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand Down
17 changes: 17 additions & 0 deletions integ/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log"
"os"
"regexp"
"testing"
"time"

Expand All @@ -20,6 +21,22 @@ func TestMain(m *testing.M) {
flag.Parse()
var err error

if len(os.Args) > 0 {
lastArg := os.Args[len(os.Args)-1]
// check if lastArg is the regex pattern of ^\QTest.*Temporal.*\E$
matched, err := regexp.MatchString(`Test.*Temporal.*`, lastArg)
if err == nil && matched {
*temporalIntegTest = true
*cadenceIntegTest = false
} else {
matched, err := regexp.MatchString(`Test.*Cadence.*`, lastArg)
if err == nil && matched {
*temporalIntegTest = false
*cadenceIntegTest = true
}
}
}

fmt.Println("*temporalIntegTest, *cadenceIntegTest", *temporalIntegTest, *cadenceIntegTest)

if *temporalIntegTest {
Expand Down
4 changes: 2 additions & 2 deletions integ/parallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestParallelWorkflowTemporal(t *testing.T) {
for i := 0; i < *repeatIntegTest; i++ {
doTestParallelWorkflow(t, service.BackendTypeTemporal, nil)
smallWaitForFastTest()
doTestParallelWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig())
doTestParallelWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
}
Expand All @@ -31,7 +31,7 @@ func TestParallelWorkflowCadence(t *testing.T) {
for i := 0; i < *repeatIntegTest; i++ {
doTestParallelWorkflow(t, service.BackendTypeCadence, nil)
smallWaitForFastTest()
doTestParallelWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig())
doTestParallelWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand Down
8 changes: 4 additions & 4 deletions integ/persistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestPersistenceWorkflowTemporalContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestPersistenceWorkflow(t, service.BackendTypeTemporal, false, false, minimumContinueAsNewConfig())
doTestPersistenceWorkflow(t, service.BackendTypeTemporal, false, false, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
}
Expand All @@ -61,7 +61,7 @@ func TestPersistenceWorkflowTemporalContinueAsNewWithMemo(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestPersistenceWorkflow(t, service.BackendTypeTemporal, true, false, minimumContinueAsNewConfig())
doTestPersistenceWorkflow(t, service.BackendTypeTemporal, true, false, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
}
Expand All @@ -71,7 +71,7 @@ func TestPersistenceWorkflowTemporalContinueAsNewWithMemoAndEncryption(t *testin
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestPersistenceWorkflow(t, service.BackendTypeTemporal, true, true, minimumContinueAsNewConfig())
doTestPersistenceWorkflow(t, service.BackendTypeTemporal, true, true, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
}
Expand All @@ -91,7 +91,7 @@ func TestPersistenceWorkflowCadenceContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestPersistenceWorkflow(t, service.BackendTypeCadence, false, false, minimumContinueAsNewConfig())
doTestPersistenceWorkflow(t, service.BackendTypeCadence, false, false, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand Down
Loading

0 comments on commit bb39b2f

Please sign in to comment.