This repository has been archived by the owner on May 5, 2019. It is now read-only.
/
easy.go
140 lines (118 loc) · 3.54 KB
/
easy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/*
Package easy is an easier interface to use cirello.io/supervisor. Its lifecycle
is managed through context.Context. Stop a given supervisor by cancelling its
context.
package main
import supervisor "cirello.io/supervisor/easy"
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// use cancel() to stop the supervisor
ctx = supervisor.WithContext(ctx)
supervisor.Add(ctx, func(ctx context.Context) {
// ...
})
}
*/
package easy
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"cirello.io/supervisor"
)
type ctxKey int
const supervisorName ctxKey = 0
var (
// ErrNoSupervisorAttached means that the given context has not been
// wrapped with WithContext, and thus this package cannot detect
// which supervisore you are referring to.
ErrNoSupervisorAttached = errors.New("no supervisor attached to context")
mu sync.Mutex
supervisors map[string]*supervisor.Group // map of supervisor name to supervisor.Supervisor
)
func init() {
supervisors = make(map[string]*supervisor.Group)
}
var (
// Permanent services are always restarted.
Permanent = supervisor.Permanent
// Transient services are restarted only when panic.
Transient = supervisor.Transient
// Temporary services are never restarted.
Temporary = supervisor.Temporary
)
// Add inserts supervised function to the attached supervisor, it launches
// automatically. If the context is not correctly prepared, it returns an
// ErrNoSupervisorAttached error. By default, the restart policy is Permanent.
func Add(ctx context.Context, f func(context.Context), opts ...supervisor.ServiceOption) (string, error) {
name, ok := extractName(ctx)
if !ok {
return "", ErrNoSupervisorAttached
}
mu.Lock()
svr, ok := supervisors[name]
mu.Unlock()
if !ok {
panic("supervisor not found")
}
opts = append([]supervisor.ServiceOption{Permanent}, opts...)
svcName := svr.AddFunc(f, opts...)
return svcName, nil
}
// Remove stops and removes the given service from the attached supervisor. If
// the context is not correctly prepared, it returns an ErrNoSupervisorAttached
// error
func Remove(ctx context.Context, name string) error {
name, ok := extractName(ctx)
if !ok {
return ErrNoSupervisorAttached
}
mu.Lock()
svr, ok := supervisors[name]
mu.Unlock()
if !ok {
panic("supervisor not found")
}
svr.Remove(name)
return nil
}
// WithContext takes a context and prepare it to be used by easy supervisor
// package. Internally, it creates a supervisor in group mode. In this mode,
// every time a service dies, the whole supervisor is restarted.
func WithContext(ctx context.Context, opts ...SupervisorOption) context.Context {
chosenName := fmt.Sprintf("supervisor-%d", rand.Uint64())
svr := &supervisor.Supervisor{
Name: chosenName,
MaxRestarts: supervisor.AlwaysRestart,
Log: func(interface{}) {},
}
for _, opt := range opts {
opt(svr)
}
group := &supervisor.Group{
Supervisor: svr,
}
mu.Lock()
supervisors[chosenName] = group
mu.Unlock()
wrapped := context.WithValue(ctx, supervisorName, chosenName)
go group.Serve(wrapped)
return wrapped
}
// SupervisorOption reconfigures the supervisor attached to the context.
type SupervisorOption func(*supervisor.Supervisor)
// WithLogger attaches a log function to the supervisor
func WithLogger(logger func(a ...interface{})) SupervisorOption {
return func(s *supervisor.Supervisor) {
s.Log = func(v interface{}) {
logger(v)
}
}
}
func extractName(ctx context.Context) (string, bool) {
name, ok := ctx.Value(supervisorName).(string)
return name, ok
}