Add more CRDT support to Riak #354

Closed
russelldb opened this Issue Jul 24, 2013 · 14 comments

8 participants

@russelldb
Basho Technologies member

CRDT support in Riak

Aim

Provide data types in Riak that make it easier for developers to reason about eventual consistency. Currently in Riak we
store opaque data. We do nothing to help the developer model their problem or deal with eventual consistency. By
providing a set of data types in riak that a developer can use to compose a complex value, that has well defined merge
semantics, we aim to make using Riak easier.

Idea

To expose CRDTs[1] to developers. Originally we had hoped to avoid the
current Get-Modify-Put cycle and provide an interface of simply
shipping operations to Riak. However recent work on this has shown
some inconsistencies in cases where the state on the vnode handling
the request is not that observed by a user. There are more exmaples in
the types below, but we have decided that the familiar Get-Modify-Put
cycle is needed. When you get a CRDT value, some context information
will be returned, this must be sent back to the server along with the
operations to perform. Failure to provide the context well lead to
less correct behaviour.

We already have an example of CRDTs in riak 1.4 with counters. If a
user wants to increment a counter they tell riak "ayo, increment my
counter by N". Riak actually stores a CRDT in a riak object, and this
operation is just a regular PUT, with some custom merge logic on the
vnode. If a user wants the current value of a counter they do a GET to
the counter resource and get back a single integer value. This is
actually just a normal riak GET, with the merge run by Riak. This
makes sense with a counter.

Things get more complex with Sets. When removing an element from a
Set, the context of the removal is the current state of the set. If
there is no context it is possible that a vnode with no current
state (think of a fallback spun up to handle a request) will not be
able to remove an element.

Things are more complex still with Maps (see below.)

It is the unpredictability of shipping only Operations that has lead
us to require a context for each operation.

It seems likely that a lot of work will be on optimising the size of
the context.

Data Types

We plan to offer the following data types. Implementation details of the chosen type, and unsolved issues below.

Counter

We already support the PN-Counter.

Details

Much like a version vector. A pair of lists of two tuples {actor(), integer()}. The first list is the P Counter. It
holds all the increments for the counter. The second list is the N Counter, it holds all the decrements. Each actor (in
riak a vnode) only increments its own entry in the list. The value of the counter is the difference between the sum of
the P Counter and the sum of the N Counter.

Issues

  • Partial failure / success (non-idempotent) - Riak may return an error, but due to a partial success the count persists. A re-try would lead to a re-count of that increment. Can only be avoided by having client side actor IDs. I suggest we just live with this failure mode.
  • Garbage - Maybe. Depends on cluster churn. Dead actors could theoretically be removed and their writes coalesced. Doing so has two difficulties:
    1. Deciding when an actor in Riak will act no more (retired actors)
    2. Consensus is required to perform the removal or the removed data will resurface on a merge. There maybe a non-consensus protocol (see recent paper Handoff Counters[2])) but we haven't explored this at all yet.

Set

None of the sets in the literature behave as they would if their concurrent operations were linearized. Of all the sets
in the catalog we feel the OR-Set is the most useful and intuitive.

Details

In the literature the OR-Set is made up of two sets of {element(), unique()}. The first set is the Add set, the
second the Remove (or tombstone) set. However, for various reasons this proves space inefficient, especially if the
same thing is added to the set often (more on that later.) Thanks to Riak being a distributed system with actors we came
up with a more space efficient OR-Set, which I'm calling a VVORSet (naming is hard.)

The VVORSet is a dictionary of element() -> {active_bit(), vclock()} mappings. When an actor adds an element to the
set it increments its entry in the clock for that element. When an actor removes an element from the set it flips the
active_bit from 1 to 0. This is the observed remove (i.e. the actor removed all the updates for that element that
it has observed, only.) Any concurrent add will result in a vlcock that is not strictly dominated by the remove clock
for the element, and on merge, the elements active_bit will flip back to 1 and the clock will be the add clock.

