/
broker.c
154 lines (147 loc) · 5.82 KB
/
broker.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
//
// Created by zr on 23-4-9.
//
#include "tlog.h"
#include "rule_engine/adaptors/mqtt_delay_message_adaptor.h"
#include "mqtt/mqtt_broker.h"
#include "base/mqtt_cmd.h"
#include <stdio.h>
#include <dlfcn.h>
extern void mqtt_delay_message(void* broker_, tmq_str_t payload);
static tmq_str_t delay_message_routing_key = NULL;
tmq_adaptor_t* init_plugin(tmq_config_t* cfg, tmq_broker_t* broker, char* so_name, void* handle, char* name)
{
char init_sym_name[100] = {0};
sprintf(init_sym_name, "get_%s_adaptor", name);
adaptor_getter_f init = dlsym(handle, init_sym_name);
if(!init)
{
tlog_error("initializer %s not found in %s", init_sym_name, so_name);
return NULL;
}
tmq_str_t error = NULL;
tmq_adaptor_t* adaptor = NULL;
if(tmq_str_equal(name, "delay_message"))
{
tmq_str_t routing_key = get_uuid();
adaptor = init(cfg, routing_key, &error);
if(adaptor)
{
((delay_message_adaptor*)adaptor)->message_cb = mqtt_delay_message;
((delay_message_adaptor*)adaptor)->broker = broker;
delay_message_routing_key = routing_key;
}
}
else
adaptor = init(cfg, NULL, &error);
if(!adaptor)
{
if(error)
{
tlog_info("initialize %s adaptor failed: %s", name, error);
tmq_str_free(error);
}
return NULL;
}
return adaptor;
}
int main(int argc, char* argv[])
{
tlog_init("broker.log", 1024 * 1024, 10, 0, TLOG_SCREEN);
tmq_broker_t broker;
printf(" __ __ ___ ____ ______ ______\n"
" / /_ (_) ____ __ __ / |/ / / __ \\ /_ __/ /_ __/\n"
" / __/ / / / __ \\ / / / / / /|_/ / / / / / / / / / \n"
"/ /__ / / / / / / / /_/ / / / / / / /_/ / / / / / \n"
"\\__/ /_/ /_/ /_/ \\___ / /_/ /_/ \\___\\_\\ /_/ /_/ \n"
" /____/ \n");
tmq_cmd_t cmd;
tmq_cmd_init(&cmd);
tmq_cmd_add_number(&cmd, "port", "p", "server port", 0, 1883);
tmq_cmd_add_number(&cmd, "ssl-port", "s", "mqtts port", 0, 8883);
tmq_cmd_add_number(&cmd, "cluster-port", "P", "cluster port", 0, 11883);
tmq_cmd_add_string(&cmd, "config", "c", "config file path", 0, "tinymqtt.conf");
if(tmq_cmd_parse(&cmd, argc, argv) < 0)
{
tmq_cmd_destroy(&cmd);
tlog_exit();
return 0;
}
/* read tinymqtt configure file */
tmq_str_t config_path = tmq_cmd_get_string(&cmd, "config");
tmq_config_t cfg;
if(tmq_config_init(&cfg, config_path, "=") == 0)
tlog_info("read config file %s ok", config_path);
else
{
tlog_error("read config file error");
return -1;
}
tmq_str_free(config_path);
tmq_plugin_info_map plugins;
tmq_map_str_init(&plugins, tmq_plugin_handle_t, MAP_DEFAULT_CAP, MAP_DEFAULT_LOAD_FACTOR);
tmq_str_t plugins_conf_ = tmq_config_get(&cfg, "plugins");
if(plugins_conf_)
{
if(tmq_str_len(plugins_conf_) <= 2 ||
tmq_str_at(plugins_conf_, 0) != '[' ||
tmq_str_at(plugins_conf_, tmq_str_len(plugins_conf_) - 1) != ']')
tlog_info("invalid plugin configuration: %s", plugins_conf_);
else
{
tlog_info("loading mqtt adaptor plugins");
tmq_str_t plugins_conf = tmq_str_substr(plugins_conf_, 1, tmq_str_len(plugins_conf_) - 2);
str_vec plugin_names = tmq_str_split(plugins_conf, ",");
for(tmq_str_t* name = tmq_vec_begin(plugin_names); name != tmq_vec_end(plugin_names); name++)
{
tmq_str_trim(*name);
char so_name[50] = {0};
sprintf(so_name, "lib%s_plugin.so", *name);
void* handle = dlopen(so_name, RTLD_LAZY);
if(!handle)
{
tlog_warn("%s not found", so_name);
continue;
}
tlog_info("load %s success", so_name);
tmq_adaptor_t* adaptor = init_plugin(&cfg, &broker, so_name, handle, *name);
if(!adaptor)
continue;
tmq_plugin_handle_t plugin_handle = {
.adaptor = adaptor,
.so_handle = handle
};
tmq_map_str_init(&plugin_handle.adaptor_parameters, adaptor_value_type,
MAP_DEFAULT_CAP, MAP_DEFAULT_LOAD_FACTOR);
tmq_map_put(plugins, *name, plugin_handle);
}
for(tmq_str_t* name = tmq_vec_begin(plugin_names); name != tmq_vec_end(plugin_names); name++)
tmq_str_free(*name);
tmq_vec_free(plugin_names);
tmq_str_free(plugins_conf);
}
tmq_str_free(plugins_conf_);
}
if(tmq_broker_init(&broker, &cfg, &cmd, &plugins) == 0)
{
tmq_str_t delay_message_enable = tmq_config_get(&cfg, "delay_message_enable");
if(delay_message_enable && tmq_str_equal(delay_message_enable, "true"))
{
char select_rule[256] = {0};
sprintf(select_rule, "SELECT "
"'delay-message-exchange' AS {delay_message.exchange}, "
"'%s' AS {delay_message.routingKey}, "
"payload.delay AS {delay_message.delayMS},"
"payload.topic, payload.message, "
"client_id, username, qos, retain "
"FROM "
"$delay", delay_message_routing_key);
tmq_rule_engine_add_rule(&broker.rule_engine, select_rule);
}
tmq_str_free(delay_message_enable);
tmq_broker_run(&broker);
}
tmq_map_free(plugins);
tlog_exit();
return 0;
}