Skip to content
Browse files

- support zeromq2 and zeromq3 apis

  • Loading branch information...
1 parent e348e11 commit d6b0c20edb80ff7d8ca2b8e47e46097463908953 @jordansissel jordansissel committed
Showing with 28 additions and 4 deletions.
  1. +5 −2 emitter.c
  2. +5 −2 harvester.c
  3. +18 −0 zmq_compat.h
View
7 emitter.c
@@ -13,6 +13,7 @@
#include <sys/resource.h>
+#include "zmq_compat.h"
#include "sleepdefs.h"
void *emitter(void *arg) {
@@ -29,7 +30,8 @@ void *emitter(void *arg) {
void *socket = zmq_socket(config->zmq, ZMQ_PULL);
insist(socket != NULL, "zmq_socket() failed: %s", strerror(errno));
int64_t hwm = 100;
- zmq_setsockopt(socket, ZMQ_HWM, &hwm, sizeof(hwm));
+ //zmq_setsockopt(socket, ZMQ_HWM, &hwm, sizeof(hwm));
+ zmq_compat_set_recvhwm(socket, hwm);
rc = zmq_bind(socket, config->zmq_endpoint);
insist(rc != -1, "zmq_bind(%s) failed: %s", config->zmq_endpoint,
zmq_strerror(errno));
@@ -90,7 +92,8 @@ void *emitter(void *arg) {
}
/* poll successful, read a message */
- rc = zmq_recv(socket, &message, 0);
+ //rc = zmq_recv(socket, &message, 0);
+ rc = zmq_compat_recvmsg(socket, &message, 0);
insist(rc == 0 /*|| errno == EAGAIN */,
"zmq_recv(%s) failed (returned %d): %s",
config->zmq_endpoint, rc, zmq_strerror(errno));
View
7 harvester.c
@@ -16,6 +16,7 @@
#include "insist.h"
#include "sleepdefs.h"
#include "flog.h"
+#include "zmq_compat.h"
#ifdef __MACH__
/* OS X is dumb, or I am dumb, or we are both dumb. I don't know anymore,
@@ -85,7 +86,8 @@ void *harvest(void *arg) {
insist(socket != NULL, "zmq_socket() failed: %s", strerror(errno));
int64_t hwm = 100;
- zmq_setsockopt(socket, ZMQ_HWM, &hwm, sizeof(hwm));
+ //zmq_setsockopt(socket, ZMQ_HWM, &hwm, sizeof(hwm));
+ zmq_compat_set_sendhwm(socket, hwm);
/* Wait for the zmq endpoint to be up (wait for connect to succeed) */
struct backoff sleeper;
@@ -150,7 +152,8 @@ void *harvest(void *arg) {
zmq_msg_init_data(&event, str_data(serialized), str_length(serialized),
my_str_free, serialized);
flog_if_slow(stdout, 0.250, {
- rc = zmq_send(socket, &event, 0);
+ //rc = zmq_send(socket, &event, 0);
+ rc = zmq_compat_sendmsg(socket, &event, 0);
}, "zmq_send (harvesting file '%s')", config->path);
insist(rc == 0, "zmq_send(event) failed: %s", zmq_strerror(rc));
zmq_msg_close(&event);
View
18 zmq_compat.h
@@ -0,0 +1,18 @@
+#ifndef _ZMQ_COMPAT_H_
+#define _ZMQ_COMPAT_H_
+
+# if ZMQ_VERSION_MAJOR == 2 /* zeromq 2 */
+# define zmq_compat_set_sendhwm(socket, hwm) zmq_setsockopt(socket, ZMQ_HWM, &hwm, sizeof(hwm))
+# define zmq_compat_set_recvhwm(socket, hwm) zmq_setsockopt(socket, ZMQ_HWM, &hwm, sizeof(hwm))
+# define zmq_compat_recvmsg(socket, message, flags) zmq_recv(socket, message, flags)
+# define zmq_compat_sendmsg(socket, message, flags) zmq_send(socket, message, flags)
+# elif ZMQ_VERSION_MAJOR == 3 /* zeromq 3 */
+# define zmq_compat_set_sendhwm(socket, hwm) zmq_setsockopt(socket, ZMQ_SNDHWM, &hwm, sizeof(hwm))
+# define zmq_compat_set_recvhwm(socket, hwm) zmq_setsockopt(socket, ZMQ_RCVHWM, &hwm, sizeof(hwm))
+# define zmq_compat_recvmsg(socket, message, flags) zmq_recvmsg(socket, message, flags)
+# define zmq_compat_sendmsg(socket, message, flags) zmq_sendmsg(socket, message, flags)
+# else
+# error "Unsupported zeromq version " ## ZMQ_VERSION_MAJOR
+# endif
+
+#endif /* _ZMQ_COMPAT_H_ */

0 comments on commit d6b0c20

Please sign in to comment.
Something went wrong with that request. Please try again.