-
Notifications
You must be signed in to change notification settings - Fork 913
/
task.go
201 lines (166 loc) · 5.04 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package tasks
import (
"context"
"errors"
"fmt"
"reflect"
"runtime/debug"
opentracing "github.com/opentracing/opentracing-go"
opentracing_ext "github.com/opentracing/opentracing-go/ext"
opentracing_log "github.com/opentracing/opentracing-go/log"
"github.com/RichardKnop/machinery/v1/log"
)
// ErrTaskPanicked ...
var ErrTaskPanicked = errors.New("Invoking task caused a panic")
// Task wraps a signature and methods used to reflect task arguments and
// return values after invoking the task
type Task struct {
TaskFunc reflect.Value
UseContext bool
Context context.Context
Args []reflect.Value
}
type signatureCtxType struct{}
var signatureCtx signatureCtxType
// SignatureFromContext gets the signature from the context
func SignatureFromContext(ctx context.Context) *Signature {
if ctx == nil {
return nil
}
v := ctx.Value(signatureCtx)
if v == nil {
return nil
}
signature, _ := v.(*Signature)
return signature
}
// NewWithSignature is the same as New but injects the signature
func NewWithSignature(taskFunc interface{}, signature *Signature) (*Task, error) {
args := signature.Args
ctx := context.Background()
ctx = context.WithValue(ctx, signatureCtx, signature)
task := &Task{
TaskFunc: reflect.ValueOf(taskFunc),
Context: ctx,
}
taskFuncType := reflect.TypeOf(taskFunc)
if taskFuncType.NumIn() > 0 {
arg0Type := taskFuncType.In(0)
if IsContextType(arg0Type) {
task.UseContext = true
}
}
if err := task.ReflectArgs(args); err != nil {
return nil, fmt.Errorf("Reflect task args error: %s", err)
}
return task, nil
}
// New tries to use reflection to convert the function and arguments
// into a reflect.Value and prepare it for invocation
func New(taskFunc interface{}, args []Arg) (*Task, error) {
task := &Task{
TaskFunc: reflect.ValueOf(taskFunc),
Context: context.Background(),
}
taskFuncType := reflect.TypeOf(taskFunc)
if taskFuncType.NumIn() > 0 {
arg0Type := taskFuncType.In(0)
if IsContextType(arg0Type) {
task.UseContext = true
}
}
if err := task.ReflectArgs(args); err != nil {
return nil, fmt.Errorf("Reflect task args error: %s", err)
}
return task, nil
}
// Call attempts to call the task with the supplied arguments.
//
// `err` is set in the return value in two cases:
// 1. The reflected function invocation panics (e.g. due to a mismatched
// argument list).
// 2. The task func itself returns a non-nil error.
func (t *Task) Call() (taskResults []*TaskResult, err error) {
// retrieve the span from the task's context and finish it as soon as this function returns
if span := opentracing.SpanFromContext(t.Context); span != nil {
defer span.Finish()
}
defer func() {
// Recover from panic and set err.
if e := recover(); e != nil {
switch e := e.(type) {
default:
err = ErrTaskPanicked
case error:
err = e
case string:
err = errors.New(e)
}
// mark the span as failed and dump the error and stack trace to the span
if span := opentracing.SpanFromContext(t.Context); span != nil {
opentracing_ext.Error.Set(span, true)
span.LogFields(
opentracing_log.Error(err),
opentracing_log.Object("stack", string(debug.Stack())),
)
}
// Print stack trace
log.ERROR.Printf("%v stack: %s", err, debug.Stack())
}
}()
args := t.Args
if t.UseContext {
ctxValue := reflect.ValueOf(t.Context)
args = append([]reflect.Value{ctxValue}, args...)
}
// Invoke the task
results := t.TaskFunc.Call(args)
// Task must return at least a value
if len(results) == 0 {
return nil, ErrTaskReturnsNoValue
}
// Last returned value
lastResult := results[len(results)-1]
// If the last returned value is not nil, it has to be of error type, if that
// is not the case, return error message, otherwise propagate the task error
// to the caller
if !lastResult.IsNil() {
// If the result implements Retriable interface, return instance of Retriable
retriableErrorInterface := reflect.TypeOf((*Retriable)(nil)).Elem()
if lastResult.Type().Implements(retriableErrorInterface) {
return nil, lastResult.Interface().(ErrRetryTaskLater)
}
// Otherwise, check that the result implements the standard error interface,
// if not, return ErrLastReturnValueMustBeError error
errorInterface := reflect.TypeOf((*error)(nil)).Elem()
if !lastResult.Type().Implements(errorInterface) {
return nil, ErrLastReturnValueMustBeError
}
// Return the standard error
return nil, lastResult.Interface().(error)
}
// Convert reflect values to task results
taskResults = make([]*TaskResult, len(results)-1)
for i := 0; i < len(results)-1; i++ {
val := results[i].Interface()
typeStr := reflect.TypeOf(val).String()
taskResults[i] = &TaskResult{
Type: typeStr,
Value: val,
}
}
return taskResults, err
}
// ReflectArgs converts []TaskArg to []reflect.Value
func (t *Task) ReflectArgs(args []Arg) error {
argValues := make([]reflect.Value, len(args))
for i, arg := range args {
argValue, err := ReflectValue(arg.Type, arg.Value)
if err != nil {
return err
}
argValues[i] = argValue
}
t.Args = argValues
return nil
}