-
Notifications
You must be signed in to change notification settings - Fork 19
/
client.go
126 lines (111 loc) · 3.03 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Package zmq reference is taken from https://github.com/joakimofv/go-bitcoindclient which is a
// go wrapper around official zmq package https://github.com/pebbe/zmq4
package zmq
import (
"errors"
"sync"
"sync/atomic"
"github.com/babylonchain/vigilante/types"
"github.com/btcsuite/btcd/rpcclient"
"github.com/pebbe/zmq4"
)
var (
ErrSubscribeDisabled = errors.New("subscribe disabled (ZmqEndpoint was not set)")
ErrSubscribeExited = errors.New("subscription backend has exited")
ErrSubscriptionAlreadyActive = errors.New("active subscription already exists")
)
// Client is a client that provides methods for interacting with zmq4.
// Must be created with New and destroyed with Close.
// Clients are safe for concurrent use by multiple goroutines.
type Client struct {
rpcClient *rpcclient.Client
closed int32 // Set atomically.
wg sync.WaitGroup
quit chan struct{}
zmqEndpoint string
blockEventChan chan *types.BlockEvent
// ZMQ subscription related things.
zctx *zmq4.Context
zsub *zmq4.Socket
subs subscriptions
// subs.zfront --> zback is used like a channel to send messages to the zmqHandler goroutine.
// Have to use zmq4 sockets in place of native channels for communication from
// other functions to the goroutine, since it is constantly waiting on the zsub socket,
// it can't select on a channel at the same time but can poll on multiple sockets.
zback *zmq4.Socket
}
// New returns an initiated client, or an error.
func New(zmqEndpoint string, blockEventChan chan *types.BlockEvent, rpcClient *rpcclient.Client) (*Client, error) {
var (
zctx *zmq4.Context
zsub *zmq4.Socket
zback *zmq4.Socket
err error
c = &Client{
quit: make(chan struct{}),
rpcClient: rpcClient,
zmqEndpoint: zmqEndpoint,
}
)
// ZMQ Subscribe.
zctx, err = zmq4.NewContext()
if err != nil {
return nil, err
}
zsub, err = zctx.NewSocket(zmq4.SUB)
if err != nil {
return nil, err
}
if err = zsub.Connect(zmqEndpoint); err != nil {
return nil, err
}
zback, err = zctx.NewSocket(zmq4.PAIR)
if err != nil {
return nil, err
}
if err = zback.Bind("inproc://channel"); err != nil {
return nil, err
}
zfront, err := zctx.NewSocket(zmq4.PAIR)
if err != nil {
return nil, err
}
if err = zfront.Connect("inproc://channel"); err != nil {
return nil, err
}
c.zctx = zctx
c.zsub = zsub
c.subs.exited = make(chan struct{})
c.subs.zfront = zfront
c.zback = zback
c.blockEventChan = blockEventChan
c.wg.Add(1)
go c.zmqHandler()
return c, nil
}
// Close terminates the client and releases resources.
func (c *Client) Close() (err error) {
if !atomic.CompareAndSwapInt32(&c.closed, 0, 1) {
return errors.New("client already closed")
}
if c.zctx != nil {
c.zctx.SetRetryAfterEINTR(false)
c.subs.Lock()
select {
case <-c.subs.exited:
default:
if _, err = c.subs.zfront.SendMessage("term"); err != nil {
return err
}
}
c.subs.Unlock()
<-c.subs.exited
err = c.zctx.Term()
if err != nil {
return err
}
}
close(c.quit)
c.wg.Wait()
return nil
}