/
event_handler_recorder.go
176 lines (143 loc) · 5.65 KB
/
event_handler_recorder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package eventerrors
import (
"context"
"github.com/AltScore/gothic/v2/pkg/ids"
"github.com/AltScore/gothic/v2/pkg/xcontext"
"github.com/AltScore/gothic/v2/pkg/xerrors"
eh "github.com/looplab/eventhorizon"
"github.com/looplab/eventhorizon/repo/mongodb"
"github.com/looplab/eventhorizon/uuid"
"github.com/totemcaf/gollections/ptrs"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
"time"
)
// RetriableError is a marker to identify an error that can be retried.
type RetriableError interface {
IsRetriable() bool
}
type EventError struct {
Id ids.Id `bson:"_id"`
CreatedAt time.Time `bson:"createdAt"`
UserId *ids.Id `bson:"userId"` // The user that caused the error, if present
Tenant *string `bson:"tenant"` // The tenant that caused the error, if present
Err string `bson:"error"` // The error returned by the event handler
Event eh.Event `bson:"event"` // The event that caused the error
Host string `bson:"host"` // This is the machine name
}
var _ eh.Entity = (*EventError)(nil)
func (e EventError) EntityID() uuid.UUID { return e.Id }
// EventHandlerErrorRecorder is a wrapper to a EventHandler to catch the returned errors from a target event handler,
// and record them to process them.
type EventHandlerErrorRecorder struct {
logger *zap.Logger
target eh.EventBus
store eh.WriteRepo
}
// NewEventHandlerErrorRecorder returns a new instance of EventHandlerErrorRecorder using the mongo client to
// record the events in the indicated database and collection.
func NewEventHandlerErrorRecorder(logger *zap.Logger, client *mongo.Client, databaseName, collectionName string, target eh.EventBus) (*EventHandlerErrorRecorder, error) {
xerrors.EnsureNotEmpty(logger, "logger")
xerrors.EnsureNotEmpty(client, "client")
xerrors.EnsureNotEmpty(databaseName, "databaseName")
xerrors.EnsureNotEmpty(collectionName, "collectionName")
xerrors.EnsureNotEmpty(target, "target")
logger = logger.Named(target.HandlerType().String() + " recorder")
logger.Info("creating event handler error recorder", zap.String("databaseName", databaseName), zap.String("collectionName", collectionName))
store, err := mongodb.NewRepoWithClient(client, databaseName, collectionName)
if err != nil {
return nil, err
}
return &EventHandlerErrorRecorder{logger: logger, target: target, store: store}, nil
}
var _ eh.EventBus = (*EventHandlerErrorRecorder)(nil)
// AddHandler implements the AddHandler method of the EventBus interface.
func (e *EventHandlerErrorRecorder) AddHandler(ctx context.Context, matcher eh.EventMatcher, handler eh.EventHandler) error {
return e.target.AddHandler(ctx, matcher, e.wrap(handler))
}
// HandlerType implements the HandlerType method of the EventBus interface.
func (e *EventHandlerErrorRecorder) HandlerType() eh.EventHandlerType {
return e.target.HandlerType()
}
// HandleEvent implements the HandleEvent method of the EventBus interface.
func (e *EventHandlerErrorRecorder) HandleEvent(ctx context.Context, event eh.Event) error {
return e.target.HandleEvent(ctx, event)
}
// Errors implements the Errors method of the EventBus interface.
func (e *EventHandlerErrorRecorder) Errors() <-chan error {
return e.target.Errors()
}
// Close implements the Close method of the EventBus interface.
func (e *EventHandlerErrorRecorder) Close() error {
return e.target.Close()
}
// wrap wraps the handler to catch the returned errors.
func (e *EventHandlerErrorRecorder) wrap(handler eh.EventHandler) eh.EventHandler {
return wrapper{
handler: handler,
recorder: e,
}
}
// persistError persists the error in the database for later processing.
// Current user and tenant in context are recorded.
func (e *EventHandlerErrorRecorder) persistError(ctx context.Context, event eh.Event, err error) error {
userId, tenant := e.getUserAndTenant(ctx)
eventError := EventError{
Id: ids.New(),
CreatedAt: event.Timestamp(),
UserId: userId,
Tenant: tenant,
Err: err.Error(),
Event: event,
}
if err := e.store.Save(ctx, &eventError); err != nil {
e.logger.Error("could not save event error", zap.Error(err))
// TODO send error in error channel
return err
}
return nil
}
func (e *EventHandlerErrorRecorder) getUserAndTenant(ctx context.Context) (*ids.Id, *string) {
if user, err := xcontext.GetUser(ctx); err == nil {
id := user.Id()
return &id, ptrs.Ptr(user.Tenant())
}
if t, found := xcontext.GetTenant(ctx); found {
return nil, &t
}
return nil, nil
}
// wrapper is a wrapper to a EventHandler to catch the returned errors from the target event handler,
type wrapper struct {
handler eh.EventHandler
recorder *EventHandlerErrorRecorder
}
var _ eh.EventHandler = (*wrapper)(nil)
func (w wrapper) HandlerType() eh.EventHandlerType {
return w.handler.HandlerType()
}
func (w wrapper) HandleEvent(ctx context.Context, event eh.Event) error {
err := w.handler.HandleEvent(ctx, event)
if err == nil {
return nil
}
if retryable, ok := err.(RetriableError); ok && retryable.IsRetriable() {
// Error is retriable, so we return it to indicate that the event was not handled.
return err
}
w.recorder.logger.Error(
"error in event handler",
zap.Error(err),
zap.String("event", event.EventType().String()),
zap.String("agg_type", event.AggregateType().String()),
zap.String("agg_id", event.AggregateID().String()),
zap.Int("agg_version", event.Version()),
zap.String("handler", w.handler.HandlerType().String()),
)
if err := w.recorder.persistError(ctx, event, err); err != nil {
// Failed to persist the error, so we return it to indicate that the event was not handled.
return err
}
// Error is not retriable, so we return nil to indicate that the event was handled.
return nil
}