Skip to content

Commit

Permalink
Merge branch 'master' of git://github.com/zeromq/erlzmq2 into robustify
Browse files Browse the repository at this point in the history
  • Loading branch information
garrett committed Jun 11, 2011
2 parents 5f5f0dc + 5442d91 commit ae447bd
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 8 deletions.
24 changes: 20 additions & 4 deletions c_src/erlzmq_nif.c
Expand Up @@ -611,7 +611,7 @@ NIF(erlzmq_nif_recv)
}
else {
enif_mutex_unlock(socket->mutex);

ErlNifBinary binary;
enif_alloc_binary(zmq_msg_size(&msg), &binary);
memcpy(binary.data, zmq_msg_data(&msg), zmq_msg_size(&msg));
Expand Down Expand Up @@ -753,14 +753,20 @@ static void * polling_thread(void * handle)
erlzmq_thread_request_t * r = vector_get(erlzmq_thread_request_t,
&requests, i);
if (item->revents & ZMQ_POLLIN) {
size_t value_len = sizeof(int64_t);
int64_t flag_value = 0;

assert(r->type == ERLZMQ_THREAD_REQUEST_RECV);
--count;

zmq_msg_t msg;
zmq_msg_init(&msg);
enif_mutex_lock(r->data.recv.socket->mutex);
if (zmq_recv(r->data.recv.socket->socket_zmq, &msg,
r->data.recv.flags))
r->data.recv.flags) ||
(r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON &&
zmq_getsockopt(r->data.recv.socket->socket_zmq,
ZMQ_RCVMORE, &flag_value, &value_len)) )
{
enif_mutex_unlock(r->data.recv.socket->mutex);
if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) {
Expand Down Expand Up @@ -790,14 +796,24 @@ static void * polling_thread(void * handle)
zmq_msg_close(&msg);

if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) {
ERL_NIF_TERM flags_list;

// Should we send the multipart flag
if(flag_value == 1) {
flags_list = enif_make_list1(r->data.recv.env, enif_make_atom(r->data.recv.env, "rcvmore"));
} else {
flags_list = enif_make_list(r->data.recv.env, 0);
}

enif_send(NULL, &r->data.recv.pid, r->data.recv.env,
enif_make_tuple3(r->data.recv.env,
enif_make_tuple4(r->data.recv.env,
enif_make_atom(r->data.recv.env, "zmq"),
enif_make_tuple2(r->data.recv.env,
enif_make_uint64(r->data.recv.env,
r->data.recv.socket->socket_index),
enif_make_resource(r->data.recv.env, r->data.recv.socket)),
enif_make_binary(r->data.recv.env, &binary)));
enif_make_binary(r->data.recv.env, &binary),
flags_list));
enif_free_env(r->data.recv.env);
r->data.recv.env = enif_alloc_env();
item->revents = 0;
Expand Down
21 changes: 17 additions & 4 deletions test/erlzmq_test.erl
Expand Up @@ -129,44 +129,57 @@ create_bound_pair(Ctx, Type1, Type2, Mode, Transport) ->
{S1, S2}.

ping_pong({S1, S2}, Msg, active) ->
ok = erlzmq:send(S1, Msg, [sndmore]),
ok = erlzmq:send(S1, Msg),
receive
{zmq, S2, Msg} ->
{zmq, S2, Msg, [rcvmore]} ->
ok
after
1000 ->
?assertMatch({ok, Msg}, timeout)
end,
receive
{zmq, S2, Msg, []} ->
ok
after
1000 ->
?assertMatch({ok, Msg}, timeout)
end,
ok = erlzmq:send(S2, Msg),
receive
{zmq, S1, Msg} ->
{zmq, S1, Msg, []} ->
ok
after
1000 ->
?assertMatch({ok, Msg}, timeout)
end,
ok = erlzmq:send(S1, Msg),
receive
{zmq, S2, Msg} ->
{zmq, S2, Msg, []} ->
ok
after
1000 ->
?assertMatch({ok, Msg}, timeout)
end,
ok = erlzmq:send(S2, Msg),
receive
{zmq, S1, Msg} ->
{zmq, S1, Msg, []} ->
ok
after
1000 ->
?assertMatch({ok, Msg}, timeout)
end,
ok;

ping_pong({S1, S2}, Msg, passive) ->
ok = erlzmq:send(S1, Msg),
?assertMatch({ok, Msg}, erlzmq:recv(S2)),
ok = erlzmq:send(S2, Msg),
?assertMatch({ok, Msg}, erlzmq:recv(S1)),
ok = erlzmq:send(S1, Msg, [sndmore]),
ok = erlzmq:send(S1, Msg),
?assertMatch({ok, Msg}, erlzmq:recv(S2)),
?assertMatch({ok, Msg}, erlzmq:recv(S2)),
ok.

basic_tests(Transport, Type1, Type2, Mode) ->
Expand Down

0 comments on commit ae447bd

Please sign in to comment.