/
socket.go
56 lines (45 loc) · 1.14 KB
/
socket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
package zmqpubsub
import (
zmq "github.com/alecthomas/gozmq"
)
func newPubSocket(bufferSize int) (*zmq.Socket, error) {
ctx, err := GetGlobalContext()
if err != nil {
return nil, err
}
sock, err := ctx.NewSocket(zmq.PUB)
if err != nil {
return nil, err
}
// prevent 0mq from infinitely buffering messages
for _, hwm := range []zmq.IntSocketOption{zmq.SNDHWM, zmq.RCVHWM} {
err = sock.SetSockOptInt(hwm, bufferSize)
if err != nil {
sock.Close()
return nil, err
}
}
return sock, nil
}
func newPubSocketBound(bufferSize int, addr string) (*zmq.Socket, error) {
sock, err := newPubSocket(bufferSize)
if err != nil {
return nil, err
}
return sock, sock.Bind(addr)
}
func newSubSocketBound(subfilter string, addr string) (*zmq.Socket, error) {
ctx, err := GetGlobalContext()
if err != nil {
return nil, err
}
sock, err := ctx.NewSocket(zmq.SUB)
if err != nil {
return nil, err
}
if err = sock.Bind(addr); err != nil {
return nil, err
}
return sock, sock.SetSockOptString(zmq.SUBSCRIBE, subfilter)
}