-
Notifications
You must be signed in to change notification settings - Fork 134
/
event_store.go
59 lines (48 loc) · 1.56 KB
/
event_store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package repository
import (
"context"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/armadaproject/armada/internal/common/eventutil"
"github.com/armadaproject/armada/internal/common/pulsarutils"
"github.com/armadaproject/armada/internal/common/schedulers"
"github.com/armadaproject/armada/pkg/api"
)
type EventStore interface {
ReportEvents(context.Context, []*api.EventMessage) error
}
type TestEventStore struct {
ReceivedEvents []*api.EventMessage
}
func (es *TestEventStore) ReportEvents(_ context.Context, message []*api.EventMessage) error {
es.ReceivedEvents = append(es.ReceivedEvents, message...)
return nil
}
type StreamEventStore struct {
Producer pulsar.Producer
MaxAllowedMessageSize uint
}
func NewEventStore(producer pulsar.Producer, maxAllowedMessageSize uint) *StreamEventStore {
return &StreamEventStore{
Producer: producer, MaxAllowedMessageSize: maxAllowedMessageSize,
}
}
func (n *StreamEventStore) ReportEvents(ctx context.Context, apiEvents []*api.EventMessage) error {
if len(apiEvents) == 0 {
return nil
}
// Because (queue, userId, jobSetId) may differ between events,
// several sequences may be necessary.
sequences, err := eventutil.EventSequencesFromApiEvents(apiEvents)
if err != nil {
return err
}
if len(sequences) == 0 {
return nil
}
sequences = eventutil.CompactEventSequences(sequences)
sequences, err = eventutil.LimitSequencesByteSize(sequences, n.MaxAllowedMessageSize, true)
if err != nil {
return err
}
return pulsarutils.PublishSequences(ctx, n.Producer, sequences, schedulers.Legacy)
}