Permalink
Branch: master
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
1148 lines (699 sloc) 36.5 KB
<!DOCTYPE html>
<html>
<head>
<title>Title</title>
<meta charset="utf-8">
<style>
@import url(https://fonts.googleapis.com/css?family=Yanone+Kaffeesatz);
@import url(https://fonts.googleapis.com/css?family=Abel);
@import url(https://fonts.googleapis.com/css?family=Ubuntu);
@import url(https://fonts.googleapis.com/css?family=Droid+Serif:400,700,400italic);
@import url(https://fonts.googleapis.com/css?family=Ubuntu+Mono:400,700,400italic);
body { font-family: 'sans-serif'; background-color: black;}
h1, h2, h3 {
font-family: 'Abel';
font-weight: normal;
}
.remark-code, .remark-inline-code { font-family: 'Ubuntu Mono'; }
.small-caps { font-variant: small-caps }
.small { font-size: 70%; }
.fade{ color: #aaa; }
.fittoslide > * { width: 100%; }
.eighty > * { width: 80%; }
/* Thanks to http://www.partage-it.com/animez-vos-presentations-remark-js/ (in French) */
.remark-fading { z-index: 9; }
.remark-slide-container {transition: opacity 0.5s ease-out; opacity: 0;}
.remark-visible {transition: opacity 0.5s ease-out; opacity: 1;}
.ably > img { width: 40%; }
</style>
</head>
<body>
<textarea id="source">
class: center, middle
# How to build your own Ably with Riak Core
.ably[![](./header_logo.svg)]
## Simon Woolf .small[&nbsp;&nbsp; simon@ably.io &nbsp;&nbsp; github.com/simonwoolf]
???
remark.js shortcuts:
p for presenter mode
c to create a new synced window
So, we're not going to start out by looking at Riak core. We're just gonna think about how we can design a system, starting in the simplest way possible and solving problems as we go, and see where we end up.
So, first things first, what is Ably? Ably is a Platform-as-a-service implementation of the pub-sub
---
# pubsub
???
i.e. 'publish-subscribe' pattern. Others include Pusher and PubNub.
The pub-sub pattern is pretty simple:
--
.eighty[![](./pubsub.png)]
???
it's a messaging pattern where the sender is decoupled from the receiver. A publisher publishes messages into topics, or 'channels', that subscribers can subscribe to. So it's the same amount of work for them to publish to a channel with one subscriber or a million.
Sounds simple enough. Sounds like a fun thing to build from scratch in Elixir. So, what's the simplest and most obvious way we could do that?
well, just keep all subscription state in a process
---
# approaches to pubsub in elixir
1. keep subscription state in a process
???
Maybe
--
```elixir
%{
"another_channel" => #MapSet<[#PID<0.57.0>]>,
"some_channel" => #MapSet<[#PID<0.57.0>, #PID<0.66.0>]>
}
```
???
in GenServer state as a Map of channels to a Set of subscribers to those channels. or perhaps
--
```elixir
# add a subscriber
GenEvent.add_handler(event_manager, {EventHandler, pid}, {pid, channels})
# emit an event
def handle_event(message, state = {subscriber_pid, channels}) do
if message.channel in channels do
send(subscriber_pid, {:message, message})
end
end
```
???
with GenEvent, where each subscriber adds a genevent handler. A bit cleaner in some ways, but the effect is much the same, all information on who is subscribed to what channel is kept in one process.
---
### approach 1. keep subscription state in a process
???
This has the advantage that
--
#### advantages:
- single point of synchronisation - consistent ordering, channel rules, channel outputs, etc.
???
there's one point of synchronisation, so consistent message ordering just happens, and it's very easy to add additional rules, or filters, or extra outputs, or anything else -- there's only one place to put them.
The downside is of course that
--
#### disadvantages:
- doesn't scale. at all
???
It doesn't scale. At all. That one process on one node is a bottleneck, everything goes through that.
---
### approach 1. keep subscription state in a process
#### advantages:
- single point of synchronisation - consistent ordering, channel rules, channel outputs, etc.
#### disadvantages:
- doesn't scale. at all
???
OK, so that's not ideal. So how can we make it better?
One way would be to say, ok, so what if messages don't actually go *through* that process, what if it just stores subscriber info, and the publisher process can query it to find out who's subscribing to that channel, and send the messages itself?
Which is a fair point. So lets take that idea and say, well, if that's all it's doing, we can make it even better by not using a process at all, and instead using
---
# approaches to pubsub in elixir
1. .fade[keep subscription state in a process]
2. property-based process registration
???
ETS tables!
Yeah, I know the slide says Property-based process registration, but under the hood, they're basically all ETS tables.
The three most common ones are
---
### approach 2. property-based process registration
a. the Elixir registry
b. `gproc`
c. PG2
???
the Elixir registry, gproc, and PG2.
So: the elixir registry can be used in either a mode where at most one process can register itself under a name, called 'unique' mode
or where any number of processes can register themselves under a name, called 'duplicate' mode. This approach will use duplicate mode.
So the idea is that if a subscriber process wants to subscribe to channel "foo", it
---
### approach 2. property-based process registration
#### the Elixir Registry
```elixir
Registry.register(SubscriptionsRegistry, "foo", null)
```
???
registers itself to a registry in duplicate mode under the key 'foo'. Then when a publisher wants to send a message to that channel, it uses
--
```elixir
Registry.dispatch(SubscriptionsRegistry, "channelname", fn (subscribers) ->
for {pid, _} <- entries, do: send(pid, message)
end)
```
???
Registry.dispatch to dispatch the message to all the subscribers.
Gproc offers even more different modes:
---
### approach 2. property-based process registration
#### `gproc`
'property', is non-unique, i.e. different processes can each register a property with the same name.
'name', is unique within the given context (local or global).
'counter', is similar to a property, but has a numeric value and behaves roughly as an ets counter (see update_counter/2.)
'aggregated counter', is automatically updated by gproc, and reflects the sum of all counter objects with the same name in the given scope. The initial value for an aggregated counter must be undefined.
'resource property', behaves like a property, but can be tracked with a 'resource counter'.
'resource counter', tracks the number of resource properties with the same name. When the resource count reaches 0, any triggers specified using an on_zero attribute may be executed (see below).
???
with its 'property' mode corresponding to the Elixir registry's 'duplicate' mode, and its 'name' mode corresponding to the Elixir registry's 'unique' mode. So for the moment we're looking at 'property' mode.
Finally we have PG2
---
### approach 2. property-based process registration
#### PG2
???
PG stands for 'process groups'. This is an erlang builtin. and is one of the two backends that Phoenix PubSub can use.
(The other thing Phoneix pubsub can do is just... *give up* and pass the problem onto redis. But we're not going to give up, are we? [raise arms, await 'no'])
No more audience participation for you guys.
---
### approach 2. property-based process registration
#### advantages:
- On a single node, scales a lot better than option 1
???
The registry / gproc / process groups / ets tables approach
scales a lot better than the previous approach. All three registries make efficient use of ets tables to avoid bottlenecking.
But there are some disadvantages:
--
#### Disadvantages:
- no single point of synchronisation, so linearisability is harder
???
Since you no longer a single process that all messages for a channel flow through, it's less obvious under what circumstances message ordering is still guaranteed, and it's harder to do channel rules, filters, extra outputs, and so on. Still possible, but no longer trivial.
---
### approach 2. property-based process registration
#### advantages:
- On a single node, scales a lot better than option 1
#### Disadvantages:
- no single point of synchronisation, so linearisability is harder
- distributed? ...sorta
???
Distribution -- so, we're using the beam, we want to scale out by adding more nodes to our mesh. Can we?
With the elixir registry: no, it's explicitly a local-only registry.
With PG2: yes, but all changes lock the cluster to ensure they're atomic, which causes scalability issues:
---
#### PG2 performance
.fittoslide[![](./pg2-perf.png)]
source: http://www.ostinelli.net/an-evaluation-of-erlang-global-process-registries-meet-syn/
???
You can see that as soon as we go from one node to 2, the max rate of changes decreases dramatically.
OK, what what about gproc? It has a global mode.
---
#### gproc global mode
???
This is an interesting one.
Gproc's global mode is kindof in flux at the moment. The released version uses a leader election module called `gen_leader`, which is used to elect a 'leader node', to keep everyone synchronised and resolve conflicts between nodes without having to do the PG2 / erlang global module thing of locking the cluster. Unfortunately the released version has quite a few known issues due to `gen_leader` issues -- fox example, you can't add a new node to the cluster after it's started.
---
#### gproc global mode
???
As far as I can tell, the author is part way through a decade-long project to solve these by moving away from `gen_leader` to a different solution called `locks_leader`, but the locks leader branch has had only two commits since 2013, so I guess progress is slow.
In short, gproc in global mode doesn't seem to be a great solution at the moment, sadly.
---
### approach 2. property-based process registration
#### advantages:
- On a single node, scales a lot better than option 1
#### Disadvantages:
- no single point of synchronisation, so linearisability is harder
- distributed? ...sorta
--
- scalability with number of subscribers per channel when distributed: ...
???
Scalability issues: channels with lots of subscribers. imagine you're on a distributed system with 4 nodes, and you have 40 thousand users subscribed to a channel, distributed evenly among the nodes.
In a distributed process registration system, when someone publishes to that channel,
---
### distributed scalability: channels with lots of subscribers
![](./scaling-bad.png)
???
it sends 40 thousand messages, and around 10 thousand get delivered to each node -- all separately, over the network.
This isn't very inefficient. Really what you want is something that lets you have one message be delivered to any node with more than zero subscribers on it, and then have local fan-out within that node, something like
---
### distributed scalability: channels with lots of subscribers
![](./scaling-good.png)
???
this.
Another issue is
---
### distributed scalability: connection state recovery
???
connection state recovery. In a decent pub-sub service, the end-user should be able to transparently recover over short periods of lost connection, say of up to a minute or two.
The 'obvious' way -- and the only way this is possible in the process group model -- is to have the subscriber process accumulate messages if they can't be pushed to the user. But there are subtleties here.
---
### distributed scalability: connection state recovery
???
For one thing, the connection process may not know what messages the end-user has received. It might be happily pushing messages over what it thinks is an active websocket, only for the websocket to die and the user to reconnect and say that they haven't received any messages since number 312 which was from 45 seconds ago. What do you do then?
---
### distributed scalability: connection state recovery
???
Maybe the connection process could keep the last (say) two minutes of messages, in a sort-of rolling message window, so it can handle that situation. Great. Except now go back to that example of having 10k subscribers to a channel on each node. Now 10k processes on that node each have to keep a copy of every message sent in the last two minutes on that channel. That's not gonna work.
So lets try approach three:
---
# approaches to pubsub in elixir
1. .fade[keep subscription state in a process]
2. .fade[property-based process registration]
3. one 'master' process per channel, zero or one 'client' processes per channel per node, global process registration
???
One 'master' process per channel, zero or one 'client' processes per channel per node, with global process registration.
going back to that diagram from before:
---
### approach 3: master-client; global process registration
![](./scaling-good.png)
???
The client can manage its local subscriptions however you like -- e.g. using a local process group, maybe gproc in local property mode, or the Elixir registry in :duplicate mode.
This solves several problems with the previous approach:
---
### approach 3: master-client; global process registration
#### advantages:
--
- efficient messaging fanout, channels can scale to many subscribers
???
- the client-master model solves the millions-of-messages-being-sent-over-the-mesh problem
--
- each channel has a single point of synchronisation
???
- each channel has a single point of synchronisation - the channelmaster - so we get we get the advantages of that - channel rules and so on.
---
### approach 3: master-client; global process registration
#### advantages:
- efficient messaging fanout, channels can scale to many subscribers
- each channel has a single point of synchronisation
???
(This does mean that the channelmaster is a bottleneck for that channel, but that's ok, because the channel can be our unit of scalability. This is actually a pretty useful thing to do -- to make a feature scale, we just need to make it scale within the limits of a channel, and make sure number of channels can scale. And if someone needs a per-channel message rate higher than one channel can sustain, they can always shard messages across multiple channels.
---
### approach 3: master-client; global process registration
#### advantages:
- efficient messaging fanout, channels can scale to many subscribers
- each channel has a single point of synchronisation
- connection state recovery window can live on the channel client
???
Having one client process per channel per node means that for connection state recovery, the two minute 'message window' can live there rather than in each individual subscriber process, so only duplicated once per node.
---
### approach 3: master-client; global process registration
#### advantages:
- efficient messaging fanout, channels can scale to many subscribers
- each channel has a single point of synchronisation
- connection state recovery window can live on the channel ~~client~~ master
???
Or even better, we can have it live in the channelmaster -- that way it's only in one place in the entire cluster. the channelclient can request a 'rewind' from the channelmaster as necessary.
(Why's that better? Because we want to be able to scale as close to linearly with number of nodes as possible -- we don't want a situation where the channelclients have to do/store so much that adding a new node to the cluster means it's already fully loaded with channelclient work before we add a single channelmaster to it.)
---
### approach 3: master-client; global process registration
#### advantages:
- efficient messaging fanout, channels can scale to many subscribers
- each channel has a single point of synchronisation
- connection state recovery window can live on the channel ~~client~~ master
#### disadvantages:
- distributed scalability with number of channels: global process registration
???
But still need a global process registry for channelmasters. And really we want a strongly consistent one -- having multiple channelmasters for the same channel would be disastrous, we'd effectively get a channel split, so eventually-consistent registries like Syn aren't really in the running. So we need something like the erlang :global process registry).
But then we run into the same problem that we had earlier with PG2, which is that if modifications lock the entire mesh, as they do with the global module, the rate at which we can add new channels goes sharply down as the size of the mesh grows:
---
#### `:global` performance
.fittoslide[![](./global-perf.png)]
source: http://www.ostinelli.net/an-evaluation-of-erlang-global-process-registries-meet-syn/
???
As you can see, pretty similar to the PG2 results.
The other issue is that
---
### approach 3: master-client; global process registration
#### advantages:
- efficient messaging fanout, channels can scale to many subscribers
- each channel has a single point of synchronisation
- connection state recovery window can live on the channel ~~client~~ master
#### disadvantages:
- distributed scalability with number of channels: global process registration
- adding new nodes doesn't take any of the existing load
???
adding a new node to the cluster takes a while to actually help much. Channelmasters will be at whatever node first registered them, and won't move unless that node shuts down. So they don't really take any of the existing load off your other nodes, they just take a share of any *new* channels -- and even that's only after they get their share of new connections.
Really, what would be nice would be something where, on adding a new node, it automatically took up a fair proportion of the work to be done.
We can have a stab at solving both of those problems at once using:
---
# approaches to pubsub in elixir
1. .fade[keep subscription state in a process]
2. .fade[property-based process registration]
3. .fade[one 'master' process per channel, zero or one 'client' processes per channel per node, global process registration]
4. one 'master' process per channel, zero or one 'client' processes per channel per node, hashing
???
hashing! Hash the name of the channel and modulo that by the number of nodes you have.
That way you don't need any global process registration because the channelmaster lives in a deterministic place in the cluster. That is, given the name of a channel, every node in the mesh can just work out which other node has that channel's channelmaster, without having to do any process registration.
It also means that when a new node is added, it automatically takes up a fair share of the work to be done in the cluster.
---
# approaches to pubsub in elixir
1. .fade[keep subscription state in a process]
2. .fade[property-based process registration]
3. .fade[one 'master' process per channel, zero or one 'client' processes per channel per node, global process registration]
4. one 'master' process per channel, zero or one 'client' processes per channel per node, hashing
???
Unfortunately, it means almost all the channelmasters have to move when you add or remove a node. If you scale up from 4 to 5 nodes and start taking the channel name hash modulo 5 instead of modulo 4, only a fifth of channels will be on the same node they were on before.
This is solved with:
---
# approaches to pubsub in elixir
1. .fade[keep subscription state in a process]
2. .fade[property-based process registration]
3. .fade[one 'master' process per channel, zero or one 'client' processes per channel per node, global process registration]
4. one 'master' process per channel, zero or one 'client' processes per channel per node, *consistent* hashing
???
Consistent hashing. For people who aren't familiar with consistent hashing, basically:
---
![](./riak-ring.png)
Source: Basho
???
You have a hash ring. This is where you identify 0 with you max hash value, i.e. wrap it around into a circle. So every hashed channelname lives somewhere on the ring. Nodes then lay claim to segments of the ring, so node 1 here has claimed all the green bits of the ring, so any channels which hash to a value on those bits will live on node 1.
If a new node is added, it will take a few segments of the ring for itself, so any channels that were on those segments will move to it. But no other channel has to move -- the only channels that move are ones moving to the new node.
---
# approaches to pubsub in elixir
1. .fade[keep subscription state in a process]
2. .fade[property-based process registration]
3. .fade[one 'master' process per channel, zero or one 'client' processes per channel per node, global process registration]
4. one 'master' process per channel, zero or one 'client' processes per channel per node, *consistent* hashing
???
And we have finally arrived at our destination, because what this is is:
---
# approaches to pubsub in elixir
1. .fade[keep subscription state in a process]
2. .fade[property-based process registration]
3. .fade[one 'master' process per channel, zero or one 'client' processes per channel per node, global process registration]
4. riak core!
???
riak core.
---
### approach 4: riak core
#### advantages:
- channelmaster is a per-channel synchronisation point. each rules, outputs, filters etc.
- connection state recovery window: lives on the channelmaster
- distributed scaling (subscribers per channel): solved by client/master model; efficient message fanout
- distributed scaling (number of channels): solved by the the hashring
???
This, finally, seems to address all our objections.
So now lets look at what a basic pub/sub system in riak core might look like. Here's a half-finished and pretty rudimentary one I made earlier, which I called "ably-ng"
---
### mix.exs
```elixir
defp deps do
[
{:riak_core, "~> 3.0", hex: :riak_core_ng},
{:cowboy, github: "ninenines/cowboy", tag: "2.0.0-pre.8"},
{:poison, "~> 3.1"},
# nb see https://github.com/project-fifo/riak_core/issues/20
{:cuttlefish, github: "basho/cuttlefish", tag: "2.0.11", manager: :rebar3, override: true},
{:lager, github: "basho/lager", tag: "3.2.4", manager: :rebar3, override: true},
{:goldrush, github: "basho/goldrush", tag: "0.1.9", manager: :rebar3, override: true},
# Test deps
{:socket, "~> 0.3.11", only: :test},
]
end
```
???
First a quick note on installing it. The hex package is called riak-core-ng. Some of the dependencies specified by the hex package seem to be a bit tempermental about compiling on recent versions of Erlang.
I found a combination that worked with erlang 19, but you have to manually force some more recent versions of deps like cuttlefish than are specified in the riak core package. If you try riak core and you run into the same difficulties, you could always clone the repo with this on it, I'll put a github url up at the end.
---
### application
```elixir
defmodule AblyNg do
use Application
require Logger
def start(_type, _args) do
case AblyNg.Supervisor.start_link do
{:ok, pid} ->
:ok = :riak_core.register(vnode_module: AblyNg.Vnode)
:ok = :riak_core_node_watcher.service_up(AblyNg.Service, self())
Logger.debug("Started AblyNg")
{:ok, pid}
{:error, reason} ->
Logger.error("Unable to start AblyNg supervisor: #{inspect reason}")
end
end
end
```
???
OK, so here's the application. I start the supervision tree, then call riak_core.register, and pass in a vnode module.
hang on, what's a vnode?
---
![](./riak-ring.png)
Source: Basho
???
Here's that diagram of the riak core ring again. (I didn't mention at the time that it was the riak core ring specifically, but it is -- Basho are the company that make Riak).
rather than having channelmasters live in some sense 'directly on the ring', riak uses an abstraction called a vnode -- 'virtual node', which is basically just a small segment of the ring -- each of those individual little segments is a vnode.
Each erlang node will have several vnodes -- in that diagram, 8. The channelmasters live on the vnodes, the vnodes live on the ring.
---
![](./riak-ring.png)
Source: Basho
???
This might seem like it complicates things, but it's actually a quite nice abstraction - it separates the job of 'knowing where I am on the ring' from the job of 'being a channelmaster'. So when the ring state changes, rather than all 100 thousand channels on the node having to work out if they need to relocate themselves, only the (say) 16 vnodes do that.
The channelmasters still live on the ring in the sense that they have a hash, but that hash just tells them which vnode they're on. When nodes are added and removed, the vnodes move around between them, but conceptually the channelmaster is always on the 'same' vnode.
---
### supervisor
```elixir
defmodule AblyNg.Supervisor do
use Supervisor
def start_link do
# riak_core appends _sup to the application name.
Supervisor.start_link(__MODULE__, [], [name: :ably_ng_sup])
end
def init(_args) do
children = [
worker(:riak_core_vnode_master, [AblyNg.Vnode], id: AblyNg.Vnode_master_worker),
worker(AblyNg.Frontend.WsServer, []),
supervisor(Registry, [:duplicate, AblyNg.AttachmentRegistry,
[partitions: System.schedulers_online()]], id: :attachment_registry),
supervisor(AblyNg.Frontend.ConnectionSupervisor, []),
supervisor(AblyNg.ChannelClientSupervisor, []),
]
supervise(children, strategy: :one_for_one, max_restarts: 5, max_seconds: 10)
end
end
```
???
With that in mind, here's the top-level supervisor.
First thing it starts is the riak core vnode master, which is a riak core process that handles sending messages to the vnodes. Again, we pass it our vnode module.
Next we start our websocket server, which
---
```elixir
defmodule AblyNg.Frontend.WsServer do
require Logger
def start_link() do
port = Application.fetch_env!(:ably_ng, :ws_port)
{:ok, cowboy_pid} = :cowboy.start_clear(
:ws,
100,
[ # ranch options
port: port,
max_connections: 10000
],
%{ # cowboy options
env: %{dispatch: routes()}
}
)
Logger.info "Started cowboy on pid #{inspect cowboy_pid}, listening on port #{inspect port}."
{:ok, cowboy_pid}
end
defp routes do
:cowboy_router.compile([
{:_, # Match all hostnames
[{"/", AblyNg.Frontend.WsHandler, []}]
}])
end
end
```
???
basically just starts cowboy.
---
### supervisor
```elixir
defmodule AblyNg.Supervisor do
use Supervisor
def start_link do
# riak_core appends _sup to the application name.
Supervisor.start_link(__MODULE__, [], [name: :ably_ng_sup])
end
def init(_args) do
children = [
worker(:riak_core_vnode_master, [AblyNg.Vnode], id: AblyNg.Vnode_master_worker),
worker(AblyNg.Frontend.WsServer, []),
supervisor(Registry, [:duplicate, AblyNg.AttachmentRegistry,
[partitions: System.schedulers_online()]], id: :attachment_registry),
supervisor(AblyNg.Frontend.ConnectionSupervisor, []),
supervisor(AblyNg.ChannelClientSupervisor, []),
]
supervise(children, strategy: :one_for_one, max_restarts: 5, max_seconds: 10)
end
end
```
???
Next comes the attachment registry, which keeps a list of what connections are attached to which channels.
Finally comes the connection supervisor and the channelclient supervisor. I'll skip over the details of websocket and connection handling as not really relevant to the talk. suffice to say that if the connection wants to attach to a channel it first makes sure there's a channelclient for it on that node. So here's part of a channelclient:
---
```elixir
defmodule AblyNg.ChannelClient do
...
def init(key) do
[app_id, channel_name] = String.split(key, ":")
{:ok, channel_master} = Service.get_master(:channel, key)
ChannelMaster.add_client(channel_master)
Logger.debug("New ChannelClient initialized: #{inspect self()}, key: #{key}")
{:ok, %{key: key, channel_master: channel_master, channel_name: channel_name, app_id: app_id}}
end
def handle_cast({:incoming, messages}, state) do
protocol_message = %ProtocolMessage{
action: @action_message,
channel: state.channel_name,
messages: messages
}
Registry.dispatch(AttachmentRegistry, state.key, fn(attachments) ->
for {connection, _} <- attachments do
ConnectionMaster.send_protocol_message(connection, protocol_message)
end
end)
{:noreply, state}
end
def handle_cast({:publish, message}, state) do
ChannelMaster.on_message(state.channel_master, message)
{:noreply, state}
end
...
end
```
???
On start, it gets the pid of the channelmaster with Service.get_master, and then simply passes any messages it's given on to it. If it gets a message from the channelmaster, it distributes it out to all connections attached to that channel using the attachment registry.
OK, so what's Service.get_master?
---
```elixir
defmodule AblyNg.Service do
require Logger
def get_master(type, key) do
# Note that we pass in the type/key twice -- first so riak core can locate
# the vnode handling that master, second to pass to that vnode
run_on_vnode(type, key, {:master, {type, key}})
end
defp run_on_vnode(type, key, command) do
# Hash the key to a ring position
idx = :riak_core_util.chash_key({type, key})
# get an active preference list (ordered list of vnodes responsible for
# that key). Want only one vnode per channel etc, so require length 1.
# Last param: restrict to nodes that implement this service
[{index_node, _}] = :riak_core_apl.get_primary_apl(idx, 1, AblyNg.Service)
# riak core appends "_master" to the vnode name.
:riak_core_vnode_master.sync_spawn_command(index_node, command, AblyNg.Vnode_master)
end
...
end
```
???
Here's part of the Service module. so get_master takes a type and a key. Type here is "channel", we want a channelmaster. (In the real ably there are also applicationmasters and accountmasters). The key is the app ID plus the name of the channel. It calls run_on_vnode with the type, the key, and the data it wants to actually pass to the vnode.
run_on_vnode takes the type and the key and calls riak_core_util.chash_key, which returns a ring position -- basically an integer from 0 to 2 to the power 160, expressed as a binary
we then pass that to another function to get the index of the vnode that that ring position lies on, together with the instance that that vnode is on. (You can have this give you multiple vnodes if you're storing data and want to replicate it).
---
```elixir
defmodule AblyNg.Service do
require Logger
def get_master(type, key) do
# Note that we pass in the type/key twice -- first so riak core can locate
# the vnode handling that master, second to pass to that vnode
run_on_vnode(type, key, {:master, {type, key}})
end
defp run_on_vnode(type, key, command) do
# Hash the key to a ring position
idx = :riak_core_util.chash_key({type, key})
# get an active preference list (ordered list of vnodes responsible for
# that key). Want only one vnode per channel etc, so require length 1.
# Last param: restrict to nodes that implement this service
[{index_node, _}] = :riak_core_apl.get_primary_apl(idx, 1, AblyNg.Service)
# riak core appends "_master" to the vnode name.
:riak_core_vnode_master.sync_spawn_command(index_node, command, AblyNg.Vnode_master)
end
...
end
```
???
Finally, we pass the vnode index to sync_spawn_command, which sends your message off to that vnode.
Note that we don't do all of this every time the channelclient sends a message to the channelmaster. Once the channelclient has the channelmaster's pid, it can just use normal erlang message passing. If the channelmaster moves, it can notify all its clients that they should discard that pid -- or better still, the clients can just use process monitoring, so erlang will notify them if that master has died, and they can do all this again to get the new location of the master.
OK, so lets have a look at the vnode itself!
---
```elixir
defmodule AblyNg.Vnode do
require Logger
@behaviour :riak_core_vnode
def start_vnode(partition) do
:riak_core_vnode_master.get_vnode_pid(partition, __MODULE__)
end
def init([partition]) do
{:ok, %{partition: partition, masters: %{channel: %{}}}}
end
...
end
```
???
It's a process. that implements the riak_core_vnode behaviour. It knows its partition (which is what section of the ring it claims), and what masters live on it.
---
```elixir
defmodule AblyNg.Vnode do
...
def handle_command({:master, {type, key}}, _sender, state) do
try do
{master, state} = case state.masters[type] do
%{^key => master} ->
{master, state}
_ ->
master = spawn_master(type, key)
{master, put_in(state.masters[type][key], master)}
end
{:reply, {:ok, master}, state}
rescue
e -> {:reply, {:error, e}, state}
end
end
defp spawn_master(:channel, key) do
{:ok, pid} = AblyNg.ChannelMaster.start_link(key)
pid
end
...
end
```
???
When a client sends this master command to it -- which is what the get_master function we were looking at before does -- it returns the pid of the master if one exists, or spawns a new one if not.
Vnodes have a bunch of other callbacks to implement, for example you define Vnode callbacks to run at different stages of of riak core calls hinted handoff, which is when vnodes migrate from one core to another, to pass data from the old to the new node, and make sure it happens without the channel losing continuity.
---
```elixir
defmodule AblyNg.ChannelMaster do
...
def handle_call({:add_client, client_pid, node_name}, _, state = %{
clients: clients
}) do
if Map.has_key?(clients, node_name) do
Logger.warn("existing client for node #{node_name}: " <>
"#{inspect clients[node_name]}. Replacing with #{inspect client_pid}")
end
Process.monitor(client_pid)
state = put_in(state.clients[node_name], client_pid)
{:reply, :ok, state}
end
def handle_cast({:incoming, message}, state) do
for {_, client} <- state.clients do
ChannelClient.on_message(client, message)
end
{:noreply, state}
end
...
end
```
???
And those are the main bits, really. Channelclients stay alive as long as one or more connections are attached to them. Channelmasters stay alive as long as they have one or more channelclients. Clients and channels can monitor each other with normal erlang process monitoring, so can get notifications if the other one dies (even if that's due to the whole node dying).
---
### So, is this how Ably works?
???
So, an obvious question is, is this what Ably actually does? Sortof. We don't use Riak core itself -- for one thing, while some of our system's in Elixir, a lot of it's in NodeJS, and a bit of Go. But the basic architecture is more or less the same as what I've described, with some extra complicating factors.
The biggest of those is that the real ably is a global system -- it runs in multiple different AWS regions, with the idea that if you publish a message in australia, then subscribers to that channel who're also in australia shouldn't have to wait for the message to go to the US and back in a realtime system. But obviously if someone in the US attaches to that channel, they still need to get the message, we can't only deliver it to people in australia.
---
### So, is this how Ably works?
???
So every region that a channel is active in has its own channelmaster that lives on a regional ring, and there's also a global channelmaster that lives on a global ring that coordinates the regional channelmasters and make sure they're all subscribing to each other's messages.
The global ring also holds app and accountmasters (which accumulate stats, control access and so forth).
---
### So, is this how Ably works?
???
And then there's all the other stuff you need for an actual realtime message platform like authentication, stats & analytics, presence, permissions and capabilities, history, a REST api, fallback transports, channel rules, end-to-end encryption, queue outputs, webhooks, firehose, adaptors for other protocols like mqtt, channel and connection lifecycle events, a dev console, mobile push notifications, and so on and so forth.
---
class: center, middle
.ably[![](./header_logo.svg)]
## https://github.com/simonwoolf/ably-ng
## we're hiring (remote friendly): https://jobs.ably.io
## simon@ably.io
???
There's the repo url if you want to play around with it, and if that all sounds interesting, we're hiring developers and are remote friendly, so there's the url for that, if you're ok with doing some nodejs too.
Any questions?
</textarea>
<script src="remark-latest.min.js">
</script>
<script>
var slideshow = remark.create({highlightStyle: 'github'});
</script>
</body>
</html>