/
persistentqueue.go
116 lines (94 loc) · 2.18 KB
/
persistentqueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package persistentqueue
import (
"io/ioutil"
"os"
"path"
"sync"
"github.com/PagerDuty/go-pdagent/pkg/common"
"github.com/PagerDuty/go-pdagent/pkg/eventqueue"
"github.com/PagerDuty/go-pdagent/pkg/eventsapi"
"github.com/asdine/storm"
"go.uber.org/zap"
)
type EventQueue interface {
Enqueue(*eventsapi.EventContainer, chan<- eventqueue.Response) error
Shutdown()
}
type PersistentQueue struct {
DB *storm.DB
Events storm.Node
EventQueue EventQueue
path string
logger *zap.SugaredLogger
tmp bool
wg sync.WaitGroup
}
type Option func(*PersistentQueue)
func WithFile(path string) Option {
return func(q *PersistentQueue) {
q.path = path
q.tmp = false
}
}
func WithEventQueue(eq EventQueue) Option {
return func(q *PersistentQueue) {
q.EventQueue = eq
}
}
func NewPersistentQueue(options ...Option) *PersistentQueue {
logger := common.Logger.Named("PersistentQueue")
logger.Info("Creating new PersistentQueue.")
q := PersistentQueue{
EventQueue: eventqueue.NewEventQueue(),
logger: logger,
tmp: true,
}
for _, option := range options {
option(&q)
}
return &q
}
func (q *PersistentQueue) Start() error {
if q.tmp {
dbFile, err := ioutil.TempFile("", "go-pdagent.*.db")
if err != nil {
return err
}
q.path = dbFile.Name()
dbFile.Close()
} else {
if err := os.MkdirAll(path.Dir(q.path), 0744); err != nil {
return err
}
}
db, err := storm.Open(q.path)
if err != nil {
return err
}
q.DB = db
q.Events = q.DB.From("events")
var pendingEvents []Event
if err := q.Events.Find("Status", StatusPending, &pendingEvents); err != nil && err != storm.ErrNotFound {
q.logger.Error("Error querying for pending events: ", err)
return err
}
q.logger.Infof("Enqueuing %v pending events.", len(pendingEvents))
for _, e := range pendingEvents {
q.processEvent(&e)
}
return nil
}
// Stop a `PersistentQueue`, performing any necessary cleanup.
func (q *PersistentQueue) Shutdown() error {
q.logger.Info("Shutting down PersistentQueue.")
q.EventQueue.Shutdown()
q.wg.Wait()
if err := q.DB.Close(); err != nil {
return err
}
if q.tmp {
os.Remove(q.path)
}
q.logger.Info("Shut down PersistentQueue.")
return nil
}