-
Notifications
You must be signed in to change notification settings - Fork 13
/
socket.go
139 lines (115 loc) · 3.38 KB
/
socket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// Copyright (c) 2014-2017 Bitmark Inc.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package zmqutil
import (
"github.com/bitmark-inc/bitmarkd/util"
"github.com/bitmark-inc/logger"
zmq "github.com/pebbe/zmq4"
"time"
)
const (
heartbeatInterval = 15 * time.Second
heartbeatTimeout = 60 * time.Second
heartbeatTTL = 120 * time.Second
)
// return a pair of connected push/pull sockets
// for shutdown signalling
func NewSignalPair(signal string) (*zmq.Socket, *zmq.Socket, error) {
// send half of signalling channel
push, err := zmq.NewSocket(zmq.PUSH)
if nil != err {
return nil, nil, err
}
push.SetLinger(0)
err = push.Bind(signal)
if nil != err {
push.Close()
return nil, nil, err
}
// receive half of signalling channel
pull, err := zmq.NewSocket(zmq.PULL)
if nil != err {
push.Close()
return nil, nil, err
}
pull.SetLinger(0)
err = pull.Connect(signal)
if nil != err {
push.Close()
pull.Close()
return nil, nil, err
}
return push, pull, nil
}
// bind a list of addresses
// creates up to 2 sockets for separate IPv4 and IPv6 traffic
func NewBind(log *logger.L, socketType zmq.Type, zapDomain string, privateKey []byte, publicKey []byte, listen []*util.Connection) (*zmq.Socket, *zmq.Socket, error) {
socket4 := (*zmq.Socket)(nil) // IPv4 traffic
socket6 := (*zmq.Socket)(nil) // IPv6 traffic
err := error(nil)
// allocate IPv4 and IPv6 sockets
for i, address := range listen {
bindTo, v6 := address.CanonicalIPandPort("tcp://")
if v6 {
if nil == socket6 {
socket6, err = NewServerSocket(socketType, zapDomain, privateKey, publicKey, v6)
}
} else {
if nil == socket4 {
socket4, err = NewServerSocket(socketType, zapDomain, privateKey, publicKey, v6)
}
}
if nil != err {
goto fail
}
if v6 {
err = socket6.Bind(bindTo)
} else {
err = socket4.Bind(bindTo)
}
if nil != err {
log.Errorf("cannot bind[%d]: %q error: %v", i, bindTo, err)
goto fail
}
log.Infof("bind[%d]: %q IPv6: %v", i, bindTo, v6)
}
return socket4, socket6, nil
// if an arror close any open sockets
fail:
if nil == socket4 {
socket4.Close()
}
if nil != socket6 {
socket6.Close()
}
return nil, nil, err
}
// create a socket suitable for a server side connection
func NewServerSocket(socketType zmq.Type, zapDomain string, privateKey []byte, publicKey []byte, v6 bool) (*zmq.Socket, error) {
socket, err := zmq.NewSocket(socketType)
if nil != err {
return nil, err
}
// allow any client to connect
//zmq.AuthAllow(zapDomain, "127.0.0.1/8")
//zmq.AuthAllow(zapDomain, "::1")
zmq.AuthCurveAdd(zapDomain, zmq.CURVE_ALLOW_ANY)
// domain is servers public key
socket.SetCurveServer(1)
//socket.SetCurvePublickey(publicKey)
socket.SetCurveSecretkey(string(privateKey))
socket.SetZapDomain(zapDomain)
socket.SetIdentity(string(publicKey)) // just use public key for identity
socket.SetIpv6(v6) // conditionally set IPv6 state
// socket.SetSndtimeo(SEND_TIMEOUT)
// socket.SetLinger(LINGER_TIME)
// socket.SetRouterMandatory(0) // discard unroutable packets
// socket.SetRouterHandover(true) // allow quick reconnect for a given public key
// socket.SetImmediate(false) // queue messages sent to disconnected peer
// heartbeat
socket.SetHeartbeatIvl(heartbeatInterval)
socket.SetHeartbeatTimeout(heartbeatTimeout)
socket.SetHeartbeatTtl(heartbeatTTL)
return socket, nil
}