Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Newer
Older
100644 410 lines (347 sloc) 11.223 kB
99c5d92 @sustrik pipes added
sustrik authored
1 /*
18b9ebe @sustrik The copyrights in file headers updated.
sustrik authored
2 Copyright (c) 2007-2011 iMatix Corporation
3 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
99c5d92 @sustrik pipes added
sustrik authored
4
5 This file is part of 0MQ.
6
7 0MQ is free software; you can redistribute it and/or modify it under
b358df9 @sustrik Name of "GNU Lesser Public License" corrected.
sustrik authored
8 the terms of the GNU Lesser General Public License as published by
99c5d92 @sustrik pipes added
sustrik authored
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
11
12 0MQ is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
b358df9 @sustrik Name of "GNU Lesser Public License" corrected.
sustrik authored
15 GNU Lesser General Public License for more details.
99c5d92 @sustrik pipes added
sustrik authored
16
b358df9 @sustrik Name of "GNU Lesser Public License" corrected.
sustrik authored
17 You should have received a copy of the GNU Lesser General Public License
99c5d92 @sustrik pipes added
sustrik authored
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 */
20
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
21 #include <new>
22
27e2d08 @mato Restructure language bindings
mato authored
23 #include "../include/zmq.h"
cb09c69 @sustrik pipe deallocation added
sustrik authored
24
99c5d92 @sustrik pipes added
sustrik authored
25 #include "pipe.hpp"
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
26 #include "likely.hpp"
99c5d92 @sustrik pipes added
sustrik authored
27
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
28 zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_,
29 uint64_t lwm_) :
99c5d92 @sustrik pipes added
sustrik authored
30 object_t (parent_),
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
31 active (true),
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
32 pipe (pipe_),
33 writer (NULL),
99c5d92 @sustrik pipes added
sustrik authored
34 lwm (lwm_),
61ee6fa @hurtonm Implement flow control
hurtonm authored
35 msgs_read (0),
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
36 sink (NULL),
37 terminating (false)
38 {
39 // Note that writer is not set here. Writer will inform reader about its
40 // address once it is created (via set_writer method).
41 }
42
43 void zmq::reader_t::set_writer (writer_t *writer_)
44 {
45 zmq_assert (!writer);
46 writer = writer_;
47 }
99c5d92 @sustrik pipes added
sustrik authored
48
49 zmq::reader_t::~reader_t ()
50 {
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
51 // Pipe as such is owned and deallocated by reader object.
52 // The point is that reader processes the last step of terminal
53 // handshaking (term_ack).
54 zmq_assert (pipe);
55
56 // First delete all the unread messages in the pipe. We have to do it by
57 // hand because zmq_msg_t is a POD, not a class, so there's no associated
58 // destructor.
59 zmq_msg_t msg;
60 while (pipe->read (&msg))
61 zmq_msg_close (&msg);
62
63 delete pipe;
99c5d92 @sustrik pipes added
sustrik authored
64 }
65
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
66 void zmq::reader_t::set_event_sink (i_reader_events *sink_)
0d704fc MSVC warnings fixed
unknown authored
67 {
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
68 zmq_assert (!sink);
69 sink = sink_;
0d704fc MSVC warnings fixed
unknown authored
70 }
71
10533a5 @hurtonm pipe: check_read() should check for message delimiter
hurtonm authored
72 bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_)
73 {
74 unsigned char *offset = 0;
75
76 return msg_.content == (void*) (offset + ZMQ_DELIMITER);
77 }
78
f2ff2c6 @sustrik checking for available messages added to ypipe/pipe
sustrik authored
79 bool zmq::reader_t::check_read ()
80 {
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
81 if (!active)
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
82 return false;
83
f2ff2c6 @sustrik checking for available messages added to ypipe/pipe
sustrik authored
84 // Check if there's an item in the pipe.
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
85 if (!pipe->check_read ()) {
86 active = false;
10533a5 @hurtonm pipe: check_read() should check for message delimiter
hurtonm authored
87 return false;
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
88 }
10533a5 @hurtonm pipe: check_read() should check for message delimiter
hurtonm authored
89
90 // If the next item in the pipe is message delimiter,
91 // initiate its termination.
92 if (pipe->probe (is_delimiter)) {
7a685b0 @sustrik Clean-up of session termination process
sustrik authored
93 zmq_msg_t msg;
94 bool ok = pipe->read (&msg);
95 zmq_assert (ok);
96 if (sink)
97 sink->delimited (this);
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
98 terminate ();
10533a5 @hurtonm pipe: check_read() should check for message delimiter
hurtonm authored
99 return false;
100 }
101
102 return true;
f2ff2c6 @sustrik checking for available messages added to ypipe/pipe
sustrik authored
103 }
104
99c5d92 @sustrik pipes added
sustrik authored
105 bool zmq::reader_t::read (zmq_msg_t *msg_)
106 {
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
107 if (!active)
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
108 return false;
109
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
110 if (!pipe->read (msg_)) {
111 active = false;
cb09c69 @sustrik pipe deallocation added
sustrik authored
112 return false;
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
113 }
cb09c69 @sustrik pipe deallocation added
sustrik authored
114
115 // If delimiter was read, start termination process of the pipe.
116 unsigned char *offset = 0;
117 if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) {
7a685b0 @sustrik Clean-up of session termination process
sustrik authored
118 if (sink)
119 sink->delimited (this);
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
120 terminate ();
cb09c69 @sustrik pipe deallocation added
sustrik authored
121 return false;
122 }
99c5d92 @sustrik pipes added
sustrik authored
123
2f219d7 @sustrik ZMQ_TBC renamed to ZMQ_MORE
sustrik authored
124 if (!(msg_->flags & ZMQ_MSG_MORE))
ed291b0 @sustrik multi-part messages work with PUB/SUB
sustrik authored
125 msgs_read++;
126
61ee6fa @hurtonm Implement flow control
hurtonm authored
127 if (lwm > 0 && msgs_read % lwm == 0)
42000d2 @sustrik terminology unified: revive->activate
sustrik authored
128 send_activate_writer (writer, msgs_read);
cb09c69 @sustrik pipe deallocation added
sustrik authored
129
130 return true;
99c5d92 @sustrik pipes added
sustrik authored
131 }
132
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
133 void zmq::reader_t::terminate ()
99c5d92 @sustrik pipes added
sustrik authored
134 {
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
135 // If termination was already started by the peer, do nothing.
136 if (terminating)
137 return;
138
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
139 active = false;
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
140 terminating = true;
141 send_pipe_term (writer);
99c5d92 @sustrik pipes added
sustrik authored
142 }
143
42000d2 @sustrik terminology unified: revive->activate
sustrik authored
144 void zmq::reader_t::process_activate_reader ()
99c5d92 @sustrik pipes added
sustrik authored
145 {
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
146 // Forward the event to the sink (either socket or session).
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
147 active = true;
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
148 sink->activated (this);
99c5d92 @sustrik pipes added
sustrik authored
149 }
150
cb09c69 @sustrik pipe deallocation added
sustrik authored
151 void zmq::reader_t::process_pipe_term_ack ()
152 {
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
153 // At this point writer may already be deallocated.
154 // For safety's sake drop the reference to it.
155 writer = NULL;
156
157 // Notify owner about the termination.
158 zmq_assert (sink);
159 sink->terminated (this);
160
161 // Deallocate resources.
162 delete this;
cb09c69 @sustrik pipe deallocation added
sustrik authored
163 }
164
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
165 zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_,
fca2e8e @hurtonm Add SWAP support
hurtonm authored
166 uint64_t hwm_, int64_t swap_size_) :
99c5d92 @sustrik pipes added
sustrik authored
167 object_t (parent_),
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
168 active (true),
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
169 pipe (pipe_),
170 reader (reader_),
99c5d92 @sustrik pipes added
sustrik authored
171 hwm (hwm_),
61ee6fa @hurtonm Implement flow control
hurtonm authored
172 msgs_read (0),
173 msgs_written (0),
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
174 swap (NULL),
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
175 sink (NULL),
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
176 swapping (false),
177 pending_delimiter (false),
178 terminating (false)
cb09c69 @sustrik pipe deallocation added
sustrik authored
179 {
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
180 // Inform reader about the writer.
181 reader->set_writer (this);
182
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
183 // Open the swap file, if required.
fca2e8e @hurtonm Add SWAP support
hurtonm authored
184 if (swap_size_ > 0) {
d8b975f @sustrik msg_store_t renamed to swap_t
sustrik authored
185 swap = new (std::nothrow) swap_t (swap_size_);
43e8868 @sustrik Added explicit error message in case of memory exhaustion
sustrik authored
186 alloc_assert (swap);
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
187 int rc = swap->init ();
188 zmq_assert (rc == 0);
fca2e8e @hurtonm Add SWAP support
hurtonm authored
189 }
cb09c69 @sustrik pipe deallocation added
sustrik authored
190 }
191
99c5d92 @sustrik pipes added
sustrik authored
192 zmq::writer_t::~writer_t ()
193 {
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
194 if (swap)
195 delete swap;
99c5d92 @sustrik pipes added
sustrik authored
196 }
197
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
198 void zmq::writer_t::set_event_sink (i_writer_events *sink_)
0d704fc MSVC warnings fixed
unknown authored
199 {
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
200 zmq_assert (!sink);
201 sink = sink_;
0d704fc MSVC warnings fixed
unknown authored
202 }
203
27e83cc Fixes assertion on pipe.cpp:237 when swap fills up.
Mikko Koppanen authored
204 bool zmq::writer_t::check_write (zmq_msg_t *msg_)
99c5d92 @sustrik pipes added
sustrik authored
205 {
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
206 // We've already checked and there's no space free for the new message.
207 // There's no point in checking once again.
208 if (unlikely (!active))
61ee6fa @hurtonm Implement flow control
hurtonm authored
209 return false;
27e83cc Fixes assertion on pipe.cpp:237 when swap fills up.
Mikko Koppanen authored
210
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
211 if (unlikely (swapping)) {
27e83cc Fixes assertion on pipe.cpp:237 when swap fills up.
Mikko Koppanen authored
212 if (unlikely (!swap->fits (msg_))) {
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
213 active = false;
214 return false;
215 }
216 }
217 else {
218 if (unlikely (pipe_full ())) {
219 if (swap)
220 swapping = true;
221 else {
222 active = false;
223 return false;
224 }
225 }
61ee6fa @hurtonm Implement flow control
hurtonm authored
226 }
99c5d92 @sustrik pipes added
sustrik authored
227
228 return true;
229 }
230
088a2db @sustrik ZMQII-16: Change "struct zmq_msg_t" to "zmq_msg_t" in C binding
sustrik authored
231 bool zmq::writer_t::write (zmq_msg_t *msg_)
99c5d92 @sustrik pipes added
sustrik authored
232 {
27e83cc Fixes assertion on pipe.cpp:237 when swap fills up.
Mikko Koppanen authored
233 if (unlikely (!check_write (msg_)))
61ee6fa @hurtonm Implement flow control
hurtonm authored
234 return false;
fca2e8e @hurtonm Add SWAP support
hurtonm authored
235
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
236 if (unlikely (swapping)) {
237 bool stored = swap->store (msg_);
238 zmq_assert (stored);
fca2e8e @hurtonm Add SWAP support
hurtonm authored
239 if (!(msg_->flags & ZMQ_MSG_MORE))
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
240 swap->commit ();
241 return true;
61ee6fa @hurtonm Implement flow control
hurtonm authored
242 }
243
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
244 pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE);
245 if (!(msg_->flags & ZMQ_MSG_MORE))
246 msgs_written++;
247
99c5d92 @sustrik pipes added
sustrik authored
248 return true;
249 }
250
e04e2cd @sustrik rollback functionality added to pipe
sustrik authored
251 void zmq::writer_t::rollback ()
252 {
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
253 // Remove incomplete message from the swap.
254 if (unlikely (swapping)) {
255 swap->rollback ();
256 return;
fca2e8e @hurtonm Add SWAP support
hurtonm authored
257 }
61ee6fa @hurtonm Implement flow control
hurtonm authored
258
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
259 // Remove incomplete message from the pipe.
fca2e8e @hurtonm Add SWAP support
hurtonm authored
260 zmq_msg_t msg;
61ee6fa @hurtonm Implement flow control
hurtonm authored
261 while (pipe->unwrite (&msg)) {
89783c3 @sustrik incomplete messages can be stored in ypipe
sustrik authored
262 zmq_assert (msg.flags & ZMQ_MSG_MORE);
e04e2cd @sustrik rollback functionality added to pipe
sustrik authored
263 zmq_msg_close (&msg);
61ee6fa @hurtonm Implement flow control
hurtonm authored
264 }
e04e2cd @sustrik rollback functionality added to pipe
sustrik authored
265 }
266
99c5d92 @sustrik pipes added
sustrik authored
267 void zmq::writer_t::flush ()
268 {
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
269 // In the swapping mode, flushing is automatically handled by swap object.
270 if (!swapping && !pipe->flush ())
42000d2 @sustrik terminology unified: revive->activate
sustrik authored
271 send_activate_reader (reader);
99c5d92 @sustrik pipes added
sustrik authored
272 }
273
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
274 void zmq::writer_t::terminate ()
cb09c69 @sustrik pipe deallocation added
sustrik authored
275 {
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
276 // Prevent double termination.
277 if (terminating)
278 return;
babdf48 @dhammika Fix pipe writer termination
dhammika authored
279 terminating = true;
cb09c69 @sustrik pipe deallocation added
sustrik authored
280
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
281 // Mark the pipe as not available for writing.
282 active = false;
fca2e8e @hurtonm Add SWAP support
hurtonm authored
283
3e97c0f @sustrik REQ socket implementation is layered on top of XREQ
sustrik authored
284 // Rollback any unfinished messages.
285 rollback ();
286
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
287 if (swapping) {
288 pending_delimiter = true;
289 return;
290 }
291
292 // Push delimiter into the pipe. Trick the compiler to belive that
293 // the tag is a valid pointer. Note that watermarks are not checked
294 // thus the delimiter can be written even though the pipe is full.
cb09c69 @sustrik pipe deallocation added
sustrik authored
295 zmq_msg_t msg;
296 const unsigned char *offset = 0;
297 msg.content = (void*) (offset + ZMQ_DELIMITER);
531c6af @sustrik message flags added to zmq_msg_t strcuture
sustrik authored
298 msg.flags = 0;
89783c3 @sustrik incomplete messages can be stored in ypipe
sustrik authored
299 pipe->write (msg, false);
10c28c1 @hurtonm Revive reader on pipe termination
hurtonm authored
300 flush ();
cb09c69 @sustrik pipe deallocation added
sustrik authored
301 }
302
42000d2 @sustrik terminology unified: revive->activate
sustrik authored
303 void zmq::writer_t::process_activate_writer (uint64_t msgs_read_)
61ee6fa @hurtonm Implement flow control
hurtonm authored
304 {
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
305 // Store the reader's message sequence number.
61ee6fa @hurtonm Implement flow control
hurtonm authored
306 msgs_read = msgs_read_;
fca2e8e @hurtonm Add SWAP support
hurtonm authored
307
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
308 // If we are in the swapping mode, we have some messages in the swap.
309 // Given that pipe is now ready for writing we can move part of the
310 // swap into the pipe.
311 if (swapping) {
312 zmq_msg_t msg;
313 while (!pipe_full () && !swap->empty ()) {
314 swap->fetch(&msg);
fca2e8e @hurtonm Add SWAP support
hurtonm authored
315 pipe->write (msg, msg.flags & ZMQ_MSG_MORE);
316 if (!(msg.flags & ZMQ_MSG_MORE))
317 msgs_written++;
318 }
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
319 if (!pipe->flush ())
320 send_activate_reader (reader);
fca2e8e @hurtonm Add SWAP support
hurtonm authored
321
be159b6 @mato zmq::writer_t: Add missing test for swap
mato authored
322 // There are no more messages in the swap. We can switch into
323 // standard in-memory mode.
324 if (swap->empty ()) {
325 swapping = false;
326
327 // Push delimiter into the pipe. Trick the compiler to belive that
328 // the tag is a valid pointer. Note that watermarks are not checked
329 // thus the delimiter can be written even though the pipe is full.
330 if (pending_delimiter) {
331 zmq_msg_t msg;
332 const unsigned char *offset = 0;
333 msg.content = (void*) (offset + ZMQ_DELIMITER);
334 msg.flags = 0;
335 pipe->write (msg, false);
336 flush ();
337 return;
338 }
fca2e8e @hurtonm Add SWAP support
hurtonm authored
339 }
340 }
341
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
342 // If the writer was non-active before, let's make it active
343 // (available for writing messages to).
babdf48 @dhammika Fix pipe writer termination
dhammika authored
344 if (!active && !terminating) {
d90b407 @sustrik refactoring of pipe/swap interaction
sustrik authored
345 active = true;
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
346 zmq_assert (sink);
347 sink->activated (this);
61ee6fa @hurtonm Implement flow control
hurtonm authored
348 }
349 }
350
cb09c69 @sustrik pipe deallocation added
sustrik authored
351 void zmq::writer_t::process_pipe_term ()
352 {
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
353 send_pipe_term_ack (reader);
cb09c69 @sustrik pipe deallocation added
sustrik authored
354
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
355 // The above command allows reader to deallocate itself and the pipe.
356 // For safety's sake we'll drop the pointers here.
357 reader = NULL;
358 pipe = NULL;
cb09c69 @sustrik pipe deallocation added
sustrik authored
359
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
360 // Notify owner about the termination.
361 zmq_assert (sink);
362 sink->terminated (this);
61ee6fa @hurtonm Implement flow control
hurtonm authored
363
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
364 // Deallocate the resources.
365 delete this;
99c5d92 @sustrik pipes added
sustrik authored
366 }
367
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
368 bool zmq::writer_t::pipe_full ()
99c5d92 @sustrik pipes added
sustrik authored
369 {
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
370 return hwm > 0 && msgs_written - msgs_read == hwm;
99c5d92 @sustrik pipes added
sustrik authored
371 }
8408ae0 @sustrik LWM is computed rather than explicitly specified by user
sustrik authored
372
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
373 void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_,
374 uint64_t hwm_, int64_t swap_size_, reader_t **reader_, writer_t **writer_)
8408ae0 @sustrik LWM is computed rather than explicitly specified by user
sustrik authored
375 {
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
376 // First compute the low water mark. Following point should be taken
377 // into consideration:
378 //
379 // 1. LWM has to be less than HWM.
380 // 2. LWM cannot be set to very low value (such as zero) as after filling
381 // the queue it would start to refill only after all the messages are
382 // read from it and thus unnecessarily hold the progress back.
383 // 3. LWM cannot be set to very high value (such as HWM-1) as it would
384 // result in lock-step filling of the queue - if a single message is
385 // read from a full queue, writer thread is resumed to write exactly one
386 // message to the queue and go back to sleep immediately. This would
387 // result in low performance.
388 //
389 // Given the 3. it would be good to keep HWM and LWM as far apart as
390 // possible to reduce the thread switching overhead to almost zero,
391 // say HWM-LWM should be max_wm_delta.
392 //
393 // That done, we still we have to account for the cases where
394 // HWM < max_wm_delta thus driving LWM to negative numbers.
395 // Let's make LWM 1/2 of HWM in such cases.
396 uint64_t lwm = (hwm_ > max_wm_delta * 2) ?
397 hwm_ - max_wm_delta : (hwm_ + 1) / 2;
398
399 // Create all three objects pipe consists of: the pipe per se, reader and
400 // writer. The pipe will be handled by reader and writer, its never passed
401 // to the user. Reader and writer are returned to the user.
402 pipe_t *pipe = new (std::nothrow) pipe_t ();
43e8868 @sustrik Added explicit error message in case of memory exhaustion
sustrik authored
403 alloc_assert (pipe);
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
404 *reader_ = new (std::nothrow) reader_t (reader_parent_, pipe, lwm);
43e8868 @sustrik Added explicit error message in case of memory exhaustion
sustrik authored
405 alloc_assert (*reader_);
05d9084 @sustrik WIP: Socket migration between threads, new zmq_close() semantics
sustrik authored
406 *writer_ = new (std::nothrow) writer_t (writer_parent_, pipe, *reader_,
407 hwm_, swap_size_);
43e8868 @sustrik Added explicit error message in case of memory exhaustion
sustrik authored
408 alloc_assert (*writer_);
8408ae0 @sustrik LWM is computed rather than explicitly specified by user
sustrik authored
409 }
Something went wrong with that request. Please try again.