-
Notifications
You must be signed in to change notification settings - Fork 78
Expand file tree
/
Copy pathworkflow.go
More file actions
88 lines (62 loc) · 2.25 KB
/
workflow.go
File metadata and controls
88 lines (62 loc) · 2.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package main
import (
"context"
"errors"
"time"
"github.com/cschleiden/go-workflows/activity"
"github.com/cschleiden/go-workflows/workflow"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
type Inputs struct {
Msg string
Times int
}
func Workflow1(ctx workflow.Context, msg string, times int, inputs Inputs) (int, error) {
logger := workflow.Logger(ctx)
logger.Debug("Entering Workflow1", "msg", msg, "times", times, "inputs", inputs)
defer logger.Debug("Leaving Workflow1")
tracer := workflow.Tracer(ctx)
ctx, span := tracer.Start(ctx, "Workflow1 custom span")
defer span.End()
_, customSpan := tracer.Start(ctx, "Workflow1 custom inner span", trace.WithAttributes(
// Add additional
attribute.String("msg", "hello world"),
))
// Do something
customSpan.End()
workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
workflow.Sleep(ctx, time.Second*1)
workflow.CreateSubWorkflowInstance[any](ctx, workflow.DefaultSubWorkflowOptions, Subworkflow).Get(ctx)
workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
_, span = workflow.Tracer(ctx).Start(ctx, "Wait-for-signal")
workflow.NewSignalChannel[string](ctx, "test-signal").Receive(ctx)
span.End()
r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, RetriedActivity, 35, 12).Get(ctx)
return r1, err
}
func Subworkflow(ctx workflow.Context) error {
workflow.Sleep(ctx, time.Millisecond*500)
workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
return nil
}
func Activity1(ctx context.Context, a, b int) (int, error) {
logger := activity.Logger(ctx)
logger.Debug("Entering Activity1")
defer logger.Debug("Leaving Activity1")
ctx, span := otel.Tracer("activity").Start(ctx, "Custom Activity1 span")
defer span.End()
_, span = otel.Tracer("activity").Start(ctx, "Another one")
defer span.End()
time.Sleep(300 * time.Millisecond)
return a + b, nil
}
func RetriedActivity(ctx context.Context, a, b int) (int, error) {
logger := activity.Logger(ctx)
if activity.Attempt(ctx) < 1 {
logger.Info("Simulating failure")
return 0, errors.New("simulated failure")
}
return a + b, nil
}