-
-
Notifications
You must be signed in to change notification settings - Fork 478
/
background.go
151 lines (127 loc) · 3.19 KB
/
background.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// Package queue implements an experimental background queue for cleanup jobs.
// Beware: It's likely broken.
// We can easily close a channel which might later be written to.
// The current locking is but a poor workaround.
// A better implementation would create a queue object in main, pass
// it through and wait for the channel to be empty before leaving main.
// Will do that later.
package queue
import (
"context"
"fmt"
"time"
"github.com/gopasspw/gopass/internal/out"
"github.com/gopasspw/gopass/pkg/debug"
)
type contextKey int
const (
ctxKeyQueue contextKey = iota
)
// Queuer is a queue interface.
type Queuer interface {
Add(Task) Task
Close(context.Context) error
Idle(time.Duration) error
}
// WithQueue adds the given queue to the context. Add a nil
// queue to disable queuing in this context.
func WithQueue(ctx context.Context, q *Queue) context.Context {
return context.WithValue(ctx, ctxKeyQueue, q)
}
// GetQueue returns an existing queue from the context or
// returns a noop one.
func GetQueue(ctx context.Context) Queuer {
if q, ok := ctx.Value(ctxKeyQueue).(*Queue); ok && q != nil {
return q
}
return &noop{}
}
type noop struct{}
// Add always returns the task.
func (n *noop) Add(t Task) Task {
return t
}
// Close always returns nil.
func (n *noop) Close(_ context.Context) error {
return nil
}
// Idle always returns nil.
func (n *noop) Idle(_ time.Duration) error {
return nil
}
// Task is a background task.
type Task func(ctx context.Context) (context.Context, error)
// Queue is a serialized background processing unit.
type Queue struct {
work chan Task
done chan struct{}
}
// New creates a new queue.
func New(ctx context.Context) *Queue {
q := &Queue{
work: make(chan Task, 1024),
done: make(chan struct{}, 1),
}
go q.run(ctx)
return q
}
func (q *Queue) run(ctx context.Context) {
for t := range q.work {
ctx2, err := t(ctx)
if err != nil {
out.Errorf(ctx, "Task failed: %s", err)
}
if ctx2 != nil {
// if a task returns a context, it is to transmit information to the next tasks in line
// so replace the in-queue context with the new one
// (each task has access to two contexts: one from the queue, and one from the function creating the task)
ctx = ctx2
}
debug.Log("Task done")
}
debug.Log("all tasks done")
q.done <- struct{}{}
}
// Add enqueues a new task.
func (q *Queue) Add(t Task) Task {
q.work <- t
debug.Log("enqueued task")
return func(ctx2 context.Context) (context.Context, error) {
return ctx2, nil
}
}
// Idle returns nil the next time the queue is empty.
func (q *Queue) Idle(maxWait time.Duration) error {
done := make(chan struct{})
go func() {
for {
if len(q.work) < 1 {
select {
case done <- struct{}{}:
// sent
default:
// no-op
}
}
time.Sleep(20 * time.Millisecond)
}
}()
select {
case <-done:
return nil
case <-time.After(maxWait):
return fmt.Errorf("timed out waiting for empty queue")
}
}
// Close waits for all tasks to be processed. Must only be called once on
// shutdown.
func (q *Queue) Close(ctx context.Context) error {
close(q.work)
select {
case <-q.done:
return nil
case <-ctx.Done():
debug.Log("context canceled")
return ctx.Err()
}
}