Skip to content

Commit 33843ab

Browse files
authored
event_rabbitmq: restore event functionality (#3738)
* event_rabbitmq: restore event functionality * event_rabbitmq: fix server reconnect
1 parent 33f99bf commit 33843ab

5 files changed

Lines changed: 274 additions & 18 deletions

File tree

modules/event_rabbitmq/event_rabbitmq.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,7 @@ static evi_reply_sock* rmq_parse(str socket)
398398
st = ST_HOST;
399399
if (dupl_string(&tmp, begin, socket.s + i)) goto err;
400400
param->conn.uri.user = tmp.s;
401+
//param->conn.uri.user[tmp.len] = '\0';
401402
begin = socket.s + i + 1;
402403
param->conn.flags |= RMQ_PARAM_USER;
403404
break;
@@ -566,6 +567,7 @@ static evi_reply_sock* rmq_parse(str socket)
566567
goto err;
567568
}
568569

570+
param->conn.state = RMQS_PREINIT;
569571
param->conn.heartbeat = heartbeat;
570572
sock->params = param;
571573
sock->flags |= EVI_PARAMS | RMQ_FLAG | EVI_ASYNC_STATUS;

modules/event_rabbitmq/event_rabbitmq.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,10 @@ extern struct timeval conn_timeout_tv;
7979
extern struct timeval rpc_timeout_tv;
8080

8181
int rmq_error(char const *context, amqp_rpc_reply_t x);
82-
void rmq_destroy_connection(rmq_connection_t *conn);
83-
int rmq_reconnect(rmq_connection_t *conn, int max_frames, str cid);
82+
void rmq_destroy_connection(rmq_connection_t *conn, int temporarely);
83+
int rmq_server_reconnect(rmq_connection_t *conn, int max_frames, str cid);
8484
int amqp_check_status(rmq_connection_t *conn, int r, int* retry, str cid);
85-
int rmq_basic_publish(rmq_connection_t *conn, int max_frames,
85+
int rmq_basic_server_publish(rmq_connection_t *conn, int max_frames,
8686
str *cid, amqp_bytes_t akey, amqp_bytes_t abody,
8787
amqp_basic_properties_t *props, int retries);
8888

modules/event_rabbitmq/rabbitmq_send.c

Lines changed: 259 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ void rmq_free_param(rmq_params_t *rmqp)
151151
shm_free(rmqp->routing_key.s);
152152
}
153153

