/
operation.go
160 lines (131 loc) · 3.6 KB
/
operation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package torch
import (
"context"
"time"
"go.mongodb.org/mongo-driver/bson"
"github.com/256dpi/fire/axe"
"github.com/256dpi/fire/coal"
"github.com/256dpi/fire/stick"
)
// Context holds context information for a reactor operation.
type Context struct {
// The parent context.
context.Context
// The operated model.
Model coal.Model
// The final update document.
Update bson.M
// Whether the operation is executed synchronously.
Sync bool
// A flag that may be set by the handler to indicate that the operation has
// not yet been fully processed and the handler should be called again
// sometime later. If a synchronous operation is deferred, it will always be
// retried asynchronously.
Defer bool
// The executed operation.
Operation *Operation
// The executed check or computation if available.
Check *Check
Computation *Computation
// The function used to report progress during a computation.
Progress func(factor float64) error
// The reactor, store and queue.
Reactor *Reactor
Store *coal.Store
Queue *axe.Queue
// The async context.
AsyncContext *axe.Context
}
// Change will record a change to the update document.
func (c *Context) Change(op, key string, val interface{}) {
if c.Update[op] == nil {
c.Update[op] = bson.M{key: val}
} else {
c.Update[op].(bson.M)[key] = val
}
}
// Operation defines a reactor operation.
type Operation struct {
// A unique name.
Name string
// The model.
Model coal.Model
// The query used to find potential models to process.
Query func() bson.M
// The filter function that decides whether a model should be processed.
Filter func(model coal.Model) bool
// The function called to process a model.
Processor func(ctx *Context) error
// The operation is executed synchronously during the modifier callback and
// when checked directly.
Sync bool
// The maximum number of models loaded during a single scan.
//
// Default: 100.
ScanBatch int
// The time after which an asynchronous operation fails (lifetime) and is
// retried (timeout).
//
// Default: 5m, 10m.
ProcessLifetime time.Duration
ProcessTimeout time.Duration
// The maximum delay up to which a process may be deferred. Beyond this
// limit, the process is aborted and may be picked up by the next scan
// depending on the configured query.
//
// Default: 1m.
MaxDeferDelay time.Duration
// The tag name used to track the number of outstanding operations.
//
// Default: "torch/Reactor/<Name>".
TagName string
// The tag expiry time.
//
// Default: 24h.
TagExpiry time.Duration
}
// Validate will validate the operation.
func (o *Operation) Validate() error {
// ensure defaults
if o.ScanBatch == 0 {
o.ScanBatch = 100
}
if o.ProcessLifetime == 0 {
o.ProcessLifetime = 5 * time.Minute
}
if o.ProcessTimeout == 0 {
o.ProcessTimeout = 10 * time.Minute
}
if o.MaxDeferDelay == 0 {
o.MaxDeferDelay = time.Minute
}
if o.TagName == "" {
o.TagName = "torch/Reactor/" + o.Name
}
if o.TagExpiry == 0 {
o.TagExpiry = 24 * time.Hour
}
return stick.Validate(o, func(v *stick.Validator) {
v.Value("Name", false, stick.IsNotZero)
v.Value("Model", false, stick.IsNotZero)
v.Value("Processor", false, stick.IsNotZero)
})
}
// Registry is a collection of known operations.
type Registry struct {
*stick.Registry[*Operation]
}
// NewRegistry will return an operation registry indexed by name.
func NewRegistry(operations ...*Operation) *Registry {
return &Registry{
Registry: stick.NewRegistry(operations,
func(o *Operation) error {
return o.Validate()
},
// index by name
func(op *Operation) string {
return op.Name
},
),
}
}