/
mosquitto_broker.h
458 lines (412 loc) · 15.6 KB
/
mosquitto_broker.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
/*
Copyright (c) 2009-2014 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Roger Light - initial implementation and documentation.
*/
#ifndef MQTT3_H
#define MQTT3_H
#include <config.h>
#include <stdio.h>
#include <mosquitto_internal.h>
#include <mosquitto_plugin.h>
#include <mosquitto.h>
#include "tls_mosq.h"
#include "uthash.h"
#ifndef __GNUC__
#define __attribute__(attrib)
#endif
/* Log destinations */
#define MQTT3_LOG_NONE 0x00
#define MQTT3_LOG_SYSLOG 0x01
#define MQTT3_LOG_FILE 0x02
#define MQTT3_LOG_STDOUT 0x04
#define MQTT3_LOG_STDERR 0x08
#define MQTT3_LOG_TOPIC 0x10
#define MQTT3_LOG_ALL 0xFF
#define WEBSOCKET_CLIENT -2
enum mosquitto_protocol {
mp_mqtt,
mp_mqttsn,
mp_websockets
};
typedef uint64_t dbid_t;
struct _mqtt3_listener {
int fd;
char *host;
uint16_t port;
int max_connections;
char *mount_point;
int *socks;
int sock_count;
int client_count;
enum mosquitto_protocol protocol;
bool use_username_as_clientid;
#ifdef WITH_TLS
char *cafile;
char *capath;
char *certfile;
char *keyfile;
char *ciphers;
char *psk_hint;
bool require_certificate;
SSL_CTX *ssl_ctx;
char *crlfile;
bool use_identity_as_username;
char *tls_version;
#endif
#ifdef WITH_WEBSOCKETS
struct libwebsocket_context *ws_context;
char *http_dir;
struct libwebsocket_protocols *ws_protocol;
#endif
};
struct mqtt3_config {
char *config_file;
char *acl_file;
bool allow_anonymous;
bool allow_duplicate_messages;
bool allow_zero_length_clientid;
char *auto_id_prefix;
int auto_id_prefix_len;
int autosave_interval;
bool autosave_on_changes;
char *clientid_prefixes;
bool connection_messages;
bool daemon;
struct _mqtt3_listener default_listener;
struct _mqtt3_listener *listeners;
int listener_count;
int log_dest;
int log_facility;
int log_type;
bool log_timestamp;
char *log_file;
FILE *log_fptr;
int message_size_limit;
char *password_file;
bool persistence;
char *persistence_location;
char *persistence_file;
char *persistence_filepath;
time_t persistent_client_expiration;
char *pid_file;
char *psk_file;
bool queue_qos0_messages;
int retry_interval;
int store_clean_interval;
int sys_interval;
bool upgrade_outgoing_qos;
char *user;
bool verbose;
#ifdef WITH_BRIDGE
struct _mqtt3_bridge *bridges;
int bridge_count;
#endif
char *auth_plugin;
struct mosquitto_auth_opt *auth_options;
int auth_option_count;
};
struct _mosquitto_subleaf {
struct _mosquitto_subleaf *prev;
struct _mosquitto_subleaf *next;
struct mosquitto *context;
int qos;
};
struct _mosquitto_subhier {
struct _mosquitto_subhier *children;
struct _mosquitto_subhier *next;
struct _mosquitto_subleaf *subs;
char *topic;
struct mosquitto_msg_store *retained;
};
struct mosquitto_msg_store{
UT_hash_handle hh;
dbid_t db_id;
int ref_count;
char *source_id;
char **dest_ids;
int dest_id_count;
uint16_t source_mid;
struct mosquitto_message msg;
};
struct mosquitto_client_msg{
struct mosquitto_client_msg *next;
struct mosquitto_msg_store *store;
uint16_t mid;
int qos;
bool retain;
time_t timestamp;
enum mosquitto_msg_direction direction;
enum mosquitto_msg_state state;
bool dup;
};
struct _mosquitto_unpwd{
char *username;
char *password;
#ifdef WITH_TLS
unsigned int password_len;
unsigned char *salt;
unsigned int salt_len;
#endif
UT_hash_handle hh;
};
struct _mosquitto_acl{
struct _mosquitto_acl *next;
char *topic;
int access;
int ucount;
int ccount;
};
struct _mosquitto_acl_user{
struct _mosquitto_acl_user *next;
char *username;
struct _mosquitto_acl *acl;
};
struct _mosquitto_auth_plugin{
void *lib;
void *user_data;
int (*plugin_version)(void);
int (*plugin_init)(void **user_data, struct mosquitto_auth_opt *auth_opts, int auth_opt_count);
int (*plugin_cleanup)(void *user_data, struct mosquitto_auth_opt *auth_opts, int auth_opt_count);
int (*security_init)(void *user_data, struct mosquitto_auth_opt *auth_opts, int auth_opt_count, bool reload);
int (*security_cleanup)(void *user_data, struct mosquitto_auth_opt *auth_opts, int auth_opt_count, bool reload);
int (*acl_check)(void *user_data, const char *clientid, const char *username, const char *topic, int access);
int (*unpwd_check)(void *user_data, const char *username, const char *password);
int (*psk_key_get)(void *user_data, const char *hint, const char *identity, char *key, int max_key_len);
};
struct mosquitto_db{
dbid_t last_db_id;
struct _mosquitto_subhier subs;
struct _mosquitto_unpwd *unpwd;
struct _mosquitto_acl_user *acl_list;
struct _mosquitto_acl *acl_patterns;
struct _mosquitto_unpwd *psk_id;
struct mosquitto *contexts_by_id;
struct mosquitto *contexts_by_sock;
struct mosquitto *contexts_for_free;
struct mosquitto *contexts_bridge;
struct _clientid_index_hash *clientid_index_hash;
struct mosquitto_msg_store *msg_store;
int msg_store_count;
struct mqtt3_config *config;
int persistence_changes;
struct _mosquitto_auth_plugin auth_plugin;
#ifdef WITH_SYS_TREE
int subscription_count;
int retained_count;
#endif
struct mosquitto *ll_for_free;
};
enum mqtt3_bridge_direction{
bd_out = 0,
bd_in = 1,
bd_both = 2
};
enum mosquitto_bridge_start_type{
bst_automatic = 0,
bst_lazy = 1,
bst_manual = 2,
bst_once = 3
};
struct _mqtt3_bridge_topic{
char *topic;
int qos;
enum mqtt3_bridge_direction direction;
char *local_prefix;
char *remote_prefix;
char *local_topic; /* topic prefixed with local_prefix */
char *remote_topic; /* topic prefixed with remote_prefix */
};
struct bridge_address{
char *address;
int port;
};
struct _mqtt3_bridge{
char *name;
struct bridge_address *addresses;
int cur_address;
int address_count;
time_t primary_retry;
bool round_robin;
int keepalive;
bool clean_session;
struct _mqtt3_bridge_topic *topics;
int topic_count;
bool topic_remapping;
time_t restart_t;
char *remote_clientid;
char *remote_username;
char *remote_password;
char *local_clientid;
char *local_username;
char *local_password;
bool notifications;
char *notification_topic;
enum mosquitto_bridge_start_type start_type;
int idle_timeout;
int restart_timeout;
int threshold;
bool lazy_reconnect;
bool try_private;
bool try_private_accepted;
#ifdef WITH_TLS
char *tls_cafile;
char *tls_capath;
char *tls_certfile;
char *tls_keyfile;
bool tls_insecure;
char *tls_version;
# ifdef REAL_WITH_TLS_PSK
char *tls_psk_identity;
char *tls_psk;
# endif
#endif
};
#ifdef WITH_WEBSOCKETS
struct libws_mqtt_hack {
char *http_dir;
};
struct libws_mqtt_data {
struct mosquitto *mosq;
};
#endif
#include <net_mosq.h>
/* ============================================================
* Main functions
* ============================================================ */
int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock_count, int listener_max);
struct mosquitto_db *_mosquitto_get_db(void);
/* ============================================================
* Config functions
* ============================================================ */
/* Initialise config struct to default values. */
void mqtt3_config_init(struct mqtt3_config *config);
/* Parse command line options into config. */
int mqtt3_config_parse_args(struct mqtt3_config *config, int argc, char *argv[]);
/* Read configuration data from config->config_file into config.
* If reload is true, don't process config options that shouldn't be reloaded (listeners etc)
* Returns 0 on success, 1 if there is a configuration error or if a file cannot be opened.
*/
int mqtt3_config_read(struct mqtt3_config *config, bool reload);
/* Free all config data. */
void mqtt3_config_cleanup(struct mqtt3_config *config);
/* ============================================================
* Server send functions
* ============================================================ */
int _mosquitto_send_connack(struct mosquitto *context, int ack, int result);
int _mosquitto_send_suback(struct mosquitto *context, uint16_t mid, uint32_t payloadlen, const void *payload);
/* ============================================================
* Network functions
* ============================================================ */
int mqtt3_socket_accept(struct mosquitto_db *db, int listensock);
int mqtt3_socket_listen(struct _mqtt3_listener *listener);
int _mosquitto_socket_get_address(int sock, char *buf, int len);
/* ============================================================
* Read handling functions
* ============================================================ */
int mqtt3_packet_handle(struct mosquitto_db *db, struct mosquitto *context);
int mqtt3_handle_connack(struct mosquitto_db *db, struct mosquitto *context);
int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context);
int mqtt3_handle_disconnect(struct mosquitto_db *db, struct mosquitto *context);
int mqtt3_handle_publish(struct mosquitto_db *db, struct mosquitto *context);
int mqtt3_handle_subscribe(struct mosquitto_db *db, struct mosquitto *context);
int mqtt3_handle_unsubscribe(struct mosquitto_db *db, struct mosquitto *context);
/* ============================================================
* Database handling
* ============================================================ */
int mqtt3_db_open(struct mqtt3_config *config, struct mosquitto_db *db);
int mqtt3_db_close(struct mosquitto_db *db);
#ifdef WITH_PERSISTENCE
int mqtt3_db_backup(struct mosquitto_db *db, bool cleanup, bool shutdown);
int mqtt3_db_restore(struct mosquitto_db *db);
#endif
void mqtt3_db_limits_set(int inflight, int queued);
/* Return the number of in-flight messages in count. */
int mqtt3_db_message_count(int *count);
int mqtt3_db_message_delete(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir);
int mqtt3_db_message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored);
int mqtt3_db_message_release(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir);
int mqtt3_db_message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state state);
int mqtt3_db_message_write(struct mosquitto *context);
int mqtt3_db_messages_delete(struct mosquitto *context);
int mqtt3_db_messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain);
int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored);
int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id);
int mqtt3_db_message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored);
/* Check all messages waiting on a client reply and resend if timeout has been exceeded. */
int mqtt3_db_message_timeout_check(struct mosquitto_db *db, unsigned int timeout);
int mqtt3_db_message_reconnect_reset(struct mosquitto *context);
int mqtt3_retain_queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos);
void mqtt3_db_store_clean(struct mosquitto_db *db);
void mqtt3_db_sys_update(struct mosquitto_db *db, int interval, time_t start_time);
void mqtt3_db_vacuum(void);
/* ============================================================
* Subscription functions
* ============================================================ */
int mqtt3_sub_add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, struct _mosquitto_subhier *root);
int mqtt3_sub_remove(struct mosquitto_db *db, struct mosquitto *context, const char *sub, struct _mosquitto_subhier *root);
int mqtt3_sub_search(struct mosquitto_db *db, struct _mosquitto_subhier *root, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored);
void mqtt3_sub_tree_print(struct _mosquitto_subhier *root, int level);
int mqtt3_subs_clean_session(struct mosquitto_db *db, struct mosquitto *context);
/* ============================================================
* Context functions
* ============================================================ */
struct mosquitto *mqtt3_context_init(struct mosquitto_db *db, int sock);
void mqtt3_context_cleanup(struct mosquitto_db *db, struct mosquitto *context, bool do_free);
void mqtt3_context_disconnect(struct mosquitto_db *db, struct mosquitto *context);
void mosquitto__add_context_to_disused(struct mosquitto_db *db, struct mosquitto *context);
void mosquitto__free_disused_contexts(struct mosquitto_db *db);
/* ============================================================
* Logging functions
* ============================================================ */
int mqtt3_log_init(int level, int destinations, int facility);
int mqtt3_log_close(void);
int _mosquitto_log_printf(struct mosquitto *mosq, int level, const char *fmt, ...) __attribute__((format(printf, 3, 4)));
/* ============================================================
* Bridge functions
* ============================================================ */
#ifdef WITH_BRIDGE
int mqtt3_bridge_new(struct mosquitto_db *db, struct _mqtt3_bridge *bridge);
int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context);
void mqtt3_bridge_packet_cleanup(struct mosquitto *context);
#endif
/* ============================================================
* Security related functions
* ============================================================ */
int mosquitto_security_module_init(struct mosquitto_db *db);
int mosquitto_security_module_cleanup(struct mosquitto_db *db);
int mosquitto_security_init(struct mosquitto_db *db, bool reload);
int mosquitto_security_apply(struct mosquitto_db *db);
int mosquitto_security_cleanup(struct mosquitto_db *db, bool reload);
int mosquitto_acl_check(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int access);
int mosquitto_unpwd_check(struct mosquitto_db *db, const char *username, const char *password);
int mosquitto_psk_key_get(struct mosquitto_db *db, const char *hint, const char *identity, char *key, int max_key_len);
int mosquitto_security_init_default(struct mosquitto_db *db, bool reload);
int mosquitto_security_apply_default(struct mosquitto_db *db);
int mosquitto_security_cleanup_default(struct mosquitto_db *db, bool reload);
int mosquitto_acl_check_default(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int access);
int mosquitto_unpwd_check_default(struct mosquitto_db *db, const char *username, const char *password);
int mosquitto_psk_key_get_default(struct mosquitto_db *db, const char *hint, const char *identity, char *key, int max_key_len);
/* ============================================================
* Window service related functions
* ============================================================ */
#if defined(WIN32) || defined(__CYGWIN__)
void service_install(void);
void service_uninstall(void);
void service_run(void);
#endif
/* ============================================================
* Websockets related functions
* ============================================================ */
#ifdef WITH_WEBSOCKETS
struct libwebsocket_context *mosq_websockets_init(struct _mqtt3_listener *listener);
#endif
void do_disconnect(struct mosquitto_db *db, struct mosquitto *context);
#endif