forked from kubemq-io/kubemq-go
/
events_store_client.go
133 lines (122 loc) · 3.09 KB
/
events_store_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package kubemq
import (
"context"
"fmt"
)
type EventsStoreClient struct {
client *Client
}
type EventsStoreSubscription struct {
Channel string
Group string
ClientId string
SubscriptionType SubscriptionOption
}
func (es *EventsStoreSubscription) Complete(opts *Options) *EventsStoreSubscription {
if es.ClientId == "" {
es.ClientId = opts.clientId
}
return es
}
func (es *EventsStoreSubscription) Validate() error {
if es.Channel == "" {
return fmt.Errorf("events store subscription must have a channel")
}
if es.ClientId == "" {
return fmt.Errorf("events store subscription must have a clientId")
}
if es.SubscriptionType == nil {
return fmt.Errorf("events store subscription must have a subscription type")
}
return nil
}
func NewEventsStoreClient(ctx context.Context, op ...Option) (*EventsStoreClient, error) {
client, err := NewClient(ctx, op...)
if err != nil {
return nil, err
}
return &EventsStoreClient{
client: client,
}, nil
}
func (es *EventsStoreClient) Send(ctx context.Context, message *EventStore) (*EventStoreResult, error) {
if err:=es.isClientReady();err!=nil{
return nil,err
}
message.transport = es.client.transport
return es.client.SetEventStore(message).Send(ctx)
}
func (es *EventsStoreClient) Stream(ctx context.Context, onResult func(result *EventStoreResult, err error)) (func(msg *EventStore) error, error) {
if err:=es.isClientReady();err!=nil{
return nil,err
}
if onResult == nil {
return nil, fmt.Errorf("events stream result callback function is required")
}
errCh := make(chan error, 1)
eventsCh := make(chan *EventStore, 1)
sendFunc := func(msg *EventStore) error {
select {
case eventsCh <- msg:
return nil
case <-ctx.Done():
return fmt.Errorf("context canceled during events message sending")
}
}
eventsResultCh := make(chan *EventStoreResult, 1)
go es.client.StreamEventsStore(ctx, eventsCh, eventsResultCh, errCh)
go func() {
for {
select {
case result := <-eventsResultCh:
onResult(result, nil)
case err := <-errCh:
onResult(nil, err)
case <-ctx.Done():
return
}
}
}()
return sendFunc, nil
}
func (es *EventsStoreClient) Subscribe(ctx context.Context, request *EventsStoreSubscription, onEvent func(msg *EventStoreReceive, err error)) error {
if err:=es.isClientReady();err!=nil{
return err
}
if onEvent == nil {
return fmt.Errorf("events store subscription callback function is required")
}
if err := request.Complete(es.client.opts).Validate(); err != nil {
return err
}
errCh := make(chan error, 1)
eventsCh, err := es.client.SubscribeToEventsStoreWithRequest(ctx, request, errCh)
if err != nil {
return err
}
go func() {
for {
select {
case msg := <-eventsCh:
onEvent(msg, nil)
case err := <-errCh:
onEvent(nil, err)
case <-ctx.Done():
return
}
}
}()
return nil
}
func (es *EventsStoreClient) Close() error {
if err:=es.isClientReady();err!=nil{
return err
}
return es.client.Close()
}
func (es *EventsStoreClient) isClientReady() error {
if es.client==nil {
return fmt.Errorf("client is not initialized")
}
return nil
}