This repository has been archived by the owner on Feb 20, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 502
/
messenger.h
403 lines (347 loc) · 17.5 KB
/
messenger.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
#pragma once
#include <atomic>
#include <condition_variable> // NOLINT
#include <functional>
#include <map>
#include <memory>
#include <mutex> // NOLINT
#include <optional>
#include <queue>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "common/dedicated_thread_owner.h"
#include "common/dedicated_thread_task.h"
#include "common/managed_pointer.h"
#include "messenger/connection_destination.h"
#include "messenger/messenger_defs.h"
// All zmq objects are forward-declared and allocated on the heap.
// This is to avoid leaking zmq headers into the rest of the system.
// The prediction is that doing this won't hurt performance too much.
namespace zmq {
class context_t;
class socket_t;
} // namespace zmq
namespace noisepage::messenger {
class ConnectionDestination;
class Messenger;
class MessengerPolledSockets;
class ZmqUtil;
/** An abstraction around ZeroMQ messages which explicitly have the sender specified. */
class ZmqMessage {
public:
/** @return The ID of this message. */
message_id_t GetMessageId() const { return message_id_; }
/** @return The callback to invoke on the source. */
callback_id_t GetSourceCallbackId() const { return source_cb_id_; }
/** @return The callback to invoke on the destination. */
callback_id_t GetDestinationCallbackId() const { return dest_cb_id_; }
/** @return The routing ID of this message. */
std::string_view GetRoutingId() const { return std::string_view(routing_id_); }
/** @return The message itself. */
std::string_view GetMessage() const { return message_; }
/** @return The raw payload of the message. */
std::string_view GetRawPayload() const { return std::string_view(payload_); }
private:
friend Messenger;
friend ZmqUtil;
/**
* Build a new ZmqMessage from the supplied information.
* @param message_id The ID of this message.
* @param source_cb_id The callback ID of the message on the source.
* @param dest_cb_id The callback ID of the message on the destination.
* @param routing_id The routing ID of the message sender. Roughly speaking, "who sent this message".
* @param message The contents of the message.
* @return A ZmqMessage encapsulating the given message.
*/
static ZmqMessage Build(message_id_t message_id, callback_id_t source_cb_id, callback_id_t dest_cb_id,
const std::string &routing_id, std::string_view message);
/**
* Parse the given payload into a ZmqMessage.
* @param routing_id The message's routing ID.
* @param message The message for the destination.
* @return A ZmqMessage encapsulating the given message.
*/
static ZmqMessage Parse(const std::string &routing_id, const std::string &message);
/** Construct a new ZmqMessage with the given routing ID and payload. Payload of form ID-MESSAGE. */
ZmqMessage(std::string routing_id, std::string payload);
/** The routing ID of the message. */
std::string routing_id_;
/** The payload in the message, of form ID-MESSAGE. */
std::string payload_;
/** The cached id of the message. */
message_id_t message_id_;
/** The cached callback id of the message (source). */
callback_id_t source_cb_id_;
/** The cached callback id of the message (destination). */
callback_id_t dest_cb_id_;
/** The cached actual message. */
std::string_view message_;
};
/** ConnectionId is an abstraction around establishing connections. */
class ConnectionId {
public:
/**
* Create a new ConnectionId that is connected to the specified target.
* @param messenger The messenger that owns this connection ID.
* @param target The target to be connected to.
* @param identity The routing ID (name) that the connection should have.
*/
explicit ConnectionId(common::ManagedPointer<Messenger> messenger, const ConnectionDestination &target,
const std::string &identity);
/** An explicit destructor is necessary because of the unique_ptr around a forward-declared type. */
~ConnectionId();
private:
friend Messenger;
friend ZmqUtil;
/** The ZMQ socket. */
std::unique_ptr<zmq::socket_t> socket_;
/** The ZMQ socket routing ID. */
std::string routing_id_;
/** The target that was connected to. */
std::string target_name_;
};
/** ConnectionRouter represents a new endpoint opened up on the Messenger to listen for connections. */
class ConnectionRouter {
public:
/**
* Create a new ConnectionRouter that listens for incoming connections.
* @param messenger The messenger that owns this connection router.
* @param target The target to listen on.
* @param identity The routing ID (name) that the router should have.
* @param callback The server loop for this connection router.
*/
explicit ConnectionRouter(common::ManagedPointer<Messenger> messenger, const ConnectionDestination &target,
const std::string &identity, CallbackFn callback);
/** An explicit destructor is necessary because of the unique_ptr around a forward-declared type. */
~ConnectionRouter();
/** @return The routing ID (name) of this connection router. */
const std::string &GetIdentity() const { return identity_; }
private:
friend Messenger;
/** The ZMQ socket. */
std::unique_ptr<zmq::socket_t> socket_;
/** The callback to be invoked on all messages received. */
CallbackFn callback_;
/** The identity of this router. */
std::string identity_;
};
/**
* Messenger handles all the network aspects of sending and receiving messages.
* Logic based on the messages is is deferred to MessengerLogic.
*
* @see messenger.cpp for a crash course on ZeroMQ, the current backing implementation.
*/
class Messenger : public common::DedicatedThreadTask {
private:
/**
* A ConnectionRouter that should be added. Unfortunately, ZeroMQ is not thread safe and this must be done from
* the main serverloop thread for poll to function correctly.
*/
struct RouterToBeAdded {
router_id_t router_id_; ///< The router ID to be assigned.
ConnectionDestination target_; ///< The destination to listen on.
std::string identity_; ///< The identity to listen as.
CallbackFn callback_; ///< The callback to invoke on all messages that are received on this destination.
};
/**
* A ConnectionId that should be established. Unfortunately, ZeroMQ is not thread safe and this must be done from
* the main serverloop thread for sending/receiving messages to function correctly.
*/
struct ConnectionToBeAdded {
connection_id_t connection_id_; ///< The connection ID to be assigned.
ConnectionDestination target_; ///< The target to be connected to.
};
public:
/** Builtin callbacks useful for testing. */
enum class BuiltinCallback : uint8_t { NOOP = 0, ECHO, ACK, NUM_BUILTIN_CALLBACKS };
/** @return The callback ID for the builtin callback. */
static callback_id_t GetBuiltinCallback(BuiltinCallback cb) { return callback_id_t{static_cast<uint64_t>(cb)}; }
/** @return The default TCP endpoint for a Messenger on the given port. */
static ConnectionDestination GetEndpointTCP(std::string target_name, uint16_t port);
/** @return The default IPC endpoint for a Messenger on the given port. */
static ConnectionDestination GetEndpointIPC(std::string target_name, uint16_t port);
/** @return The default INPROC endpoint for a Messenger on the given port. */
static ConnectionDestination GetEndpointINPROC(std::string target_name, uint16_t port);
/**
* Create a new Messenger, listening to the default endpoints on the given port.
* @param port The port that determines the default endpoints.
* @param identity The identity that this Messenger instance is known by. See warning!
*
* @warning Identity must be unique across all instances of Messengers.
*/
explicit Messenger(uint16_t port, std::string identity);
/** An explicit destructor is necessary because of the unique_ptr around a forward-declared type. */
~Messenger() override;
/** Run the main server loop, which dispatches messages received to the MessengerLogic layer. */
void RunTask() override;
/** Terminate the Messenger. */
void Terminate() override;
/**
* Listen for new connections on the specified target destination. Blocks until the listen is ready.
*
* @warning TODO(WAN): figure out what bad things happen if you give it a ConnectionDestination that is
* already in use. I don't think this is a problem that is likely to occur because all our
* destinations are known at compile time and we don't have too many right now, but I should
* fix this at some point. I am reluctant to add a set of destinations just for this though..
*
* @param target The destination to listen on for new connections.
* @param identity The identity to listen as.
* @param callback The server loop for all messages received.
* @return The ID of the router that was created.
*/
router_id_t ListenForConnection(const ConnectionDestination &target, const std::string &identity,
CallbackFn callback);
/**
* Connect to the specified target destination.
*
* @param target The destination to be connected to. Note that target_name is meaningless here.
* @return The ID of the connection that was created.
*
* @warning The default behavior of ZMQ sockets is to allow connections to any target and to queue messages.
* Obtaining a connection_id does NOT guarantee that a successful connection has been made.
* If this is necessary, test the connection explicitly by sending a message through.
* Currently, connection_id is setup so that messages are only queued for successful connections.
* But note that it should not be necessary as your code should handle the case of the target
* going away permanently anyway, in particular consider this ordering of events:
* MakeConnection(target) success -> target dies -> SendMessage(target, ...)
*/
connection_id_t MakeConnection(const ConnectionDestination &target);
/**
* Send a message through the specified connection id.
*
* @warning Remember that ConnectionId can only be used from the same thread that created it!
*
* @param connection_id The connection to send the message over.
* @param message The message to be sent.
* @param callback The callback function to be invoked locally on the response. Can be nullptr.
* @param remote_cb_id The callback function to be invoked remotely on the destination to handle this message.
* For example, used for invoking preregistered functions or messages sent in response.
* To invoke preregistered functions, use static_cast<uint8_t>(Messenger::BuiltinCallback).
*/
void SendMessage(connection_id_t connection_id, const std::string &message, CallbackFn callback,
callback_id_t remote_cb_id);
/**
* Send a message through the specified connection router.
*
* @warning A ConnectionRouter only knows about destinations that are directly connected to it.
*
* @param router_id The connection router to send the message over.
* @param recv_id The routing ID of the destination.
* @param message The message to be sent.
* @param callback The callback function to be invoked on the response. Can be nullptr.
* @param remote_cb_id The callback function to be invoked remotely on the destination to handle this message.
* For example, used for invoking preregistered functions or messages sent in response.
* To invoke preregistered functions, use static_cast<uint8_t>(Messenger::BuiltinCallback).
*/
void SendMessage(router_id_t router_id, const std::string &recv_id, const std::string &message, CallbackFn callback,
callback_id_t remote_cb_id);
private:
friend ConnectionId;
friend ConnectionRouter;
static constexpr const char *MESSENGER_DEFAULT_TCP = "*";
static constexpr const char *MESSENGER_DEFAULT_IPC = "./noisepage-ipc-{}";
static constexpr const char *MESSENGER_DEFAULT_INPROC = "noisepage-inproc-{}";
static constexpr const std::chrono::milliseconds MESSENGER_RESEND_TIMER = std::chrono::milliseconds(6666);
static constexpr const std::chrono::milliseconds MESSENGER_POLL_TIMER = std::chrono::milliseconds(250);
/** The maximum timeout that a send or recv operation is allowed to block for. TODO(WAN): 30, really? */
static constexpr const std::chrono::milliseconds MESSENGER_SNDRCV_TIMEOUT = std::chrono::seconds(30);
/** @return The next callback ID to be used when sending messages. */
callback_id_t GetNextSendCallbackId();
/** The main server loop. */
void ServerLoop();
/** Add listening points. */
void ServerLoopAddRouters();
/** Make new connections to other listening points. */
void ServerLoopMakeConnections();
/** Send all queued messages. */
void ServerLoopSendMessages();
/** Receive and process any outstanding messages. */
void ServerLoopRecvAndProcessMessages();
struct PendingMessage {
common::ManagedPointer<zmq::socket_t> zmq_socket_;
std::string destination_id_;
ZmqMessage msg_;
bool is_router_socket_;
std::time_t last_send_time_;
};
/**
* Processes messages.
* Responsible for special callback functions specified by message ID.
* Also responsible for invoking the callbacks that were passed in with a SendMessage.
* Note that these callbacks are distinct from the ServerLoop callbacks common to custom Messenger endpoints, such as
* replication and the model server messenger.
*
* TODO(WAN): Unfortunately, this is also a limitation where the SendMessage() callback must always execute before
* the server loop gets a chance to handle the message.
*/
void ProcessMessage(const ZmqMessage &msg);
/**
* Track the messages that have been seen so far by a given replica, additionally returning whether a message is
* being seen for the first time. Note that this is necessary to simulate idempotence in the Messenger, which tries
* to periodically resend messages that have not been acknowledged as part of bolting guaranteed delivery onto ZeroMQ.
*
* @param replica The replica sending the message.
* @param message_id The ID of the message.
* @return True if the message is being seen for the first time. False if seen before.
*/
bool UpdateMessagesSeen(const std::string &replica, message_id_t message_id);
/** The port that is used for all default endpoints. */
const uint16_t port_;
/** The identity that this instance of the Messenger is known by. */
const std::string identity_;
std::unique_ptr<zmq::context_t> zmq_ctx_;
std::unique_ptr<zmq::socket_t> zmq_default_socket_;
std::unique_ptr<MessengerPolledSockets> polled_sockets_;
std::unordered_map<callback_id_t, CallbackFn> callbacks_;
std::vector<RouterToBeAdded> routers_to_be_added_;
std::mutex routers_add_mutex_;
std::condition_variable routers_add_cvar_;
std::unordered_map<router_id_t, std::unique_ptr<ConnectionRouter>> routers_;
std::vector<ConnectionToBeAdded> connections_to_be_added_;
std::mutex connections_add_mutex_;
std::condition_variable connections_add_cvar_;
std::unordered_map<connection_id_t, std::unique_ptr<ConnectionId>> connections_;
std::map<message_id_t, PendingMessage> pending_messages_;
std::mutex pending_messages_mutex_;
std::unordered_map<std::string, std::unordered_set<message_id_t>> seen_messages_complement_;
std::unordered_map<std::string, message_id_t> seen_messages_max_;
std::mutex callbacks_mutex_;
bool is_messenger_running_ = false;
/** The message ID that gets automatically prefixed to messages. */
std::atomic<message_id_t> next_message_id_{0};
/** The source callback ID that gets automatically prefixed to messages. */
std::atomic<callback_id_t> next_callback_id_{static_cast<uint8_t>(BuiltinCallback::NUM_BUILTIN_CALLBACKS) + 1};
/**
* The ID of the next listening router to be made from ListenForConnection().
* Note that this is always accessed while holding routers_add_mutex_.
*/
router_id_t next_router_id_{0};
/**
* The ID of the next outgoing connection to be made from MakeConnection().
* Note that this is always accessed while holding connections_add_mutex_.
*/
connection_id_t next_connection_id_{0};
};
/**
* MessengerManager is the entry point to the Messenger system.
* MessengerManager is responsible for instantiating the Messenger and then registering the Messenger with the
* DedicatedThreadRegistry.
*/
class MessengerManager : public common::DedicatedThreadOwner {
public:
/**
* Create and run a new Messenger (which is a DedicatedThreadTask) on the specified thread registry.
* @param thread_registry The registry in which the Messenger will be registered.
* @param port The port on which the Messenger will listen by default.
* @param identity The name that this Messenger will be known by. Must be unique across all instances!
*/
explicit MessengerManager(common::ManagedPointer<common::DedicatedThreadRegistry> thread_registry, uint16_t port,
const std::string &identity);
/** @return The Messenger being managed. */
common::ManagedPointer<Messenger> GetMessenger() const { return messenger_; }
private:
common::ManagedPointer<Messenger> messenger_;
};
} // namespace noisepage::messenger