Skip to content
Permalink
Browse files

Fix ZMQ bugs and add DNS TCP references.

We use an extension to the ZMQ_ROUTER protocol to support sending
and receiving packets through multiple hosts.  When one binds or
connects using ZMQ the address provided just establishes the physical
link but does NOT provide a protocol connection.  The protocol requires
an additional name which is stored as the ZMQ_IDENTITY for each socket
and is set by the application.

Before the patch the address and protocol name were the same for bind
sockets and "address:random_number" for connect sockets.  This meant
that if the server used "tcp://localhost:6713" for the address and proto
id the client had to do the same.  So if the client specified
"tcp://127.0.0.1:6713" it would be able to make the TCP connection but
the ZMQ protocol would fail to send packets because the strings weren't
the same for both client and server.

As part of this patch I've added the ability to specify both a TCP address
and a protocol ID. The new string has the format:

    zmq_id|tcp://host:port

So now the server could specify "SERVER|tcp://localhost:6713" and the
client can use "SERVER|tcp://127.0.0.1:6713" and connect at both the
link and protocol level.

A connect socket uses an automatically generated ID of
"hostname:random_number".  We could extend connect sockets to take a
protocol ID but it's not needed for us at this time.

With this change we can now use all the various methods of specifying the
physical link that ZMQ supports.  So these all work for the server bind:

    SERVER|tcp://*:6713          - Listen on all interfaces
    SERVER|tcp://eth0:6713       - Listen on the eth0 interface
    SERVER|tcp://localhost:6713
    SERVER|tcp://127.0.0.1:6713

ZMQ also supports IPv6 addresses.
  • Loading branch information...
tacketar authored and PerilousApricot committed Mar 30, 2017
1 parent 02101a3 commit c66d30671f6946b6bf65d5954899aafee294a018
Showing with 103 additions and 49 deletions.
  1. +21 −0 src/gop/mq_helpers.c
  2. +1 −1 src/gop/mq_helpers.h
  3. +6 −4 src/gop/mq_portal.c
  4. +25 −12 src/gop/mq_zmq.c
  5. +31 −24 test/mq_test.c
  6. +19 −8 test/mqs_test.c
@@ -36,6 +36,27 @@

static tbx_atomic_unit32_t _id_counter = 0;

//***********************************************************************
// mq_id_bytes - Returns hte number of bytes for the remote id.
// Assumes the host string is of the form: id|tcp://address:port
// Returns the number of bytes for the ID
//***********************************************************************

int mq_id_bytes(char *host, int len)
{
int i;

if (host == NULL) return(0);

i = 0;
do {
if (host[i] == '|') break;
i++;
} while (i<len);

return(i);
}

//***********************************************************************
// mq_make_id_frame - Makes and generates and ID frame
//***********************************************************************
@@ -31,7 +31,7 @@ extern "C" {
gop_mq_frame_t *mq_make_id_frame();
int mq_num_frames(mq_msg_t *msg);
char *mq_address_to_string(mq_msg_t *address);

int mq_id_bytes(char *host, int len);

#ifdef __cplusplus
}
@@ -35,6 +35,7 @@
#include "gop.h"
#include "gop/portal.h"
#include "mq_portal.h"
#include "mq_helpers.h"
#include "thread_pool.h"

//** Poll index for connection monitoring
@@ -297,7 +298,7 @@ int mq_task_send(gop_mq_context_t *mqc, gop_mq_task_t *task)
gop_mq_portal_t *p;
gop_mq_frame_t *f;
char *host;
int size;
int size, n;

f = gop_mq_msg_first(task->msg);

@@ -306,13 +307,14 @@ int mq_task_send(gop_mq_context_t *mqc, gop_mq_task_t *task)
gop_mq_get_frame(f, (void **)&host, &size);

//** Look up the portal
n = mq_id_bytes(host, size);
apr_thread_mutex_lock(mqc->lock);
p = (gop_mq_portal_t *)(apr_hash_get(mqc->client_portals, host, size));
p = (gop_mq_portal_t *)(apr_hash_get(mqc->client_portals, host, n));
if (p == NULL) { //** New host so create the portal
FATAL_UNLESS(host != NULL);
log_printf(10, "Creating MQ_CMODE_CLIENT portal for outgoing connections host = %s size = %d\n", host, size);
log_printf(10, "Creating MQ_CMODE_CLIENT portal for outgoing connections host = %s size = %d id_bytes=%d\n", host, size, n);
p = gop_mq_portal_create(mqc, host, MQ_CMODE_CLIENT);
apr_hash_set(mqc->client_portals, p->host, APR_HASH_KEY_STRING, p);
apr_hash_set(mqc->client_portals, p->host, n, p);
}
apr_thread_mutex_unlock(mqc->lock);

