-
Notifications
You must be signed in to change notification settings - Fork 46
/
worker.go
117 lines (94 loc) · 3.51 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package worker
import (
"context"
"fmt"
"sync"
"github.com/benbjohnson/clock"
"github.com/cschleiden/go-workflows/backend"
"github.com/cschleiden/go-workflows/backend/history"
"github.com/cschleiden/go-workflows/client"
"github.com/cschleiden/go-workflows/internal/signals"
internal "github.com/cschleiden/go-workflows/internal/worker"
workflowinternal "github.com/cschleiden/go-workflows/internal/workflow"
"github.com/cschleiden/go-workflows/registry"
"github.com/cschleiden/go-workflows/workflow"
)
type Worker struct {
backend backend.Backend
done chan struct{}
wg *sync.WaitGroup
registry *registry.Registry
workflowWorker *internal.Worker[backend.WorkflowTask, workflowinternal.ExecutionResult]
activityWorker *internal.Worker[backend.ActivityTask, history.Event]
}
func New(backend backend.Backend, options *Options) *Worker {
if options == nil {
options = &DefaultOptions
} else {
if options.WorkflowExecutorCacheSize == 0 {
options.WorkflowExecutorCacheSize = DefaultOptions.WorkflowExecutorCacheSize
}
if options.WorkflowExecutorCacheTTL == 0 {
options.WorkflowExecutorCacheTTL = DefaultOptions.WorkflowExecutorCacheTTL
}
}
registry := registry.New()
// Register internal activities
if err := registry.RegisterActivity(&signals.Activities{Signaler: client.New(backend)}); err != nil {
panic(fmt.Errorf("registering internal activities: %w", err))
}
return &Worker{
backend: backend,
done: make(chan struct{}),
wg: &sync.WaitGroup{},
workflowWorker: internal.NewWorkflowWorker(backend, registry, internal.WorkflowWorkerOptions{
WorkerOptions: internal.WorkerOptions{
Pollers: options.WorkflowPollers,
PollingInterval: options.WorkflowPollingInterval,
MaxParallelTasks: options.MaxParallelWorkflowTasks,
HeartbeatInterval: options.WorkflowHeartbeatInterval,
},
WorkflowExecutorCache: options.WorkflowExecutorCache,
WorkflowExecutorCacheSize: options.WorkflowExecutorCacheSize,
WorkflowExecutorCacheTTL: options.WorkflowExecutorCacheTTL,
}),
activityWorker: internal.NewActivityWorker(backend, registry, clock.New(), internal.WorkerOptions{
Pollers: options.ActivityPollers,
PollingInterval: options.ActivityPollingInterval,
MaxParallelTasks: options.MaxParallelActivityTasks,
HeartbeatInterval: options.ActivityHeartbeatInterval,
}),
registry: registry,
}
}
// Start starts the worker.
//
// To stop the worker, cancel the context passed to Start. To wait for completion of the active
// tasks, call `WaitForCompletion`.
func (w *Worker) Start(ctx context.Context) error {
if err := w.workflowWorker.Start(ctx); err != nil {
return fmt.Errorf("starting workflow worker: %w", err)
}
if err := w.activityWorker.Start(ctx); err != nil {
return fmt.Errorf("starting activity worker: %w", err)
}
return nil
}
// WaitForCompletion waits for all active tasks to complete.
func (w *Worker) WaitForCompletion() error {
if err := w.workflowWorker.WaitForCompletion(); err != nil {
return err
}
if err := w.activityWorker.WaitForCompletion(); err != nil {
return err
}
return nil
}
// RegisterWorkflow registers a workflow with the worker's registry.
func (w *Worker) RegisterWorkflow(wf workflow.Workflow, opts ...registry.RegisterOption) error {
return w.registry.RegisterWorkflow(wf, opts...)
}
// RegisterActivity registers an activity with the worker's registry.
func (w *Worker) RegisterActivity(a workflow.Activity, opts ...registry.RegisterOption) error {
return w.registry.RegisterActivity(a, opts...)
}