There is actually a further small optimization for space where actor ids are stored only once in a list, and replaced by
a single integer in the clocks. There is some computational expense at update and merge to look up actors, merge actor
lists, and replace actors. See the module[3] for more details.

Issues

  • Remove co-ordinated by a replica that does not contain the element - A client may get the current value for the Set of friends, and see that "Dave" is present. The client may ask Riak to remove "Dave" from the set of friends. The replica that co-ordinates the write may (for some reason (fallback?)) not contain the element "Dave". This is a failing precondition on the remove operation. What is the correct behaviour? After all, the user knows "Dave" is in the Set. 1. We require a Get-Modify-Post cycle for Set removal, sending the whole datastructure to the client as context, which the client sends back with the operations to be performed. 2. We could just act on the co-ordinating vnode (no-op) and have the user application re-issue the request when it observes the value still present. This is the behaviour if a context is not provided. However, there are edges, technically a element additions that have not been observed by the client could be removed if those operations interleave the client Get-Put. Therefore using a context is most correct.
  • Garbage - Tombstones are required for the set to behave deterministically in the face of adds concurrent with removes. However if all concurrent adds and removes have been received by all replicas, the tombstones are no longer needed. Removed tombstones can make up the majority of a sets size. Imagine a set that has had 1k elements added and 999 removed. Though it has but a single element it is the same size is a set with 1k members. We are investigating a consensus protocol that allows us to remove tombstones.

LWW Register

A single value register that converges on the value with the highest timestamp.

Details

Pretty simple. Either user supplies timestamp or the module generates one with:

make_micro_epoch() ->
    {Mega, Sec, Micro} = os:timestamp(),
    (Mega * 1000000 + Sec) * 1000000 + Micro.

Issues

  • Logical or wall clock LWW - Would a vector clock style logical LWW makes more sense? It would require the user to provide that logical clock as context rather than just the operation, but that seems reasonable.

Boolean Flags

These are one way flags. Enable flags start off and converge to on. Disable flags start on and converge to off.

Map

A Map of {name(), crdt_type()} -> crdt() CRDT that allows composition of complex documents made up of CRDTs. The
restriction is that the type of each field must be declared. This avoids the need for a lattice of CRDT types, which
we feel would lead to all Maps ultimately being Maps of Maps of Maps of Maps as we attempt to merge a name -> value ||
name -> value2
conflict into a single Map instance of both values (or an MVV, but why have siblings with CRDTs?)

We have a prototype (see below) that allows users to update / add / remove sub elements in the Map.

Details

The Map is implemented as a VVORSet of {name(), type()} pairs called the Key Set and a dictionary of {name(), type()}
-> crdt()
mappings called the Value List. name is anything stringish, and type is one of the supported CRDT
modules.

We use a VVORSet for the keyset to allow concurrent update / remove of fields. The need for each update on
a field to be recorded as an add to the key set so that it wins over a concurrent remove is what lead us to develop the VVORSet in the first place.

When a user wishes to add a field to the Map they can simply issue an update operation to that field. If the field is not present at the local replica an empty CRDT of type is created, the
operation is applied, and the value is stored. A remove results in a VVORSet remove for the key, and we remove the
value from the Value List at once. We received a draft paper from Baquero et al (after we made our map) that has a similar state based CRDT spec that we used to develop our EQC model. There is room for discussion about tombstoning values too, as there are some oddities in concurrent update | remove, but we think providing the context overcomes most of these. See below for more details.