@@ -35,6 +35,7 @@
#include <unistd.h>

#include "mq_portal.h"
#include "mq_helpers.h"

//*************************************************************
// Native routines
@@ -56,21 +57,26 @@ int zero_native_bind(gop_mq_socket_t *socket, const char *format, ...)
int err, n;
char id[256];

va_start(args, format);
snprintf(id, 255, format, args);
n = mq_id_bytes(id, strlen(id));
if (socket->type != MQ_PAIR) {
va_start(args, format);
snprintf(id, 255, format, args);
err = zmq_setsockopt(socket->arg, ZMQ_IDENTITY, id, strlen(id));
err = zmq_setsockopt(socket->arg, ZMQ_IDENTITY, id, n);
if (err != 0) {
log_printf(0, "ERROR setting socket identity! id=%s err=%d errno=%d\n", id, err, errno);
return(-1);
}
log_printf(0, "id=%s err=%d\n", id, err);
} else {
id[0] = 0;
}

snprintf(id, 255, format, args);
err = zmq_bind(socket->arg, id);
if (id[n] == '|') {
n++;
} else {
n = 0;
}

err = zmq_bind(socket->arg, &(id[n]));
n = errno;
va_end(args);

@@ -84,7 +90,7 @@ log_printf(0, "id=%s err=%d\n", id, err);
int zero_native_connect(gop_mq_socket_t *socket, const char *format, ...)
{
va_list args;
int err;
int err, n;
char buf[255], id[256];

if (socket->type != MQ_PAIR) {
@@ -95,7 +101,7 @@ int zero_native_connect(gop_mq_socket_t *socket, const char *format, ...)
va_start(args, format);
//** Set the ID
if (socket->type != MQ_PAIR) {
snprintf(buf, 255, format, args);
gethostname(buf, sizeof(buf));
snprintf(id, 255, "%s:" I64T , buf, tbx_random_get_int64(1, 1000000));
err = zmq_setsockopt(socket->arg, ZMQ_IDENTITY, id, strlen(id));
if (err != 0) {
@@ -106,7 +112,13 @@ int zero_native_connect(gop_mq_socket_t *socket, const char *format, ...)
}

snprintf(id, 255, format, args);
err = zmq_connect(socket->arg, id);
n = mq_id_bytes(id, strlen(id));
if (id[n] == '|') {
n++;
} else {
n = 0;
}
err = zmq_connect(socket->arg, &(id[n]));
va_end(args);

return(err);
@@ -121,7 +133,7 @@ int zero_native_disconnect(gop_mq_socket_t *socket, const char *format, ...)
char id[256];

va_start(args, format);

snprintf(id, 255, format, args);
err = zmq_disconnect(socket->arg, id);
va_end(args);
@@ -148,7 +160,7 @@ int zero_native_monitor(gop_mq_socket_t *socket, char *address, int events)
int zero_native_send(gop_mq_socket_t *socket, mq_msg_t *msg, int flags)
{
gop_mq_frame_t *f, *fn;
int n, loop, bytes;
int n, loop, bytes, len;

int count = 0;

@@ -161,9 +173,10 @@ int zero_native_send(gop_mq_socket_t *socket, mq_msg_t *msg, int flags)
}

while ((fn = gop_mq_msg_next(msg)) != NULL) {
len = (count > 0) ? f->len : mq_id_bytes(f->data, f->len); //** 1st frame we need to tweak the address
loop = 0;
do {
bytes = zmq_send(socket->arg, f->data, f->len, ZMQ_SNDMORE);
bytes = zmq_send(socket->arg, f->data, len, ZMQ_SNDMORE);
if (bytes == -1) {
if (errno == EHOSTUNREACH) {
usleep(100);
@@ -56,7 +56,9 @@ tbx_stack_t *deferred_pending = NULL;
gop_mq_command_stats_t server_stats;
gop_mq_portal_t *server_portal = NULL;

char *host = "tcp://127.0.0.1:6714";
char *server_host = "SERVER|tcp://127.0.0.1:6714";
char *client_host = "SERVER|tcp://localhost:6714";

mq_pipe_t control_efd[2];
mq_pipe_t server_efd[2];
int shutdown_everything = 0;
@@ -66,7 +68,7 @@ tbx_atomic_unit32_t ping_count = 0;
// pack_msg - Packs a message for sending
//***************************************************************************

mq_msg_t *pack_msg(int dotrack, test_data_t *td)
mq_msg_t *pack_msg(int dotrack, test_data_t *td, char *host)
{
mq_msg_t *msg;

@@ -179,9 +181,9 @@ int client_direct()
sleep(1); //** Wait for the server to start up and bind to the port
ctx = gop_mq_socket_context_new();
sock = gop_mq_socket_new(ctx, MQ_TRACE_ROUTER);
err = gop_mq_connect(sock, host);
err = gop_mq_connect(sock, client_host);
if (err != 0) {
log_printf(0, "ERROR: Failed connecting to host=%s error=%d\n", host, err);
log_printf(0, "ERROR: Failed connecting to host=%s error=%d\n", client_host, err);
status = 1;
goto fail;
}
@@ -190,7 +192,7 @@ int client_direct()

//** Compose the PING message
msg = gop_mq_msg_new();
gop_mq_msg_append_mem(msg, host, strlen(host), MQF_MSG_KEEP_DATA);
gop_mq_msg_append_mem(msg, client_host, strlen(client_host), MQF_MSG_KEEP_DATA);
gop_mq_msg_append_mem(msg, NULL, 0, MQF_MSG_KEEP_DATA);
gop_mq_msg_append_mem(msg, MQF_VERSION_KEY, MQF_VERSION_SIZE, MQF_MSG_KEEP_DATA);
gop_mq_msg_append_mem(msg, MQF_PING_KEY, MQF_PING_SIZE, MQF_MSG_KEEP_DATA);
@@ -200,7 +202,7 @@ int client_direct()
//** Send the message
err = gop_mq_send(sock, msg, 0);
if (err != 0) {
log_printf(0, "ERROR: Failed sending PING message to host=%s error=%d\n", host, err);
log_printf(0, "ERROR: Failed sending PING message to host=%s error=%d\n", client_host, err);
status = 1;
goto fail;
}
@@ -214,14 +216,14 @@ int client_direct()
pfd.events = MQ_POLLIN;
err = gop_mq_poll(&pfd, 1, 5000); //** Wait for 5 secs
if (err != 1) {
log_printf(0, "ERROR: Failed polling for PING message response to host=%s\n", host);
log_printf(0, "ERROR: Failed polling for PING message response to host=%s\n", client_host);
status = 1;
goto fail;
}
err = gop_mq_recv(sock, msg, MQ_DONTWAIT);
// err = gop_mq_recv(sock, msg, 0);
if (err != 0) {
log_printf(0, "ERROR: Failed recving PONG response message from host=%s error=%d\n", host, err);
log_printf(0, "ERROR: Failed recving PONG response message from host=%s error=%d\n", client_host, err);
status = 1;
goto fail;
}
@@ -283,7 +285,7 @@ int client_exec_ping_test(gop_mq_context_t *mqc)

//** Compose the PING message
msg = gop_mq_msg_new();
gop_mq_msg_append_mem(msg, host, strlen(host), MQF_MSG_KEEP_DATA);
gop_mq_msg_append_mem(msg, client_host, strlen(client_host), MQF_MSG_KEEP_DATA);
gop_mq_msg_append_mem(msg, NULL, 0, MQF_MSG_KEEP_DATA);
gop_mq_msg_append_mem(msg, MQF_VERSION_KEY, MQF_VERSION_SIZE, MQF_MSG_KEEP_DATA);
gop_mq_msg_append_mem(msg, MQF_EXEC_KEY, MQF_EXEC_SIZE, MQF_MSG_KEEP_DATA);
@@ -298,7 +300,7 @@ int client_exec_ping_test(gop_mq_context_t *mqc)
err = gop_waitall(gop);
gop_free(gop, OP_DESTROY);
if (err != OP_STATE_SUCCESS) {
log_printf(0, "ERROR: Failed sending PING message to host=%s error=%d\n", host, err);
log_printf(0, "ERROR: Failed sending PING message to host=%s error=%d\n", client_host, err);
status = 1;
goto fail;
}
@@ -382,7 +384,7 @@ int client_trackexec_ping_test(gop_mq_context_t *mqc, int delay, int address_rep
td.command = CMD_PING;
td.delay = delay;
td.address_reply = address_reply;
msg = pack_msg(1, &td);
msg = pack_msg(1, &td, client_host);

gop = gop_mq_op_new(mqc, msg, client_response_pong, NULL, NULL, dt);
gop_set_private(gop, &td);
@@ -391,7 +393,7 @@ int client_trackexec_ping_test(gop_mq_context_t *mqc, int delay, int address_rep
err = gop_waitall(gop);
gop_free(gop, OP_DESTROY);
if (err != success_value) {
log_printf(0, "ERROR: Recving PONG message to host=%s error=%d\n", host, err);
log_printf(0, "ERROR: Recving PONG message to host=%s error=%d\n", client_host, err);
status = 1;
goto fail;
}
@@ -443,7 +445,7 @@ void generate_tasks(gop_mq_context_t *mqc, gop_opque_t *q, int count, test_data_
tbx_type_malloc(td, test_data_t, 1);
*td = *td_base;

msg = pack_msg(1, td);
msg = pack_msg(1, td, client_host);
gop = gop_mq_op_new(mqc, msg, client_response_pong, td, free, td->dt);
td->id = gop_id(gop);
gop_set_private(gop, td);
@@ -797,7 +799,7 @@ int server_handle_deferred(gop_mq_socket_t *sock)
}

if (err != 0) {
log_printf(0, "ERROR: Failed sending deferred message to host=%s error=%d\n", host, err);
log_printf(0, "ERROR: Failed sending deferred message to host=%s error=%d\n", client_host, err);
}

free(defer);
@@ -858,7 +860,7 @@ int proc_trackexec_ping(gop_mq_portal_t *p, gop_mq_socket_t *sock, mq_msg_t *msg

//** Make the trackaddress response if needed
log_printf(5, "address_reply=%d\n", td->address_reply);
track_response = (td->address_reply == 1) ? gop_mq_msg_trackaddress(host, msg, pid, 1) : NULL;
track_response = (td->address_reply == 1) ? gop_mq_msg_trackaddress(server_host, msg, pid, 1) : NULL;

if (td->delay == 0) {
gop_mq_get_frame(pid, (void **)&data, &size);;
@@ -876,7 +878,7 @@ int proc_trackexec_ping(gop_mq_portal_t *p, gop_mq_socket_t *sock, mq_msg_t *msg
}

if (err != 0) {
log_printf(0, "ERROR: Failed sending PONG message to host=%s error=%d\n", host, err);
log_printf(0, "ERROR: Failed sending PONG message to host=%s error=%d\n", client_host, err);
}

} else if (td->delay > 0) {
@@ -909,7 +911,7 @@ int proc_trackexec_ping(gop_mq_portal_t *p, gop_mq_socket_t *sock, mq_msg_t *msg


if (err != 0) {
log_printf(0, "ERROR: Failed sending TRACKADDRESS message to host=%s error=%d\n", host, err);
log_printf(0, "ERROR: Failed sending TRACKADDRESS message to host=%s error=%d\n", client_host, err);
}
}

@@ -992,9 +994,9 @@ void *server_test_raw_socket()

ctx = gop_mq_socket_context_new();
sock = gop_mq_socket_new(ctx, MQ_TRACE_ROUTER);
err = gop_mq_bind(sock, host);
err = gop_mq_bind(sock, server_host);
if (err != 0) {
log_printf(0, "ERROR: Failed connecting to host=%s error=%d errno=%d\n", host, err, errno);
log_printf(0, "ERROR: Failed connecting to host=%s error=%d errno=%d\n", server_host, err, errno);
goto fail;
}

@@ -1100,7 +1102,7 @@ void server_test_mq_loop()
mqc = server_make_context();

//** Make the server portal
server_portal = gop_mq_portal_create(mqc, host, MQ_CMODE_SERVER);
server_portal = gop_mq_portal_create(mqc, server_host, MQ_CMODE_SERVER);
table = gop_mq_portal_command_table(server_portal);
gop_mq_command_set(table, MQF_PING_KEY, MQF_PING_SIZE, NULL, cb_ping);

@@ -1219,7 +1221,8 @@ int main(int argc, char **argv)

if (argc < 2) {
printf("mq_test [-d log_level] [-log logfile] [-host url]\n");
printf(" -host url Defaults to %s\n", host);
printf(" -server_host url Defaults to %s\n", server_host);
printf(" -client_host url Defaults to %s\n", client_host);
return(0);
}

@@ -1237,9 +1240,13 @@ int main(int argc, char **argv)
i++;
logfile = argv[i];
i++;
} else if (strcmp(argv[i], "-host") == 0) { //** Alternate host
} else if (strcmp(argv[i], "-server_host") == 0) { //** Alternate host
i++;
server_host = argv[i];
i++;
} else if (strcmp(argv[i], "-client_host") == 0) { //** Alternate host
i++;
host = argv[i];
client_host = argv[i];
i++;
} else if (strcmp(argv[i], "-h") == 0) { //** Print help
printf("mq_test [-d log_level]\n");
@@ -1251,7 +1258,7 @@ int main(int argc, char **argv)
tbx_set_log_level(dlevel);

printf("log_level=%d\n", _log_level);
printf("host=%s\n", host);
printf("server_host=%s client_host=%s\n", server_host, client_host);

gop_init_opque_system();

0 comments on commit c66d306

Please sign in to comment.
You can’t perform that action at this time.