forked from minio/minio
-
Notifications
You must be signed in to change notification settings - Fork 14
/
crdt_broadcaster.go
67 lines (61 loc) · 1.45 KB
/
crdt_broadcaster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package s3x
import (
"context"
pb "github.com/RTradeLtd/TxPB/v3/go"
)
//crdtBroadcaster implements crdt.Broadcaster using a pb.PubSubAPIClient
type crdtBroadcaster struct {
topic string
client pb.PubSubAPI_PubSubClient
next chan []byte
err error //only read from if next is closed
}
// newCrdtBroadcaster builds a crdtBroadcaster, ctx must be closed after use to release resources.
func newCrdtBroadcaster(ctx context.Context, api pb.PubSubAPIClient, topic string) (*crdtBroadcaster, error) {
client, err := api.PubSub(ctx)
if err != nil {
return nil, err
}
if err := client.Send(&pb.PubSubRequest{
RequestType: pb.PSREQTYPE_PS_SUBSCRIBE,
Topics: []string{topic},
}); err != nil {
return nil, err
}
next := make(chan []byte)
b := &crdtBroadcaster{
topic: topic,
client: client,
next: next,
}
go func() {
for {
resp, err := client.Recv()
if err != nil {
b.err = err
close(next)
return
}
for _, m := range resp.GetMessage() {
next <- m.GetData()
}
}
}()
return b, nil
}
// Broadcast sends payload to other replicas.
func (b *crdtBroadcaster) Broadcast(data []byte) error {
return b.client.Send(&pb.PubSubRequest{
RequestType: pb.PSREQTYPE_PS_PUBLISH,
Topics: []string{b.topic},
Data: data,
})
}
// Next obtains the next payload received from the network.
func (b *crdtBroadcaster) Next() ([]byte, error) {
data, ok := <-b.next
if !ok {
return nil, b.err
}
return data, nil
}