Issues

  • Garbage - exactly the same tombstoning issues as for VVORSet. But with the added complexity that a Map can be arbitrarily nested with arbitrarily many Sets / Maps as field values.
  • Concurrent Update and Remove - There is some difficulty still to be solved around concurrent remove and update. Imagine the following sequence:
    1. Increment Counter at Field F in Map M by 5 on replica X (F = [{X, 5}])
    2. Merge with replica Y
    3. Increment Counter at Field F in Map M by 3 on replica Y (F = [{X, 5}, {Y, 3}])
    4. Increment Counter at Field F in Map M by 1 on replica X (F = [{X, 6}])
    5. Remove Field F from Map M on replica X (F = null)
    6. Increment Counter at Field F in Map M by 1 on replica Y (F = [{X, 5}, {Y, 4}])
    7. Merge X and Y (F = [{X, 5}, {Y, 4}]) What should be the correct value of the counter at this point? As designed the concurrent update wins over the remove, but what has happened to the increment at step 4? Let me know if you think this is acceptable. The alternative, of course, is…complex… There is some debate that if you want to remove a field and then re-add it, what you really need to do is somehow reset it. In the case of a counter, rather than removing and starting again, increment by -current_value. For a set remove all elements. Another option is that we keep per-remove tombstones, and updates concurrent with a remove are preserved. This may lead to surprising behaviour with entirely concurrent add >> remove >> add >> remove cycles on a fields, but again…maybe remove is the wrong operation in that context. Please weigh in on this debate, as the Map is very valuable for modelling arbitrary compositions of CRDTs.

Garbage Collection

Here is the current thinking on GC:

What is garbage?

As mentioned in the tour de types above, garbage is different for different CRDTs. In counters it is retired actors, in Sets it is tombstone elements. In Maps it may even include tombstone values.

In Sets garbage is very easy to identify (tombstones), in counters it is harder. How do we decide what is a retired actor? The garbage in all CRDTs is a result of the monotonic function, or at least constant inflation of CRDTs. It is this constant growth that gives them the merge properties that have. However, after all replicas have seen a concurrent add and remove the tombstone in a set is useless. So some garbage can safely be removed.

This garbage cannot be removed in an ad hoc, unilateral manner, as it will simply be re-introduced on a merge. In the case of Sets this would be wasteful, in the case of counters, it would lead to incorrect values.

Garbage then slows the system down over time. Reading a Set with many tombstones from disk, sending it over the network, is all wasteful. We would like to at least get the garbage out of the primary read-write path of an object.

A man, a plan, a (garbage) can

Some suggestions exist in the literature for garbage collection, like ORSWOTS[4] or Handoff Counters[2], but neither is a general purpose mechanism for multiple CRDT types (and [4] requires casual delivery.)

I'm going to focus on Sets for now, since (a) they have the most obvious garbage problem and (b) we can side step the problem of what is garbage (c) I doubt counters will accrue much garbage.

I think that we can't remove a tombstone element while there is a chance that a concurrent add of that element exists anywhere in the cluster. In order to be sure no such add exists we would need to check every node.

Equally, I think it would be a bad idea to require every node in the cluster to be available to collect garbage from a Set (but maybe that is fine.)

My current idea is to take advantage of Joseph Blomstedt's proposed strong consistency work to write an immutable log of garbage collection epochs per key. This means that any single replica can perform a unilateral garbage collection, and all other replicas can 'catch up' when they merge and detect that they're from a different epoch. Of course this just moves the garbage off the critical path and leaves us with the problem of truncating or trimming the log. We can use a full cluster gossip protocol to truncate the log as it can happen lazily in the background as nodes are available. Details of these three (GC log, catch up, GC log pruning) protocols to follow shortly.

Multi Data Centre

For GC to work in MDC clusters of clusters we think a cluster local / cluster remote view of CRDTs is required. This
would require a pre-repl / post-repl hook of some sort that allows a CRDT to roll up its local representation prior to
replication.

Since we have implemented an actor based OR-Set, and the counters are actor based, we think this is as simple as
replacing all local actor IDs with a single ClusterId at roll up before repl, and dropping a clusters local ID from the
state on repl receive.

This is way over simplified and one of the things I'm still figuring out.

API / User Interface

This is a thorny issue. It would be great to allow clients (like the Java Client) to create Objects that are based on
Maps, with fields annotated with CRDT types and never have to deal with siblings ever.

We've started looking at some ways to represent the manipulation / querying of maps. Please comment on the examples:

https://github.com/metadave/gravy#update
https://gist.github.com/seancribbs/1020fb807e65dfda8d97
https://gist.github.com/russelldb/3c9613d1067e0cd71736 (this one is how the prototype works, not really an option)


