Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,8 @@ public class DemoWorkflow extends Workflow {
- The `TestWorkflow` method
- Creating the workflow with input and output.
- API calls. In the example below, these calls start and call the workflow activities.



```go
package main

Expand All @@ -877,8 +878,11 @@ import (
"log"
"time"

"github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/workflow"
"github.com/dapr/durabletask-go/api"
"github.com/dapr/durabletask-go/backend"
"github.com/dapr/durabletask-go/client"
"github.com/dapr/durabletask-go/task"
dapr "github.com/dapr/go-sdk/client"
)

var stage = 0
Expand All @@ -888,110 +892,68 @@ const (
)

func main() {
w, err := workflow.NewWorker()
if err != nil {
log.Fatal(err)
}
registry := task.NewTaskRegistry()

fmt.Println("Worker initialized")

if err := w.RegisterWorkflow(TestWorkflow); err != nil {
if err := registry.AddOrchestrator(TestWorkflow); err != nil {
log.Fatal(err)
}
fmt.Println("TestWorkflow registered")

if err := w.RegisterActivity(TestActivity); err != nil {
if err := registry.AddActivity(TestActivity); err != nil {
log.Fatal(err)
}
fmt.Println("TestActivity registered")

// Start workflow runner
if err := w.Start(); err != nil {
log.Fatal(err)
daprClient, err := dapr.NewClient()
if err != nil {
log.Fatalf("failed to create Dapr client: %v", err)
}
fmt.Println("runner started")

daprClient, err := client.NewClient()
if err != nil {
log.Fatalf("failed to intialise client: %v", err)
client := client.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger())
if err := client.StartWorkItemListener(context.TODO(), registry); err != nil {
log.Fatalf("failed to start work item listener: %v", err)
}
defer daprClient.Close()

fmt.Println("runner started")

ctx := context.Background()

// Start workflow test
respStart, err := daprClient.StartWorkflow(ctx, &client.StartWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
WorkflowName: "TestWorkflow",
Options: nil,
Input: 1,
SendRawInput: false,
})
id, err := client.ScheduleNewOrchestration(ctx, "TestWorkflow", api.WithInput(1))
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
fmt.Printf("workflow started with id: %v\n", respStart.InstanceID)
fmt.Printf("workflow started with id: %v\n", id)

// Pause workflow test
err = daprClient.PauseWorkflow(ctx, &client.PauseWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})

err = client.PurgeOrchestrationState(ctx, id)
if err != nil {
log.Fatalf("failed to pause workflow: %v", err)
}

respGet, err := daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
respGet, err := client.FetchOrchestrationMetadata(ctx, id)
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}

if respGet.RuntimeStatus != workflow.StatusSuspended.String() {
log.Fatalf("workflow not paused: %v", respGet.RuntimeStatus)
}

fmt.Printf("workflow paused\n")
fmt.Printf("workflow paused: %s\n", respGet.RuntimeStatus)

// Resume workflow test
err = daprClient.ResumeWorkflow(ctx, &client.ResumeWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})

err = client.ResumeOrchestration(ctx, id, "")
if err != nil {
log.Fatalf("failed to resume workflow: %v", err)
}
fmt.Printf("workflow running: %s\n", respGet.RuntimeStatus)

respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
respGet, err = client.FetchOrchestrationMetadata(ctx, id)
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}

if respGet.RuntimeStatus != workflow.StatusRunning.String() {
log.Fatalf("workflow not running")
}

fmt.Println("workflow resumed")
fmt.Printf("workflow resumed: %s\n", respGet.RuntimeStatus)

fmt.Printf("stage: %d\n", stage)

// Raise Event Test

err = daprClient.RaiseEventWorkflow(ctx, &client.RaiseEventWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
EventName: "testEvent",
EventData: "testData",
SendRawData: false,
})

err = client.RaiseEvent(ctx, id, "testEvent", api.WithEventPayload("testData"))
if err != nil {
fmt.Printf("failed to raise event: %v", err)
}
Expand All @@ -1002,177 +964,44 @@ func main() {

fmt.Printf("stage: %d\n", stage)

respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
respGet, err = client.FetchOrchestrationMetadata(ctx, id)
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}

fmt.Printf("workflow status: %v\n", respGet.RuntimeStatus)

// Purge workflow test
err = daprClient.PurgeWorkflow(ctx, &client.PurgeWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
err = client.PurgeOrchestrationState(ctx, id)
if err != nil {
log.Fatalf("failed to purge workflow: %v", err)
}

respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil && respGet != nil {
log.Fatal("failed to purge workflow")
}

fmt.Println("workflow purged")

fmt.Printf("stage: %d\n", stage)

// Terminate workflow test
respStart, err = daprClient.StartWorkflow(ctx, &client.StartWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
WorkflowName: "TestWorkflow",
Options: nil,
Input: 1,
SendRawInput: false,
})
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}

fmt.Printf("workflow started with id: %s\n", respStart.InstanceID)

err = daprClient.TerminateWorkflow(ctx, &client.TerminateWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil {
log.Fatalf("failed to terminate workflow: %v", err)
}

respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
if respGet.RuntimeStatus != workflow.StatusTerminated.String() {
log.Fatal("failed to terminate workflow")
}

fmt.Println("workflow terminated")

err = daprClient.PurgeWorkflow(ctx, &client.PurgeWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})

respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err == nil || respGet != nil {
log.Fatalf("failed to purge workflow: %v", err)
}

fmt.Println("workflow purged")

stage = 0
fmt.Println("workflow client test")

wfClient, err := workflow.NewClient()
if err != nil {
log.Fatalf("[wfclient] faield to initialize: %v", err)
}

id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
if err != nil {
log.Fatalf("[wfclient] failed to start workflow: %v", err)
}

fmt.Printf("[wfclient] started workflow with id: %s\n", id)

metadata, err := wfClient.FetchWorkflowMetadata(ctx, id)
if err != nil {
log.Fatalf("[wfclient] failed to get worfklow: %v", err)
}

fmt.Printf("[wfclient] workflow status: %v\n", metadata.RuntimeStatus.String())

if stage != 1 {
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 1 expected, current: %d", stage)
}

fmt.Printf("[wfclient] stage: %d\n", stage)

// raise event

if err := wfClient.RaiseEvent(ctx, id, "testEvent", workflow.WithEventPayload("testData")); err != nil {
log.Fatalf("[wfclient] failed to raise event: %v", err)
}

fmt.Println("[wfclient] event raised")

// Sleep to allow the workflow to advance
time.Sleep(time.Second)

if stage != 2 {
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 2 expected, current: %d", stage)
}

fmt.Printf("[wfclient] stage: %d\n", stage)

// stop workflow
if err := wfClient.TerminateWorkflow(ctx, id); err != nil {
log.Fatalf("[wfclient] failed to terminate workflow: %v", err)
}

fmt.Println("[wfclient] workflow terminated")

if err := wfClient.PurgeWorkflow(ctx, id); err != nil {
log.Fatalf("[wfclient] failed to purge workflow: %v", err)
}

fmt.Println("[wfclient] workflow purged")

// stop workflow runtime
if err := w.Shutdown(); err != nil {
log.Fatalf("failed to shutdown runtime: %v", err)
}

fmt.Println("workflow worker successfully shutdown")
}

func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
func TestWorkflow(ctx *task.OrchestrationContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
if err := ctx.CallActivity(TestActivity, task.WithActivityInput(input)).Await(&output); err != nil {
return nil, err
}

err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output)
err := ctx.WaitForSingleEvent("testEvent", time.Second*60).Await(&output)
if err != nil {
return nil, err
}

if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
if err := ctx.CallActivity(TestActivity, task.WithActivityInput(input)).Await(&output); err != nil {
return nil, err
}

return output, nil
}

func TestActivity(ctx workflow.ActivityContext) (any, error) {
func TestActivity(ctx task.ActivityContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return "", err
Expand Down
Loading
Loading