Skip to content

Riak Core Broadcast

Doug Rohrer edited this page Sep 20, 2016 · 2 revisions

Riak Core Broadcast

The following describe the broadcast subsystems implemented by riak_core_broadcast. It is intended for those wishing to use the subsystem as well as support and modify it.

1. Overview

riak_core_broadcast provides a generalized mechanism for ensured eventual delivery of messages to all nodes in a riak_core cluster. The development of this subsystem was driven primarily by the implementation of Cluster Metadata, however it was built for use in other applications as well. Because of this generalization, it is required the client application/subsystem provide certain guarantees in order to ensure 100% reliability when delivering messages.

Broadcasted messages are opaque to riak_core_broadcast. The only requirement is that each message must be uniquely identifiable. The storage of messages is the responsibility of the application/subsystem using riak_core_broadcast. For more details about what is required when using riak_core_broadcast see Broadcast Handlers.

To deliver messages riak_core_broadcast sends them along the paths of a directed, possibly cyclic, graph of all nodes in the cluster. In most cases, however, the graph will be a spanning tree. Only in the cases where membership changes or failures occur will a less optimal path be used. Keep reading for more details.

riak_core_broadcast provides the following guarantee regarding message delivery -- assuming the application/subsystem using it meets the necessary requirements (see Broadcast Handlers):

A broadcasted message will be delivered to all nodes *at least
once*. There is one exception: when a node the a broadcast originates on is
removed before it can deliver the payload to at least one other
node, this guarantee is not met.

Exploration of ways to mitigate the exception mentioned are being tracked in this issue.

2. Implementation

riak_core_broadcast is a hybrid of deterministic broadcast and stochastic epidemic protocols: a self-healing spanning tree. The primary path for the delivery of messages is to broadcast using the spanning tree but when links fail, they are healed by using pull-based gossip [1]. Sometimes, the spanning tree may have additional links -- making it a directed , and possibly cyclic, graph while it is being healed or when membership changes. In addition, the entire mechanism is backed by a standard anti-entropy epidemic [1]

2.1 Plumtree

The implementation is based upon "Plumtree" [2] , which specifies an algorithm for a self-healing spanning tree. The paper [2] has a clear and concise description of the algorithm so none will be recreated here.

Plumtree was chosen, primarily, because of its efficiency in the stable case while maintaining reliability. When the graph of nodes is a proper spanning tree, each node will receive the payload exactly once. In failure cases, nodes may receive the payload more than once, but the act of doing so heals the tree and removes redundant links.

2.2 Differences & Extensions

riak_core_broadcast extends what is specified in the Plumtree Protocol. The requirements of the implementation in [2] and riak_core_broadcast differ in several ways. The following describes those differences and their effect on the implementation.

2.2.1 Peer Service

In large-scale, partially connected eventual delivery protocols it is common to separate responsibilities into two parts: the peer service and the message overlay. The latter is what Plumtree implements and it is responsible for providing a reliable and hopefully efficient delivery mechanism for most messages. The former is responsible for ensuring delivery of critical data like membership, among other things not relevant to this document and may do so less efficiently at the cost of safety. The message overlay relies on the peer service and makes certain assumptions about it. These assumptions are the source of many of the differences between the implementation specified in [2] and riak_core_broadcast. The Peer Service in [2] is HyParView [3]. In riak_core it is the existing riak_core_gossip.

The assumptions made in [2] are discussed in section 3.3 of the paper. The "Connectivity" requirement of the service is not one that can be provided by Riak. Namely, a node must be able to be a singleton during a long lived partition and still ensure the delivery to all nodes. The implementation in [2] assumes in this case that the peer service will fail this node out of the overlay and that failure to deliver messages to it or sent from it is acceptable. We do not.

The paper also is focused on clusters on the order of tens of thousands of nodes, whereas Riak cluster, typically range from 5 to 100. The Peer Service for riak_core_broadcast is fully-connected -- each node is aware of every other node in the cluster and can communicate with them directly outside of riak_core_broadcast. This affects how membership is handled.

2.2.2 Historical Delivery

The reliability of Plumtree is measured in [2] by determining how many healthy nodes receive the message, where healthy nodes are defined to be nodes that have not failed while there are protocol messages related to the broadcast "in the network". The reliability of riak_core_broadcast, however, must define healthy nodes as those that have not permanently failed or have been removed from the cluster.

When considering dynamic membership, the definition of healthy nodes in [2] is extended to include joined nodes that did so while there were still protocol messages related to the broadcast "in the network". riak_core_broadcast, extends this further to include any node which may join in the future, despite protocol messages no longer being "in the network".

This requirement can be thought of as "Historical Delivery".

2.2.3 Lazy Message Queuing

Plumtree is a reliable protocol in the face of massive node failures, which was the focus of the research, however, it is not tolerant to network partitions. This is hinted at in the "Connectivity" requirement [2] of Plumtree's Peer Service and can be proven using a formal model. To address this, riak_core_broadcast extends how the set of unsent lazy (or "i have") messages is maintained and adds an additional message: "ignore i have".

The original algorithm removes the record of an "i have" message, used to indicate it needs to sent, as soon as one attempt is made to send it. This is a problem when messages are dropped by the network. riak_core_broadcast maintains this record after the attempt and periodically scans the set of them and re-sends.

The record is removed from the set when an "ignore i have" message is received. This message is sent by the peer who received the "i have" and indicates that no graft is necessary.

2.2.4 Anti-Entropy

In cases where a large percentage of nodes fail, the union of all sets of lazy links between nodes, which heal the spanning tree, will no longer contain links between all nodes. When this happens, messages may not be delivered to all healthy nodes even under the less strict definition used in [2]. Although, [2] discusses a similar failure scenario, they are not quite the same because of the Peer Service, however the differences are not worth exploring further.

Although high failure percentages are not so common in riak_core applications, it is still an unacceptable failure scenario. riak_core_broadcast uses a classic anti-entropy epidemic [1] to ensure that each node eventually checks-in with every other node in the cluster and delivers any missing messages. These are referred to as "Exchanges".

This, also, ensures that riak_core_broadcast maintains Historical Delivery.

2.2.5 Initial Peers

Since the Peer Service between the two implementations differ their generation of the set of initial eager and lazy peers for each node do as well. riak_core_broadcast uses the riak_core_util:build_tree/3. See riak_core_broadcast:init_peers/1 for more details.

2.2.6 Dynamic Membership

Since riak_core_gossip is fully-connected, unlike HyParView [3], how the system deals with membership changes has also been modified. riak_core_broadcast simply regenerates the initial peers, as if it were restarted w/ the new membership information, and allows the protocol to modify things as needed.

3. Broadcast Handlers

This section needs content

4. References