Skip to content

Commit

Permalink
return an array of events from poll, one per socket passed into it, None
Browse files Browse the repository at this point in the history
for no data or Some <mask> otherwise
  • Loading branch information
joelreymont committed May 17, 2011
1 parent d13cfea commit 389b77f
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 27 deletions.
8 changes: 4 additions & 4 deletions src/ZMQ.ml
Expand Up @@ -258,12 +258,12 @@ module Poll = struct

type t

type event_mask = In | Out | In_out
type poll_event = In | Out | In_out
type poll_socket = [`Pair|`Pub|`Sub|`Req|`Rep|`Dealer|`Router|`Pull|`Push] Socket.t
type poll_item = (poll_socket * event_mask)
type poll_mask = (poll_socket * poll_event)

external of_poll_items : poll_item array -> t = "caml_zmq_poll_of_pollitem_array"
external native_poll: t -> int -> poll_item array = "caml_zmq_poll"
external of_poll_items : poll_mask array -> t = "caml_zmq_poll_of_pollitem_array"
external native_poll: t -> int -> poll_event option array = "caml_zmq_poll"

let poll ?(timeout = -1) items = native_poll items timeout

Expand Down
18 changes: 8 additions & 10 deletions src/ZMQ.mli
Expand Up @@ -29,8 +29,8 @@ val term : context -> unit

val version : unit -> int * int * int

module Socket :
sig
module Socket : sig

type 'a t
type 'a kind

Expand Down Expand Up @@ -105,25 +105,23 @@ sig

end

module Device :
sig
module Device : sig

val streamer : [>`Pull] Socket.t -> [>`Push] Socket.t -> unit
val forwarder : [>`Sub] Socket.t -> [>`Pub] Socket.t -> unit
val queue : [>`Router] Socket.t -> [>`Dealer] Socket.t -> unit

end

module Poll :
sig
module Poll : sig

type t

type event_mask = In | Out | In_out
type poll_event = In | Out | In_out
type poll_socket = [`Pair|`Pub|`Sub|`Req|`Rep|`Dealer|`Router|`Pull|`Push] Socket.t
type poll_item = (poll_socket * event_mask)
type poll_mask = (poll_socket * poll_event)

val of_poll_items : poll_item array -> t
val poll : ?timeout: int -> t -> poll_item array
val of_poll_items : poll_mask array -> t
val poll : ?timeout: int -> t -> poll_event option array

end
25 changes: 12 additions & 13 deletions src/poll.c
Expand Up @@ -76,7 +76,7 @@ short CAML_ZMQ_Mask_val (value mask) {

CAMLprim value caml_zmq_poll(value poll, value timeout) {
CAMLparam2 (poll, timeout);
CAMLlocal2 (poll_itemarray, curr_elem);
CAMLlocal2 (events, some);
int n = CAML_ZMQ_Poll_val(poll)->num_elems;
zmq_pollitem_t *items = CAML_ZMQ_Poll_val(poll)->poll_items;
int tm = Int_val(timeout);
Expand All @@ -87,21 +87,20 @@ CAMLprim value caml_zmq_poll(value poll, value timeout) {

caml_zmq_raise_if(num_event_sockets == -1);
if(num_event_sockets == 0) { /* It's invalid to allocate a zero sized array */
poll_itemarray = Atom(0);
events = Atom(0);
} else {
poll_itemarray = caml_alloc_tuple(num_event_sockets);
int i, j;
for(i = 0, j = 0; i < num_event_sockets; i++) {
while(!((items[j].revents | ZMQ_POLLIN) || (items[j].revents | ZMQ_POLLOUT))) {
j++;
events = caml_alloc(num_event_sockets, 0);
int i;
for(i = 0; i < num_event_sockets; i++) {
if (!((items[i].revents | ZMQ_POLLIN) || (items[i].revents | ZMQ_POLLOUT))) {
Store_field(events, i, Val_int(0)); /* None */
} else {
some = caml_alloc(1, 0);
Store_field(some, 0, CAML_ZMQ_Val_mask(items[i].revents));
Store_field(events, i, some);
}
curr_elem = caml_alloc_tuple(2);
Store_field(curr_elem, 0, caml_zmq_copy_socket(items[j].socket));
Store_field(curr_elem, 1, CAML_ZMQ_Val_mask(items[j].revents));
Store_field(poll_itemarray, i, curr_elem);
j++;
}
}

CAMLreturn (poll_itemarray);
CAMLreturn (events);
}

0 comments on commit 389b77f

Please sign in to comment.