/
work.go
82 lines (64 loc) · 1.79 KB
/
work.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package minion
/* this implementation is heavily inspired by
https://github.com/riverqueue/river
I had built something similar, then I saw their announcment
and it filled in the gaps for me.
*/
import (
"context"
"encoding/json"
"time"
"github.com/dashotv/minion/database"
)
type Job[T Payload] struct {
*database.Model
// Args are the arguments for the job.
Args T
}
type Payload interface {
Kind() string
}
// WorkerDefaults is a helper struct that implements the Worker interface
// with default values. Embed this struct in your worker to get the
// default behavior.
type WorkerDefaults[T Payload] struct{}
// Timeout returns the timeout for the job, override this method to
// set a timeout specific to this job, otherwise the default timeout
// will be used.
func (w *WorkerDefaults[T]) Timeout(*Job[T]) time.Duration { return 0 }
// Worker is the interface that must be implemented by all workers.
type Worker[T Payload] interface {
Timeout(*Job[T]) time.Duration
Work(ctx context.Context, job *Job[T]) error
}
type wrapped interface {
Unmarshal() error
Timeout() time.Duration
Work(ctx context.Context) error
}
type wrappedWorker[T Payload] struct {
job *Job[T]
data *database.Model
worker Worker[T]
}
func (w *wrappedWorker[T]) Work(ctx context.Context) error {
return w.worker.Work(ctx, w.job)
}
func (w *wrappedWorker[T]) Timeout() time.Duration {
return w.worker.Timeout(w.job)
}
func (w *wrappedWorker[T]) Unmarshal() error {
w.job = &Job[T]{
Model: w.data,
}
return json.Unmarshal([]byte(w.data.Args), &w.job.Args)
}
type factory interface {
Create(data *database.Model) wrapped
}
type workerFactory[T Payload] struct {
worker Worker[T]
}
func (f *workerFactory[T]) Create(data *database.Model) wrapped {
return &wrappedWorker[T]{data: data, worker: f.worker}
}