Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support local activity optimization #389

Merged
merged 13 commits into from
May 21, 2024
63 changes: 44 additions & 19 deletions docker-compose/ci-cadence-dependencies.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,47 @@ version: '3.9'

# see .env file for the default value of the environment variables
services:
elasticsearch:
container_name: elasticsearch
environment:
- cluster.routing.allocation.disk.threshold_enabled=true
- cluster.routing.allocation.disk.watermark.low=512mb
- cluster.routing.allocation.disk.watermark.high=256mb
- cluster.routing.allocation.disk.watermark.flood_stage=128mb
- discovery.type=single-node
- ES_JAVA_OPTS=-Xms256m -Xmx256m
- xpack.security.enabled=false
image: elasticsearch:${ELASTICSEARCH_VERSION}
networks:
- testing-network
expose:
- 9200
cassandra:
image: cassandra:4.1.1
image: cassandra:3.11
ports:
- "9042:9042"
networks:
- testing-network
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
networks:
- testing-network
kafka:
image: wurstmeister/kafka:2.12-2.1.1
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
- "MAX_HEAP_SIZE=256M"
- "HEAP_NEWSIZE=128M"
healthcheck:
test: ["CMD", "cqlsh", "-u cassandra", "-p cassandra" ,"-e describe keyspaces"]
interval: 15s
timeout: 30s
retries: 10
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
networks:
- testing-network
- testing-network
cadence:
image: ubercadence/server:v1.2.5-auto-setup
image: ubercadence/server:0.24.0-auto-setup
ports:
- "8000:8000"
- "8001:8001"
Expand All @@ -30,15 +55,15 @@ services:
- "7833:7833"
environment:
- "CASSANDRA_SEEDS=cassandra"
- "PROMETHEUS_ENDPOINT_0=0.0.0.0:8000"
- "PROMETHEUS_ENDPOINT_1=0.0.0.0:8001"
- "PROMETHEUS_ENDPOINT_2=0.0.0.0:8002"
- "PROMETHEUS_ENDPOINT_3=0.0.0.0:8003"
- "DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development.yaml"
- "LOG_LEVEL=debug"
- "DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development_es.yaml"
- "ENABLE_ES=true"
- "ES_SEEDS=elasticsearch"
- "ES_VERSION=v7"
- "KAFKA_SEEDS=kafka"
depends_on:
cassandra:
condition: service_healthy
- cassandra
- kafka
- elasticsearch
networks:
- testing-network
cadence-admin-tools:
Expand All @@ -47,7 +72,7 @@ services:
- cadence
environment:
- CADENCE_CLI_ADDRESS=cadence:7933
image: ubercadence/cli:v1.2.6
image: ubercadence/cli:0.24.0
networks:
- testing-network
stdin_open: true
Expand Down
4 changes: 2 additions & 2 deletions integ/any_command_close_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestAnyCommandCloseWorkflowTemporal(t *testing.T) {
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyCommandCloseWorkflow(t, service.BackendTypeTemporal, nil)
smallWaitForFastTest()
doTestAnyCommandCloseWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig())
doTestAnyCommandCloseWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
}
Expand All @@ -31,7 +31,7 @@ func TestAnyCommandCloseWorkflowCadence(t *testing.T) {
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, nil)
smallWaitForFastTest()
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig())
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand Down
5 changes: 3 additions & 2 deletions integ/any_command_combination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func TestAnyCommandCombinationWorkflowTemporalContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyCommandCombinationWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig())
// TODO not sure why using minimumContinueAsNewConfig(true) will fail
doTestAnyCommandCombinationWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand All @@ -49,7 +50,7 @@ func TestAnyCommandCombinationWorkflowCadenceContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig())
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand Down
4 changes: 2 additions & 2 deletions integ/any_timer_signal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestAnyTimerSignalWorkflowTemporalContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyTimerSignalWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig())
doTestAnyTimerSignalWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
}
Expand All @@ -47,7 +47,7 @@ func TestAnyTimerSignalWorkflowCadenceContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyTimerSignalWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig())
doTestAnyTimerSignalWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand Down
4 changes: 2 additions & 2 deletions integ/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestBasicWorkflowTemporal(t *testing.T) {
doTestBasicWorkflow(t, service.BackendTypeTemporal, nil)
smallWaitForFastTest()

doTestBasicWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig())
doTestBasicWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true))
smallWaitForFastTest()

doTestBasicWorkflow(t, service.BackendTypeTemporal, &iwfidl.WorkflowConfig{
Expand All @@ -39,7 +39,7 @@ func TestBasicWorkflowCadence(t *testing.T) {
for i := 0; i < *repeatIntegTest; i++ {
doTestBasicWorkflow(t, service.BackendTypeCadence, nil)
smallWaitForFastTest()
doTestBasicWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig())
doTestBasicWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand Down
13 changes: 9 additions & 4 deletions integ/conditional_close_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func TestConditionalForceCompleteOnInternalChannelEmptyWorkflowTemporalContinueA
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig())
// TODO not sure why using minimumContinueAsNewConfig(true) will fail
doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand All @@ -47,17 +48,21 @@ func TestConditionalForceCompleteOnInternalChannelEmptyWorkflowCadenceContinueAs
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig())
doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}

func doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) {
func doTestConditionalForceCompleteOnInternalChannelEmptyWorkflow(
t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig,
) {
doTestConditionalForceCompleteOnChannelEmptyWorkflow(t, backendType, config, false)
doTestConditionalForceCompleteOnChannelEmptyWorkflow(t, backendType, config, true)
}

func doTestConditionalForceCompleteOnChannelEmptyWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig, useSignalChannel bool) {
func doTestConditionalForceCompleteOnChannelEmptyWorkflow(
t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig, useSignalChannel bool,
) {
assertions := assert.New(t)
// start test workflow server
wfHandler := conditionalClose.NewHandler()
Expand Down
4 changes: 2 additions & 2 deletions integ/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestCreateWorkflowTemporalContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestCreateWithoutStartingState(t, service.BackendTypeTemporal, minimumContinueAsNewConfig())
doTestCreateWithoutStartingState(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
}
Expand All @@ -46,7 +46,7 @@ func TestCreateWorkflowCadenceContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestCreateWithoutStartingState(t, service.BackendTypeCadence, minimumContinueAsNewConfig())
doTestCreateWithoutStartingState(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand Down
4 changes: 2 additions & 2 deletions integ/deadend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestDeadEndWorkflowTemporalContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestDeadEndWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig())
doTestDeadEndWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
}
Expand All @@ -46,7 +46,7 @@ func TestDeadEndWorkflowCadenceContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestDeadEndWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig())
doTestDeadEndWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand Down
4 changes: 2 additions & 2 deletions integ/interstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestInterStateWorkflowTemporal(t *testing.T) {
for i := 0; i < *repeatIntegTest; i++ {
doTestInterStateWorkflow(t, service.BackendTypeTemporal, nil)
smallWaitForFastTest()
doTestInterStateWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig())
doTestInterStateWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
}
Expand All @@ -31,7 +31,7 @@ func TestInterStateWorkflowCadence(t *testing.T) {
for i := 0; i < *repeatIntegTest; i++ {
doTestInterStateWorkflow(t, service.BackendTypeCadence, nil)
smallWaitForFastTest()
doTestInterStateWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig())
doTestInterStateWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand Down
5 changes: 3 additions & 2 deletions integ/locking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ func TestLockingWorkflowTemporalContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestLockingWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig())
// TODO not sure why using minimumContinueAsNewConfig(true) will fail
doTestLockingWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand All @@ -51,7 +52,7 @@ func TestLockingWorkflowCadenceContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestLockingWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig())
doTestLockingWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand Down
17 changes: 17 additions & 0 deletions integ/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log"
"os"
"regexp"
"testing"
"time"

Expand All @@ -20,6 +21,22 @@ func TestMain(m *testing.M) {
flag.Parse()
var err error

if len(os.Args) > 0 {
lastArg := os.Args[len(os.Args)-1]
// check if lastArg is the regex pattern of ^\QTest.*Temporal.*\E$
matched, err := regexp.MatchString(`Test.*Temporal.*`, lastArg)
if err == nil && matched {
*temporalIntegTest = true
*cadenceIntegTest = false
} else {
matched, err := regexp.MatchString(`Test.*Cadence.*`, lastArg)
if err == nil && matched {
*temporalIntegTest = false
*cadenceIntegTest = true
}
}
}

fmt.Println("*temporalIntegTest, *cadenceIntegTest", *temporalIntegTest, *cadenceIntegTest)

if *temporalIntegTest {
Expand Down
4 changes: 2 additions & 2 deletions integ/parallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestParallelWorkflowTemporal(t *testing.T) {
for i := 0; i < *repeatIntegTest; i++ {
doTestParallelWorkflow(t, service.BackendTypeTemporal, nil)
smallWaitForFastTest()
doTestParallelWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig())
doTestParallelWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
}
Expand All @@ -31,7 +31,7 @@ func TestParallelWorkflowCadence(t *testing.T) {
for i := 0; i < *repeatIntegTest; i++ {
doTestParallelWorkflow(t, service.BackendTypeCadence, nil)
smallWaitForFastTest()
doTestParallelWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig())
doTestParallelWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand Down
8 changes: 4 additions & 4 deletions integ/persistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestPersistenceWorkflowTemporalContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestPersistenceWorkflow(t, service.BackendTypeTemporal, false, false, minimumContinueAsNewConfig())
doTestPersistenceWorkflow(t, service.BackendTypeTemporal, false, false, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
}
Expand All @@ -61,7 +61,7 @@ func TestPersistenceWorkflowTemporalContinueAsNewWithMemo(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestPersistenceWorkflow(t, service.BackendTypeTemporal, true, false, minimumContinueAsNewConfig())
doTestPersistenceWorkflow(t, service.BackendTypeTemporal, true, false, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
}
Expand All @@ -71,7 +71,7 @@ func TestPersistenceWorkflowTemporalContinueAsNewWithMemoAndEncryption(t *testin
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestPersistenceWorkflow(t, service.BackendTypeTemporal, true, true, minimumContinueAsNewConfig())
doTestPersistenceWorkflow(t, service.BackendTypeTemporal, true, true, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
}
Expand All @@ -91,7 +91,7 @@ func TestPersistenceWorkflowCadenceContinueAsNew(t *testing.T) {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestPersistenceWorkflow(t, service.BackendTypeCadence, false, false, minimumContinueAsNewConfig())
doTestPersistenceWorkflow(t, service.BackendTypeCadence, false, false, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}
Expand Down
Loading
Loading