Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Subscriptions model 6/6 #28675

Merged
merged 8 commits into from Mar 30, 2023
Merged
18 changes: 10 additions & 8 deletions apps/meteor/app/apps/server/bridges/internal.ts
@@ -1,10 +1,10 @@
import { InternalBridge } from '@rocket.chat/apps-engine/server/bridges/InternalBridge';
import type { ISetting } from '@rocket.chat/apps-engine/definition/settings';
import type { ISubscription } from '@rocket.chat/core-typings';
import { Settings } from '@rocket.chat/models';
import { Settings, Subscriptions } from '@rocket.chat/models';

import type { AppServerOrchestrator } from '../../../../ee/server/apps/orchestrator';
import { Subscriptions } from '../../../models/server';
import { isTruthy } from '../../../../lib/isTruthy';

export class AppInternalBridge extends InternalBridge {
// eslint-disable-next-line no-empty-function
Expand All @@ -17,17 +17,19 @@ export class AppInternalBridge extends InternalBridge {
return [];
}

const records = Subscriptions.findByRoomIdWhenUsernameExists(roomId, {
fields: {
'u.username': 1,
},
}).fetch();
const records = Promise.await(
Subscriptions.findByRoomIdWhenUsernameExists(roomId, {
projection: {
'u.username': 1,
},
}).toArray(),
);

if (!records || records.length === 0) {
return [];
}

return records.map((s: ISubscription) => s.u.username);
return records.map((s: ISubscription) => s.u.username).filter(isTruthy);
}

protected async getWorkspacePublicKey(): Promise<ISetting> {
Expand Down
10 changes: 4 additions & 6 deletions apps/meteor/app/apps/server/bridges/messages.ts
Expand Up @@ -3,10 +3,10 @@ import { MessageBridge } from '@rocket.chat/apps-engine/server/bridges/MessageBr
import type { IMessage } from '@rocket.chat/apps-engine/definition/messages';
import type { IUser } from '@rocket.chat/apps-engine/definition/users';
import type { IRoom } from '@rocket.chat/apps-engine/definition/rooms';
import type { ISubscription } from '@rocket.chat/core-typings';
import { api } from '@rocket.chat/core-services';
import { Subscriptions } from '@rocket.chat/models';

import { Messages, Users, Subscriptions } from '../../../models/server';
import { Messages, Users } from '../../../models/server';
import { updateMessage } from '../../../lib/server/functions/updateMessage';
import { executeSendMessage } from '../../../lib/server/methods/sendMessage';
import notifications from '../../../notifications/server/lib/Notifications';
Expand Down Expand Up @@ -68,15 +68,13 @@ export class AppMessageBridge extends MessageBridge {
protected async notifyRoom(room: IRoom, message: IMessage, appId: string): Promise<void> {
this.orch.debugLog(`The App ${appId} is notifying a room's users.`);

if (!room || !room.id) {
if (!room?.id) {
return;
}

const msg = this.orch.getConverters()?.get('messages').convertAppMessage(message);

const users = Subscriptions.findByRoomIdWhenUserIdExists(room.id, { fields: { 'u._id': 1 } })
.fetch()
.map((s: ISubscription) => s.u._id);
const users = (await Subscriptions.findByRoomIdWhenUserIdExists(room.id, { projection: { 'u._id': 1 } }).toArray()).map((s) => s.u._id);

Users.findByIds(users, { fields: { _id: 1 } })
.fetch()
Expand Down
14 changes: 5 additions & 9 deletions apps/meteor/app/federation/server/functions/helpers.ts
@@ -1,8 +1,8 @@
import { isDirectMessageRoom } from '@rocket.chat/core-typings';
import type { ISubscription, IRegisterUser, IUser, IRoom } from '@rocket.chat/core-typings';
import { Settings } from '@rocket.chat/models';
import { Settings, Subscriptions } from '@rocket.chat/models';

import { Subscriptions, Users } from '../../../models/server';
import { Users } from '../../../models/server';
import { STATUS_ENABLED, STATUS_REGISTERING } from '../constants';

export const getNameAndDomain = (fullyQualifiedName: string): string[] => fullyQualifiedName.split('@');
Expand Down Expand Up @@ -37,13 +37,9 @@ export const hasExternalDomain = ({ federation }: { federation: { origin: string
export const isLocalUser = ({ federation }: { federation: { origin: string } }, localDomain: string): boolean =>
!federation || federation.origin === localDomain;

export const getFederatedRoomData = (
export const getFederatedRoomData = async (
room: IRoom,
): {
hasFederatedUser: boolean;
users: IUser[];
subscriptions: { [k: string]: ISubscription } | undefined;
} => {
): Promise<{ hasFederatedUser: boolean; users: IUser[]; subscriptions: { [k: string]: ISubscription } | undefined }> => {
if (isDirectMessageRoom(room)) {
// Check if there is a federated user on this room

Expand All @@ -55,7 +51,7 @@ export const getFederatedRoomData = (
}

// Find all subscriptions of this room
const s = Subscriptions.findByRoomIdWhenUsernameExists(room._id).fetch() as ISubscription[];
const s = await Subscriptions.findByRoomIdWhenUsernameExists(room._id).toArray();
const subscriptions = s.reduce((acc, s) => {
acc[s.u._id] = s;
return acc;
Expand Down
Expand Up @@ -19,7 +19,7 @@ async function afterAddedToRoom(involvedUsers, room) {
clientLogger.debug({ msg: 'afterAddedToRoom', involvedUsers, room });

// If there are not federated users on this room, ignore it
const { users, subscriptions } = getFederatedRoomData(room);
const { users, subscriptions } = await getFederatedRoomData(room);

// Load the subscription
const subscription = await Subscriptions.findOneByRoomIdAndUserId(room._id, addedUser._id);
Expand Down
@@ -1,7 +1,6 @@
import { FederationRoomEvents } from '@rocket.chat/models';
import { FederationRoomEvents, Subscriptions } from '@rocket.chat/models';

import { clientLogger } from '../lib/logger';
import { Subscriptions } from '../../../models/server';
import { normalizers } from '../normalizers';
import { deleteRoom } from '../../../lib/server/functions';
import { getFederationDomain } from '../lib/getFederationDomain';
Expand Down Expand Up @@ -36,10 +35,10 @@ async function afterCreateDirectRoom(room, extras) {
const genesisEvent = await FederationRoomEvents.createGenesisEvent(getFederationDomain(), normalizedRoom);

const events = await Promise.all(
extras.members.map((member) => {
extras.members.map(async (member) => {
const normalizedMember = normalizers.normalizeUser(member);

const sourceSubscription = Subscriptions.findOne({
const sourceSubscription = await Subscriptions.findOne({
'rid': normalizedRoom._id,
'u._id': normalizedMember._id,
});
Expand Down
6 changes: 3 additions & 3 deletions apps/meteor/app/federation/server/hooks/afterCreateRoom.js
@@ -1,7 +1,7 @@
import { FederationRoomEvents } from '@rocket.chat/models';
import { FederationRoomEvents, Subscriptions } from '@rocket.chat/models';

import { clientLogger } from '../lib/logger';
import { Subscriptions, Users } from '../../../models/server';
import { Users } from '../../../models/server';
import { normalizers } from '../normalizers';
import { deleteRoom } from '../../../lib/server/functions';
import { getFederationDomain } from '../lib/getFederationDomain';
Expand Down Expand Up @@ -64,7 +64,7 @@ async function afterCreateRoom(roomOwner, room) {
}

// Find all subscriptions of this room
let subscriptions = Subscriptions.findByRoomIdWhenUsernameExists(room._id).fetch();
let subscriptions = await Subscriptions.findByRoomIdWhenUsernameExists(room._id).toArray();
subscriptions = subscriptions.reduce((acc, s) => {
acc[s.u._id] = s;

Expand Down
2 changes: 1 addition & 1 deletion apps/meteor/app/federation/server/hooks/afterLeaveRoom.js
Expand Up @@ -16,7 +16,7 @@ async function afterLeaveRoom(user, room) {

clientLogger.debug({ msg: 'afterLeaveRoom', user, room });

const { users } = getFederatedRoomData(room);
const { users } = await getFederatedRoomData(room);

try {
// Get the domains after leave
Expand Down
Expand Up @@ -18,7 +18,7 @@ async function afterRemoveFromRoom(involvedUsers, room) {

clientLogger.debug({ msg: 'afterRemoveFromRoom', involvedUsers, room });

const { users } = getFederatedRoomData(room);
const { users } = await getFederatedRoomData(room);

try {
// Get the domains after removal
Expand Down
Expand Up @@ -14,10 +14,10 @@ import type {
IImportData,
IImportRecordType,
} from '@rocket.chat/core-typings';
import { ImportData, Rooms as RoomsRaw } from '@rocket.chat/models';
import { ImportData, Rooms as RoomsRaw, Subscriptions } from '@rocket.chat/models';

import type { IConversionCallbacks } from '../definitions/IConversionCallbacks';
import { Users, Rooms, Subscriptions } from '../../../models/server';
import { Users, Rooms } from '../../../models/server';
import { generateUsernameSuggestion, insertMessage, saveUserIdentity, addUserToDefaultChannels } from '../../../lib/server';
import { setUserActiveStatus } from '../../../lib/server/functions/setUserActiveStatus';
import type { Logger } from '../../../../server/lib/logger/Logger';
Expand Down Expand Up @@ -953,7 +953,7 @@ export class ImportDataConverter {

async archiveRoomById(rid: string) {
await RoomsRaw.archiveById(rid);
Subscriptions.archiveByRoomId(rid);
await Subscriptions.archiveByRoomId(rid);
}

async convertData(startedByUserId: string, callbacks: IConversionCallbacks = {}): Promise<void> {
Expand Down
5 changes: 2 additions & 3 deletions apps/meteor/app/lib/server/functions/archiveRoom.ts
@@ -1,13 +1,12 @@
import { Messages, Rooms } from '@rocket.chat/models';
import { Messages, Rooms, Subscriptions } from '@rocket.chat/models';
import type { IMessage } from '@rocket.chat/core-typings';

import { Subscriptions } from '../../../models/server';
import { callbacks } from '../../../../lib/callbacks';
import { settings } from '../../../settings/server';

export const archiveRoom = async function (rid: string, user: IMessage['u']): Promise<void> {
await Rooms.archiveById(rid);
Subscriptions.archiveByRoomId(rid);
await Subscriptions.archiveByRoomId(rid);
await Messages.createWithTypeRoomIdMessageUserAndUnread('room-archived', rid, '', user, settings.get('Message_Read_Receipt_Enabled'));

callbacks.run('afterRoomArchived', await Rooms.findOneById(rid), user);
Expand Down
5 changes: 2 additions & 3 deletions apps/meteor/app/lib/server/functions/unarchiveRoom.ts
@@ -1,11 +1,10 @@
import { Messages, Rooms } from '@rocket.chat/models';
import { Messages, Rooms, Subscriptions } from '@rocket.chat/models';
import type { IMessage } from '@rocket.chat/core-typings';

import { Subscriptions } from '../../../models/server';
import { settings } from '../../../settings/server';

export const unarchiveRoom = async function (rid: string, user: IMessage['u']): Promise<void> {
await Rooms.unarchiveById(rid);
Subscriptions.unarchiveByRoomId(rid);
await Subscriptions.unarchiveByRoomId(rid);
await Messages.createWithTypeRoomIdMessageUserAndUnread('room-unarchived', rid, '', user, settings.get('Message_Read_Receipt_Enabled'));
};
27 changes: 13 additions & 14 deletions apps/meteor/app/lib/server/lib/notifyUsersOnMessage.js
@@ -1,8 +1,7 @@
import moment from 'moment';
import { escapeRegExp } from '@rocket.chat/string-helpers';
import { Subscriptions as SubscriptionsRaw, Rooms } from '@rocket.chat/models';
import { Subscriptions, Rooms } from '@rocket.chat/models';

import { Subscriptions } from '../../../models/server';
import { settings } from '../../../settings/server';
import { callbacks } from '../../../../lib/callbacks';

Expand Down Expand Up @@ -64,19 +63,19 @@ const incGroupMentions = async (rid, roomType, excludeUserId, unreadCount) => {
const incUnreadByGroup = ['all_messages', 'group_mentions_only', 'user_and_group_mentions_only'].includes(unreadCount);
const incUnread = roomType === 'd' || roomType === 'l' || incUnreadByGroup ? 1 : 0;

await SubscriptionsRaw.incGroupMentionsAndUnreadForRoomIdExcludingUserId(rid, excludeUserId, 1, incUnread);
await Subscriptions.incGroupMentionsAndUnreadForRoomIdExcludingUserId(rid, excludeUserId, 1, incUnread);
};

const incUserMentions = async (rid, roomType, uids, unreadCount) => {
const incUnreadByUser = ['all_messages', 'user_mentions_only', 'user_and_group_mentions_only'].includes(unreadCount);
const incUnread = roomType === 'd' || roomType === 'l' || incUnreadByUser ? 1 : 0;

await SubscriptionsRaw.incUserMentionsAndUnreadForRoomIdAndUserIds(rid, uids, 1, incUnread);
await Subscriptions.incUserMentionsAndUnreadForRoomIdAndUserIds(rid, uids, 1, incUnread);
};

const getUserIdsFromHighlights = (rid, message) => {
const highlightOptions = { fields: { 'userHighlights': 1, 'u._id': 1 } };
const subs = Subscriptions.findByRoomWithUserHighlights(rid, highlightOptions).fetch();
const getUserIdsFromHighlights = async (rid, message) => {
const highlightOptions = { projection: { 'userHighlights': 1, 'u._id': 1 } };
const subs = await Subscriptions.findByRoomWithUserHighlights(rid, highlightOptions).toArray();

return subs
.filter(
Expand Down Expand Up @@ -114,7 +113,7 @@ async function updateUsersSubscriptions(message, room) {

const unreadCount = getUnreadSettingCount(room.t);

getUserIdsFromHighlights(room._id, message).forEach((uid) => userIds.add(uid));
(await getUserIdsFromHighlights(room._id, message)).forEach((uid) => userIds.add(uid));

// give priority to user mentions over group mentions
if (userIds.size > 0) {
Expand All @@ -125,16 +124,16 @@ async function updateUsersSubscriptions(message, room) {

// this shouldn't run only if has group mentions because it will already exclude mentioned users from the query
if (!toAll && !toHere && unreadCount === 'all_messages') {
await SubscriptionsRaw.incUnreadForRoomIdExcludingUserIds(room._id, [...userIds, message.u._id]);
await Subscriptions.incUnreadForRoomIdExcludingUserIds(room._id, [...userIds, message.u._id]);
}
}

// Update all other subscriptions to alert their owners but without incrementing
// the unread counter, as it is only for mentions and direct messages
// We now set alert and open properties in two separate update commands. This proved to be more efficient on MongoDB - because it uses a more efficient index.
await Promise.all([
SubscriptionsRaw.setAlertForRoomIdExcludingUserId(message.rid, message.u._id),
SubscriptionsRaw.setOpenForRoomIdExcludingUserId(message.rid, message.u._id),
Subscriptions.setAlertForRoomIdExcludingUserId(message.rid, message.u._id),
Subscriptions.setOpenForRoomIdExcludingUserId(message.rid, message.u._id),
]);
}

Expand All @@ -143,13 +142,13 @@ export async function updateThreadUsersSubscriptions(message, room, replies) {

// incUserMentions(room._id, room.t, replies, unreadCount);

await SubscriptionsRaw.setAlertForRoomIdAndUserIds(message.rid, replies);
await Subscriptions.setAlertForRoomIdAndUserIds(message.rid, replies);

const repliesPlusSender = [...new Set([message.u._id, ...replies])];

await SubscriptionsRaw.setOpenForRoomIdAndUserIds(message.rid, repliesPlusSender);
await Subscriptions.setOpenForRoomIdAndUserIds(message.rid, repliesPlusSender);

await SubscriptionsRaw.setLastReplyForRoomIdAndUserIds(message.rid, repliesPlusSender, new Date());
await Subscriptions.setLastReplyForRoomIdAndUserIds(message.rid, repliesPlusSender, new Date());
}

export async function notifyUsersOnMessage(message, room) {
Expand Down