[1] CRDTs - A comprehensive study of Convergent and Commutative Replicated Data Type
[2] Handoff Counters
[3] VVORSet implementation - riak_dt_vvorset
[4] ORSWOTS - An Optimized Conflict-free Replicated Set

@russelldb
Basho Technologies member

Copying initial comments from private repo:

From @metadave

@seancribbs and I are talking about a general query language for Riak, to be implemented in Erlang, that includes CRDT's:
https://gist.github.com/metadave/6024103

From @lenary

I also have ideas for a CRDT Query/Update language: https://gist.github.com/lenary/da1b98a97b43a06ecf3e

However they need some work, and after discussion with @seancribbs it seems I might take them down a completely different track. :)

@russelldb
Basho Technologies member

There are many ways to make a CRDT map it seems. None of them are really satisfying to me (yet). But I need to pick one and move on. If you have the time, please help.

All the maps use something like an OR-Set for keys. All Maps map keys of {name(), crdt_type()} -> crdt(crdt_type()).

Original (Carlos Map): OR-Set for keys, dict of Key->Value

Upside: pretty small. Only keyset has tombstones.
Downside: A(update >> remove) | B(update) loses A's update on merge. This is because the remove simply drops the value at A, so B keeps all of A's updates up-to when they diverged and re-introduces them to the value on merge. Add wins, but something is missing.

Remove Wins: dict of {name(), crdt_type()} -> {vclock(), active(), crdt(crdt_type(), tombstone() | undefined}

Upside: Semantic is pretty clear: when you remove an element we write a tombstone. On merge any updates concurrent with the remove are dropped. The field can be re-added. If it is re-removed update the tombstone clock.
Downside: Long partition behaviour is confusing (imagine MDC write - write clusters) If a remove of a field happens on A that is concurrent with updates on B all B's updates are dropped on merge.

Add Wins: same as above except a list of tombstones, each a {clock(), crdt()} pair

Upside: no write is ever lost, ever, ever.
Downside: fiddly, complex impl, lots of space overhead for tombstones, surprising behaviour in long partitioned replicas (i.e. if A(update >> remove >> update >> remove) | B(update >> remove >> update >> remove) field is present and merged value of all updates)

Reset-Remove: dict of {name(), crdt_type()} -> {vclock(), active(), crdt(crdt_type())} mappings

Upside: simple to understand semantic, appears most correct most often during partitioned state
Downside: doesn't support every CRDT, (i.e. G-Counter, G-Set, One Way Flags are unsupported)

Works thusly: A remove first resets the crdt value (for a counter this means issue a decrement / increment that sets the counter to zero, for a set remove all elements, for a map reset/remove all fields), then it removes the key. The value is kept as a tombstone.
If the user wants to re-add the field, it is as though it were an empty field, but any concurrent updates at other replicas maintain their writes, and the value they had for the removed replica are dominated by the reset operation.

Pick one!

@kuenishi

Just got curious: How can we construct a combined CRDTs like {crdt_map(), crdt_map(), crdt_counter()} - which I expect each element of the tuple each CRDT data? Theoretically it looks like possible if CRDTs are not nested, like crdt_map(string(), crdt_counter()) .

This is because, from the viewpoint of application development ease, we rarely store single-valued object with only single counter included, but usually user model where multiple member included like this:

-record(user, {
               name :: string(),
               email :: string(),
               total_count_liked :: non_neg_integer(),
               friends :: [ string() ], %% usually friends' user names
               ... }).

In this case possible CRDT is applicable at total_count_liked and friends . Or is this much advanced problem to solve in future?

@russelldb
Basho Technologies member

@kuenishi Unless I misunderstand your question, this composition is what the Map is for. A Map is like a JSON document (for example.) Each field in the Map is a CRDT. So there may be Maps in Maps (in Maps).

Your model would be expressed as (assuming the Key in Riak is User):

{ {name, riak_dt_lwwreg} -> lwwreg,
   {email, riak_dt_lwwreg} -> lwwreg,
   {total_count_liked, riak_dt_pncounter} -> pncounter,
   {friends, riak_dt_orset} -> orset
}

and so on. The Map is used to compose CRDTs. We can only store CRDTs in the Map, the Map's field names include the Type of the thing stored, so Ints are pncounters, strings are Last Write Wins Registers, Lists of Friends are OR-Sets, and so on.

Does that answer your question?

@kuenishi

Thank you, now I get it! That's what we have been wanted.

@seancribbs

@kuenishi I might mention that for the first pre-release we have decided not to allow maps-in-maps via the public API (although they are technically possible).

@radix

Hi, I'm just a curious observer learning about CRDTs, but I'm confused about something.

You mention that the literature describes OR-Set as two sets of pairs of (e, unique), but the INRIA paper actually describes it as only one set of pairs of (e, unique) (see Specification 15).

Is there another definition of OR-Set that I'm missing?

Pardon my annoying question...

@lenary

@radeex our two sets are essentially 1) a set of (elem, token) for additions, and 2) a set of (elem, token) for removals (copies of the ones in the addition set if something is removed). We get the exact same semantics, without having to track when things were added and removed if we do it this way, and just unioning the set on a merge. This is the simplest implementation.

