Permalink
Browse files

Changes to threading API

- attached and detatched thread model
- attached threads get explicit arguments (not via thread_t block)
- detached threads look like POSIX threads
- this fixed a problem of dangling pipe sockets in some cases
Also fixed zframe_dump to print newlines & tabs
  • Loading branch information...
1 parent bcd734b commit 22b5d5ba6e205a4978ba5071bb0f4293efa1a3a4 @hintjens committed Apr 6, 2011
Showing with 143 additions and 78 deletions.
  1. +2 −0 NEWS
  2. +26 −14 include/zctx.h
  3. +21 −0 notes.txt
  4. +93 −63 src/zctx.c
  5. +1 −1 src/zframe.c
View
2 NEWS
@@ -4,6 +4,8 @@ libzapi version 1.2.2 (beta), released on 2011/04/xx
Changes
-------
+* Threading API now supports attached and detached threads.
+
* In zframe class, added print, reset, strdup, streq, strhex, dup methods.
* In zmsg class, added last, wrap, unwrap, popstr, pushstr, addstr, dup
View
@@ -29,31 +29,34 @@
extern "C" {
#endif
+
// Opaque class structure
typedef struct _zctx_t zctx_t;
// @interface
-// Structure passed to threads created via this class
-typedef struct {
- zctx_t *ctx; // Context shared with parent thread
- void *pipe; // Pipe to parent thread (PAIR)
- void *arg; // Application argument
-} zthread_t;
+// Detached threads follow POSIX pthreads API
+typedef void *(zthread_detached_fn) (void *args);
+// Attached threads get context and pipe from parent
+typedef void (zthread_attached_fn) (void *args, zctx_t *ctx, void *pipe);
// Create new context, returns context object, replaces zmq_init
zctx_t *
zctx_new (void);
+// Create new shadow context, returns context object
+zctx_t *
+ zctx_shadow (zctx_t *self);
+
// Destroy context and all sockets in it, replaces zmq_term
void
zctx_destroy (zctx_t **self_p);
-
-// Raise default I/O threads from 1, for crazy heavy applications
+
+// Raise default I/O threads from 1, for crazy heavy applications
void
zctx_set_iothreads (zctx_t *self, int iothreads);
-
+
// Set msecs to flush sockets when closing them
-void
+void
zctx_set_linger (zctx_t *self, int linger);
// Create socket within this context, replaces zmq_socket
@@ -64,11 +67,20 @@ void *
void
zctx_socket_destroy (zctx_t *self, void *socket);
-// Create thread, return PAIR socket to talk to thread. The child thread
-// receives a (zthread_t *) object including a zctx, a pipe back to the
-// creating thread, and the arg passed in this call.
+// --- SHOULD MOVE TO zthread class
+// Create an attached thread. An attached thread gets a ctx and a PAIR
+// pipe back to its parent. It must monitor its pipe, and exit if the
+// pipe becomes unreadable.
void *
- zctx_thread_new (zctx_t *self, void *(*thread_fn) (void *), void *arg);
+ zctx_attach_thread (zctx_t *self, zthread_attached_fn *thread_fn,
+ void *args);
+
+// Create a detached thread. A detached thread operates autonomously
+// and is used to simulate a separate process. It gets no ctx, and no
+// pipe.
+void
+ zctx_detach_thread (zctx_t *self, zthread_detached_fn *thread_fn,
+ void *args);
// Self test of this class
int
View
@@ -0,0 +1,21 @@
+zsocket_
+
+ - subscribe
+ - unsubscribe
+ - hwm / set_
+ - swap / set_
+ - affinity / set_
+ - identity / set_
+ - rate / set_
+ - recovery_ivl / set_
+ - recovery_ivl_msec / set_
+ - mcast_loop / set_
+ - sndbuf / set_
+ - rcvbuf / set_
+ - linger / set_
+ - reconnect_ivl / set_
+ - reconnect_ivl_max / set_
+ - backlog / set_
+ - fd
+ - events
+ - type
View
@@ -226,101 +226,72 @@ zctx_socket_destroy (zctx_t *self, void *socket)
// --------------------------------------------------------------------------
-// Thread creation code, taken from ZFL's zfl_thread class and customized.
+// Thread creation code, wrapping POSIX and Win32 thread APIs
typedef struct {
- void *(*thread_fn) (void *);
- void *args;
+ // Two thread handlers, one will be set, one NULL
+ zthread_attached_fn *attached;
+ zthread_detached_fn *detached;
+ void *args; // Application arguments
+ zctx_t *ctx; // Context object if any
+ void *pipe; // Pipe, if any, back to parent
#if defined (__WINDOWS__)
- HANDLE handle;
+ HANDLE handle; // Win32 thread handle
#endif
} shim_t;
#if defined (__UNIX__)
// Thread shim for UNIX calls the real thread and cleans up afterwards.
-
void *
-s_call_thread_fn (void *args)
+s_thread_shim (void *args)
{
assert (args);
shim_t *shim = (shim_t *) args;
- shim->thread_fn (shim->args);
+ if (shim->attached)
+ shim->attached (shim->args, shim->ctx, shim->pipe);
+ else
+ shim->detached (shim->args);
- zthread_t *zthread = (zthread_t *) shim->args;
- zctx_destroy (&zthread->ctx);
- free (zthread);
+ zctx_destroy (&shim->ctx);
free (shim);
return NULL;
}
#elif defined (__WINDOWS__)
// Thread shim for Windows that wraps a POSIX-style thread handler
// and does the _endthreadex for us automatically.
-
unsigned __stdcall
-s_call_thread_fn (void *args)
+s_thread_shim (void *args)
{
assert (args);
shim_t *shim = (shim_t *) args;
- shim->thread_fn (shim->args);
+ if (shim->attached)
+ shim->attached (shim->args, shim->ctx, shim->pipe);
+ else
+ shim->detached (shim->args);
+
_endthreadex (0);
CloseHandle (shim->handle);
- zthread_t *zthread = (zthread_t *) shim->args;
- zctx_destroy (&zthread->ctx);
- free (zthread);
+ zctx_destroy (&shim->ctx);
free (shim);
return 0;
}
#endif
-
-// --------------------------------------------------------------------------
-// Create a child thread able to speak to this thread over inproc sockets.
-// The child thread receives a zthread_t structure as argument. Returns a
-// PAIR socket that is connected to the child thread. You can ignore the
-// socket if you don't need it.
-
-void *
-zctx_thread_new (zctx_t *self, void *(*thread_fn) (void *), void *arg)
+static void
+s_thread_start (shim_t *shim)
{
- // Create our end of the pipe
- void *pipe = zctx_socket_new (self, ZMQ_PAIR);
- char endpoint [64];
- int rc = snprintf (endpoint, 64, "inproc://zctx-pipe-%p", pipe);
- assert (rc < 64);
- rc = zmq_bind (pipe, endpoint);
- assert (rc == 0);
-
- // Child thread gets a zthread_t arguments block
- zthread_t *args = (zthread_t *) zmalloc (sizeof (zthread_t));
- args->arg = arg; // Application arguments
-
- // Create new zctx_t for child, and new pipe in that
- args->ctx = (zctx_t *) zmalloc (sizeof (zctx_t));
- args->ctx->context = self->context;
- args->ctx->sockets = zlist_new ();
-
- // Create child end of pipe, and connect to ours
- args->pipe = zctx_socket_new (args->ctx, ZMQ_PAIR);
- rc = zmq_connect (args->pipe, endpoint);
- assert (rc == 0);
-
- // Now start child thread, passing our arguments
- shim_t *shim = (shim_t *) zmalloc (sizeof (shim_t));
- shim->thread_fn = thread_fn;
- shim->args = args;
-
#if defined (__UNIX__)
pthread_t thread;
- pthread_create (&thread, NULL, s_call_thread_fn, shim);
+ pthread_create (&thread, NULL, s_thread_shim, shim);
pthread_detach (thread);
#elif defined (__WINDOWS__)
shim->handle = (HANDLE)_beginthreadex(
NULL, // Handle is private to this process
0, // Use a default stack size for new thread
- &s_call_thread_fn, // Start real thread function via this shim
+ &s_thread_shim, // Start real thread function via this shim
shim, // Which gets the current object as argument
CREATE_SUSPENDED, // Set thread priority before starting it
NULL); // We don't use the thread ID
@@ -332,28 +303,84 @@ zctx_thread_new (zctx_t *self, void *(*thread_fn) (void *), void *arg)
// Now start thread
ResumeThread (shim->handle);
#endif
+}
+
+// --------------------------------------------------------------------------
+// Create an attached thread. An attached thread gets a ctx and a PAIR
+// pipe back to its parent. It must monitor its pipe, and exit if the
+// pipe becomes unreadable.
+
+void *
+zctx_attach_thread (zctx_t *self, zthread_attached_fn *thread_fn, void *args)
+{
+ // Create our end of the pipe
+ void *pipe = zctx_socket_new (self, ZMQ_PAIR);
+ char endpoint [64];
+ int rc = snprintf (endpoint, 64, "inproc://zctx-pipe-%p", pipe);
+ assert (rc < 64);
+ rc = zmq_bind (pipe, endpoint);
+ assert (rc == 0);
+
+ // Prepare argument shim for child thread
+ shim_t *shim = (shim_t *) zmalloc (sizeof (shim_t));
+ shim->attached = thread_fn;
+ shim->args = args;
+
+ // Create shadow zctx_t object for child thread
+ shim->ctx = (zctx_t *) zmalloc (sizeof (zctx_t));
+ shim->ctx->context = self->context;
+ shim->ctx->sockets = zlist_new ();
+
+ // Connect child pipe to our pipe
+ shim->pipe = zctx_socket_new (shim->ctx, ZMQ_PAIR);
+ rc = zmq_connect (shim->pipe, endpoint);
+ assert (rc == 0);
+
+ s_thread_start (shim);
return pipe;
}
// --------------------------------------------------------------------------
+// Create a detached thread. A detached thread operates autonomously
+// and is used to simulate a separate process. It gets no ctx, and no
+// pipe.
+
+void
+zctx_detach_thread (zctx_t *self, zthread_detached_fn *thread_fn, void *args)
+{
+ // Prepare argument shim for child thread
+ shim_t *shim = (shim_t *) zmalloc (sizeof (shim_t));
+ shim->detached = thread_fn;
+ shim->args = args;
+ s_thread_start (shim);
+}
+
+
+// --------------------------------------------------------------------------
// Selftest
// @selftest
static void *
-s_test_thread (void *args_ptr)
+s_test_detached (void *args)
{
- zthread_t *args = (zthread_t *) args_ptr;
+ zctx_t *ctx = zctx_new ();
+ // Create a socket to check it'll be automatically deleted
+ void *push = zctx_socket_new (ctx, ZMQ_PUSH);
+ zctx_destroy (&ctx);
+ return NULL;
+}
- // Create a socket to check it'll be properly deleted at exit
- zctx_socket_new (args->ctx, ZMQ_PUSH);
+static void
+s_test_attached (void *args, zctx_t *ctx, void *pipe)
+{
+ // Create a socket to check it'll be automatically deleted
+ zctx_socket_new (ctx, ZMQ_PUSH);
// Wait for our parent to ping us, and pong back
- char *ping = zstr_recv (args->pipe);
- free (ping);
- zstr_send (args->pipe, "pong");
- return NULL;
+ free (zstr_recv (pipe));
+ zstr_send (pipe, "pong");
}
// @end
@@ -388,12 +415,15 @@ zctx_test (Bool verbose)
zmq_connect (s6, "tcp://127.0.0.1:5555");
// Create a child thread, check it's safely alive
- void *pipe = zctx_thread_new (ctx, s_test_thread, NULL);
+
+ void *pipe = zctx_attach_thread (ctx, s_test_attached, NULL);
zstr_send (pipe, "ping");
char *pong = zstr_recv (pipe);
assert (streq (pong, "pong"));
free (pong);
+ zctx_detach_thread (ctx, s_test_detached, NULL);
+
// Everything should be cleanly closed now
zctx_destroy (&ctx);
// @end
View
@@ -248,7 +248,7 @@ zframe_print (zframe_t *self, char *prefix)
int is_text = 1;
int char_nbr;
for (char_nbr = 0; char_nbr < size; char_nbr++)
- if (data [char_nbr] < 32 || data [char_nbr] > 127)
+ if (data [char_nbr] < 9 || data [char_nbr] > 127)
is_text = 0;
fprintf (stderr, "[%03d] ", (int) size);

0 comments on commit 22b5d5b

Please sign in to comment.