-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubsub.go
293 lines (230 loc) · 10.8 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
/*
<!--
Copyright (c) 2016 Christoph Berger. Some rights reserved.
Use of this text is governed by a Creative Commons Attribution Non-Commercial
Share-Alike License that can be found in the LICENSE.txt file.
The source code contained in this file may import third-party source code
whose licenses are provided in the respective license files.
-->
+++
title = "Message Queues Part 2: The PubSub Protocol"
description = "How to implement the publisher-subscriber protocol with nanomsg, Mangos, and Go."
author = "Christoph Berger"
email = "chris@appliedgo.net"
date = "2016-06-02"
publishdate = "2016-06-02"
categories = ["Distributed Computing"]
tags = ["Messaging", "Mangos", "PubSub"]
articletypes = ["Tutorial"]
+++
This is the second (and, for the time being, the last) article about messaging and Mangos. After doing first steps with the Pair protocol, we now look into a slightly more complex protocol, the Publisher-Subscriber (or PubSub) protocol.
<!--more-->
The Publisher-Subscriber model in a nutshell
--------------------------------------------
### What is PubSub exactly?
PubSub is a communication topology where a single entity called Publisher produces messages that it sends out to other entities called Subscribers. Subscribers may receive everything the publisher sends, or they may subscribe to message subsets called Topics.
![PubSub topology](TopoPubSub.png)
This sounds much like some news service sending articles out to readers, and in fact many news services work this way. Whether you subscribe to an RSS feed, to one or more forums on a discussions platform, or follow someone on Twitter--in each case, there is one publisher and multiple subscribers involved.
### Where can a PubSub topology be used?
Typically, PubSub addresses scenarios like these:
* Multiple observers need to act upon status changes of a single entity.
* Multiple workers shall process data from a single entity. Results are not sent back to that entity.
### Implementation variations
Subscription strategies can differ based on the system architecture.
* If the system has a **broker**, clients subscribe to the broker rather than to the server, and the broker takes care of routing the messages to the clients based on the subscribed topics.
* In **brokerless** systems, clients may either send their topics to the server, and the server then sends each message only to the clients that have subscribed for that topic.
* Or the clients filter the messages at their end. The server then simply sends all messages to all clients. (This approach is fine in smaller, local scenarios but does not scale well.)
### How Mangos implements the PubSub protocol
As seen in the Pair example in the previous article, Mangos uses special, protocol-aware sockets. In a PubSub scenario, the "pub" socket just sends out its messages to all receivers (or to nirvana if no one is listening). The "sub" socket is able to filter the incoming messages by topic and only delivers the messages that match one of the subscribed topics.
The animation below shows the Mangos approach - the publisher sends all messages to all subscribers, and the subscribers filter the messages according to the topics they have subscribed to:
HYPE[PubSub](PubSub.html)
This is certainly a rather simple and robust approach, as the server does not need to manage the clients and their subscriptions; on the downside, as noted above, filtering on client side does not scale well with the number of clients.
A PubSub example
----------------
Let's dive into coding now. We'll develop a tiny example where a couple of clients subscribe to a few topics, and the server then publishes some messages by topic.
But first, let's do the installation stuff.
### Installing Mangos and importing the packages
Like in the previous post, Mangos is installed via a simple go get:
go get -u github.com/go-mangos/mangos
Now you can import Mangos into your .go file.
## The code
*/
// ### Globals and imports
package main
import (
"fmt"
"log"
"os"
"os/exec"
"time"
// For this example, we need the PUBSUB protocol as well as the ipc and tcp transports.
// Unlike the PAIR protocol, PUBSUB actually consists of two protocols, PUB and SUB.
"github.com/go-mangos/mangos"
"github.com/go-mangos/mangos/protocol/pub"
"github.com/go-mangos/mangos/protocol/sub"
"github.com/go-mangos/mangos/transport/ipc"
"github.com/go-mangos/mangos/transport/tcp"
)
// newPublisherSocket creates a new pub socket from the passed-in URL, and starts
// listening on this socket.
func newPublisherSocket(url string) (mangos.Socket, error) {
socket, err := pub.NewSocket()
if err != nil {
return nil, err
}
// Allow the use of either TCP or IPC.
socket.AddTransport(ipc.NewTransport())
socket.AddTransport(tcp.NewTransport())
// Start listening.
err = socket.Listen(url)
if err != nil {
return nil, err
}
return socket, nil
}
// newSubscriberSocket creates a new sub socket from the passed-in URL, and dials
// into this socket.
func newSubscriberSocket(url string) (mangos.Socket, error) {
socket, err := sub.NewSocket()
if err != nil {
return nil, err
}
socket.AddTransport(ipc.NewTransport())
socket.AddTransport(tcp.NewTransport())
err = socket.Dial(url)
if err != nil {
return nil, err
}
return socket, nil
}
// Subscribing in nanomsg/Mangos is as simple as setting a socket option.
// The topic is a simple, plain string.
// (For a list of available socket options, see the [Mangos API documentation](https://godoc.org/github.com/go-mangos/mangos#pkg-constants).)
func subscribe(socket mangos.Socket, topic string) error {
err := socket.SetOption(mangos.OptionSubscribe, []byte(topic))
if err == nil {
// A second socket option avoids that clients wait forever when they receive no messages.
err = socket.SetOption(mangos.OptionRecvDeadline, 10*time.Second)
}
return err
}
// To publish to subscribers of a specific topic, simply prepend the topic to the message.
// A pipe character (`|`) separates the topic from the message. This is only done for better
// readability. In 'real' scenarios, the receiver would just strip away the topic prefix and
// pass the rest of the message over to the next processing stage.
func publish(socket mangos.Socket, topic, message string) error {
err := socket.Send([]byte(fmt.Sprintf("%s|%s", topic, message)))
return err
}
// Receiving is nothing more than calling the socket's Recv() method. The magic happens
// through the socket option "OptionSubscribe" we set earlier. This option makes the
// socket ignore any message that does not start with the desired topic(s).
func receive(socket mangos.Socket) (string, error) {
message, err := socket.Recv()
return string(message), err
}
// Now it is time to set up the server. Besides the socket URL we also pass a list of
// topics that the server will use for sending messages.
func runServer(url string, topics []string) {
// Create the publisher socket.
socket, err := newPublisherSocket(url)
if err != nil {
log.Fatalf("Cannot listen on %s: %s\n", url, err.Error())
}
// Loop through the topics and send a message for each one. Repeat a couple of times.
for i := 0; i < 5; i++ {
for _, topic := range topics {
time.Sleep(1 * time.Second)
fmt.Printf("Publishing a message for topic %s\n", topic)
err = publish(socket, topic, fmt.Sprintf("Message for %s", topic))
if err != nil {
log.Fatalf("Cannot publish message for topic %s: %s\n", topic, err.Error())
}
}
}
}
// Client setup is also easy.
func runClient(name, url string, topics []string) {
// First, we create a subscriber socket.
socket, err := newSubscriberSocket(url)
if err != nil {
log.Fatalf("Cannot dial into %s: %s\n", url, err.Error())
}
// Then, we subscribe to the topics that were passed in as a parameter.
for _, topic := range topics {
err := subscribe(socket, topic)
if err != nil {
log.Fatalf("Cannot subscribe to topic %s: %s\n", topic, err.Error())
}
}
// Finally, we listen for new message and print out any that matches
// one of the topics we subscribed to.
for i := 0; i < 5*len(topics); i++ {
message, err := receive(socket)
if err != nil {
log.Fatalf("Error receiving message: %s\n", err.Error())
}
fmt.Printf("Client %s received: %s\n", name, message)
}
}
// Putting it all together...
func main() {
// The socket URL.
url := "tcp://localhost:56565"
// Without parameters, the process starts as the server.
if len(os.Args) == 1 {
// First, spawn the clients.
// We use the `Cmd` type from the `os.exec` package to spawn the clients
// as subprocesses in a convenient way.
client1 := exec.Command("./pubsub", "C1", "Technology")
client1.Stdout = os.Stdout // Default is nil but we want to see what the clients say.
client1.Stderr = os.Stderr // Same here.
client2 := exec.Command("./pubsub", "C2", "Technology", "Weather")
client2.Stdout = os.Stdout
client2.Stderr = os.Stderr
client3 := exec.Command("./pubsub", "C3", "Finance")
client3.Stdout = os.Stdout
client3.Stderr = os.Stderr
fmt.Println("Starting client 1")
err := client1.Start() // Start the command and continue without waiting for the command to finish.
if err != nil {
log.Fatalf("Failed starting client1: %s", err.Error())
}
fmt.Println("Starting client 2")
err = client2.Start()
if err != nil {
log.Fatalf("Failed starting client2: %s", err.Error())
}
fmt.Println("Starting client 3")
err = client3.Start()
if err != nil {
log.Fatalf("Failed starting client3: %s", err.Error())
}
// Start publishing.
fmt.Println("Starting the server")
runServer(url, []string{"Technology", "Weather", "Finance"})
// Wait for all commands started with Start() to finish.
time.Sleep(1 * time.Second) // to ensure all clients have consumed the messages.
fmt.Println("Waiting for the clients to exit")
client1.Wait()
client2.Wait()
client2.Wait()
fmt.Println("Server ends.")
} else {
// One or more parameters means this process is a client.
fmt.Println(os.Args[1], "is starting.")
runClient(os.Args[1], url, os.Args[2:])
fmt.Println("Client", os.Args[1], "ends.")
}
}
/*
Get this code from github:
go get -d github.com/appliedgo/pubsub
cd $GOPATH/src/github.com/appliedgo/pubsub
go build
./pubsub
(`go get -d` gets the code but does not install it into `$GOPATH/bin`. go build then builds the executable locally so that it would not end up between your other executables, especially if $GOPATH/bin is part of your $PATH.)
As you have seen in the code for main(), the program spawns three child processes that take over the role of the clients. If everything works fine, you should then see the publisher send 15 messages to the clients, and the clients should then grab only the messages that they have subscribed to.
For additional fun, try tweaking some parameters. For example, comment out the last `time.Sleep()` statement in main(). Or have the clients expect more messages than the server sends, and see what happens!
Have fun!
*/