diff --git a/c_src/erlzmq_nif.c b/c_src/erlzmq_nif.c index 6e2eb44..208ee44 100644 --- a/c_src/erlzmq_nif.c +++ b/c_src/erlzmq_nif.c @@ -611,7 +611,7 @@ NIF(erlzmq_nif_recv) } else { enif_mutex_unlock(socket->mutex); - + ErlNifBinary binary; enif_alloc_binary(zmq_msg_size(&msg), &binary); memcpy(binary.data, zmq_msg_data(&msg), zmq_msg_size(&msg)); @@ -753,6 +753,9 @@ static void * polling_thread(void * handle) erlzmq_thread_request_t * r = vector_get(erlzmq_thread_request_t, &requests, i); if (item->revents & ZMQ_POLLIN) { + size_t value_len = sizeof(int64_t); + int64_t flag_value = 0; + assert(r->type == ERLZMQ_THREAD_REQUEST_RECV); --count; @@ -760,7 +763,10 @@ static void * polling_thread(void * handle) zmq_msg_init(&msg); enif_mutex_lock(r->data.recv.socket->mutex); if (zmq_recv(r->data.recv.socket->socket_zmq, &msg, - r->data.recv.flags)) + r->data.recv.flags) || + (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON && + zmq_getsockopt(r->data.recv.socket->socket_zmq, + ZMQ_RCVMORE, &flag_value, &value_len)) ) { enif_mutex_unlock(r->data.recv.socket->mutex); if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) { @@ -790,14 +796,24 @@ static void * polling_thread(void * handle) zmq_msg_close(&msg); if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) { + ERL_NIF_TERM flags_list; + + // Should we send the multipart flag + if(flag_value == 1) { + flags_list = enif_make_list1(r->data.recv.env, enif_make_atom(r->data.recv.env, "rcvmore")); + } else { + flags_list = enif_make_list(r->data.recv.env, 0); + } + enif_send(NULL, &r->data.recv.pid, r->data.recv.env, - enif_make_tuple3(r->data.recv.env, + enif_make_tuple4(r->data.recv.env, enif_make_atom(r->data.recv.env, "zmq"), enif_make_tuple2(r->data.recv.env, enif_make_uint64(r->data.recv.env, r->data.recv.socket->socket_index), enif_make_resource(r->data.recv.env, r->data.recv.socket)), - enif_make_binary(r->data.recv.env, &binary))); + enif_make_binary(r->data.recv.env, &binary), + flags_list)); enif_free_env(r->data.recv.env); r->data.recv.env = enif_alloc_env(); item->revents = 0; diff --git a/test/erlzmq_test.erl b/test/erlzmq_test.erl index 03d4dbb..ab2df72 100644 --- a/test/erlzmq_test.erl +++ b/test/erlzmq_test.erl @@ -129,9 +129,17 @@ create_bound_pair(Ctx, Type1, Type2, Mode, Transport) -> {S1, S2}. ping_pong({S1, S2}, Msg, active) -> + ok = erlzmq:send(S1, Msg, [sndmore]), ok = erlzmq:send(S1, Msg), receive - {zmq, S2, Msg} -> + {zmq, S2, Msg, [rcvmore]} -> + ok + after + 1000 -> + ?assertMatch({ok, Msg}, timeout) + end, + receive + {zmq, S2, Msg, []} -> ok after 1000 -> @@ -139,7 +147,7 @@ ping_pong({S1, S2}, Msg, active) -> end, ok = erlzmq:send(S2, Msg), receive - {zmq, S1, Msg} -> + {zmq, S1, Msg, []} -> ok after 1000 -> @@ -147,7 +155,7 @@ ping_pong({S1, S2}, Msg, active) -> end, ok = erlzmq:send(S1, Msg), receive - {zmq, S2, Msg} -> + {zmq, S2, Msg, []} -> ok after 1000 -> @@ -155,18 +163,23 @@ ping_pong({S1, S2}, Msg, active) -> end, ok = erlzmq:send(S2, Msg), receive - {zmq, S1, Msg} -> + {zmq, S1, Msg, []} -> ok after 1000 -> ?assertMatch({ok, Msg}, timeout) end, ok; + ping_pong({S1, S2}, Msg, passive) -> ok = erlzmq:send(S1, Msg), ?assertMatch({ok, Msg}, erlzmq:recv(S2)), ok = erlzmq:send(S2, Msg), ?assertMatch({ok, Msg}, erlzmq:recv(S1)), + ok = erlzmq:send(S1, Msg, [sndmore]), + ok = erlzmq:send(S1, Msg), + ?assertMatch({ok, Msg}, erlzmq:recv(S2)), + ?assertMatch({ok, Msg}, erlzmq:recv(S2)), ok. basic_tests(Transport, Type1, Type2, Mode) ->