Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/replication-simulation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ jobs:
scenario:
- activeactive
- activeactive_same_wfid
- activeactive_same_wfid_signalwithstart
- activeactive_same_wfid_signalwithstart_delayed
- activeactive_cron
- activeactive_regional_failover
- activeactive_regional_failover_start_same_wfid
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# This file is used as dynamicconfig override for "activeactive_same_wfid_signalwithstart" replication simulation scenario
# configured via simulation/replication/testdata/replication_simulation_activeactive_same_wfid_signalwithstart.yaml
system.writeVisibilityStoreName:
- value: "db"
system.readVisibilityStoreName:
- value: "db"
history.replicatorTaskBatchSize:
- value: 25
constraints: {}
frontend.failoverCoolDown:
- value: 5s
history.ReplicationTaskProcessorStartWait: # default is 5s. repl task processor sleeps this much before processing received messages.
- value: 10ms
history.standbyTaskMissingEventsResendDelay:
- value: 5s
history.standbyTaskMissingEventsDiscardDelay:
- value: 10s
history.standbyClusterDelay:
- value: 10s
history.enableTransferQueueV2:
- value: true
history.enableTimerQueueV2:
- value: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# This file is used as dynamicconfig override for "activeactive_same_wfid_signalwithstart_delayed" replication simulation scenario
# configured via simulation/replication/testdata/replication_simulation_activeactive_same_wfid_signalwithstart_delayed.yaml
system.writeVisibilityStoreName:
- value: "db"
system.readVisibilityStoreName:
- value: "db"
history.replicatorTaskBatchSize:
- value: 25
constraints: {}
frontend.failoverCoolDown:
- value: 5s
history.ReplicationTaskProcessorStartWait: # default is 5s. repl task processor sleeps this much before processing received messages.
- value: 10ms
history.standbyTaskMissingEventsResendDelay:
- value: 5s
history.standbyTaskMissingEventsDiscardDelay:
- value: 10s
history.standbyClusterDelay:
- value: 10s
history.enableTransferQueueV2:
- value: true
history.enableTimerQueueV2:
- value: true
107 changes: 74 additions & 33 deletions simulation/replication/replication_simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func TestReplicationSimulation(t *testing.T) {
startTime := time.Now().UTC()
simTypes.Logf(t, "Simulation start time: %v", startTime)
for i, op := range simCfg.Operations {
op := op
waitForOpTime(t, op, startTime)
var err error
switch op.Type {
Expand All @@ -100,9 +99,9 @@ func TestReplicationSimulation(t *testing.T) {
case simTypes.ReplicationSimulationOperationValidate:
err = validate(t, op, simCfg, sim)
case simTypes.ReplicationSimulationOperationQueryWorkflow:
err = queryWorkflow(t, op, simCfg)
err = queryWorkflow(t, op, simCfg, sim)
case simTypes.ReplicationSimulationOperationSignalWithStartWorkflow:
err = signalWithStartWorkflow(t, op, simCfg)
err = signalWithStartWorkflow(t, op, simCfg, sim)
case simTypes.ReplicationSimulationOperationMigrateDomainToActiveActive:
err = migrateDomainToActiveActive(t, op, simCfg)
case simTypes.ReplicationSimulationOperationValidateWorkflowReplication:
Expand Down Expand Up @@ -330,7 +329,7 @@ func migrateDomainToActiveActive(t *testing.T, op *simTypes.Operation, simCfg *s
return nil
}

func queryWorkflow(t *testing.T, op *simTypes.Operation, simCfg *simTypes.ReplicationSimulationConfig) error {
func queryWorkflow(t *testing.T, op *simTypes.Operation, simCfg *simTypes.ReplicationSimulationConfig, sim *simTypes.ReplicationSimulation) error {
t.Helper()

simTypes.Logf(t, "Querying workflow: %s on domain %s on cluster: %s", op.WorkflowID, op.Domain, op.Cluster)
Expand All @@ -344,11 +343,22 @@ func queryWorkflow(t *testing.T, op *simTypes.Operation, simCfg *simTypes.Replic
consistencyLevel = types.QueryConsistencyLevelStrong.Ptr()
}

// Prepare workflow execution - use specific RunID if provided via runIDKey
executionRequest := &types.WorkflowExecution{
WorkflowID: op.WorkflowID,
}
if op.RunIDKey != "" {
if runID, err := sim.GetRunID(op.RunIDKey); err == nil && runID != "" {
executionRequest.RunID = runID
simTypes.Logf(t, "Using stored RunID %s for query (key: %s)", runID, op.RunIDKey)
} else {
return fmt.Errorf("runIDKey %s specified but no RunID found in registry", op.RunIDKey)
}
}

queryResp, err := frontendCl.QueryWorkflow(ctx, &types.QueryWorkflowRequest{
Domain: op.Domain,
Execution: &types.WorkflowExecution{
WorkflowID: op.WorkflowID,
},
Domain: op.Domain,
Execution: executionRequest,
Query: &types.WorkflowQuery{
QueryType: op.Query,
},
Expand All @@ -373,6 +383,7 @@ func signalWithStartWorkflow(
t *testing.T,
op *simTypes.Operation,
simCfg *simTypes.ReplicationSimulationConfig,
sim *simTypes.ReplicationSimulation,
) error {
t.Helper()
simTypes.Logf(t, "SignalWithStart workflow: %s on domain %s on cluster: %s", op.WorkflowID, op.Domain, op.Cluster)
Expand All @@ -398,7 +409,15 @@ func signalWithStartWorkflow(
return err
}

simTypes.Logf(t, "SignalWithStart workflow: %s on domain %s on cluster: %s. RunID: %s", op.WorkflowID, op.Domain, op.Cluster, signalResp.GetRunID())
runID := signalResp.GetRunID()
simTypes.Logf(t, "SignalWithStart workflow: %s on domain %s on cluster: %s. RunID: %s", op.WorkflowID, op.Domain, op.Cluster, runID)

// Store RunID if runIDKey is specified
if op.RunIDKey != "" {
sim.StoreRunID(op.RunIDKey, runID)
simTypes.Logf(t, "Stored RunID %s with key: %s", runID, op.RunIDKey)
}

return nil
}

Expand All @@ -415,73 +434,89 @@ func validate(

simTypes.Logf(t, "Validating workflow: %s on cluster: %s", op.WorkflowID, op.Cluster)

consistencyLevel := types.QueryConsistencyLevelEventual.Ptr()
if op.ConsistencyLevel == "strong" {
consistencyLevel = types.QueryConsistencyLevelStrong.Ptr()
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

// Prepare workflow execution - use specific RunID if provided via runIDKey
execution := &types.WorkflowExecution{
executionRequest := &types.WorkflowExecution{
WorkflowID: op.WorkflowID,
}
if op.RunIDKey != "" {
if runID, err := sim.GetRunID(op.RunIDKey); err == nil && runID != "" {
execution.RunID = runID
executionRequest.RunID = runID
simTypes.Logf(t, "Using stored RunID %s for validation (key: %s)", runID, op.RunIDKey)
} else {
return fmt.Errorf("runIDKey %s specified but no RunID found in registry", op.RunIDKey)
}
}

resp, err := simCfg.MustGetFrontendClient(t, op.Cluster).DescribeWorkflowExecution(ctx,
&types.DescribeWorkflowExecutionRequest{
Domain: op.Domain,
Execution: execution,
Domain: op.Domain,
Execution: executionRequest,
QueryConsistencyLevel: consistencyLevel,
})
if err != nil {
return err
}

workflowStatus := resp.GetWorkflowExecutionInfo().GetCloseStatus()
workflowCloseTime := resp.GetWorkflowExecutionInfo().GetCloseTime()
switch op.Want.Status {
case "completed":
// Validate workflow completed
if resp.GetWorkflowExecutionInfo().GetCloseStatus() != types.WorkflowExecutionCloseStatusCompleted || resp.GetWorkflowExecutionInfo().GetCloseTime() == 0 {
return fmt.Errorf("workflow %s not completed. status: %s, close time: %v", op.WorkflowID, resp.GetWorkflowExecutionInfo().GetCloseStatus(), time.Unix(0, resp.GetWorkflowExecutionInfo().GetCloseTime()))
if workflowStatus != types.WorkflowExecutionCloseStatusCompleted || workflowCloseTime == 0 {
return fmt.Errorf("workflow %s not completed. status: %s, close time: %v", op.WorkflowID, workflowStatus, time.Unix(0, workflowCloseTime))
}
case "failed":
// Validate workflow failed
if resp.GetWorkflowExecutionInfo().GetCloseStatus() != types.WorkflowExecutionCloseStatusFailed || resp.GetWorkflowExecutionInfo().GetCloseTime() == 0 {
return fmt.Errorf("workflow %s not failed. status: %s, close time: %v", op.WorkflowID, resp.GetWorkflowExecutionInfo().GetCloseStatus(), time.Unix(0, resp.GetWorkflowExecutionInfo().GetCloseTime()))
if workflowStatus != types.WorkflowExecutionCloseStatusFailed || workflowCloseTime == 0 {
return fmt.Errorf("workflow %s not failed. status: %s, close time: %v", op.WorkflowID, workflowStatus, time.Unix(0, workflowCloseTime))
}
case "canceled":
// Validate workflow canceled
if resp.GetWorkflowExecutionInfo().GetCloseStatus() != types.WorkflowExecutionCloseStatusCanceled || resp.GetWorkflowExecutionInfo().GetCloseTime() == 0 {
return fmt.Errorf("workflow %s not canceled. status: %s, close time: %v", op.WorkflowID, resp.GetWorkflowExecutionInfo().GetCloseStatus(), time.Unix(0, resp.GetWorkflowExecutionInfo().GetCloseTime()))
if workflowStatus != types.WorkflowExecutionCloseStatusCanceled || workflowCloseTime == 0 {
return fmt.Errorf("workflow %s not canceled. status: %s, close time: %v", op.WorkflowID, workflowStatus, time.Unix(0, workflowCloseTime))
}
case "terminated":
// Validate workflow terminated
if resp.GetWorkflowExecutionInfo().GetCloseStatus() != types.WorkflowExecutionCloseStatusTerminated || resp.GetWorkflowExecutionInfo().GetCloseTime() == 0 {
return fmt.Errorf("workflow %s not terminated. status: %s, close time: %v", op.WorkflowID, resp.GetWorkflowExecutionInfo().GetCloseStatus(), time.Unix(0, resp.GetWorkflowExecutionInfo().GetCloseTime()))
if workflowStatus != types.WorkflowExecutionCloseStatusTerminated || workflowCloseTime == 0 {
return fmt.Errorf("workflow %s not terminated. status: %s, close time: %v", op.WorkflowID, workflowStatus, time.Unix(0, workflowCloseTime))
}
case "continued-as-new":
// Validate workflow continued as new
if resp.GetWorkflowExecutionInfo().GetCloseStatus() != types.WorkflowExecutionCloseStatusContinuedAsNew || resp.GetWorkflowExecutionInfo().GetCloseTime() == 0 {
return fmt.Errorf("workflow %s not continued as new. status: %s, close time: %v", op.WorkflowID, resp.GetWorkflowExecutionInfo().GetCloseStatus(), time.Unix(0, resp.GetWorkflowExecutionInfo().GetCloseTime()))
if workflowStatus != types.WorkflowExecutionCloseStatusContinuedAsNew || workflowCloseTime == 0 {
return fmt.Errorf("workflow %s not continued as new. status: %s, close time: %v", op.WorkflowID, workflowStatus, time.Unix(0, workflowCloseTime))
}
case "timed-out":
// Validate workflow timed out
if resp.GetWorkflowExecutionInfo().GetCloseStatus() != types.WorkflowExecutionCloseStatusTimedOut || resp.GetWorkflowExecutionInfo().GetCloseTime() == 0 {
return fmt.Errorf("workflow %s not timed out. status: %s, close time: %v", op.WorkflowID, resp.GetWorkflowExecutionInfo().GetCloseStatus(), time.Unix(0, resp.GetWorkflowExecutionInfo().GetCloseTime()))
if workflowStatus != types.WorkflowExecutionCloseStatusTimedOut || workflowCloseTime == 0 {
return fmt.Errorf("workflow %s not timed out. status: %s, close time: %v", op.WorkflowID, workflowStatus, time.Unix(0, workflowCloseTime))
}
default:
// Validate workflow is running
if resp.GetWorkflowExecutionInfo().GetCloseTime() != 0 {
return fmt.Errorf("workflow %s not running. status: %s, close time: %v", op.WorkflowID, resp.GetWorkflowExecutionInfo().GetCloseStatus(), time.Unix(0, resp.GetWorkflowExecutionInfo().GetCloseTime()))
if workflowCloseTime != 0 {
return fmt.Errorf("workflow %s not running. status: %s, close time: %v", op.WorkflowID, workflowStatus, time.Unix(0, workflowCloseTime))
}
}

simTypes.Logf(t, "Validated workflow: %s on cluster: %s. Status: %s, CloseTime: %v", op.WorkflowID, op.Cluster, resp.GetWorkflowExecutionInfo().GetCloseStatus(), time.Unix(0, resp.GetWorkflowExecutionInfo().GetCloseTime()))

// Get history to validate the worker identity that started and completed the workflow
// Some workflows start in cluster0 and complete in cluster1. This is to validate that
history, err := getAllHistory(t, simCfg, op.Cluster, op.Domain, op.WorkflowID)
var runID string
if op.RunIDKey != "" {
runID, err = sim.GetRunID(op.RunIDKey)
if err != nil {
return err
}
}
history, err := getAllHistory(t, simCfg, op.Cluster, op.Domain, op.WorkflowID, runID)
if err != nil {
return err
}
Expand Down Expand Up @@ -588,17 +623,23 @@ func waitForOpTime(t *testing.T, op *simTypes.Operation, startTime time.Time) {
simTypes.Logf(t, "Operation time (t + %ds) reached: %v", int(op.At.Seconds()), startTime.Add(op.At))
}

func getAllHistory(t *testing.T, simCfg *simTypes.ReplicationSimulationConfig, clusterName, domainName, wfID string) ([]types.HistoryEvent, error) {
func getAllHistory(t *testing.T, simCfg *simTypes.ReplicationSimulationConfig, clusterName, domainName, wfID, runID string) ([]types.HistoryEvent, error) {
frontendCl := simCfg.MustGetFrontendClient(t, clusterName)
var nextPageToken []byte
var history []types.HistoryEvent

executionRequest := &types.WorkflowExecution{
WorkflowID: wfID,
}
if runID != "" {
executionRequest.RunID = runID
}

for {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
response, err := frontendCl.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
Domain: domainName,
Execution: &types.WorkflowExecution{
WorkflowID: wfID,
},
Domain: domainName,
Execution: executionRequest,
MaximumPageSize: 1000,
NextPageToken: nextPageToken,
WaitForNewEvent: false,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# This file is a replication simulation scenario spec.
# It is parsed into ReplicationSimulationConfig struct.
# Replication simulation for this file can be run via ./simulation/replication/run.sh activeactive_same_wfid_signalwithstart
# Dynamic config overrides can be set via config/dynamicconfig/replication_simulation_activeactive_same_wfid_signalwithstart.yml
# When a domain is configured as active-active
# And the same workflow ID is used to SignalWithStart in multiple clusters
# Then the 'earlier' workflow should be terminated
# And the 'later' workflow should complete

clusters:
cluster0:
grpcEndpoint: "cadence-cluster0:7833"
cluster1:
grpcEndpoint: "cadence-cluster1:7833"

# primaryCluster is where domain data is written to and replicates to others. e.g. domain registration
primaryCluster: "cluster0"

domains:
test-domain-aa-conflict:
activeClustersByRegion:
region0: cluster0
region1: cluster1

operations:
- op: signal_with_start_workflow
at: 0s
workflowID: conflict-wf
workflowType: timer-activity-loop-workflow
cluster: cluster0
domain: test-domain-aa-conflict
signalName: custom-signal
signalInput: "cluster0-signal-data"
workflowExecutionStartToCloseTimeout: 65s
workflowDuration: 35s
runIDKey: cluster0-run

- op: signal_with_start_workflow
at: 2s
workflowID: conflict-wf
workflowType: timer-activity-loop-workflow
cluster: cluster1
domain: test-domain-aa-conflict
signalName: custom-signal
signalInput: "cluster1-signal-data"
workflowExecutionStartToCloseTimeout: 65s
workflowDuration: 35s
runIDKey: cluster1-run

# Query the cluster0 run to validate it was started with the correct signal
# Note that if this query is delayed it will eventually see the signal from the 'later' workflow
- op: query_workflow
at: 3s
workflowID: conflict-wf
cluster: cluster0
domain: test-domain-aa-conflict
query: latest-signal-content
runIDKey: cluster0-run
want:
queryResult: ["cluster0-signal-data"]

# Query the cluster1 run to validate it was started with the correct signal
- op: query_workflow
at: 3s
workflowID: conflict-wf
cluster: cluster1
domain: test-domain-aa-conflict
query: latest-signal-content
runIDKey: cluster1-run
want:
queryResult: ["cluster1-signal-data"]

- op: validate
at: 30s
workflowID: conflict-wf
runIDKey: cluster0-run
cluster: cluster0
domain: test-domain-aa-conflict
want:
status: terminated
startedByWorkersInCluster: cluster0
completedByWorkersInCluster: cluster0

- op: validate
at: 30s
workflowID: conflict-wf
runIDKey: cluster1-run
cluster: cluster0
domain: test-domain-aa-conflict
want:
status: running
startedByWorkersInCluster: cluster1

- op: validate
at: 70s
workflowID: conflict-wf
runIDKey: cluster1-run
cluster: cluster0
domain: test-domain-aa-conflict
want:
status: completed
startedByWorkersInCluster: cluster1
completedByWorkersInCluster: cluster1
Loading
Loading