-
Notifications
You must be signed in to change notification settings - Fork 0
/
mediator.go
153 lines (125 loc) · 4.1 KB
/
mediator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package mediator
import (
"context"
"errors"
"fmt"
"reflect"
)
type RequestHandlerFunc func(ctx context.Context, request any) (any, error)
type RequestHandler[TRequest any, TResponse any] interface {
Handle(ctx context.Context, request TRequest) (TResponse, error)
}
type NotificationHandler[TNotification any] interface {
Handle(ctx context.Context, notification TNotification) error
}
type PipelineBehavior interface {
Handle(ctx context.Context, request any, next RequestHandlerFunc) (any, error)
}
var (
requestHandlers map[reflect.Type]any = make(map[reflect.Type]any)
notificationHandlers map[reflect.Type][]any = make(map[reflect.Type][]any)
pipelineBehaviors = []PipelineBehavior{}
)
func RegisterRequestHandler[TRequest any, TResponse any](
handler RequestHandler[TRequest, TResponse],
) error {
var request TRequest
requestType := reflect.TypeOf(request)
if _, contains := requestHandlers[requestType]; contains {
return fmt.Errorf("handler for request type '%s' is already registered", requestType.String())
}
requestHandlers[requestType] = handler
return nil
}
func RegisterPipelineBehavior(pipelineBehavior PipelineBehavior) {
pipelineBehaviors = append(pipelineBehaviors, pipelineBehavior)
}
func RegisterNotificationHandler[TNotification any](handler NotificationHandler[TNotification]) {
var notification TNotification
notificationType := reflect.TypeOf(notification)
notificationHandlers[notificationType] = append(
notificationHandlers[notificationType],
handler,
)
}
func Send[TRequest any, TResponse any](ctx context.Context, request TRequest) (TResponse, error) {
requestType := reflect.TypeOf(request)
var response TResponse
handler, registered := requestHandlers[requestType]
if !registered {
return response, fmt.Errorf(
"request handler for request type '%s' is not registered", requestType.String(),
)
}
typedRequestHandler, ok := handler.(RequestHandler[TRequest, TResponse])
if !ok {
return response, fmt.Errorf(
"failed to convert handler '%s' to typed handler 'RequestHandler[%s, %s]'",
typeName(handler),
typeName(request),
typeName(response),
)
}
numBehaviors := len(pipelineBehaviors)
if numBehaviors < 1 {
return typedRequestHandler.Handle(ctx, request)
}
var behavior RequestHandlerFunc = func(ctx context.Context, req any) (any, error) {
typedRequest, ok := req.(TRequest)
if !ok {
return response, fmt.Errorf(
"incorrect request type expected '%s' got '%s'",
typeName(req),
typeName(request),
)
}
return typedRequestHandler.Handle(ctx, typedRequest)
}
for i := numBehaviors - 1; i >= 0; i-- {
pipeline := pipelineBehaviors[i]
// Create new behavior through a func to avoid infinite loops of self-reference.
// Passing in the parameters through the function avoids that.
behavior = func(pipelineBehavior PipelineBehavior, next RequestHandlerFunc) RequestHandlerFunc {
return func(ctx context.Context, request any) (any, error) {
return pipeline.Handle(ctx, request, next)
}
}(pipeline, behavior)
}
untypedResponse, err := behavior(ctx, request)
if err != nil {
return response, err
}
response, ok = untypedResponse.(TResponse)
if !ok {
return response, fmt.Errorf(
"failed to convert response of type '%s' to type '%s'",
typeName(untypedResponse),
typeName(response),
)
}
return response, nil
}
func Publish[TNotification any](ctx context.Context, notification TNotification) error {
notificationType := reflect.TypeOf(notification)
handlers := notificationHandlers[notificationType]
if len(handlers) < 1 {
return nil
}
var aggregateError error
for _, handler := range handlers {
typedHandler, _ := handler.(NotificationHandler[TNotification])
if err := typedHandler.Handle(ctx, notification); err != nil {
// Poor "substitute" for actual aggregate errors.
handleErr := fmt.Errorf(
"failed to execute notification handler '%s' with error: %w",
typeName(typedHandler),
err,
)
aggregateError = errors.Join(handleErr, aggregateError)
}
}
return aggregateError
}
func typeName(obj any) string {
return reflect.TypeOf(obj).String()
}