/
publisher.go
288 lines (243 loc) · 7.64 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
package eventsourcex
import (
"context"
"io"
"io/ioutil"
"strings"
"time"
"github.com/altairsix/eventsource"
"github.com/altairsix/pkg/action"
"github.com/altairsix/pkg/action/heartbeat"
"github.com/altairsix/pkg/checkpoint"
"github.com/altairsix/pkg/tracer"
"github.com/nats-io/go-nats"
"github.com/nats-io/go-nats-streaming"
"github.com/opentracing/opentracing-go/log"
)
const (
// DefaultCommitInterval the minimum amount of time that must pass between offset commits
DefaultCommitInterval = time.Second * 3
// DefaultPublishInterval the amount of time between checking the repository for updates
DefaultPublishInterval = time.Minute
)
// Publisher publishes the record to a event bus
type Publisher interface {
Publish(record eventsource.StreamRecord) error
}
// PublisherFunc provides a func wrapper to Publisher
type PublisherFunc func(record eventsource.StreamRecord) error
// Publish Implements the Publisher interface
func (fn PublisherFunc) Publish(record eventsource.StreamRecord) error { return fn(record) }
// WithLogPublish logs when events are published
func WithLogPublish(publisher Publisher, logger interface {
Info(string, ...log.Field)
}) PublisherFunc {
return func(record eventsource.StreamRecord) error {
logger.Info("publisher:publish", log.String("id", record.AggregateID))
return publisher.Publish(record)
}
}
// Supervisor reads events from a StreamReader and supervisor them to a handler
type Supervisor interface {
Check()
Close() error
Done() <-chan struct{}
}
type supervisor struct {
w io.Writer
ctx context.Context
cancel func()
done chan struct{}
check chan struct{}
segment tracer.Segment
r eventsource.StreamReader
h Publisher
cpKey string
cp Checkpointer
interval time.Duration
offset uint64
offsetLoaded bool
committedOffset uint64
committedAt time.Time
recordCount int
}
// Close stops the worker process
func (s *supervisor) Close() error {
s.cancel()
<-s.done
return nil
}
// Check request a check from the supervisor
func (s *supervisor) Check() {
select {
case s.check <- struct{}{}:
default:
}
}
// Done allows external tools to signal off of when the supervisor is done
func (s *supervisor) Done() <-chan struct{} {
return s.done
}
func (s *supervisor) checkOnce() {
segment, ctx := tracer.NewSegment(s.ctx, "supervisor:check_once", log.String("checkpoint-key", s.cpKey))
defer segment.Finish()
if !s.offsetLoaded {
v, err := s.cp.Load(ctx, s.cpKey)
if err != nil {
segment.LogFields(log.Error(err), log.String("text", "unable to load checkpoint key"))
return
}
s.offset = v
s.offsetLoaded = true
segment.Info("supervisor:checkpoint_loaded", log.Uint64("offset", s.offset))
}
// read events
events, err := s.r.Read(ctx, s.offset+1, s.recordCount)
if err != nil {
segment.LogFields(log.Error(err), log.String("text", "unable to read events from StreamReder"))
return
}
// publish events
for _, event := range events {
if err := s.h.Publish(event); err != nil {
segment.LogFields(log.Error(err), log.String("text", "unable to publish events"))
return
}
s.offset = event.Offset
}
// time to commit?
if now := time.Now(); now.Sub(s.committedAt) > DefaultCommitInterval {
if err := s.cp.Save(ctx, s.cpKey, s.offset); err == nil {
s.committedAt = now
s.committedOffset = s.offset
}
}
}
func (s *supervisor) listenAndPublish() {
defer close(s.done)
defer close(s.check)
defer s.segment.Finish()
s.segment.Info("supervisor:started", log.String("interval", s.interval.String()))
timer := time.NewTicker(s.interval)
defer timer.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-timer.C:
s.checkOnce()
case <-s.check:
s.checkOnce()
}
}
}
// WithPublishEvents publishes received events to nats
func WithPublishEvents(fn Publisher, nc *nats.Conn, env, boundedContext string) PublisherFunc {
rootSubject := StreamSubject(env, boundedContext) + "."
return func(event eventsource.StreamRecord) error {
if err := fn.Publish(event); err != nil {
return err
}
subject := rootSubject + event.AggregateID
go nc.Publish(subject, []byte(event.AggregateID))
return nil
}
}
// PublishStream reads from a stream and publishes
func PublishStream(ctx context.Context, h Publisher, r eventsource.StreamReader, cp Checkpointer, env, bc string) Supervisor {
cpKey := makeCheckpointKey(env, bc)
segment, _ := tracer.NewSegment(ctx, "publish_stream", log.String("checkpoint", cpKey))
child, cancel := context.WithCancel(ctx)
s := &supervisor{
w: ioutil.Discard,
ctx: child,
cancel: cancel,
done: make(chan struct{}),
check: make(chan struct{}, 1),
segment: segment,
r: r,
h: h,
cpKey: cpKey,
cp: cp,
interval: DefaultPublishInterval,
recordCount: 100,
}
go s.listenAndPublish()
return s
}
type tracingPublisher struct {
target Supervisor
segment tracer.Segment
}
// WithTraceReceiveNotices returns a Supervisor that ping when Check is invoked
func WithTraceReceiveNotices(s Supervisor, segment tracer.Segment) Supervisor {
return &tracingPublisher{
target: s,
segment: segment,
}
}
func (s *tracingPublisher) Close() error { return s.target.Close() }
func (s *tracingPublisher) Done() <-chan struct{} { return s.target.Done() }
func (s *tracingPublisher) Check() {
s.segment.Info("eventsourcex:notice_received")
s.target.Check()
}
// WithReceiveNotifications listens to nats for notices on the StreamSubject and prods the supervisor
func WithReceiveNotifications(s Supervisor, nc *nats.Conn, env, boundedContext string) Supervisor {
go func() {
subject := NoticesSubject(env, boundedContext)
fn := func(m *nats.Msg) {
s.Check()
}
var sub *nats.Subscription
for {
select {
case <-s.Done():
return
default:
}
if v, err := nc.Subscribe(subject, fn); err == nil {
sub = v
break
}
}
<-s.Done()
sub.Unsubscribe()
}()
return s
}
// PublishStan publishes events to the nats stream identified with the env and boundedContext
func PublishStan(st stan.Conn, subject string) PublisherFunc {
return func(event eventsource.StreamRecord) error {
return st.Publish(subject, event.Data)
}
}
// PublishStreamSingleton is similar to PublishStream except that there may be only one running in the environment
func PublishStreamSingleton(ctx context.Context, p Publisher, r eventsource.StreamReader, cp *checkpoint.CP, env, bc string, nc *nats.Conn) error {
segment, ctx := tracer.NewSegment(ctx, "publish_stream")
segment.SetBaggageItem("subject", StreamSubject(env, bc))
defer segment.Finish()
a := action.Action(func(ctx context.Context) error {
h := WithPublishEvents(p, nc, env, bc) // publish events to here
supervisor := PublishStream(ctx, h, r, cp, env, bc) // go!
if env == "local" { // in the local env
supervisor = WithTraceReceiveNotices(supervisor, segment) // configuration addition logging
}
supervisor = WithReceiveNotifications(supervisor, nc, env, bc) // ping the supervisor when events received
<-supervisor.Done() // wait until done
return nil
})
singleton := action.Singleton(heartbeat.Nats(nc, makeTickerSubject(env, bc)))
forever := action.Forever(time.Second * 3)
return a.Use(singleton, forever).Do(ctx)
}
func makeCheckpointKey(env, bc string) string {
return "stan:" + env + "." + bc
}
func makeTickerSubject(env, bc string, args ...string) string {
key := "ticker:" + env + "." + bc
if len(args) > 0 {
key += "." + strings.Join(args, ".")
}
return key
}