-
Notifications
You must be signed in to change notification settings - Fork 1
/
command.go
58 lines (48 loc) · 1.34 KB
/
command.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package cockroachdb
import (
"context"
"encoding/json"
"github.com/adlerhurst/eventstore/v2"
)
type command struct {
eventstore.Command
payload []byte
aggregate eventstore.TextSubjects
id string
sequence uint32
}
func commandsFromAggregates(ctx context.Context, aggregates []eventstore.Aggregate) (commands []*command, close func(), err error) {
commands = make([]*command, 0, len(aggregates))
for _, aggregate := range aggregates {
aggregateEvents, err := commandsFromAggregate(ctx, aggregate)
if err != nil {
return nil, func() {}, err
}
commands = append(commands, aggregateEvents...)
}
return commands,
func() {
for _, cmd := range commands {
cmd.payload = nil
commandPool.Put(cmd)
}
},
nil
}
func commandsFromAggregate(ctx context.Context, aggregate eventstore.Aggregate) ([]*command, error) {
commands := make([]*command, len(aggregate.Commands()))
for i, command := range aggregate.Commands() {
commands[i] = commandPool.Get()
commands[i].Command = command
commands[i].aggregate = aggregate.ID()
if command.Payload() != nil {
var err error
commands[i].payload, err = json.Marshal(command.Payload())
if err != nil {
logger.ErrorContext(ctx, "marshal payload failed", "cause", err, "action", commands[i].Action().Join("."))
return nil, err
}
}
}
return commands, nil
}