/
forwarder.go
45 lines (36 loc) · 1011 Bytes
/
forwarder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
package zmqpubsub
import (
zmq "github.com/alecthomas/gozmq"
)
// Forwarder is a zeromq forwarder device acting as a broker between
// multiple publishers and multiple subscribes.
type Forwarder struct {
ctx *zmq.Context
frontend *zmq.Socket
backend *zmq.Socket
options Broker
}
func NewForwarder(options Broker) (*Forwarder, error) {
var err error
b := new(Forwarder)
b.options = options
if b.ctx, err = GetGlobalContext(); err != nil {
return nil, err
}
// Publishers speak to the frontend socket
b.frontend, err = newSubSocketBound(options.SubscribeFilter, options.PubAddr)
if err != nil {
b.ctx.Close()
return nil, err
}
// Subscribers speak to the backend socket
if b.backend, err = newPubSocketBound(options.BufferSize, options.SubAddr); err != nil {
b.ctx.Close()
return nil, err
}
return b, nil
}
func (b *Forwarder) Run() error {
return zmq.Device(zmq.FORWARDER, b.frontend, b.backend)
}