Permalink
Browse files

Merge branch 'master' into 2-1

Conflicts:
	Makefile
  • Loading branch information...
2 parents fd99a31 + b6a6abf commit 3a36833c89f9709b9ce0b67352a686fa3de1a7d9 @yrashk yrashk committed Jun 3, 2011
Showing with 52 additions and 29 deletions.
  1. +1 −1 Makefile
  2. +27 −2 c_src/erlzmq_nif.c
  3. +1 −3 c_src/vector.h
  4. +1 −1 include/erlzmq.hrl
  5. +20 −20 src/erlzmq.erl
  6. +2 −2 test/erlzmq_test.erl
View
@@ -8,7 +8,7 @@ ZMQ_FLAGS=
endif
ifndef ZEROMQ_VERSION
-ZEROMQ_VERSION=v2.1.4
+ZEROMQ_VERSION=v2.1.7
endif
all: perf
View
@@ -506,6 +506,10 @@ NIF(erlzmq_nif_send)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
enif_mutex_lock(socket->context->mutex);
+ if (socket->context->thread_socket_name == NULL) {
+ enif_mutex_unlock(socket->context->mutex);
+ return return_zmq_errno(env, ETERM);
+ }
if (zmq_send(socket->context->thread_socket, &msg, 0)) {
enif_mutex_unlock(socket->context->mutex);
@@ -580,6 +584,10 @@ NIF(erlzmq_nif_recv)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
enif_mutex_lock(socket->context->mutex);
+ if (socket->context->thread_socket_name == NULL) {
+ enif_mutex_unlock(socket->context->mutex);
+ return return_zmq_errno(env, ETERM);
+ }
if (zmq_send(socket->context->thread_socket, &msg, 0)) {
enif_mutex_unlock(socket->context->mutex);
@@ -636,6 +644,17 @@ NIF(erlzmq_nif_close)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
enif_mutex_lock(socket->context->mutex);
+ if (socket->context->thread_socket_name == NULL) {
+ // context is gone
+ enif_mutex_lock(socket->mutex);
+ zmq_msg_close(&msg);
+ zmq_close(socket->socket_zmq);
+ enif_mutex_unlock(socket->mutex);
+ enif_mutex_destroy(socket->mutex);
+ enif_release_resource(socket);
+ enif_mutex_unlock(socket->context->mutex);
+ return enif_make_atom(env, "ok");
+ }
if (zmq_send(socket->context->thread_socket, &msg, 0)) {
enif_mutex_unlock(socket->context->mutex);
zmq_msg_close(&msg);
@@ -901,6 +920,11 @@ static void * polling_thread(void * handle)
zmq_msg_close(&msg);
}
else if (r->type == ERLZMQ_THREAD_REQUEST_TERM) {
+ enif_mutex_lock(context->mutex);
+ free(context->thread_socket_name);
+ // use this to flag context is over
+ context->thread_socket_name = NULL;
+ enif_mutex_unlock(context->mutex);
// cleanup pending requests
for (i = 1; i < vector_count(&requests); ++i) {
erlzmq_thread_request_t * r_old = vector_get(erlzmq_thread_request_t,
@@ -922,9 +946,10 @@ static void * polling_thread(void * handle)
zmq_close(thread_socket);
zmq_close(context->thread_socket);
enif_mutex_unlock(context->mutex);
- enif_mutex_destroy(context->mutex);
- free(context->thread_socket_name);
zmq_term(context->context_zmq);
+ enif_mutex_lock(context->mutex);
+ enif_mutex_unlock(context->mutex);
+ enif_mutex_destroy(context->mutex);
enif_release_resource(context);
// notify the waiting request
enif_send(NULL, &r->data.term.pid, r->data.term.env,
View
@@ -40,9 +40,7 @@
#ifndef VECTOR_H
#define VECTOR_H
-#ifndef size_t
-typedef unsigned long size_t;
-#endif
+#include <stddef.h>
typedef struct {
View
@@ -106,7 +106,7 @@
%% @type erlzmq_socket() = binary().
%% An opaque handle to an erlzmq socket.
--opaque erlzmq_socket() :: binary().
+-opaque erlzmq_socket() :: {pos_integer(), binary()}.
%% @type erlzmq_send_recv_flag() = noblock | sndmore | recvmore | {timeout, timeout()}.
%% The individual flags to use with {@link erlzmq:send/3. send/3}
View
@@ -89,8 +89,8 @@ context(Threads) when is_integer(Threads) ->
Type :: erlzmq_socket_type() |
list(erlzmq_socket_type() |
{active, boolean()})) ->
- {ok, {pos_integer(), erlzmq_socket()}} |
- erlzmq_error().
+ {ok, erlzmq_socket()} |
+ erlzmq_error().
socket(Context, Type) when is_atom(Type) ->
socket(Context, [Type]);
socket(Context, [H | _] = L) ->
@@ -114,7 +114,7 @@ socket(Context, [H | _] = L) ->
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_bind">zmq_bind</a>.</i>
%% @end
--spec bind(SocketTuple :: {pos_integer(), erlzmq_socket()},
+-spec bind(Socket :: erlzmq_socket(),
Endpoint :: erlzmq_endpoint()) ->
ok |
erlzmq_error().
@@ -127,7 +127,7 @@ bind({I, Socket}, Endpoint)
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_connect">zmq_connect</a>.</i>
%% @end
--spec connect(SocketTuple :: {pos_integer(), erlzmq_socket()},
+-spec connect(Socket :: erlzmq_socket(),
Endpoint :: erlzmq_endpoint()) ->
ok |
erlzmq_error().
@@ -136,19 +136,19 @@ connect({I, Socket}, Endpoint)
erlzmq_nif:connect(Socket, Endpoint).
%% @equiv send(Socket, Msg, [])
--spec send(SocketTuple :: {pos_integer(), erlzmq_socket()},
+-spec send(Socket :: erlzmq_socket(),
Data :: erlzmq_data()) ->
ok |
erlzmq_error().
-send(SocketTuple, Binary) when is_binary(Binary) ->
- send(SocketTuple, Binary, []).
+send(Socket, Binary) when is_binary(Binary) ->
+ send(Socket, Binary, []).
%% @doc Send a message on a socket.
%% <br />
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_send">zmq_send</a>.</i>
%% @end
--spec send(SocketTuple :: {pos_integer(), erlzmq_socket()},
+-spec send(Socket :: erlzmq_socket(),
Data :: erlzmq_data(),
Flags :: erlzmq_send_recv_flags()) ->
ok |
@@ -168,18 +168,18 @@ send({I, Socket}, Binary, Flags)
end.
%% @equiv recv(Socket, 0)
--spec recv(SocketTuple :: {pos_integer(), erlzmq_socket()}) ->
+-spec recv(Socket :: erlzmq_socket()) ->
{ok, erlzmq_data()} |
erlzmq_error().
-recv(SocketTuple) ->
- recv(SocketTuple, []).
+recv(Socket) ->
+ recv(Socket, []).
%% @doc Receive a message from a socket.
%% <br />
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_recv">zmq_recv</a>.</i>
%% @end
--spec recv(SocketTuple :: {pos_integer(), erlzmq_socket()},
+-spec recv(Socket :: erlzmq_socket(),
Flags :: erlzmq_send_recv_flags()) ->
{ok, erlzmq_data()} |
erlzmq_error() |
@@ -204,13 +204,13 @@ recv({I, Socket}, Flags)
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_setsockopt">zmq_setsockopt</a>.</i>
%% @end
--spec setsockopt(SocketTuple :: {pos_integer(), erlzmq_socket()},
+-spec setsockopt(Socket :: erlzmq_socket(),
Name :: erlzmq_sockopt(),
erlzmq_sockopt_value()) ->
ok |
erlzmq_error().
-setsockopt(SocketTuple, Name, Value) when is_list(Value) ->
- setsockopt(SocketTuple, Name, erlang:list_to_binary(Value));
+setsockopt(Socket, Name, Value) when is_list(Value) ->
+ setsockopt(Socket, Name, erlang:list_to_binary(Value));
setsockopt({I, Socket}, Name, Value) when is_integer(I), is_atom(Name) ->
erlzmq_nif:setsockopt(Socket, option_name(Name), Value).
@@ -219,26 +219,26 @@ setsockopt({I, Socket}, Name, Value) when is_integer(I), is_atom(Name) ->
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_getsockopt">zmq_getsockopt</a>.</i>
%% @end
--spec getsockopt(SocketTuple :: {pos_integer(), erlzmq_socket()},
+-spec getsockopt(Socket :: erlzmq_socket(),
Name :: erlzmq_sockopt()) ->
{ok, erlzmq_sockopt_value()} |
erlzmq_error().
getsockopt({I, Socket}, Name) when is_integer(I), is_atom(Name) ->
erlzmq_nif:getsockopt(Socket, option_name(Name)).
%% @equiv close(Socket, infinity)
--spec close(SocketTuple :: {pos_integer(), erlzmq_socket()}) ->
+-spec close(Socket :: erlzmq_socket()) ->
ok |
erlzmq_error().
-close(SocketTuple) ->
- close(SocketTuple, infinity).
+close(Socket) ->
+ close(Socket, infinity).
%% @doc Close the given socket.
%% <br />
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_close">zmq_close</a>.</i>
%% @end
--spec close(SocketTuple :: {pos_integer(), erlzmq_socket()},
+-spec close(Socket :: erlzmq_socket(),
Timeout :: timeout()) ->
ok |
erlzmq_error().
View
@@ -83,10 +83,10 @@ shutdown_blocking_test() ->
shutdown_blocking_unblocking_test() ->
{ok, C} = erlzmq:context(),
{ok, S} = erlzmq:socket(C, [pub, {active, false}]),
- erlzmq:close(S),
- V = erlzmq:term(C, 0),
+ V = erlzmq:term(C, 500),
?assertMatch({error, {timeout, _}}, V),
{error, {timeout, Ref}} = V,
+ erlzmq:close(S),
receive
{Ref, ok} ->
ok

0 comments on commit 3a36833

Please sign in to comment.