Skip to content
This repository has been archived by the owner on Feb 7, 2024. It is now read-only.

Commit

Permalink
Scope pub/sub channels in Redis by appId to avoid crosstalk between apps
Browse files Browse the repository at this point in the history
  • Loading branch information
francislavoie committed Mar 29, 2019
1 parent e58cfea commit 2e4569e
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 24 deletions.
47 changes: 26 additions & 21 deletions src/PubSub/Redis/RedisClient.php
Expand Up @@ -78,40 +78,45 @@ public function boot(LoopInterface $loop): ReplicationInterface
*
* @param string $redisChannel
* @param string $payload
* @return bool
*/
protected function onMessage(string $redisChannel, string $payload)
{
$payload = json_decode($payload);

// Ignore messages sent by ourselves
if (isset($payload->serverId) && $this->serverId === $payload->serverId) {
return false;
return;
}

// We need to put the channel name in the payload
$payload->channel = $redisChannel;
// Pull out the app ID. See RedisPusherBroadcaster
$appId = $payload->appId;

// We need to put the channel name in the payload.
// We strip the app ID from the channel name, websocket clients
// expect the channel name to not include the app ID.
$payload->channel = Str::after($redisChannel, "$appId:");

/* @var $channelManager ChannelManager */
$channelManager = app(ChannelManager::class);

// Load the Channel instance, if any
$channel = $channelManager->find($payload->appId, $payload->channel);
if ($channel === null) {
return false;
$channel = $channelManager->find($appId, $payload->channel);

// If no channel is found, none of our connections want to
// receive this message, so we ignore it.
if (! $channel) {
return;
}

$socket = $payload->socket;
$socket = $payload->socket ?? null;

// Remove the internal keys from the payload
// Remove fields intended for internal use from the payload
unset($payload->socket);
unset($payload->serverId);
unset($payload->appId);

// Push the message out to connected websocket clients
$channel->broadcastToEveryoneExcept($payload, $socket);

return true;
}

/**
Expand All @@ -123,13 +128,13 @@ protected function onMessage(string $redisChannel, string $payload)
*/
public function subscribe(string $appId, string $channel): bool
{
if (! isset($this->subscribedChannels[$channel])) {
if (! isset($this->subscribedChannels["$appId:$channel"])) {
// We're not subscribed to the channel yet, subscribe and set the count to 1
$this->subscribeClient->__call('subscribe', [$channel]);
$this->subscribedChannels[$channel] = 1;
$this->subscribeClient->__call('subscribe', ["$appId:$channel"]);
$this->subscribedChannels["$appId:$channel"] = 1;
} else {
// Increment the subscribe count if we've already subscribed
$this->subscribedChannels[$channel]++;
$this->subscribedChannels["$appId:$channel"]++;
}

return true;
Expand All @@ -144,17 +149,17 @@ public function subscribe(string $appId, string $channel): bool
*/
public function unsubscribe(string $appId, string $channel): bool
{
if (! isset($this->subscribedChannels[$channel])) {
if (! isset($this->subscribedChannels["$appId:$channel"])) {
return false;
}

// Decrement the subscription count for this channel
$this->subscribedChannels[$channel]--;
$this->subscribedChannels["$appId:$channel"]--;

// If we no longer have subscriptions to that channel, unsubscribe
if ($this->subscribedChannels[$channel] < 1) {
$this->subscribeClient->__call('unsubscribe', [$channel]);
unset($this->subscribedChannels[$channel]);
if ($this->subscribedChannels["$appId:$channel"] < 1) {
$this->subscribeClient->__call('unsubscribe', ["$appId:$channel"]);
unset($this->subscribedChannels["$appId:$channel"]);
}

return true;
Expand All @@ -173,7 +178,7 @@ public function publish(string $appId, string $channel, stdClass $payload): bool
$payload->appId = $appId;
$payload->serverId = $this->serverId;

$this->publishClient->__call('publish', [$channel, json_encode($payload)]);
$this->publishClient->__call('publish', ["$appId:$channel", json_encode($payload)]);

return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/PubSub/Redis/RedisPusherBroadcaster.php
Expand Up @@ -144,7 +144,7 @@ public function broadcast(array $channels, $event, array $payload = [])
]);

foreach ($this->formatChannels($channels) as $channel) {
$connection->publish($channel, $payload);
$connection->publish("{$this->appId}:$channel", $payload);
}
}
}
7 changes: 5 additions & 2 deletions src/WebSockets/Channels/Channel.php
Expand Up @@ -107,11 +107,14 @@ public function broadcastToOthers(ConnectionInterface $connection, $payload)
->publish($connection->app->id, $payload);
}

$this->broadcastToEveryoneExcept($payload, $connection->socketId, $connection->app->id);
$this->broadcastToEveryoneExcept($payload, $connection->socketId);
}

public function broadcastToEveryoneExcept($payload, ?string $socketId = null, ?string $appId = null)
public function broadcastToEveryoneExcept($payload, ?string $socketId = null)
{
// Performance optimization, if we don't have a socket ID,
// then we avoid running the if condition in the foreach loop below
// by calling broadcast() instead.
if (is_null($socketId)) {
$this->broadcast($payload);

Expand Down

0 comments on commit 2e4569e

Please sign in to comment.