Skip to content

Commit

Permalink
incomplete messages can be stored in ypipe
Browse files Browse the repository at this point in the history
  • Loading branch information
sustrik committed May 19, 2010
1 parent f40ce4e commit 89783c3
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 24 deletions.
10 changes: 4 additions & 6 deletions src/pipe.cpp
Expand Up @@ -162,7 +162,7 @@ bool zmq::writer_t::write (zmq_msg_t *msg_)
return false;
}

pipe->write (*msg_);
pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE);
if (!(msg_->flags & ZMQ_MSG_MORE))
msgs_written++;
return true;
Expand All @@ -172,11 +172,9 @@ void zmq::writer_t::rollback ()
{
zmq_msg_t msg;

// Remove all incomplete messages from the pipe.
while (pipe->unwrite (&msg)) {
if (!(msg.flags & ZMQ_MSG_MORE)) {
pipe->write (msg);
break;
}
zmq_assert (msg.flags & ZMQ_MSG_MORE);
zmq_msg_close (&msg);
msgs_written--;
}
Expand Down Expand Up @@ -206,7 +204,7 @@ void zmq::writer_t::term ()
const unsigned char *offset = 0;
msg.content = (void*) (offset + ZMQ_DELIMITER);
msg.flags = 0;
pipe->write (msg);
pipe->write (msg, false);
pipe->flush ();
}

Expand Down
46 changes: 28 additions & 18 deletions src/ypipe.hpp
Expand Up @@ -31,7 +31,7 @@ namespace zmq
// Only a single thread can read from the pipe at any specific moment.
// Only a single thread can write to the pipe at any specific moment.
// T is the type of the object in the queue.
// N is granularity of the pipe, i.e. how many messages are needed to
// N is granularity of the pipe, i.e. how many items are needed to
// perform next memory allocation.

template <typename T, int N> class ypipe_t
Expand All @@ -46,7 +46,7 @@ namespace zmq

// Let all the pointers to point to the terminator.
// (unless pipe is dead, in which case c is set to NULL).
r = w = &queue.back ();
r = w = f = &queue.back ();
c.set (&queue.back ());
}

Expand All @@ -59,54 +59,61 @@ namespace zmq
#pragma message disable(UNINIT)
#endif

// Write an item to the pipe. Don't flush it yet.
inline void write (const T &value_)
// Write an item to the pipe. Don't flush it yet. If incomplete is
// set to true the item is assumed to be continued by items
// subsequently written to the pipe. Incomplete items are never
// flushed down the stream.
inline void write (const T &value_, bool incomplete_)
{
// Place the value to the queue, add new terminator element.
queue.back () = value_;
queue.push ();

// Move the "flush up to here" poiter.
if (!incomplete_)
f = &queue.back ();
}

#ifdef ZMQ_HAVE_OPENVMS
#pragma message restore
#endif

// Pop an unflushed message from the pipe. Returns true is such
// message exists, false otherwise.
// Pop an incomplete item from the pipe. Returns true is such
// item exists, false otherwise.
inline bool unwrite (T *value_)
{
if (w == &queue.back ())
if (f == &queue.back ())
return false;
queue.unpush ();
*value_ = queue.back ();
return true;
}

// Flush the messages into the pipe. Returns false if the reader
// thread is sleeping. In that case, caller is obliged to wake the
// reader up before using the pipe again.
// Flush all the completed items into the pipe. Returns false if
// the reader thread is sleeping. In that case, caller is obliged to
// wake the reader up before using the pipe again.
inline bool flush ()
{
// If there are no un-flushed items, do nothing.
if (w == &queue.back ())
if (w == f)
return true;

// Try to set 'c' to 'back'
if (c.cas (w, &queue.back ()) != w) {
// Try to set 'c' to 'f'.
if (c.cas (w, f) != w) {

// Compare-and-swap was unseccessful because 'c' is NULL.
// This means that the reader is asleep. Therefore we don't
// care about thread-safeness and update c in non-atomic
// manner. We'll return false to let the caller know
// that reader is sleeping.
c.set (&queue.back ());
w = &queue.back ();
c.set (f);
w = f;
return false;
}

// Reader is alive. Nothing special to do now. Just move
// the 'first un-flushed item' pointer to the end of the queue.
w = &queue.back ();
// the 'first un-flushed item' pointer to 'f'.
w = f;
return true;
}

Expand All @@ -125,7 +132,7 @@ namespace zmq

// If there are no elements prefetched, exit.
// During pipe's lifetime r should never be NULL, however,
// it can happen during pipe shutdown when messages
// it can happen during pipe shutdown when items
// are being deallocated.
if (&queue.front () == r || !r)
return false;
Expand Down Expand Up @@ -165,6 +172,9 @@ namespace zmq
// exclusively by reader thread.
T *r;

// Points to the first item to be flushed in the future.
T *f;

// The single point of contention between writer and reader thread.
// Points past the last flushed item. If it is NULL,
// reader is asleep. This pointer should be always accessed using
Expand Down

0 comments on commit 89783c3

Please sign in to comment.