I'd also point out that Spec 15 isn't state-based OR-Set, which is what we implemented, it's an op-based OR-Set. We've avoided op-based CRDTs for various reasons, essentially feeling that state-based give a simpler model for us to program with underneath.

However, It turns out, @russelldb is a genius, and has worked out how to implement a set crdt with OR-Set semantics, that can track the causality of when something was added or removed enough that we only have a single tracking "set", and merges can work out how to do the right thing. It's called riak_dt_orswot, and is in the develop branch of the basho/riak_dt repo.

@russelldb
Basho Technologies member

@radeex the tombstoneless set is based on this work (http://arxiv.org/pdf/1210.3368.pdf ([4] above)), in fact we've been very lucky to have Carlos Baquero work with us on the Map and Set implementation.

The confusion about the OR-Set you mention is probably, as @lenary points out, the difference between a state and op based set. The paper at [1] says:

"We leave the corresponding state-based specification as an exercise for the reader. Since
  every add is effectively unique, a state-based implementation could be based on U-Set."

The easiest way to think of this is two U-Sets, one for adds, one for removes. And a naive implementation would do just that. In practice you can keep an unique ID per actor, and increment that per add (starting to look a lot like a version vector per element), rather than store all IDs, and use a single bit to denote if an element is present or not…and keep optimising until you end up with an ORSWOT.

@radix

Okay, I was wondering if it was about the op-based vs state-based implementations. Your implementation sounds pretty clever. Thanks for the explanation!

@maxsz

Hi, are you people aware of this work? http://run.unl.pt/bitstream/10362/7802/1/Sousa_2012.pdf

@seancribbs
@russelldb
Basho Technologies member

Hi @maxsz yes! I have read parts of Valter's dissertation (though not all yet). I've been corresponding with Valter, and Professor Nuno Preguiça (his supervisor), along with Carlos Baqueror and others from the CRDT research team.

We're actually involved in SyncFree, a new 3 year research project, and hope to bring useful discoveries into Riak in the future. If you want stay informed on that project there is a website here https://syncfree.lip6.fr/

In fact, Valter has been looking at CRDT invariants, and I believe he is using the pre5 riak release (with CRDTs) for his research at the moment. He's been on the Riak mailing list lately.

@oleksiyk oleksiyk referenced this issue in nlf/riakpbc Feb 12, 2014
Merged

Riak 2.0 support #51

@jaredmorrow jaredmorrow added this to the 2.0 milestone Mar 24, 2014
@seancribbs

I think we've addressed the RFC bits of this for 2.0, closing. Reopen with comments if you disagree.

@seancribbs seancribbs closed this Mar 24, 2014
@rzezeski rzezeski modified the milestone: 2.0-beta, 2.0 Mar 25, 2014
@ianclegg ianclegg referenced this issue in RBMHTechnology/eventuate Sep 21, 2015
Closed

Implement an OR Shopping Cart CRDT #114

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment