Skip to content

Commit

Permalink
DEV: Various behind-the-scenes improvements to PresenceChannel (#14518)
Browse files Browse the repository at this point in the history
- Allow the `/presence/get` endpoint to return multiple channels in a single request (limited to 50)
- When multiple presence channels are initialized in a single Ember runloop, batch them into a single GET request
- Introduce the `presence-pretender` to allow easy testing of PresenceChannel-related features
- Introduce a `use_cache` boolean (default true) on the the server-side PresenceChannel initializer. Useful during testing.
  • Loading branch information
davidtaylorhq committed Oct 7, 2021
1 parent ba380c5 commit a55642a
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 81 deletions.
85 changes: 68 additions & 17 deletions app/assets/javascripts/discourse/app/services/presence.js
Expand Up @@ -2,7 +2,7 @@ import Service from "@ember/service";
import EmberObject, { computed, defineProperty } from "@ember/object";
import { readOnly } from "@ember/object/computed";
import { ajax } from "discourse/lib/ajax";
import { cancel, debounce, later, throttle } from "@ember/runloop";
import { cancel, debounce, later, next, once, throttle } from "@ember/runloop";
import Session from "discourse/models/session";
import { Promise } from "rsvp";
import { isTesting } from "discourse-common/config/environment";
Expand All @@ -12,6 +12,8 @@ const PRESENCE_INTERVAL_S = 30;
const PRESENCE_DEBOUNCE_MS = isTesting() ? 0 : 500;
const PRESENCE_THROTTLE_MS = isTesting() ? 0 : 5000;

const PRESENCE_GET_RETRY_MS = 5000;

function createPromiseProxy() {
const promiseProxy = {};
promiseProxy.promise = new Promise((resolve, reject) => {
Expand Down Expand Up @@ -121,21 +123,7 @@ class PresenceChannelState extends EmberObject {
}

if (!initialData) {
try {
initialData = await ajax("/presence/get", {
data: {
channel: this.name,
},
});
} catch (e) {
if (e.jqXHR?.status === 404) {
throw new PresenceChannelNotFound(
`PresenceChannel '${this.name}' not found`
);
} else {
throw e;
}
}
initialData = await this.presenceService._getInitialData(this.name);
}

this.set("count", initialData.count);
Expand Down Expand Up @@ -231,6 +219,7 @@ export default class PresenceService extends Service {
this._presenceChannelStates = EmberObject.create();
this._presentProxies = {};
this._subscribedProxies = {};
this._initialDataRequests = {};
window.addEventListener("beforeunload", () => {
this._beaconLeaveAll();
});
Expand All @@ -244,6 +233,64 @@ export default class PresenceService extends Service {
});
}

_getInitialData(channelName) {
let promiseProxy = this._initialDataRequests[channelName];
if (!promiseProxy) {
promiseProxy = this._initialDataRequests[
channelName
] = createPromiseProxy();
}

once(this, this._makeInitialDataRequest);

return promiseProxy.promise;
}

async _makeInitialDataRequest() {
if (this._initialDataAjax) {
// try again next runloop
next(this, () => once(this, this._makeInitialDataRequest));
}

if (Object.keys(this._initialDataRequests).length === 0) {
// Nothing to request
return;
}

this._initialDataAjax = ajax("/presence/get", {
data: {
channels: Object.keys(this._initialDataRequests).slice(0, 50),
},
});

let result;
try {
result = await this._initialDataAjax;
} catch (e) {
later(this, this._makeInitialDataRequest, PRESENCE_GET_RETRY_MS);
throw e;
} finally {
this._initialDataAjax = null;
}

for (const channel in result) {
if (!result.hasOwnProperty(channel)) {
continue;
}

const state = result[channel];
if (state) {
this._initialDataRequests[channel].resolve(state);
} else {
const error = new PresenceChannelNotFound(
`PresenceChannel '${channel}' not found`
);
this._initialDataRequests[channel].reject(error);
}
delete this._initialDataRequests[channel];
}
}

_addPresent(channelProxy) {
let present = this._presentProxies[channelProxy.name];
if (!present) {
Expand Down Expand Up @@ -459,7 +506,11 @@ export default class PresenceService extends Service {
} else if (this._queuedEvents.length > 0) {
this._cancelTimer();
debounce(this, this._throttledUpdateServer, PRESENCE_DEBOUNCE_MS);
} else if (!this._nextUpdateTimer && !isTesting()) {
} else if (
!this._nextUpdateTimer &&
this._presentChannels.size > 0 &&
!isTesting()
) {
this._nextUpdateTimer = later(
this,
this._throttledUpdateServer,
Expand Down
@@ -0,0 +1,84 @@
import { publishToMessageBus } from "discourse/tests/helpers/qunit-helpers";
import User from "discourse/models/user";
import { settled } from "@ember/test-helpers";

let channels = {};

export default function (helper) {
this.post("/presence/update", (request) => {
const params = new URLSearchParams(request.requestBody);
const presentChannels = params.getAll("present_channels[]");
const leaveChannels = params.getAll("leave_channels[]");

const user = User.current();
if (!user) {
return helper.response(403, {});
}
const userInfo = {
id: user.id,
username: user.username,
name: user.name,
avatar_template: "/letter_avatar_proxy/v4/letter/b/35a633/{size}.png",
};

presentChannels.forEach((c) => joinChannel(c, userInfo));
leaveChannels.forEach((c) => leaveChannel(c, userInfo));

return helper.response({ success: "OK" });
});
this.get("/presence/get", (request) => {
const channelNames = request.queryParams.channels;
const response = {};
channelNames.forEach((c) => (response[c] = getChannelInfo(c)));
return helper.response(response);
});
}

export function getChannelInfo(name) {
channels[name] ||= { count: 0, users: [], last_message_id: 0 };
return channels[name];
}

export function joinChannel(name, user) {
const channel = getChannelInfo(name);
if (!channel.users.any((u) => u.id === user.id)) {
channel.users.push(user);
channel.count += 1;
channel.last_message_id += 1;
publishToMessageBus(
`/presence${name}`,
{
entering_users: [user],
},
0,
channel.last_message_id
);
}
return settled();
}

export function leaveChannel(name, user) {
const channel = getChannelInfo(name);
if (channel.users.any((u) => u.id === user.id)) {
channel.users = channel.users.reject((u) => u.id === user.id);
channel.count -= 1;
channel.last_message_id += 1;
publishToMessageBus(
`/presence${name}`,
{
leaving_user_ids: [user.id],
},
0,
channel.last_message_id
);
}
return settled();
}

export function presentUserIds(channelName) {
return getChannelInfo(channelName).users.map((u) => u.id);
}

export function clearState() {
channels = {};
}
2 changes: 2 additions & 0 deletions app/assets/javascripts/discourse/tests/setup-tests.js
Expand Up @@ -33,6 +33,7 @@ import { registerObjects } from "discourse/pre-initializers/inject-discourse-obj
import sinon from "sinon";
import { run } from "@ember/runloop";
import { isLegacyEmber } from "discourse-common/config/environment";
import { clearState as clearPresenceState } from "discourse/tests/helpers/presence-pretender";

const Plugin = $.fn.modal;
const Modal = Plugin.Constructor;
Expand Down Expand Up @@ -308,6 +309,7 @@ function setupTestsCommon(application, container, config) {
QUnit.testDone(function () {
sinon.restore();
resetPretender();
clearPresenceState();

// Destroy any modals
$(".modal-backdrop").remove();
Expand Down
Expand Up @@ -27,23 +27,31 @@ function usersFixture() {
},
];
}

acceptance("Presence - Subscribing", function (needs) {
needs.pretender((server, helper) => {
server.get("/presence/get", (request) => {
if (request.queryParams.channel?.startsWith("/test/")) {
return helper.response({
count: 3,
last_message_id: 1,
users: usersFixture(),
});
} else if (request.queryParams.channel?.startsWith("/countonly/")) {
return helper.response({
count: 3,
last_message_id: 1,
});
}

return helper.response(404, {});
const channels = request.queryParams.channels;
const response = {};

channels.forEach((c) => {
if (c.startsWith("/test/")) {
response[c] = {
count: 3,
last_message_id: 1,
users: usersFixture(),
};
} else if (c.startsWith("/countonly/")) {
response[c] = {
count: 3,
last_message_id: 1,
};
} else {
response[c] = null;
}
});

return helper.response(200, response);
});
});

Expand Down
35 changes: 24 additions & 11 deletions app/controllers/presence_controller.rb
Expand Up @@ -4,22 +4,35 @@ class PresenceController < ApplicationController
skip_before_action :check_xhr
before_action :ensure_logged_in, only: [:update]

MAX_CHANNELS_PER_REQUEST ||= 50

def get
name = params.require(:channel)
names = params.require(:channels)
raise Discourse::InvalidParameters.new(:channels) if !(names.is_a?(Array) && names.all? { |n| n.is_a? String })

begin
channel = PresenceChannel.new(name)
rescue PresenceChannel::NotFound
raise Discourse::NotFound
names.uniq!

raise Discourse::InvalidParameters.new("Too many channels") if names.length > MAX_CHANNELS_PER_REQUEST

user_group_ids = if current_user
GroupUser.where(user_id: current_user.id).pluck("group_id")
else
[]
end

if !channel.can_view?(user_id: current_user&.id)
# Do not reveal existence of channel
raise Discourse::NotFound
result = {}
names.each do |name|
channel = PresenceChannel.new(name)
if channel.can_view?(user_id: current_user&.id, group_ids: user_group_ids)
result[name] = PresenceChannelStateSerializer.new(channel.state, root: nil)
else
result[name] = nil
end
rescue PresenceChannel::NotFound
result[name] = nil
end

state = channel.state
render json: state, serializer: PresenceChannelStateSerializer, root: nil
render json: result
end

def update
Expand All @@ -39,7 +52,7 @@ def update
raise Discourse::InvalidParameters.new(:leave_channels)
end

if present_channels && present_channels.length > 50
if present_channels && present_channels.length > MAX_CHANNELS_PER_REQUEST
raise Discourse::InvalidParameters.new("Too many present_channels")
end

Expand Down
21 changes: 12 additions & 9 deletions lib/presence_channel.rb
Expand Up @@ -70,13 +70,13 @@ def to_json

attr_reader :name, :timeout, :message_bus_channel_name, :config

def initialize(name, raise_not_found: true)
def initialize(name, raise_not_found: true, use_cache: true)
@name = name
@timeout = DEFAULT_TIMEOUT
@message_bus_channel_name = "/presence#{name}"

begin
@config = fetch_config
@config = fetch_config(use_cache: use_cache)
rescue PresenceChannel::NotFound
raise if raise_not_found
@config = Config.new
Expand All @@ -85,21 +85,22 @@ def initialize(name, raise_not_found: true)

# Is this user allowed to view this channel?
# Pass `nil` for anonymous viewers
def can_view?(user_id: nil)
def can_view?(user_id: nil, group_ids: nil)
return true if config.public
return true if user_id && config.allowed_user_ids&.include?(user_id)

if user_id && config.allowed_group_ids.present?
user_group_ids = GroupUser.where(user_id: user_id).pluck("group_id")
return true if (user_group_ids & config.allowed_group_ids).present?
group_ids ||= GroupUser.where(user_id: user_id).pluck("group_id")
return true if (group_ids & config.allowed_group_ids).present?
end
false
end

# Is a user allowed to enter this channel?
# Currently equal to the the can_view? permission
def can_enter?(user_id: nil)
def can_enter?(user_id: nil, group_ids: nil)
return false if user_id.nil?
can_view?(user_id: user_id)
can_view?(user_id: user_id, group_ids: group_ids)
end

# Mark a user's client as present in this channel. The client_id should be unique per
Expand Down Expand Up @@ -297,8 +298,10 @@ def self.unregister_prefix(prefix)

private

def fetch_config
cached_config = PresenceChannel.redis.get(redis_key_config)
def fetch_config(use_cache: true)
cached_config = if use_cache
PresenceChannel.redis.get(redis_key_config)
end

if cached_config == Config::NOT_FOUND
raise PresenceChannel::NotFound
Expand Down

0 comments on commit a55642a

Please sign in to comment.