forked from bbengfort/rtreq
/
server_async.go
183 lines (150 loc) · 4.66 KB
/
server_async.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package rtreq
import (
"context"
"fmt"
"sync"
pb "github.com/bbengfort/rtreq/msg"
zmq "github.com/pebbe/zmq4"
"golang.org/x/sync/errgroup"
)
//===========================================================================
// Asynchronous Server Transporter
//===========================================================================
// IPCAddr is the in process communcation socket for workers.
const IPCAddr = "ipc://workers.ipc"
// DefaultNWorkers is the number of workers allocated to handle clients.
const DefaultNWorkers = 16
// RouterServer responds to requests from other peers using a ROUTER socket.
type RouterServer struct {
sync.Mutex
Transporter
inproc *zmq.Socket // ipc DEALER socket to communicate with workers
nWorkers int // number of workers to initialize
group *errgroup.Group // group to manage worker go routines
workers []*Worker // worker threads to handle requests
}
// Run the server and listen for messages
func (s *RouterServer) Run() (err error) {
defer s.Close()
// Create the socket to talk to clients
if s.sock, err = s.context.NewSocket(zmq.ROUTER); err != nil {
return WrapError("could not create ROUTER socket", err)
}
// Bind the client socket to the external address
if err = s.sock.Bind(s.addr); err != nil {
return WrapError("could not bind '%s'", err, s.addr)
}
status("bound async server to %s with ROUTER socket\n", s.addr)
// Create the socket to talk to workers
if s.inproc, err = s.context.NewSocket(zmq.DEALER); err != nil {
return WrapError("could not create DEALER socket", err)
}
// Bind the workers socket to an inprocess address
if err = s.inproc.Bind(IPCAddr); err != nil {
return WrapError("could not bind '%s'", err, IPCAddr)
}
// Create the workers pool
s.workers = make([]*Worker, 0, s.nWorkers)
s.group, _ = errgroup.WithContext(context.Background())
for w := 0; w < s.nWorkers; w++ {
worker := new(Worker)
worker.Init(fmt.Sprintf("%s-%d", s.name, w+1), s.context)
s.workers = append(s.workers, worker)
s.group.Go(worker.Run)
}
info("initialized %d workers", s.nWorkers)
// Connect worker threads to clients via a queue proxy
if err = zmq.Proxy(s.sock, s.inproc, nil); err != nil {
if !s.stopped {
return WrapError("proxy interrupted", err)
}
}
if !s.stopped {
return s.group.Wait()
}
return nil
}
// Close the socket and clean up the connections.
func (s *RouterServer) Close() (err error) {
if err = s.inproc.Close(); err != nil {
warn("could not close in process socket: %s", err)
}
// Set linger to 0 so the connection closes immediately
if err = s.sock.SetLinger(0); err != nil {
return err
}
// Close the socket and return
return s.sock.Close()
}
// SetWorkers specifies the number of workers, if n is 0 uses DefaultNWorkers
func (s *RouterServer) SetWorkers(n int) {
if n == 0 {
n = DefaultNWorkers
}
s.nWorkers = n
}
// Shutdown the server and print the metrics out
func (s *RouterServer) Shutdown(path string) error {
if err := s.Transporter.Shutdown(); err != nil {
return err
}
// Collect all the metrics
for _, worker := range s.workers {
s.metrics.Append(worker.metrics)
}
status("%s", s.metrics)
if path != "" {
extra := map[string]interface{}{"server": "router"}
return s.metrics.Write(path, extra)
}
return nil
}
//===========================================================================
// Message Handling Workers
//===========================================================================
// Worker connects to an inprocess socket and handle client messages in
// parallel without sharing state. Workers have all the benefits of other
// transporters, but maintain local sockets.
type Worker struct {
Transporter
}
// Init the worker and connect it.
func (w *Worker) Init(name string, context *zmq.Context) {
w.addr = IPCAddr
w.context = context
w.name = name
w.metrics = new(Metrics)
w.metrics.Init()
var err error
w.sock, err = context.NewSocket(zmq.REP)
if err != nil {
w.sock = nil
warn("could not create worker REP socket: %s", err)
return
}
if err = w.sock.Connect(w.addr); err != nil {
w.sock = nil
warn("could not connect to '%s': %s", w.addr, err)
return
}
}
// Run the worker to listen for messages and respond to them.
func (w *Worker) Run() error {
debug("starting worker %s", w.name)
// Handle messages received on the receiver forever
for {
msg, err := w.recv()
if err != nil {
debug("error in %s: %s", w.name, err)
break
}
w.handle(msg)
}
return w.Close()
}
// Handle messages received by the worker
func (w *Worker) handle(message *pb.BasicMessage) {
info("received: %s\n", message.String())
reply := fmt.Sprintf("reply msg #%d from worker %s", w.nRecv, w.name)
w.send(reply)
}