-
Notifications
You must be signed in to change notification settings - Fork 83
/
bus.go
297 lines (257 loc) · 6.85 KB
/
bus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Package bus implements a message bus which is a common data model and a messaging infrastructure
// to allow different modules to communicate locally or remotely.
package bus
import (
"errors"
"io"
"sync"
"time"
"go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/pkg/run"
)
type (
payload interface{}
// MessageID the identity of a Message.
MessageID uint64
// Future represents a future result of an asynchronous publishing.
Future interface {
Get() (Message, error)
GetAll() ([]Message, error)
}
)
// Message is send on the bus to all subscribed listeners.
type Message struct {
payload payload
node string
id MessageID
batchMode bool
}
// ID outputs the MessageID of the Message.
func (m Message) ID() MessageID {
return m.id
}
// Data returns the data wrapped in the Message.
func (m Message) Data() interface{} {
return m.payload
}
// Node returns the node name of the Message.
func (m Message) Node() string {
return m.node
}
// BatchModeEnabled returns whether the Message is sent in batch mode.
func (m Message) BatchModeEnabled() bool {
return m.batchMode
}
// NewMessage returns a new Message with a MessageID and embed data.
func NewMessage(id MessageID, data interface{}) Message {
return Message{id: id, node: "local", payload: data}
}
// NewBatchMessageWithNode returns a new Message with a MessageID and NodeID and embed data.
func NewBatchMessageWithNode(id MessageID, node string, data interface{}) Message {
return Message{id: id, node: node, payload: data, batchMode: true}
}
// NewMessageWithNode returns a new Message with a MessageID and NodeID and embed data.
func NewMessageWithNode(id MessageID, node string, data interface{}) Message {
return Message{id: id, node: node, payload: data}
}
// MessageListener is the signature of functions that can handle an EventMessage.
type MessageListener interface {
Rev(message Message) Message
}
// Subscriber allow subscribing a Topic's messages.
type Subscriber interface {
Subscribe(topic Topic, listener MessageListener) error
}
// Publisher allow sending Messages to a Topic.
type Publisher interface {
Publish(topic Topic, message ...Message) (Future, error)
}
// Broadcaster allow sending Messages to a Topic and receiving the responses.
type Broadcaster interface {
Broadcast(timeout time.Duration, topic Topic, message Message) ([]Future, error)
}
type channel chan event
type chType int
var (
chTypeUnidirectional chType
chTypeBidirectional chType = 1
)
// Topic is the object which messages are sent to or received from.
type Topic struct {
id string
typ chType
}
// UniTopic returns an unary Topic.
func UniTopic(id string) Topic {
return Topic{id: id, typ: chTypeUnidirectional}
}
// BiTopic returns bidirectional Topic.
func BiTopic(id string) Topic {
return Topic{id: id, typ: chTypeBidirectional}
}
// String returns the string representation of the Topic.
func (t Topic) String() string {
return t.id
}
// The Bus allows publish-subscribe-style communication between components.
type Bus struct {
topics map[Topic][]channel
closer *run.Closer
mutex sync.RWMutex
}
// NewBus returns a Bus.
func NewBus() *Bus {
b := new(Bus)
b.topics = make(map[Topic][]channel)
b.closer = run.NewCloser(0)
return b
}
var (
// ErrTopicNotExist hints the topic published doesn't exist.
ErrTopicNotExist = errors.New("the topic does not exist")
errTopicEmpty = errors.New("the topic is empty")
errListenerEmpty = errors.New("the message listener is empty")
errEmptyFuture = errors.New("can't invoke Get() on an empty future")
)
type emptyFuture struct{}
func (e *emptyFuture) Get() (Message, error) {
return Message{}, errEmptyFuture
}
func (e *emptyFuture) GetAll() ([]Message, error) {
return nil, errEmptyFuture
}
type localFuture struct {
retCh chan Message
retCount int
}
func (l *localFuture) Get() (Message, error) {
if l.retCount < 1 {
return Message{}, io.EOF
}
m, ok := <-l.retCh
if ok {
l.retCount--
return m, nil
}
return Message{}, io.EOF
}
func (l *localFuture) GetAll() ([]Message, error) {
var globalErr error
ret := make([]Message, 0, l.retCount)
for {
m, err := l.Get()
if errors.Is(err, io.EOF) {
return ret, globalErr
}
if err != nil {
globalErr = multierr.Append(globalErr, err)
continue
}
ret = append(ret, m)
}
}
type event struct {
f Future
m Message
}
// Publish sends Messages to a Topic.
func (b *Bus) Publish(topic Topic, message ...Message) (Future, error) {
if topic.id == "" {
return nil, errTopicEmpty
}
b.mutex.RLock()
defer b.mutex.RUnlock()
cc, exit := b.topics[topic]
if !exit {
return nil, ErrTopicNotExist
}
var f Future
switch topic.typ {
case chTypeUnidirectional:
f = nil
case chTypeBidirectional:
f = &localFuture{retCount: len(message), retCh: make(chan Message)}
}
for _, each := range cc {
for _, m := range message {
go func(ch channel, message Message) {
if !b.closer.AddRunning() {
return
}
defer b.closer.Done()
select {
case <-b.closer.CloseNotify():
return
case ch <- event{
m: message,
f: f,
}:
}
}(each, m)
}
}
if f == nil {
return &emptyFuture{}, nil
}
return f, nil
}
// Subscribe adds an MessageListener to be called when a message of a Topic is posted.
func (b *Bus) Subscribe(topic Topic, listener MessageListener) error {
if topic.id == "" {
return errTopicEmpty
}
if listener == nil {
return errListenerEmpty
}
b.mutex.Lock()
defer b.mutex.Unlock()
if _, exist := b.topics[topic]; !exist {
b.topics[topic] = make([]channel, 0)
}
ch := make(channel)
list := b.topics[topic]
list = append(list, ch)
b.topics[topic] = list
go func(listener MessageListener, ch channel) {
for {
c, ok := <-ch
if ok {
ret := listener.Rev(c.m)
if c.f == nil {
continue
}
if lf, ok := c.f.(*localFuture); ok {
lf.retCh <- ret
}
} else {
break
}
}
}(listener, ch)
return nil
}
// Close a Bus until all Messages are sent to Subscribers.
func (b *Bus) Close() {
b.closer.CloseThenWait()
for _, chs := range b.topics {
for _, ch := range chs {
close(ch)
}
}
}