Skip to content

Commit

Permalink
Merge pull request #7 from derekkraan/improve_user_friendliness
Browse files Browse the repository at this point in the history
Introduce `DeltaCrdt` module and improve documentation
  • Loading branch information
derekkraan committed Nov 6, 2018
2 parents bddab2f + 89fbbc3 commit 2d400ea
Show file tree
Hide file tree
Showing 17 changed files with 274 additions and 128 deletions.
41 changes: 15 additions & 26 deletions README.md
@@ -1,15 +1,13 @@
# DeltaCrdt

DeltaCrdt implements some Delta CRDTs in Elixir.
DeltaCrdt implements some Delta CRDTs in Elixir. There is an [introductory blog post]() and the official documentation on [hexdocs.pm](https://hexdocs.pm/delta_crdt) is also very good.

CRDTs currently offered include:
- Add Wins Last Write Wins Map
- Add Wins Set
- Observed Remove Map

Please open an issue or a pull request if you'd like to see any additional Delta CRDTs included.

The following papers have been partially implemented in this library:
The following papers have used to implement this library:
- [`Delta State Replicated Data Types – Almeida et al. 2016`](https://arxiv.org/pdf/1603.01529.pdf)
- [`Efficient Synchronization of State-based CRDTs – Enes et al. 2018`](https://arxiv.org/pdf/1803.02750.pdf)

Expand All @@ -20,42 +18,33 @@ Documentation can be found on [hexdocs.pm](https://hexdocs.pm/delta_crdt).
Here's a short example to illustrate adding an entry to a map:

```elixir
alias DeltaCrdt.{CausalCrdt, AWLWWMap}

# start 2 GenServers to wrap a CRDT.
{:ok, crdt} = CausalCrdt.start_link(AWLWWMap)
{:ok, crdt2} = CausalCrdt.start_link(AWLWWMap)
# start 2 Delta CRDTs
{:ok, crdt1} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap)
{:ok, crdt2} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap)

# make them aware of each other
send(crdt, {:add_neighbours, [crdt2]})
send(crdt2, {:add_neighbours, [crdt]})

# do an operation on the CRDT
GenServer.cast(crdt, {:operation, {:add, ["CRDT", "is magic"]}})
DeltaCrdt.add_neighbours(crdt1, [crdt2])

# force sending intervals to neighbours
send(crdt, :ship_interval_or_state_to_all)
# show the initial value
DeltaCrdt.read(crdt1)
%{}

# wait a few ms to give it time to replicate
Process.sleep(50)
# add a key/value in crdt1
DeltaCrdt.mutate(crdt1, :add, ["CRDT", "is magic!"])

# read the CRDT
CausalCrdt.read(crdt2)
#=> %{"CRDT" => "is magic"}
# read it after it has been replicated to crdt2
DeltaCrdt.read(crdt2)
%{"CRDT" => "is magic!"}
```

## TODOs

- implement join decomposition to further reduce back-propagation.

## Installation

The package can be installed by adding `delta_crdt` to your list of dependencies in `mix.exs`:

```elixir
def deps do
[
{:delta_crdt, "~> 0.1.11"}
{:delta_crdt, "~> 0.2.0"}
]
end
```
142 changes: 142 additions & 0 deletions lib/delta_crdt.ex
@@ -0,0 +1,142 @@
defmodule DeltaCrdt do
@moduledoc """
Start and interact with the Delta CRDTs provided by this library.
A CRDT is a conflict-free replicated data-type. That is to say, it is a distributed data structure that automatically resolves conflicts in a way that is consistent across all replicas of the data. In other words, your distributed data is guaranteed to eventually converge globally.
Normal CRDTs (otherwise called "state CRDTs") require transmission of the entire CRDT state with every change. This clearly doesn't scale, but there has been exciting research in the last few years into "Delta CRDTs", CRDTs that only transmit their deltas. This has enabled a whole new scale of applications for CRDTs, and it's also what this library is based on.
A Delta CRDT is made of two parts. First, the data structure itself, and second, an anti-entropy algorithm, which is responsible for ensuring convergence. `DeltaCrdt` implements Algorithm 2 from ["Delta State Replicated Data Types – Almeida et al. 2016"](https://arxiv.org/pdf/1603.01529.pdf) which is an anti-entropy algorithm for δ-CRDTs.
While it is certainly interesting to have a look at this paper and spend time grokking it, in theory I've done the hard work so that you don't have to, and this library is the result.
With this library, you can build distributed applications that share some state. [`Horde.Supervisor`](https://hexdocs.pm/horde/Horde.Supervisor.html) and [`Horde.Registry`](https://hexdocs.pm/horde/Horde.Registry.html) are both built atop `DeltaCrdt`, but there are certainly many more possibilities.
Here's a simple example for illustration:
```
iex> {:ok, crdt1} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, sync_interval: 3)
iex> {:ok, crdt2} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, sync_interval: 3)
iex> DeltaCrdt.add_neighbours(crdt1, [crdt2])
iex> DeltaCrdt.read(crdt1)
%{}
iex> DeltaCrdt.mutate(crdt1, :add, ["CRDT", "is magic!"])
iex> Process.sleep(10) # need to wait for propagation for the doctest
iex> DeltaCrdt.read(crdt2)
%{"CRDT" => "is magic!"}
```
"""

@default_sync_interval 50
@default_ship_interval 50
@default_ship_debounce 50

@type crdt_option ::
{:notify, {pid(), term()}}
| {:sync_interval, pos_integer()}
| {:ship_interval, pos_integer()}
| {:ship_debounce, pos_integer()}

@type crdt_options :: [crdt_option()]

@doc """
Start a DeltaCrdt and link it to the calling process.
There are a number of options you can specify to tweak the behaviour of DeltaCrdt:
- `:notify` - when the state of the CRDT has changed, `msg` will be sent to `pid`. Varying `msg` allows a single process to listen for updates from multiple CRDTs.
- `:sync_interval` - the delta CRDT will attempt to sync its local changes with its neighbours at this interval. Default is 50.
- `:ship_interval` - the delta CRDT will notify the listener at this interval, in ms. Default is 50.
- `:ship_debounce` - debounce notify messages, in milliseconds. Default is 50.
"""
@spec start_link(
crdt_module :: module(),
opts :: crdt_options(),
genserver_opts :: GenServer.options()
) :: GenServer.on_start()
def start_link(crdt_module, opts \\ [], genserver_opts \\ []) do
GenServer.start_link(
DeltaCrdt.CausalCrdt,
{crdt_module, Keyword.get(opts, :notify, nil),
Keyword.get(opts, :sync_interval, @default_sync_interval),
Keyword.get(opts, :ship_interval, @default_ship_interval),
Keyword.get(opts, :ship_debounce, @default_ship_debounce)},
genserver_opts
)
end

@doc """
Include DeltaCrdt in a supervision tree with `{DeltaCrdt, [crdt: DeltaCrdt.AWLWWMap, name: MyCRDTMap]}`
"""
def child_spec(opts \\ []) do
name = Keyword.get(opts, :name, nil)
crdt_module = Keyword.get(opts, :crdt, nil)
shutdown = Keyword.get(opts, :shutdown, 5000)

if is_nil(name) do
raise "must specify :name in options, got: #{inspect(opts)}"
end

if is_nil(crdt_module) do
raise "must specify :crdt in options, got: #{inspect(opts)}"
end

%{
id: name,
start:
{DeltaCrdt.CausalCrdt, :start_link,
[crdt_module, Keyword.drop(opts, [:name]), [name: name]]},
shutdown: shutdown
}
end

@doc """
Notify a CRDT of its neighbours.
This function allows CRDTs to communicate with each other and sync their states.
**Note: this sets up a unidirectional sync, so if you want bidirectional syncing (which is normally desirable), then you must call this function twice (or thrice for 3 nodes, etc):**
```
DeltaCrdt.add_neighbours(c1, [c2, c3])
DeltaCrdt.add_neighbours(c2, [c1, c3])
DeltaCrdt.add_neighbours(c3, [c1, c2])
```
"""
@spec add_neighbours(crdt :: GenServer.server(), neighbours :: list(GenServer.server())) :: :ok
def add_neighbours(crdt, neighbours) when is_list(neighbours) do
send(crdt, {:add_neighbours, neighbours})
:ok
end

@spec mutate(crdt :: GenServer.server(), function :: atom, arguments :: list()) :: :ok
@doc """
Mutate the CRDT synchronously.
For the asynchronous version of this function, see `mutate_async/3`.
To see which operations are available, see the documentation for the crdt module that was provided in `start_link/3`.
For example, `DeltaCrdt.AWLWWMap` has a function `add` that takes 4 arguments. The last 2 arguments are supplied by DeltaCrdt internally, so you have to provide only the first two arguments: `key` and `val`. That would look like this: `DeltaCrdt.mutate(crdt, :add, ["CRDT", "is magic!"])`. This pattern is repeated for all mutation functions. Another exaple: to call `DeltaCrdt.AWLWWMap.clear`, use `DeltaCrdt.mutate(crdt, :clear, [])`.
"""
def mutate(crdt, f, a)
when is_atom(f) and is_list(a) do
GenServer.call(crdt, {:operation, {f, a}})
end

@spec mutate_async(crdt :: GenServer.server(), function :: atom, arguments :: list()) :: :ok
@doc """
Mutate the CRDT asynchronously.
"""
def mutate_async(crdt, f, a)
when is_atom(f) and is_list(a) do
GenServer.cast(crdt, {:operation, {f, a}})
end

@doc """
Read the state of the CRDT.
"""
@spec read(crdt :: GenServer.server(), timeout :: pos_integer()) :: crdt_state :: term()
def read(crdt, timeout \\ 5000) do
{crdt_module, state} = GenServer.call(crdt, :read, timeout)
apply(crdt_module, :read, [state])
end
end
43 changes: 39 additions & 4 deletions lib/delta_crdt/aw_lww_map.ex
@@ -1,21 +1,56 @@
defmodule DeltaCrdt.AWLWWMap do
alias DeltaCrdt.{CausalDotMap, AWSet, ORMap}
@opaque crdt_state :: CausalDotMap.t()
@opaque crdt_delta :: CausalDotMap.t()
@type key :: term()
@type value :: term()
@type node_id :: term()
@moduledoc """
An add-wins last-write-wins map.
def start_link(notify_pid \\ nil, opts \\ []) do
DeltaCrdt.CausalCrdt.start_link(new(), notify_pid, opts)
end
This CRDT is an add-wins last-write-wins map. This means:
* The data structure is of a map. So you can store the following values:
```
%{key: "value"}
%{"1" => %{another_map: "what!"}}
%{123 => {:a, :tuple}}
```
* Both keys and values are of type `term()` (aka `any()`).
* Add-wins means that if there is a conflict between an add and a remove operation, the add operation will win out. This is in contrast to remove-wins, where the remove operation would win.
* Last-write-wins means that if there is a conflict between two write operations, the latest (as marked with a timestamp) will win. Underwater, every delta contains a timestamp which is used to resolve the conflicts.
"""

alias DeltaCrdt.{CausalDotMap, AWSet, ORMap}

@doc "Convenience function to create an empty add-wins last-write-wins map"
@spec new() :: crdt_state()
def new(), do: %CausalDotMap{}

@doc "Add (or overwrite) a key-value pair to the map"
@spec add(key :: key(), val :: value(), i :: node_id(), crdt_state()) :: crdt_delta()
def add(key, val, i, map) do
{AWSet, :add, [{val, System.system_time(:nanosecond)}]}
|> ORMap.apply(key, i, map)
end

@doc "Remove a key and it's corresponding value from the map"
@spec remove(key :: key(), i :: node_id(), crdt_state()) :: crdt_delta()
def remove(key, i, map), do: ORMap.remove(key, i, map)

@doc "Remove all key-value pairs from the map"
@spec clear(node_id(), crdt_state()) :: crdt_delta()
def clear(i, map), do: ORMap.clear(i, map)

@doc """
Read the state of the map
**Note: this operation is expensive, so it's best not to call this more often than necessary.**
"""
@spec read(map :: crdt_state()) :: map()
def read(%{state: map}) do
Map.new(map, fn {key, values} ->
{val, _ts} = Enum.max_by(Map.keys(values.state), fn {_val, ts} -> ts end)
Expand Down
6 changes: 6 additions & 0 deletions lib/delta_crdt/aw_set.ex
@@ -1,4 +1,10 @@
defmodule DeltaCrdt.AWSet do
@moduledoc """
An add-wins set.
This CRDT represents a set with add-wins semantics. So in the event of a conflict between an add and a remove operation, the add operation will win and the element will remain in the set.
"""

def new(), do: %DeltaCrdt.CausalDotMap{}

def add(element, i, %{causal_context: c}) do
Expand Down
1 change: 1 addition & 0 deletions lib/delta_crdt/causal_context.ex
@@ -1,4 +1,5 @@
defmodule DeltaCrdt.CausalContext do
@moduledoc false
defstruct dots: MapSet.new(),
maxima: %{}

Expand Down
79 changes: 15 additions & 64 deletions lib/delta_crdt/causal_crdt.ex
Expand Up @@ -3,11 +3,6 @@ defmodule DeltaCrdt.CausalCrdt do

require Logger

@default_sync_interval 50

@default_ship_interval 50
@default_ship_debounce 50

@outstanding_ack_timeout 20_000

@ship_after_x_deltas 100
Expand All @@ -16,64 +11,20 @@ defmodule DeltaCrdt.CausalCrdt do
@type delta :: {k :: integer(), delta :: any()}
@type delta_interval :: {a :: integer(), b :: integer(), delta :: delta()}

@moduledoc """
DeltaCrdt implements Algorithm 2 from `Delta State Replicated Data Types – Almeida et al. 2016`
which is an anti-entropy algorithm for δ-CRDTs. You can find the original paper here: https://arxiv.org/pdf/1603.01529.pdf
"""

defmodule State do
defstruct node_id: nil,
notify: nil,
neighbours: MapSet.new(),
neighbour_refs: %{},
crdt_module: nil,
crdt_state: nil,
shipped_sequence_number: 0,
sequence_number: 0,
ship_debounce: 0,
deltas: %{},
ack_map: %{},
outstanding_acks: %{}
end

def child_spec(opts \\ []) do
name = Keyword.get(opts, :name, nil)
crdt_module = Keyword.get(opts, :crdt, nil)
shutdown = Keyword.get(opts, :shutdown, 5000)

if is_nil(name) do
raise "must specify :name in options, got: #{inspect(opts)}"
end

if is_nil(crdt_module) do
raise "must specify :crdt in options, got: #{inspect(opts)}"
end

%{
id: name,
start: {__MODULE__, :start_link, [crdt_module, Keyword.drop(opts, [:name]), [name: name]]},
shutdown: shutdown
}
end

@doc """
Start a DeltaCrdt.
"""
def start_link(crdt_module, opts \\ [], genserver_opts \\ []) do
GenServer.start_link(
__MODULE__,
{crdt_module, Keyword.get(opts, :notify, nil),
Keyword.get(opts, :sync_interval, @default_sync_interval),
Keyword.get(opts, :ship_interval, @default_ship_interval),
Keyword.get(opts, :ship_debounce, @default_ship_debounce)},
genserver_opts
)
end

def read(server, timeout \\ 5000) do
{crdt_module, state} = GenServer.call(server, :read, timeout)
apply(crdt_module, :read, [state])
end
@moduledoc false

defstruct node_id: nil,
notify: nil,
neighbours: MapSet.new(),
neighbour_refs: %{},
crdt_module: nil,
crdt_state: nil,
shipped_sequence_number: 0,
sequence_number: 0,
ship_debounce: 0,
deltas: %{},
ack_map: %{},
outstanding_acks: %{}

### GenServer callbacks

Expand All @@ -85,7 +36,7 @@ defmodule DeltaCrdt.CausalCrdt do
Process.flag(:trap_exit, true)

{:ok,
%State{
%__MODULE__{
node_id: :rand.uniform(1_000_000_000),
notify: notify,
crdt_module: crdt_module,
Expand Down

0 comments on commit 2d400ea

Please sign in to comment.