A design consideration when creating bus1 was to create a IPC mechanism that was predictable and which did not introduce any ambiguities not already present in the system. In particular, the order in which messages are delivered should be well-defined and should make intuitive sense, ideally without sacrificing performance nor scalability.
This page sets out what requirements we place on the order of messages in bus1, and outlines (in simplified terms) how the message transaction logic is implemented. For a more technical explanation see the comments in the sources.
Seen from userspace, one may think of messages being ordered as if the whole system had one global lock, ensuring that only one message transaction could take place at any given time. In practice, there is no global lock, nor any global synchronization of any kind, but it may still be a helpful intuition. Note, however, that this intuition only explains the order of deliver, but not the time of deliver. In particular, there may be a (small and bounded) delay between a message having been sent and it being ready to be received.
The desired behaviour of the system may be summarized in three concepts: consistency, causality, and locality.
The bus must not introduce any ambiguity which may lead different peers to observe the state of the system in different ways. In particular, the order of multicast messages must be consistent: If two peers multicast one message each, then any two peers receiving both messages must be guaranteed to receive them in the same order.
The bus must respect causality: If the sending or receiving of a message may have caused the sending of another message (by whatever means), then the messages must always be ordered accordingly. In other words, it must not be possible to "receive the effect before the cause". For more on this topic, see Lamport's happens before relation.
It is worth noting that whilst the only order we are interested in is the order of message deliver, we still take side-channel communication into account for the purposes of causality. In particular, an event on one peer may cause a message being sent from another by having the first event trigger a signal (outside the control and knowledge of the bus) to be sent to the other peer and hence trigger the sending of the message.
The bus must not rely on any global synchronization. No restriction should be placed on the scalability of the system. Rather the system is expected to have to deal with ever greater parallelism in the future. The cost of this is a somewhat more complex implementation with a somewhat higher (though still modest) overhead in the uncontended case.
In particular, each queue implements a set of operations, each of which are considered to be atomic (protected by a per-queue lock) and each of which may have a cost of at most O(log(n)) in the length of the queue. This is the only locking or other synchronization taking place, no queue lock is held for anything more than one queue operation and no queue lock is held under another.
Naively, one could imagine queues simply being implemented as FIFOs. Though without some sort of synchronization, that contradicts our requirements. For instance, both consistency and causality may be violated by multicast messages as follows:
- the delivery of one multicast could "overtake" the delivery of another (even if the destination peers were visited in the same order) and hence inverting the order in which different peers receive messages; and
- one queue may receive one instance of a multicast message and successfully send a reaction to it to a second queue, before that queue receives its instance of the original multicast message.
Instead of basing the order of messages on the order of delivery, it is based on assigning a timestamp to each message, and the relative order of two messages are determined by simply comparing their timestamps. The rest of this page is dedicated to determining what timestamp a given message should be assigned.
It is worth pointing out that whilst timestamps give a total order on all messages (allowing any two messages to be compared, even if they have no relation to each other), we only guarantee that their semantics makes sense for the purposes of ordering messages on a given queue. In particular, it is perfectly possible that a message received before another is sent has still has a greater timestamp, as long as there is no interaction between the two transactions. The concept of timestamps is an implementation detail which is not exposed to userspace.
The implementation is based on Lamport clocks (though adopted for multicast), and whilst this particular scheme is unlikely to be unique, we did not yet find anything in the literature precisely matching the semantics we want (pointers to references would be welcome).
Each queue has a local clock, which is just a simple counter. The clock is monotonically increasing, i.e., it cannot be rewinded.
When a message is sent, the clock on the local queue is ticked which increases the value of the counter and returns the new value, which is called a timestamp. The timestamp is guaranteed to be unique for the local clock. I.e., as the clock is never rewinded, and ticking it advances the counter we know that the resulting timestamps returned by two ticks of the same clock must be different and the latter strictly greater than the former.
The timestamp obtained from the sending clock is combined with a unique identification for the clock (for instance the pointer to the queue, but could be any globally unique number), and the result is stamped on the message being sent.
The order of messages are now determined by first comparing the timestamps. If the timestamps coincide (which is possible in case the messages originate at different queues), then it must mean (as will be seen below) that there is no possible causal relationship between the messages, so their order can be taken to be arbitrary. However, we still want to ensure consistency, so to resolve this ambiguity, the per-queue unique identifier is used to determine the order of the messages.
Synchronizing the clocks
Ticking the clock of the sending queue ensures that messages sent from the same queue are ordered accordingly. However, we also need that a message sent from a queue after a message received on that queue, should be ordered against each other correctly as well.
In order to achieve this, all clocks involved in a message transaction must be synchronized with the timestamp of the message of the message being sent. Synchronizing a clock with a given timestamp is a noop if the clock is ahead of the timestamp, otherwise the clock is fast-forwarded to match the timestamp.
Synchronizing the clocks ensures that any timestamp acquired from a clock for an outgoing message is greater than the timestamps of all the incoming messages currently on the queue. In particular, this gives the guarantee that causality between received and and sent messages is respected.
Note that it is crucial to first synchronize all the clocks before starting to queue the messages of a given transaction. Otherwise, a message could be dequeued on one queue and cause a side-channel signal to be sent triggering the sending of a message on another queue, before that queue has been synced with the timestamp of the original transaction.
Blocking the queue
Knowing how to compare two messages is only sufficient to decide when a message can be dequeued if you know you have all the relevant messages to compare. When dequeuing a message, the one with the lowest timestamp should be dequeued first, but without any further synchronization it may be that there are other messages with even lower timestamps that are yet to arrive at the queue.
This problem is solved by allowing a block to be installed at a given timestamp. There may be any number of blocks in place and the effect is that no message can be dequeued if it is preceded by a block. Before obtaining the timestamp of a message, we go through all the destination queues and block them at their current timestamp and collect the maximum of all these timestamps. The sending queue is then synchronized to the maximum timestamp before being ticked. Once a message is queued the corresponding block is removed from the queue.
As any block is placed after all messages already on the queue, and any message is placed after its corresponding block, we know that if a message is not preceded by any block, it is not possible that any future queued message has a lower timestamp. It therefore follows that if the first message on a queue is not blocked, it is ready to be dequeued.
In short, the algorithm works as follows:
- place a block on each of the destination queues, and compute the maximum of their current timestamps;
- synchronize the sending queue to this timestamp;
- tick the sending queue and assign this timestamp to the message being sent;
- synchronize each of the receiving clocks to the message's timestamp;
- queue the message on each destination queue and remove the corresponding block.
The cost of a message transaction is O(n log(m)), where n is the number of destinations and m is the maximum length of the receiving queues.
Without affecting the complexity, there are a couple of optimizations one could employ.
Any messages with timestamps between a block and the corresponding message are blocked unnecessarily. It therefore would be desirable to minimize this window, by blocking at a timestamp as close to the corresponding message timestamp as possible. So rather than blocking the queues at their current timestamp, their clocks should first be synchronized with the best estimate we have of what the final timestamp wil be.
Moreover, it is not necessary to block the destination queue in case of unicast messages, but that requires a bit more thought.
As there is no synchronization between peers apart from the mechanism of local clocks outlined above, the notion of something "happening before" something else across different peers must always be expressed in terms of clocks and timestamps (cf. Lamport).
The bus employs a system of nodes and handles, which are essentially references to nodes. Nodes may be destroyed by their owner, even though there are still handles referencing the node. The behaviour of the system may be different based on an event happening before or after a node has been destroyed, so deciding when a node destruction takes place must be done in terms of timestamps and clocks.
When a node is destroyed, a timestamp is acquired from the queue of the owning peer and assigned as the destruction timestamp of the node. This is what messages may order against if they need to.
A message sent to a node should succeed if the node is valid, and fail if it has been destroyed. When a message is sent to a node, and the node has a destruction timestamp assigned, the message should be ordered against this timestamp, and if it is ordered before it should be delievered, and if it is ordered after it should not.
A handle passed along with a message, should be successfully installed in the receiver if it its underlying node is still valid at the time of reception, and it should fail otherwise. Again, the destruction timestamp of the node is used: if the destruction is ordered before the message containing the handle, then the handle should be considered invalid, otherwise it should be considered valid.
One way to consider this is that if a peer has received the destruction notification about a given handle, it should not be possible to receive the handle again later. And the other way around, if a handle to a node in the process of being destroyed is passed along, that would not cause any problems as long as the required destruction notification is guaranteed to arrive after the message to correctly clean things up.