Skip to content

Commit

Permalink
Make Redis backend shardable
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewgodwin committed Nov 6, 2015
1 parent 5106c78 commit a41516f
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 40 deletions.
104 changes: 82 additions & 22 deletions channels/backends/redis_py.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import time
import json
import datetime
import math
import redis
import random
import binascii
import uuid

from django.utils import six

from .base import BaseChannelBackend


Expand All @@ -13,41 +18,81 @@ class RedisChannelBackend(BaseChannelBackend):
multiple processes fine, but it's going to be pretty bad at throughput.
"""

def __init__(self, routing, expiry=60, host="localhost", port=6379, prefix="django-channels:"):
def __init__(self, routing, expiry=60, hosts=None, prefix="django-channels:"):
super(RedisChannelBackend, self).__init__(routing=routing, expiry=expiry)
self.host = host
self.port = port
# Make sure they provided some hosts, or provide a default
if not hosts:
hosts = [("localhost", 6379)]
for host, port in hosts:
assert isinstance(host, six.string_types)
assert int(port)
self.hosts = hosts
self.prefix = prefix
# Precalculate some values for ring selection
self.ring_size = len(self.hosts)
self.ring_divisor = int(math.ceil(4096 / float(self.ring_size)))

@property
def connection(self):
def consistent_hash(self, value):
"""
Maps the value to a node value between 0 and 4095
using MD5, then down to one of the ring nodes.
"""
bigval = binascii.crc32(value) & 0xffffffff
return (bigval // 0x100000) // self.ring_divisor

def random_index(self):
return random.randint(0, len(self.hosts) - 1)

def connection(self, index):
"""
Returns the correct connection for the current thread.
Pass key to use a server based on consistent hashing of the key value;
pass None to use a random server instead.
"""
return redis.Redis(host=self.host, port=self.port)
# If index is explicitly None, pick a random server
if index is None:
index = self.random_index()
# Catch bad indexes
if not (0 <= index < self.ring_size):
raise ValueError("There are only %s hosts - you asked for %s!" % (self.ring_size, index))
host, port = self.hosts[index]
return redis.Redis(host=host, port=port)

@property
def connections(self):
for i in range(len(self.hosts)):
return self.connection(i)

def send(self, channel, message):
# if channel is no str (=> bytes) convert it
if not isinstance(channel, str):
channel = channel.decode('utf-8')

# Pick a connection to the right server - consistent for response
# channels, random for normal channels
if channel.startswith("!"):
index = self.consistent_hash(key)
connection = self.connection(index)
else:
connection = self.connection(None)
# Write out message into expiring key (avoids big items in list)
key = self.prefix + str(uuid.uuid4())
self.connection.set(
# TODO: Use extended set, drop support for older redis?
key = self.prefix + uuid.uuid4().get_hex()
connection.set(
key,
json.dumps(message),
)
self.connection.expire(
connection.expire(
key,
self.expiry + 10,
)
# Add key to list
self.connection.rpush(
connection.rpush(
self.prefix + channel,
key,
)
# Set list to expire when message does (any later messages will bump this)
self.connection.expire(
connection.expire(
self.prefix + channel,
self.expiry + 10,
)
Expand All @@ -56,13 +101,27 @@ def send(self, channel, message):
def receive_many(self, channels):
if not channels:
raise ValueError("Cannot receive on empty channel list!")
# Shuffle channels to avoid the first ones starving others of workers
random.shuffle(channels)
# Work out what servers to listen on for the given channels
indexes = {}
random_index = self.random_index()
for channel in channels:
if channel.startswith("!"):
indexes.setdefault(self.consistent_hash(channel), []).append(channel)
else:
indexes.setdefault(random_index, []).append(channel)
# Get a message from one of our channels
while True:
result = self.connection.blpop([self.prefix + channel for channel in channels], timeout=1)
# Select a random connection to use
# TODO: Would we be better trying to do this truly async?
index = random.choice(indexes.keys())
connection = self.connection(index)
channels = indexes[index]
# Shuffle channels to avoid the first ones starving others of workers
random.shuffle(channels)
# Pop off any waiting message
result = connection.blpop([self.prefix + channel for channel in channels], timeout=1)
if result:
content = self.connection.get(result[1])
content = connection.get(result[1])
if content is None:
continue
return result[0][len(self.prefix):].decode("utf-8"), json.loads(content.decode("utf-8"))
Expand All @@ -75,7 +134,7 @@ def group_add(self, group, channel, expiry=None):
seconds (expiry defaults to message expiry if not provided).
"""
key = "%s:group:%s" % (self.prefix, group)
self.connection.zadd(
self.connection(self.consistent_hash(group)).zadd(
key,
**{channel: time.time() + (expiry or self.expiry)}
)
Expand All @@ -86,7 +145,7 @@ def group_discard(self, group, channel):
does nothing otherwise (does not error)
"""
key = "%s:group:%s" % (self.prefix, group)
self.connection.zrem(
self.connection(self.consistent_hash(group)).zrem(
key,
channel,
)
Expand All @@ -96,10 +155,11 @@ def group_channels(self, group):
Returns an iterable of all channels in the group.
"""
key = "%s:group:%s" % (self.prefix, group)
connection = self.connection(self.consistent_hash(group))
# Discard old channels
self.connection.zremrangebyscore(key, 0, int(time.time()) - 10)
connection.zremrangebyscore(key, 0, int(time.time()) - 10)
# Return current lot
return self.connection.zrange(
return connection.zrange(
key,
0,
-1,
Expand All @@ -113,14 +173,14 @@ def lock_channel(self, channel, expiry=None):
obtained, False if lock not obtained.
"""
key = "%s:lock:%s" % (self.prefix, channel)
return bool(self.connection.setnx(key, "1"))
return bool(self.connection(self.consistent_hash(channel)).setnx(key, "1"))

def unlock_channel(self, channel):
"""
Unlocks the named channel. Always succeeds.
"""
key = "%s:lock:%s" % (self.prefix, channel)
self.connection.delete(key)
self.connection(self.consistent_hash(channel)).delete(key)

def __str__(self):
return "%s(host=%s, port=%s)" % (self.__class__.__name__, self.host, self.port)
60 changes: 44 additions & 16 deletions docs/backends.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,64 @@ Multiple choices of backend are available, to fill different tradeoffs of
complexity, throughput and scalability. You can also write your own backend if
you wish; the API is very simple and documented below.

In-memory
---------

The in-memory backend is the simplest, and not really a backend as such;
it exists purely to enable Django to run in a "normal" mode where no Channels
functionality is available, just normal HTTP request processing. You should
never need to set it explicitly.

This backend provides no network transparency or non-blocking guarantees.

Database
--------

Redis
-----

The Redis backend is the recommended backend to run Channels with, as it
supports both high throughput on a single Redis server as well as the ability
to run against a set of Redis servers in a sharded mode.

To use the Redis backend you have to install the redis package::

pip install -U redis

Also you need to set the following in the ``CHANNEL_BACKENDS`` setting::
By default, it will attempt to connect to a Redis server on ``localhost:6379``,
but you can override this with the ``HOSTS`` setting::

CHANNEL_BACKENDS = {
"default": {
"BACKEND": "channels.backends.redis_py.RedisChannelBackend",
"HOST": "redis-hostname",
"BACKEND": "channels.backends.redis.RedisChannelBackend",
"HOSTS": [("redis-channel-1", 6379), ("redis-channel-2", 6379)],
},
}

Sharding
~~~~~~~~

The sharding model is based on consistent hashing - in particular,
:ref:`response channels <channel-types>` are hashed and used to pick a single
Redis server that both the interface server and the worker will use.

For normal channels, since any worker can service any channel request, messages
are simply distributed randomly among all possible servers, and workers will
pick a single server to listen to. Note that if you run more Redis servers than
workers, it's very likely that some servers will not have workers listening to
them; we recommend you always have at least ten workers for each Redis server
to ensure good distribution. Workers will, however, change server periodically
(every five seconds or so) so queued messages should eventually get a response.

Note that if you change the set of sharding servers you will need to restart
all interface servers and workers with the new set before anything works,
and any in-flight messages will be lost (even with persistence, some will);
the consistent hashing model relies on all running clients having the same
settings. Any misconfigured interface server or worker will drop some or all
messages.


In-memory
---------

The in-memory backend is the simplest, and not really a backend as such;
it exists purely to enable Django to run in a "normal" mode where no Channels
functionality is available, just normal HTTP request processing. You should
never need to set it explicitly.

This backend provides no network transparency or non-blocking guarantees.

Database
--------

=======
Writing Custom Backends
-----------------------

Expand Down
4 changes: 2 additions & 2 deletions docs/deploying.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ here's an example for a remote Redis server::

CHANNEL_BACKENDS = {
"default": {
"BACKEND": "channels.backends.redis_py.RedisChannelBackend",
"HOST": "redis-channel",
"BACKEND": "channels.backends.redis.RedisChannelBackend",
"HOSTS": [("redis-channel", 6379)],
},
}

Expand Down
Empty file modified docs/faqs.rst
100644 → 100755
Empty file.
7 changes: 7 additions & 0 deletions docs/scaling.rst
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,10 @@ That's why Channels labels any *response channel* with a leading ``!``, letting
you know that only one server is listening for it, and thus letting you scale
and shard the two different types of channels accordingly (for more on
the difference, see :ref:`channel-types`).

This is the underlying theory behind Channels' sharding model - normal channels
are sent to random Redis servers, while response channels are sent to a
predictable server that both the interface server and worker can derive.

Currently, sharding is implemented as part of the Redis backend only;
see the :doc:`backend documentation <backends>` for more information.

0 comments on commit a41516f

Please sign in to comment.