forked from sei-protocol/sei-tendermint
-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscription.go
92 lines (76 loc) · 2.69 KB
/
subscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package pubsub
import (
"context"
"errors"
"github.com/google/uuid"
abci "github.com/badrootd/sei-tendermint/abci/types"
"github.com/badrootd/sei-tendermint/internal/libs/queue"
"github.com/badrootd/sei-tendermint/types"
)
var (
// ErrUnsubscribed is returned by Next when the client has unsubscribed.
ErrUnsubscribed = errors.New("subscription removed by client")
// ErrTerminated is returned by Next when the subscription was terminated by
// the publisher.
ErrTerminated = errors.New("subscription terminated by publisher")
)
// A Subscription represents a client subscription for a particular query.
type Subscription struct {
id string
queue *queue.Queue // open until the subscription ends
stopErr error // after queue is closed, the reason why
}
// newSubscription returns a new subscription with the given queue capacity.
func newSubscription(quota, limit int) (*Subscription, error) {
queue, err := queue.New(queue.Options{
SoftQuota: quota,
HardLimit: limit,
})
if err != nil {
return nil, err
}
return &Subscription{
id: uuid.NewString(),
queue: queue,
}, nil
}
// Next blocks until a message is available, ctx ends, or the subscription
// ends. Next returns ErrUnsubscribed if s was unsubscribed, ErrTerminated if
// s was terminated by the publisher, or a context error if ctx ended without a
// message being available.
func (s *Subscription) Next(ctx context.Context) (Message, error) {
next, err := s.queue.Wait(ctx)
if errors.Is(err, queue.ErrQueueClosed) {
return Message{}, s.stopErr
} else if err != nil {
return Message{}, err
}
return next.(Message), nil
}
// ID returns the unique subscription identifier for s.
func (s *Subscription) ID() string { return s.id }
// publish transmits msg to the subscriber. It reports a queue error if the
// queue cannot accept any further messages.
func (s *Subscription) publish(msg Message) error { return s.queue.Add(msg) }
// stop terminates the subscription with the given error reason.
func (s *Subscription) stop(err error) {
if err == nil {
panic("nil stop error")
}
s.stopErr = err
s.queue.Close()
}
// Message glues data and events together.
type Message struct {
subID string
data types.EventData
events []abci.Event
}
// SubscriptionID returns the unique identifier for the subscription
// that produced this message.
func (msg Message) SubscriptionID() string { return msg.subID }
// Data returns an original data published.
func (msg Message) Data() types.EventData { return msg.data }
func (msg Message) LegacyData() types.LegacyEventData { return msg.data.ToLegacy() }
// Events returns events, which matched the client's query.
func (msg Message) Events() []abci.Event { return msg.events }