Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis Cluster PubSub - Scalability Issues #2672

Open
DivineTraube opened this Issue Jul 13, 2015 · 18 comments

Comments

Projects
None yet
10 participants
@DivineTraube
Copy link

DivineTraube commented Jul 13, 2015

Edit: full mailing list discussion

Hi all,

I also posted this on the mailing list, but as I think it is a bug, I'll add it here, too.

according to [1] and [2] PubSub works by broadcasting every publish to every other Redis Cluster node. This limits the PubSub throughput to the bisection bandwidth of the underlying network infrastructure divided by the number of nodes times message size. So if a typical message has 1KB, the cluster has 10 nodes and bandwidth is 1 GBit/s, throughput is already limited to 12.5K RPS. If we increase the message size to 5 KB and the number of nodes to 50, we only get 500 RPS - much less than a single Redis instance could service (>100K RPS), while putting maximum pressure on the network. PubSub thus scales linearly wrt. to the cluster size, but in the the negative direction!

This leads me to my question: why not just redirect clients to the node responsible for the PubSub channel's hash slot owner, similar to normal data operations? Cluster nodes would thus only have to publish and notify locally, similar to keyspace notifications [3], and PubSub would be perfectly scalable. Sure, this would break PSUBSCRIBE for the cluster, but PSUBSCRIBE could be limited to only allow patterns containing a {hash-tag}, meaning that the subscription only pertains to channels containing that hash tag + matching the pattern (i.e. one specific node). As PSUBSCRIBE is semantically a multi-key operation, this would make perfect sense and be consistent with the rest of Redis Cluster.

In summary, I think the assumption that clients may publish and subscribe to any node is a dangerous guarantee that kills scalability. What do you think - could the above be the way to handle PubSub in Redis Cluster? Are there currently any workarounds to have PubSub scale in a Redis Cluster deployment?

[1] #1927
[2] http://redis.io/topics/cluster-spec
[3] #2541

P.S. Redis Cluster is a great project and highly value all the effort that goes into it!

@badboy

This comment has been minimized.

Copy link
Contributor

badboy commented Jul 13, 2015

Keep in mind: currently the key space and pubsub are mostly unrelated. A pubsub channel does not map to a key in the keyspace, that's also why currently it's not bound to the same slot assignments. It could, however, follow the same pattern, but I'm not sure about the drawbacks.

@DivineTraube

This comment has been minimized.

Copy link
Author

DivineTraube commented Jul 14, 2015

Ok, there might be historical reasons for this, but there is no fundamental reason not to change this, in order to achieve better scalability properties.

On the mailing list I added a proposal for handling existing applications without breaking them, as well as lossless handover during migration.

  1. (Breaking existing apps) the SUBSCRIBE and PUBLISH commands could be extended to have a new optional parameter "broadcast" which defaults to true. This parameter indicates, whether the receiving node should broadcast the PUBLISH command over the cluster bus. Existing apps would behave as before, while all client drivers (e.g. Jedis) could add the feature of treating non-broadcast PUBLISH calls using hash slot semantics. This part is rather easy because essentially clients just have to treat PUBLISH similar to any other data command.
  2. (Subscriptions during hash slot migration) This is harder but totally possible. This is what I would propose:
    -When migration starts, PUBLISH calls are redirected with -ASK to the new hash slot owner.
    -The node taking over the hash slot keeps a temporary log of PUBLISH messages for some configurable time (SUBSCRIPTION_MIGRATION_TIMEOUT)
    -Atomically with the begin of -ASK redirections, subscribed clients receive a message indicating that they sould reconnect to the new node.
    -On connection with the new node, clients initially receive (either by default or by option), all messages the node accumulated since the migration and before SUBSCRIPTION_MIGRATION_TIMEOUT has passed.

Using this scheme, no messages will be lost, as long as subscribed clients are able to reconnect to the new node within SUBSCRIPTION_MIGRATION_TIMEOUT. In Redis Cluster, this requires implementing the temporary PubSub migration log. Clients need to extend their subscribe-connections to handle reconnection to a new node. This part can be combined with 1. by having a SUBSCRIBE that indicates whether a client is capable of a handover and defaults to false.

@DivineTraube

This comment has been minimized.

Copy link
Author

DivineTraube commented Jul 16, 2015

