Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -874,86 +874,98 @@ package main

import (
"context"
"errors"
"fmt"
"log"
"time"

"github.com/dapr/durabletask-go/api"
"github.com/dapr/durabletask-go/backend"
"github.com/dapr/durabletask-go/client"
"github.com/dapr/durabletask-go/task"
dapr "github.com/dapr/go-sdk/client"
"github.com/dapr/durabletask-go/workflow"
"github.com/dapr/go-sdk/client"
)

var stage = 0

const (
workflowComponent = "dapr"
)
var failActivityTries = 0

func main() {
registry := task.NewTaskRegistry()
r := workflow.NewRegistry()

if err := registry.AddOrchestrator(TestWorkflow); err != nil {
if err := r.AddWorkflow(TestWorkflow); err != nil {
log.Fatal(err)
}
fmt.Println("TestWorkflow registered")

if err := registry.AddActivity(TestActivity); err != nil {
if err := r.AddActivity(TestActivity); err != nil {
log.Fatal(err)
}
fmt.Println("TestActivity registered")

daprClient, err := dapr.NewClient()
if err != nil {
log.Fatalf("failed to create Dapr client: %v", err)
if err := r.AddActivity(FailActivity); err != nil {
log.Fatal(err)
}
fmt.Println("FailActivity registered")

client := client.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger())
if err := client.StartWorkItemListener(context.TODO(), registry); err != nil {
log.Fatalf("failed to start work item listener: %v", err)
wclient, err := client.NewWorkflowClient()
if err != nil {
log.Fatal(err)
}
fmt.Println("Worker initialized")

ctx, cancel := context.WithCancel(context.Background())
if err = wclient.StartWorker(ctx, r); err != nil {
log.Fatal(err)
}
fmt.Println("runner started")

ctx := context.Background()

// Start workflow test
id, err := client.ScheduleNewOrchestration(ctx, "TestWorkflow", api.WithInput(1))
// Set the start time to the current time to not wait for the workflow to
// "start". This is useful for increasing the throughput of creating
// workflows.
// workflow.WithStartTime(time.Now())
instanceID, err := wclient.ScheduleWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
fmt.Printf("workflow started with id: %v\n", id)
fmt.Printf("workflow started with id: %v\n", instanceID)

// Pause workflow test
err = client.PurgeOrchestrationState(ctx, id)
err = wclient.SuspendWorkflow(ctx, instanceID, "")
if err != nil {
log.Fatalf("failed to pause workflow: %v", err)
}

respGet, err := client.FetchOrchestrationMetadata(ctx, id)
respFetch, err := wclient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
log.Fatalf("failed to fetch workflow: %v", err)
}
fmt.Printf("workflow paused: %s\n", respGet.RuntimeStatus)

if respFetch.RuntimeStatus != workflow.StatusSuspended {
log.Fatalf("workflow not paused: %s: %v", respFetch.RuntimeStatus, respFetch)
}

fmt.Printf("workflow paused\n")

// Resume workflow test
err = client.ResumeOrchestration(ctx, id, "")
err = wclient.ResumeWorkflow(ctx, instanceID, "")
if err != nil {
log.Fatalf("failed to resume workflow: %v", err)
}
fmt.Printf("workflow running: %s\n", respGet.RuntimeStatus)

respGet, err = client.FetchOrchestrationMetadata(ctx, id)
respFetch, err = wclient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
fmt.Printf("workflow resumed: %s\n", respGet.RuntimeStatus)

if respFetch.RuntimeStatus != workflow.StatusRunning {
log.Fatalf("workflow not running")
}

fmt.Println("workflow resumed")

fmt.Printf("stage: %d\n", stage)

// Raise Event Test
err = client.RaiseEvent(ctx, id, "testEvent", api.WithEventPayload("testData"))

err = wclient.RaiseEvent(ctx, instanceID, "testEvent", workflow.WithEventPayload("testData"))
if err != nil {
fmt.Printf("failed to raise event: %v", err)
}
Expand All @@ -964,44 +976,99 @@ func main() {

fmt.Printf("stage: %d\n", stage)

respGet, err = client.FetchOrchestrationMetadata(ctx, id)
waitCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
_, err = wclient.WaitForWorkflowCompletion(waitCtx, instanceID)
cancel()
if err != nil {
log.Fatalf("failed to wait for workflow: %v", err)
}

fmt.Printf("fail activity executions: %d\n", failActivityTries)

respFetch, err = wclient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}

fmt.Printf("workflow status: %v\n", respGet.RuntimeStatus)
fmt.Printf("workflow status: %v\n", respFetch.String())

// Purge workflow test
err = client.PurgeOrchestrationState(ctx, id)
err = wclient.PurgeWorkflowState(ctx, instanceID)
if err != nil {
log.Fatalf("failed to purge workflow: %v", err)
}

respFetch, err = wclient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err == nil || respFetch != nil {
log.Fatalf("failed to purge workflow: %v", err)
}

fmt.Println("workflow purged")

fmt.Printf("stage: %d\n", stage)

// Terminate workflow test
id, err := wclient.ScheduleWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
fmt.Printf("workflow started with id: %v\n", instanceID)

metadata, err := wclient.WaitForWorkflowStart(ctx, id)
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
fmt.Printf("workflow status: %s\n", metadata.String())

err = wclient.TerminateWorkflow(ctx, id)
if err != nil {
log.Fatalf("failed to terminate workflow: %v", err)
}
fmt.Println("workflow terminated")

err = wclient.PurgeWorkflowState(ctx, id)
if err != nil {
log.Fatalf("failed to purge workflow: %v", err)
}
fmt.Println("workflow purged")

cancel()

fmt.Println("workflow worker successfully shutdown")
}

func TestWorkflow(ctx *task.OrchestrationContext) (any, error) {
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
if err := ctx.CallActivity(TestActivity, task.WithActivityInput(input)).Await(&output); err != nil {
if err := ctx.CallActivity(TestActivity, workflow.WithActivityInput(input)).Await(&output); err != nil {
return nil, err
}

err := ctx.WaitForSingleEvent("testEvent", time.Second*60).Await(&output)
err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output)
if err != nil {
return nil, err
}

if err := ctx.CallActivity(TestActivity, task.WithActivityInput(input)).Await(&output); err != nil {
if err := ctx.CallActivity(TestActivity, workflow.WithActivityInput(input)).Await(&output); err != nil {
return nil, err
}

if err := ctx.CallActivity(FailActivity, workflow.WithActivityRetryPolicy(&workflow.RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 100 * time.Millisecond,
BackoffCoefficient: 2,
MaxRetryInterval: 1 * time.Second,
})).Await(nil); err == nil {
return nil, fmt.Errorf("unexpected no error executing fail activity")
}

return output, nil
}

func TestActivity(ctx task.ActivityContext) (any, error) {
func TestActivity(ctx workflow.ActivityContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return "", err
Expand All @@ -1011,6 +1078,11 @@ func TestActivity(ctx task.ActivityContext) (any, error) {

return fmt.Sprintf("Stage: %d", stage), nil
}

func FailActivity(ctx workflow.ActivityContext) (any, error) {
failActivityTries += 1
return nil, errors.New("dummy activity error")
}
```

[See the full Go SDK workflow example in context.](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ The following example shows how to execute the activity `ActivityA` on the targe
{{% tab "Go" %}}

```go
func TestWorkflow(ctx *task.OrchestrationContext) (any, error) {
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var output string
err := ctx.CallActivity("ActivityA",
task.WithActivityInput("my-input"),
task.WithActivityAppID("App2"), // Here we set the target app ID which will execute this activity.
workflow.WithActivityInput("my-input"),
workflow.WithActivityAppID("App2"), // Here we set the target app ID which will execute this activity.
).Await(&output)

if err != nil {
Expand Down Expand Up @@ -115,11 +115,11 @@ The following example shows how to execute the child workflow `Workflow2` on the
{{% tab "Go" %}}

```go
func TestWorkflow(ctx *task.OrchestrationContext) (any, error) {
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var output string
err := ctx.CallSubOrchestrator("Workflow2",
task.WithSubOrchestratorInput("my-input"),
task.WithSubOrchestratorAppID("App2"), // Here we set the target app ID which will execute this child workflow.
err := ctx.CallChildWorkflow("Workflow2",
workflow.WithChildWorkflowInput("my-input"),
workflow.WithChildWorkflowAppID("App2"), // Here we set the target app ID which will execute this child workflow.
).Await(&output)

if err != nil {
Expand Down
Loading
Loading