/
integration_events.go
103 lines (85 loc) · 3.17 KB
/
integration_events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package handlers
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"github.com/LordMoMA/Intelli-Mall/baskets/basketspb"
"github.com/LordMoMA/Intelli-Mall/depot/depotpb"
"github.com/LordMoMA/Intelli-Mall/internal/am"
"github.com/LordMoMA/Intelli-Mall/internal/ddd"
"github.com/LordMoMA/Intelli-Mall/internal/errorsotel"
"github.com/LordMoMA/Intelli-Mall/internal/registry"
"github.com/LordMoMA/Intelli-Mall/ordering/internal/application"
"github.com/LordMoMA/Intelli-Mall/ordering/internal/application/commands"
"github.com/LordMoMA/Intelli-Mall/ordering/internal/domain"
)
type integrationHandlers[T ddd.Event] struct {
app application.App
}
var _ ddd.EventHandler[ddd.Event] = (*integrationHandlers[ddd.Event])(nil)
func NewIntegrationEventHandlers(reg registry.Registry, app application.App, mws ...am.MessageHandlerMiddleware) am.MessageHandler {
return am.NewEventHandler(reg, integrationHandlers[ddd.Event]{
app: app,
}, mws...)
}
func RegisterIntegrationEventHandlers(subscriber am.MessageSubscriber, handlers am.MessageHandler) (err error) {
_, err = subscriber.Subscribe(basketspb.BasketAggregateChannel, handlers, am.MessageFilter{
basketspb.BasketCheckedOutEvent,
}, am.GroupName("ordering-baskets"))
if err != nil {
return err
}
_, err = subscriber.Subscribe(depotpb.ShoppingListAggregateChannel, handlers, am.MessageFilter{
depotpb.ShoppingListCompletedEvent,
}, am.GroupName("ordering-depot"))
return
}
func (h integrationHandlers[T]) HandleEvent(ctx context.Context, event T) (err error) {
span := trace.SpanFromContext(ctx)
defer func(started time.Time) {
if err != nil {
span.AddEvent(
"Encountered an error handling integration event",
trace.WithAttributes(errorsotel.ErrAttrs(err)...),
)
}
span.AddEvent("Handled integration event", trace.WithAttributes(
attribute.Int64("TookMS", time.Since(started).Milliseconds()),
))
}(time.Now())
span.AddEvent("Handling integration event", trace.WithAttributes(
attribute.String("Event", event.EventName()),
))
switch event.EventName() {
case basketspb.BasketCheckedOutEvent:
return h.onBasketCheckedOut(ctx, event)
case depotpb.ShoppingListCompletedEvent:
return h.onShoppingListCompleted(ctx, event)
}
return nil
}
func (h integrationHandlers[T]) onBasketCheckedOut(ctx context.Context, event ddd.Event) error {
payload := event.Payload().(*basketspb.BasketCheckedOut)
items := make([]domain.Item, len(payload.GetItems()))
for i, item := range payload.GetItems() {
items[i] = domain.Item{
ProductID: item.GetProductId(),
StoreID: item.GetStoreId(),
StoreName: item.GetStoreName(),
ProductName: item.GetProductName(),
Price: item.GetPrice(),
Quantity: int(item.GetQuantity()),
}
}
return h.app.CreateOrder(ctx, commands.CreateOrder{
ID: payload.GetId(),
CustomerID: payload.GetCustomerId(),
PaymentID: payload.GetPaymentId(),
Items: items,
})
}
func (h integrationHandlers[T]) onShoppingListCompleted(ctx context.Context, event ddd.Event) error {
payload := event.Payload().(*depotpb.ShoppingListCompleted)
return h.app.ReadyOrder(ctx, commands.ReadyOrder{ID: payload.GetOrderId()})
}