The zipper::merge
class provides k-way merge across k streams. The
streams need not be synchronized in (real) time but must be strictly
ordered in each stream by a value that may be compared across streams
(eg a data time).
In light of the real time nature of the streams, the merge enacts one of two policies. It guarantee a maximum latency bound at the cost of possible message loss or it may guarantee lossless operation at the cost of unbound latency.
The merge
maintains a min-heap priority queue of nodes. Each node
correlates the following four pieces of information:
- payload
- a user object
- ordering
- a value by which nodes may be placed in ascending order
- identity
- a value indicating the stream.
- debut
- the “time” which a payload was accepted to the merge
The ordering value is sometimes called a data time though it does not necessarily have to represent a time.
At any given moment of operation, a given stream is considered to be represented in the merge if at least one node from that stream is held in any location of the queue excluding the current top of the min-heap.
The state of the merge
buffer is considered fully complete if all of
the k-way streams are represented. And it is considered weakly
complete if at least all active streams are represented. An active
stream is one that has provided recent data within a duration smaller
than the maximum latency. Any streams that are not active are called
stale.
The state of completeness is considered when the user asks to extract
or drain nodes from the queue. It is when a drain is performed that
either the latency bounds or lossless assurance is applied. When
draining for latency bounding, the queue may drain nodes which are
ordered “after” (higher ordering value) than nodes (from other
streams) which the user may subsequently try to feed. When a node is
fed to the merge
which is ordered “before” (lower ordering value) the
last node that has been drained it will be rejected. The user is
invited to check the Boolean return value of feed()
to determine if
the node was accepted.
The “k” parameter is called the cardinality of the merge
buffer. The
current completeness is compared to cardinality in determining which
elements to drain. The cardinality may be safely increased during
operation of the merge to accommodate the introduction of novel input
streams.
The cardinality may be reduced to accommodate removal of input
streams. However, this may lead to an over-complete condition and
thus may cause elements to be drained sooner than otherwise expected.
Once elements of the removed streams have been flushed, the merge
buffer should return to normal behavior.
The buffer may also be purged (cleared) of contents with the cardinality kept or modified. Upon clearing it behaves as if it was newly constructed.
Caveat: these micro benchmarks may not be up to date with the code.
With -O2
or -O3
, the zipper_stress.cpp achieves:
❯ g++ -O2 -std=c++17 -o stress_zipper stress_zipper.cpp && time ./stress_zipper Nstream=10, Nsend=10 M, Nchunks=100 Tot: 8.22475 s, 1.21584 MHz Zip: 2.7205 s, 3.6758 MHz ________________________________________________________ Executed in 8.23 secs fish external usr time 8.22 secs 581.00 micros 8.22 secs sys time 0.00 secs 101.00 micros 0.00 secs ❯ g++ -O3 -std=c++17 -o stress_zipper stress_zipper.cpp && time ./stress_zipper Nstream=10, Nsend=10 M, Nchunks=100 Tot: 6.49373 s, 1.53995 MHz Zip: 2.48493 s, 4.02425 MHz ________________________________________________________ Executed in 6.50 secs fish external
The “Zip” part counts time to call just feed()
and drain_waiting()
.
The rest is mostly taken up to perform the std::sort()
to find the
next “active” stream. This is the “application’s problem”.
With no optimization code runs about 10x slower but it is useful to turn it off in order that profiling makes more sense. Here is a look with Google’s Perftools.
❯ g++ -g -std=c++17 -o stress_zipper stress_zipper.cpp ❯ CPUPROFILE=stress-zipper.prof LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libprofiler.so.0 ./stress_zipper Nstream=10, Nsend=1 M, Nchunks=100 Tot: 6.97106 s, 0.14345 MHz Zip: 2.98409 s, 0.335111 MHz PROFILE: interrupts/evictions/bytes = 696/186/67672 ❯ google-pprof --lines --pdf ./stress_zipper stress-zipper.prof > stress-zipper.pdf
See stress-zipper.pdf for that detailed output. Code line numbers have changed. We also reduced the number of sends by 10x to get similar run time as optimized case (who has time to wait a whole minute to run a program?!).
The zipper branches are seen to account for about 50% of the total running as summarized:
- 40% in
drain_waiting()
- 19% in
next()
(pop’ing the heap) - 5% in
complete()
(checking if safe to drain) - 3% in
back_inserter()
(filling output)
- 19% in
- 8% in
feed()
(mostly heap insertion)
Most of the time is ultimately spent in constructing, destructing and
copying the Node::payload
object. If this is a problem, the caller
could provide payloads as pointers or another scalar index.
Replacing Payload
with size_t
gives almost 3x speed up. Note, 10x
more than prior optimized.
❯ g++ -O2 -std=c++17 -o stress_zipper stress_zipper.cpp && time ./stress_zipper Nstream=10, Nsend=100 M, Nchunks=100 Tot: 18.9862 s, 5.26698 MHz Zip: 8.63253 s, 11.5841 MHz ________________________________________________________ Executed in 18.99 secs fish external usr time 18.98 secs 0.00 micros 18.98 secs sys time 0.00 secs 736.00 micros 0.00 secs
Sending an index to a Payload through the merge instead of the entire object gives about 3x speed up.
Back to a full Payload
and the “lossy, latency bounding” drain.
❯ g++ -O2 -std=c++17 -o stress_lossy stress_lossy.cpp && time ./stress_lossy Nstream=10, Nsend=10 M, Nchunks=100, Nlost=0, Nleft=15 Tot: 8.11915 s, 1.23166 MHz Zip: 5.69683 s, 1.75536 MHz ________________________________________________________ Executed in 8.12 secs fish external usr time 8.11 secs 517.00 micros 8.11 secs sys time 0.01 secs 95.00 micros 0.01 secs
It is currently 2x slower than lossless.