After discussing this with Josiah Carlson, he came up with a very good proposal that I posted to the redis-dev mailing list:

  • add option: new cluster-wide config option sharded-pubsub defaulting to 'no', if provided with 'yes', subscription messages on a channel X will only be sent to nodes that are the master or slaves of the shard that would contain that channel if it were a key
  • treat channels similar to regular keys + forwarding: publishing to a channel not assigned to the node published to will get the message forwarded to the proper master (which is then replicated to the slaves).
  • publish hash slot changes to subscribers: if a subscriber wants to be notified of their channel moving to another server, they must subscribe to a special cluster:slots channel (which is broadcast cluster-wide, only cluster nodes can publish, like sentinel:hello in sentinels) to be notified of the progress of slot migrations
  • double publishing during migration: immediately upon slot migration start, both the old and new slot master/slaves start receive published messages for sending to subscribers, and on completion of migration (and the slot migration completed message) plus a configurable grace period (so clients don't miss messages during fast migrations), only the new master/slaves for that slot receive messages for sending to subscribers

Applications can then use scalable Redis Cluster PubSub using this pattern and existing client libraries:

  1. Given a channel, find the the node that owns the hash slot.
  2. Subscribe to that node and also to cluster:slots to detect migrations.
  3. Upon slot migration, subscribe to the channel on the new node and keep the old connection open.
  4. Forward messages to the application from the old connection until it closes and remember those messages.
  5. When migration is complete and the old connection closed, forward messages from the new connection, trimming the remembered messages from the first connection (this is neccessary to guarantee exactly-once during normal operation & migration). Note: this works in most cases but not all (e.g. if published message are all the same) - to get real exactly-once a more intrusive scheme like in this discussion or application-implemented log sequence numbers are required. But this is ok, as PubSub was always designed to be a fire-an-forget solution.

Possible Additions:

  • If sharded-pubsub is switched on, PSUBSCRIBE could be limited to only allow patterns containing a {hash-tag}, meaning that the subscription only pertains to channels containing that hash tag + matching the pattern (i.e. one specific node). As PSUBSCRIBE is semantically a multi-key operation, this would make perfect sense and be consistent with the rest of Redis Cluster.
  • Instead of forwarding publish calls, regular -moved redirection can be used (unless a migration is happening)

What do you think - is this a sensible solution? Can this be implemented with moderate effort in Redis Cluster?

@shaharmor

This comment has been minimized.

Copy link

shaharmor commented Nov 22, 2015

+1. We are currently limited at 120-130k RPS and are considering moving to cluster but apparently it won't help us..

@misiek08

This comment has been minimized.

Copy link

misiek08 commented Nov 22, 2015

Maybe something like each cluster node acts like clients with PubSub, so if clients connect on node3 and execute SUBSCRIBE this node3 will subscribe on other nodes only for that channel. PSUBSCRIBE can be handled same way. So if there's no need to send message to nodeX it's not send and we have perfect scalability.

@shacharz

This comment has been minimized.

Copy link

shacharz commented Dec 3, 2015

+1

@shaharmor

This comment has been minimized.

Copy link

shaharmor commented Dec 13, 2015

@antirez is this on the roadmap for any near future versions?

@antirez

This comment has been minimized.

Copy link
Owner

antirez commented Dec 14, 2015

Hello, I understand the issue, but this was specifically designed in this way for a couple of reasons. There is some background here: #122, the gist is that, because of the fire and forget nature of Pub/Sub and the fact it does not acts to the datase, it is much better from the point of view of scalability if we don't partition vertically based on the channel name, but instead implement the smartness required in order to broadcast messages only to selected messages.

Basically the work a Redis node has to do in order to receive a message is proportional to the number of clients receiving the message, so the idea is to have small Redis Clusters handling only Pub/Sub that can partition by clients.

In the current naive implementation every message is sent to every node, which is quite an overhead, but by limiting the number of instances to 3/4, the overhead is acceptable (the Pub/Sub cluster should only be used for Pub/Sub, basically, and if normal Redis operations are also required should be executed in a different cluster).

For larger clusters, what we want is to have smart broadcasting that only broadcasts messages to the instances where there are likely receivers for a given specific channel, but at the same time we want clients to cluster up to a given point, otherwise if we are using a cluster in order to serve just a couple of channels, what would happen is that we have publishers and receivers for every channel in every node, and this basically means we are broadcasting everything again.

So imagine this:

  1. Redis Cluster is able, by exchanging bloom filters among nodes, to mostly publish messages only where required.
  2. Clients are smart enough to run a protocol before connecting to SUBSCRIBE, so that they check the number of subscribers for a given channel in all the cluster nodes, and later connect to the node having already listeners for this channel, if the number of listeners are up to a given number, or connect to the next node with enough listeners but not over the limit amount.

This means that clients will cluster up to a given number into selected nodes, so that we save bandwidth, but at the same time that if there are too much clients, it's better to use more bandwidth for the sake of scaling the clients better, given that the reception of the message is O(number-of-clients).

@DivineTraube

This comment has been minimized.

Copy link
Author

DivineTraube commented Dec 23, 2015

Hi, thanks for joining the discussion and clarifying.

Can you elaborate why you don't see channel-based partitioning as the best solution? From the point of scalability, this leads to maximal locality of subscribers and publishers and hence minimal network communication (the current bottleneck).

So your suggestion is to have Redis Cluster broadcast when neccessary and clients colocate on nodes with many similar subscribers. I think that is certainly a good approach. However, I do not see why Bloom filters would be beneficial for this. The complete mapping of cluster nodes to channels is small compared to message volumes and can even be transferred incrementally. Therefore, I do not see a bottleneck in transferreing per-node subscriber sets that a Bloom filter could fix. Furthermore, each false positive does have substantial downside: a unncessary message to a cluster node.

The main problem of using a Bloom filter for this is the following: either you have to generate the Bloom filter each time it is requested (computationally expensive) or you have to use a Bloom filter variant that allows removal (e.g. Counting Bloom filters, Cuckoo Filter, etc.), which alle have the downside of forcing you to decide for a configuration ahead of time. The capacity of the Bloom filter is then fixed and cannot grow or shrink with the actual amount of subscribers. This either means that the filter is constantly too large (no space advantage) or the false positive rate is high (degradation to unnecessary broadcasting). Bloom filter variants that automatically resize (e.g. Scalable Bloom filters) are not used in practice, hard to implement and suffer from other downsides.

As a consequence, I would suggest to change point 1 to this:
Redis Cluster nodes can retrieve the set of subscribers for each nodes (e.g. for initial syncing) and receive newly connected subscribers + the respective node incrementally (e.g. over the cluster bus).

What do you think?

@kankedong

This comment has been minimized.

Copy link

kankedong commented May 16, 2016

Hi all:

The size of my data is about 4 bytes,i want to know the size of the message which will be broadcasted by Redis Cluster Node ;

Now,How clustering works with pub/sub, PubSub works by broadcasting every publish to every other Redis Cluster node?,which will limit the PubSub throughput ?

Thanks for every answer;

@misiek08

This comment has been minimized.

Copy link

misiek08 commented May 19, 2016

@shaharmor

This comment has been minimized.

Copy link

shaharmor commented May 31, 2016

Is there any intention to change this behavior?
Obviously today Redis Cluster PubSub is not built for massive scales. (We are already doing more than 100k/s and reaching the CPU limit)

@misiek08

This comment has been minimized.

Copy link

misiek08 commented May 31, 2016

I have proof-of-concept solution to monitor commands and manually pass messages to other nodes, but I've set it up on separated nodes (not clustered by redis internal cluster system).
It was handling traffic good, but I still don't have any place to test it on real high-traffic setup.

@shaharmor

This comment has been minimized.

Copy link

shaharmor commented Jun 3, 2016

From my very limited understanding, this is the line that causes the propagation of message throughout the cluster: https://github.com/antirez/redis/blob/unstable/src/pubsub.c#L320

I have created a local build without that line, and updated my redis client to use the same slot hashing for pubsub connections.

I believe this will allow me to create a Redis PubSub cluster without being limited to a capacity of a single server.

It does not solve the issue of having only a single channel, but as this is not my case its not a problem.

@aep

This comment has been minimized.

Copy link

aep commented Dec 9, 2016

@shaharmor can you share the code? I'm implementing a simple wrapper service that just scales pubsub properly. wondering if a fork is easier?

@shaharmor

This comment has been minimized.

Copy link

shaharmor commented Dec 9, 2016

@aep #3381

@aabhasb

This comment has been minimized.

Copy link

aabhasb commented Nov 23, 2017

@DivineTraube I was wondering, in the key space approach how would you handle the case where a subscriber wants to access channels which are spread throughout i.e. on nodes other than to which the subscriber(client) is connected to.
In key space, a client issues a "get key" command and if the key is not in the hash slots of the connected server, the server gets the value from the right server.
In pub/sub, the client(subscriber) is in a listening mode, it wont go and issue a "get new mssgs" sortaof command so how will it be notified when a new message has been published on a channel on a different node?

@brugnara

This comment has been minimized.

Copy link

brugnara commented Jan 9, 2018

We have 12 instances handling redis messages so I used RPUSH and BLPOP in a while(true) in order to have only one message handled by an instance of my code without overhead.

const MAX_WAIT = 0; // read the doc about this number
const key = 'my-key';
const digest = require('my-digestion-module');

while (true)  {
  // this will wait untile a LPUSH or RPUSH happens or MAX_WAIT is passed
  const payload = yield redis.blpop(key, MAX_WAIT);

  // no payload if timed out
  if (!payload) continue;

  debug.verbose('got %d bytes from list: %s', payload[1].length, payload[0]);
  yield digest(JSON.parse(payload[1]));
  debug.verbose('payload digested');
} 

On the other side, instead of using PUBLISH i'm using:

redis.rpush(key, JSON.stringify(payload));

This is also very useful because if the main app goes down, redis keeps the data as it is a list.

I know it's totally different from pubsub shot and forget method but I believe that using lists the app can scale more easily.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.