154-
void rmq_destroy_connection(rmq_connection_t *conn)
154+
void rmq_destroy_connection(rmq_connection_t *conn, int temporarely)
155155
{
156156
switch (conn->state)
157157
{
@@ -165,12 +165,16 @@ void rmq_destroy_connection(rmq_connection_t *conn)
165165
if (amqp_destroy_connection(conn->conn) < 0)
166166
LM_ERR("cannot destroy connection\n");
167167
case RMQS_OFF:
168+
case RMQS_PREINIT:
168169
break;
169170
default:
170171
LM_WARN("Unknown rmq server state %d\n", conn->state);
171172
}
172173

173-
conn->state = RMQS_OFF;
174+
if (temporarely)
175+
conn->state = RMQS_PREINIT;
176+
else
177+
conn->state = RMQS_OFF;
174178

175179
if (conn->tls_dom) {
176180
tls_api.release_domain(conn->tls_dom);
@@ -187,26 +191,274 @@ void rmq_destroy(evi_reply_sock *sock)
187191
if ((sock->flags & EVI_PARAMS) && sock->params) {
188192
rmq_free_param((rmq_params_t *)sock->params);
189193
rmq_params_t *rmqp = (rmq_params_t *)sock->params;
190-
rmq_destroy_connection(&rmqp->conn);
194+
rmq_destroy_connection(&rmqp->conn, 0);
191195
}
192196
shm_free(sock);
193197
}
194198

199+
static int rmq_reconnect(evi_reply_sock *sock)
200+
{
201+
rmq_params_t * rmqp = (rmq_params_t *)sock->params;
202+
rmq_connection_t *conn;
203+
#if defined AMQP_VERSION_v04
204+
amqp_socket_t *amqp_sock;
205+
#endif
206+
int socket;
207+
208+
if (!rmqp) {
209+
LM_ERR("not enough socket info\n");
210+
return -1;
211+
}
212+
213+
conn = &rmqp->conn;
214+
215+
switch (conn->state) {
216+
case RMQS_OFF:
217+
case RMQS_PREINIT:
218+
if (!(conn->conn = amqp_new_connection())) {
219+
LM_ERR("cannot create amqp connection!\n");
220+
return -1;
221+
}
222+
#if defined AMQP_VERSION_v04
223+
if (use_tls && (conn->uri.ssl || (conn->flags&RMQ_PARAM_TLS))) {
224+
if (!conn->tls_dom) {
225+
conn->tls_dom = tls_api.find_client_domain_name(&conn->tls_dom_name);
226+
if (!conn->tls_dom) {
227+
LM_ERR("TLS domain: %.*s not found\n",
228+
conn->tls_dom_name.len, conn->tls_dom_name.s);
229+
return -1;
230+
}
231+
}
232+
233+
amqp_sock = amqp_ssl_socket_new(conn->conn);
234+
if (!amqp_sock) {
235+
LM_ERR("cannot create AMQP TLS socket\n");
236+
return -1;
237+
}
238+
239+
#if AMQP_VERSION < AMQP_VERSION_CODE(0, 10, 0, 0)
240+
/* if amqp_ssl_socket_get_context() is not available, serialize the CA,
241+
* cert and key loading in order to prevent openssl multiprocess issues */
242+
lock_get(ssl_lock);
243+
if (amqp_ssl_socket_set_cacert(amqp_sock, conn->tls_dom->ca.s) !=
244+
AMQP_STATUS_OK) {
245+
LM_ERR("Failed to set CA certificate\n");
246+
lock_release(ssl_lock);
247+
return -1;
248+
}
249+
250+
if (amqp_ssl_socket_set_key(amqp_sock, conn->tls_dom->cert.s,
251+
conn->tls_dom->pkey.s) != AMQP_STATUS_OK) {
252+
LM_ERR("Failed to set certificate and private key\n");
253+
lock_release(ssl_lock);
254+
return -1;
255+
}
256+
lock_release(ssl_lock);
257+
#else
258+
/* point the CA, cert and key from librabbitmq's SSL_CTX to
259+
* the info loaded through the tls_mgm's SSL_CTX, in order to
260+
* prevent openssl multiprocess issues */
261+
void *ssl_ctx;
262+
ssl_ctx = amqp_ssl_socket_get_context(amqp_sock);
263+
264+
/* set CA in AMQP's SSL_CTX */
265+
openssl_api.ctx_set_cert_store(ssl_ctx,
266+
((void**)conn->tls_dom->ctx)[process_no]);
267+
268+
/* set certificate in AMQP's SSL_CTX */
269+
if (openssl_api.ctx_set_cert_chain(ssl_ctx,
270+
((void**)conn->tls_dom->ctx)[process_no]) < 0) {
271+
LM_ERR("Failed to set certificate\n");
272+
return -1;
273+
}
274+
275+
/* set private key in AMQP's SSL_CTX */
276+
if (openssl_api.ctx_set_pkey_file(ssl_ctx, conn->tls_dom->pkey.s) < 0) {
277+
LM_ERR("Failed to set private key\n");
278+
return -1;
279+
}
280+
#endif
281+
282+
#if AMQP_VERSION >= AMQP_VERSION_CODE(0, 8, 0, 0)
283+
amqp_ssl_socket_set_verify_peer(amqp_sock, conn->tls_dom->verify_cert);
284+
amqp_ssl_socket_set_verify_hostname(amqp_sock, 0);
285+
#else
286+
amqp_ssl_socket_set_verify(amqp_sock, conn->tls_dom->verify_cert);
287+
#endif
288+
289+
#if AMQP_VERSION >= AMQP_VERSION_CODE(0, 8, 0, 0)
290+
amqp_tls_version_t method_min, method_max;
291+
292+
if (conn->tls_dom->method != TLS_METHOD_UNSPEC) {
293+
switch (conn->tls_dom->method) {
294+
case TLS_USE_TLSv1:
295+
method_min = AMQP_TLSv1;
296+
break;
297+
case TLS_USE_TLSv1_2:
298+
method_min = AMQP_TLSv1_2;
299+
break;
300+
default:
301+
LM_NOTICE("Unsupported TLS minimum method for AMQP, using TLSv1\n");
302+
method_min = AMQP_TLSv1;
303+
}
304+
} else {
305+
LM_INFO("Minimum TLS method unspecified, using TLSv1\n");
306+
method_min = AMQP_TLSv1;
307+
}
308+
309+
if (conn->tls_dom->method_max != TLS_METHOD_UNSPEC) {
310+
switch (conn->tls_dom->method_max) {
311+
case TLS_USE_TLSv1:
312+
method_max = AMQP_TLSv1;
313+
break;
314+
case TLS_USE_TLSv1_2:
315+
method_max = AMQP_TLSv1_2;
316+
break;
317+
default:
318+
LM_NOTICE("Unsupported TLS maximum method for AMQP, using latest"
319+
" supported by librabbitmq\n");
320+
method_max = AMQP_TLSvLATEST;
321+
}
322+
} else {
323+
method_max = AMQP_TLSvLATEST;
324+
LM_INFO("Maximum TLS method unspecified, using latest supported by"
325+
" librabbitmq\n");
326+
}
327+
328+
if (amqp_ssl_socket_set_ssl_versions(amqp_sock, method_min, method_max) !=
329+
AMQP_STATUS_OK) {
330+
LM_ERR("Failed to set TLS method range\n");
331+
return -1;
332+
}
333+
#endif
334+
} else {
335+
amqp_sock = amqp_tcp_socket_new(conn->conn);
336+
if (!amqp_sock) {
337+
LM_ERR("cannot create AMQP socket\n");
338+
return -1;
339+
}
340+
}
341+
342+
socket = amqp_socket_open_noblock(amqp_sock, sock->address.s,
343+
sock->port, &conn_timeout_tv);
344+
if (socket < 0) {
345+
amqp_connection_close(conn->conn, AMQP_REPLY_SUCCESS);
346+
LM_ERR("cannot open AMQP socket\n");
347+
return -1;
348+
}
349+
#if defined AMQP_VERSION && AMQP_VERSION >= 0x00090000
350+
if (rpc_timeout_tv.tv_sec > 0 &&
351+
amqp_set_rpc_timeout(conn->conn, &rpc_timeout_tv) < 0)
352+
LM_ERR("setting RPC timeout - going blocking\n");
353+
#endif
354+
355+
#else
356+
socket = amqp_open_socket_noblock(sock->address.s, sock->port,
357+
&conn_timeout_tv);
358+
if (socket < 0) {
359+
LM_ERR("cannot open AMQP socket\n");
360+
return -1;
361+
}
362+
amqp_set_sockfd(conn->conn, socket);
363+
#endif
364+
conn->state = RMQS_INIT;
365+
/* fall through */
366+
case RMQS_INIT:
367+
if (rmq_error("Logging in", amqp_login(
368+
conn->conn,
369+
RMQ_DEFAULT_VHOST,
370+
0,
371+
RMQ_DEFAULT_MAX,
372+
conn->heartbeat,
373+
AMQP_SASL_METHOD_PLAIN,
374+
conn->uri.user ? conn->uri.user : RMQ_DEFAULT_UP,
375+
conn->uri.password ? conn->uri.password : RMQ_DEFAULT_UP)))
376+
return -2;
377+
/* all good - return success */
378+
conn->state = RMQS_CONN;
379+
/* fall through */
380+
case RMQS_CONN:
381+
/* don't use more than 1 channel */
382+
amqp_channel_open(conn->conn, RMQ_DEFAULT_CHANNEL);
383+
if (rmq_error("Opening channel", amqp_get_rpc_reply(conn->conn)))
384+
return -2;
385+
LM_INFO("[] successfully connected!\n");
386+
conn->state = RMQS_ON;
387+
/* fall through */
388+
case RMQS_ON:
389+
return 0;
390+
default:
391+
LM_WARN("Unknown rmq server state %d\n", conn->state);
392+
return -1;
393+
}
394+
395+
}
396+
397+
static int rmq_basic_publish(rmq_connection_t *conn, int max_frames,
398+
str *cid, amqp_bytes_t akey, amqp_bytes_t abody,
399+
amqp_basic_properties_t *props, int retries, rmq_send_t *rmqs)
400+
{
401+
int ret;
402+
evi_reply_sock *sock;
403+
404+
if (conn->flags & RMQF_NOPER) {
405+
props->delivery_mode = 2;
406+
props->_flags |= AMQP_BASIC_DELIVERY_MODE_FLAG;
407+
}
408+
409+
do {
410+
sock = rmqs->sock;
411+
LM_INFO("rmq_reconnect()\n");
412+
ret = rmq_reconnect(sock);
413+
414+
if (ret == -1) {
415+
if (amqp_destroy_connection(conn->conn) < 0)
416+
LM_ERR("cannot destroy connection\n");
417+
if (conn->tls_dom) {
418+
tls_api.release_domain(conn->tls_dom);
419+
conn->tls_dom = NULL;
420+
}
421+
LM_ERR("cannot connect to RabbitMQ server %s:%u\n",
422+
conn->uri.host, conn->uri.port);
423+
return ret;
424+
}
425+
if (ret == -2) {
426+
rmq_destroy_connection(conn, 1);
427+
LM_ERR("cannot connect to RabbitMQ server %s:%u\n",
428+
conn->uri.host, conn->uri.port);
429+
return ret;
430+
}
431+
432+
ret = amqp_basic_publish(conn->conn, RMQ_DEFAULT_CHANNEL, conn->exchange, akey, \
433+
(conn->flags & RMQF_MAND), (conn->flags & RMQF_IMM),
434+
props, abody);
435+
ret = amqp_check_status(conn, ret, &retries, *cid);
436+
} while (ret > 0);
437+
438+
return ret;
439+
}
440+
195441
/* sends the buffer */
196442
static int rmq_sendmsg(rmq_send_t *rmqs)
197443
{
198444
rmq_params_t * rmqp = (rmq_params_t *)rmqs->sock->params;
199445
int ret;
200-
int re_publish = 0;
446+
int re_publish = 2;
201447
amqp_basic_properties_t props;
202448

203449
if (!rmqp || !(rmqp->conn.flags & RMQF_MAND)) {
204450
LM_ERR("not enough socket info\n");
205451
return -1;;
206452
}
207453

208-
if (rmqp->conn.state == RMQS_OFF)
454+
/* FIXME:
455+
* We need a new state for un-initialised connections
456+
* Unlike server connections, this ones are not initialised at startup
457+
*/
458+
if (rmqp->conn.state == RMQS_OFF) {
459+
LM_INFO("server disconnected\n");
209460
return 0;
461+
}
210462

211463
rmqp->conn.uri.host = rmqs->sock->address.s;
212464

@@ -218,7 +470,8 @@ static int rmq_sendmsg(rmq_send_t *rmqs)
218470
amqp_cstring_bytes(rmqp->routing_key.s),
219471
amqp_cstring_bytes(rmqs->msg),
220472
((rmqp->conn.flags & RMQF_NOPER)?&props:0),
221-
re_publish);
473+
re_publish,
474+
rmqs);
222475

223476
return ret;
224477
}

0 commit comments

Comments
 (0)