generated from dogmatiq/template-go
-
Notifications
You must be signed in to change notification settings - Fork 2
/
handler.go
116 lines (92 loc) 路 3.11 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package fixtures
import (
"context"
"github.com/dogmatiq/infix/handler"
"github.com/dogmatiq/infix/parcel"
"github.com/dogmatiq/infix/persistence"
)
// HandlerStub is a test implementation of the handler.Provider interface.
type HandlerStub struct {
handler.Handler
HandleMessageFunc func(context.Context, handler.UnitOfWork, parcel.Parcel) error
}
// HandleMessage handles the message in p.
func (h *HandlerStub) HandleMessage(ctx context.Context, w handler.UnitOfWork, p parcel.Parcel) error {
if h.HandleMessageFunc != nil {
return h.HandleMessageFunc(ctx, w, p)
}
if h.Handler != nil {
return h.Handler.HandleMessage(ctx, w, p)
}
return nil
}
// UnitOfWorkStub is a test implementation of the handler.UnitOfWork interface.
type UnitOfWorkStub struct {
Commands []parcel.Parcel
Events []parcel.Parcel
Timeouts []parcel.Parcel
Operations []persistence.Operation
Deferred []handler.DeferFunc
}
// ExecuteCommand updates the unit-of-work to execute the command in p.
func (w *UnitOfWorkStub) ExecuteCommand(p parcel.Parcel) {
w.Commands = append(w.Commands, p)
}
// ScheduleTimeout updates the unit-of-work to schedule the timeout in p.
func (w *UnitOfWorkStub) ScheduleTimeout(p parcel.Parcel) {
w.Timeouts = append(w.Timeouts, p)
}
// RecordEvent updates the unit-of-work to record the event in p.
func (w *UnitOfWorkStub) RecordEvent(p parcel.Parcel) {
w.Events = append(w.Events, p)
}
// Do updates the unit-of-work to include op in the persistence batch.
func (w *UnitOfWorkStub) Do(op persistence.Operation) {
w.Operations = append(w.Operations, op)
}
// Defer registers fn to be called when the unit-of-work is complete.
func (w *UnitOfWorkStub) Defer(fn handler.DeferFunc) {
w.Deferred = append(w.Deferred, fn)
}
// Succeed invokes the unit-of-work's deferred functions with the given result.
func (w *UnitOfWorkStub) Succeed(res handler.Result) {
w.invokeDeferred(res, nil)
}
// Fail invokes the unit-of-work's deferred functions with the given error.
func (w *UnitOfWorkStub) Fail(err error) {
w.invokeDeferred(handler.Result{}, err)
}
func (w *UnitOfWorkStub) invokeDeferred(res handler.Result, err error) {
for i := len(w.Deferred) - 1; i >= 0; i-- {
w.Deferred[i](res, err)
}
}
// AcknowledgerStub is a test implementation of the handler.Acknowledger
// interface.
type AcknowledgerStub struct {
handler.Acknowledger
AckFunc func(context.Context, persistence.Batch) (persistence.Result, error)
NackFunc func(context.Context, error) error
}
// Ack acknowledges the message, ensuring it is not handled again.
//
// b is the batch from the unit-of-work.
func (a *AcknowledgerStub) Ack(ctx context.Context, b persistence.Batch) (persistence.Result, error) {
if a.AckFunc != nil {
return a.AckFunc(ctx, b)
}
if a.Acknowledger != nil {
return a.Acknowledger.Ack(ctx, b)
}
return persistence.Result{}, nil
}
// Nack negatively-acknowledges the message, causing it to be retried.
func (a *AcknowledgerStub) Nack(ctx context.Context, cause error) error {
if a.NackFunc != nil {
return a.NackFunc(ctx, cause)
}
if a.Acknowledger != nil {
return a.Acknowledger.Nack(ctx, cause)
}
return nil
}