/
repository.go
35 lines (29 loc) · 1.04 KB
/
repository.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package notice
import (
"context"
"time"
"github.com/altairsix/eventsource"
"github.com/altairsix/pkg/eventsourcex"
"github.com/altairsix/pkg/tracer"
"github.com/nats-io/go-nats"
"github.com/opentracing/opentracing-go/log"
)
// WithConsistentRead provides a faux consistent read. Should wrap WithNotifier to ensure that
// the NoticesSubject.{ID} is subscribed to prior to the command being executed.
func WithConsistentRead(repo eventsourcex.Repository, nc *nats.Conn, subject string, timeout time.Duration) eventsourcex.Repository {
return eventsourcex.RepositoryFunc(func(ctx context.Context, cmd eventsource.Command) (int, error) {
version, err := repo.Apply(ctx, cmd)
if err != nil {
return 0, err
}
child, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
segment := tracer.SegmentFromContext(ctx)
segment.Info("eventsource.notice_published",
log.String("subject", subject),
log.String("id", cmd.AggregateID()),
)
nc.RequestWithContext(child, subject, []byte(cmd.AggregateID()))
return version, nil
})
}