Skip to content

Commit

Permalink
refactor: move notifyListener actions out of instance-status package
Browse files Browse the repository at this point in the history
  • Loading branch information
ricardogarim committed May 10, 2024
1 parent 18e339e commit 4193d0e
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 43 deletions.
21 changes: 19 additions & 2 deletions apps/meteor/app/lib/server/lib/notifyListener.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { api, dbWatchersDisabled } from '@rocket.chat/core-services';
import type { IRocketChatRecord, IRoom } from '@rocket.chat/core-typings';
import { Rooms } from '@rocket.chat/models';
import type { IInstanceStatus, IRocketChatRecord, IRoom } from '@rocket.chat/core-typings';
import { InstanceStatus, Rooms } from '@rocket.chat/models';

type ClientAction = 'inserted' | 'updated' | 'removed';

Expand Down Expand Up @@ -65,3 +65,20 @@ export async function notifyOnRoomChangedByUserDM<T extends IRoom>(
void api.broadcast('watch.rooms', { clientAction, room: item });
}
}

export async function notifyOnInstanceStatusChangedById<T extends IInstanceStatus>(
id: T['_id'],
clientAction: 'inserted' | 'updated' | 'removed',
data?: Record<string, unknown>,
diff?: Record<string, unknown>,
) {
if (!dbWatchersDisabled) {
return;
}

const instanceStatus = await InstanceStatus.findOneById(id);

if (instanceStatus) {
void api.broadcast('watch.instanceStatus', { id, clientAction, data, diff });
}
}
5 changes: 5 additions & 0 deletions apps/meteor/ee/server/local-services/instance/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import EJSON from 'ejson';
import type { BrokerNode } from 'moleculer';
import { ServiceBroker, Transporters, Serializers } from 'moleculer';

import { notifyOnInstanceStatusChangedById } from '../../../../app/lib/server/lib/notifyListener';
import { StreamerCentral } from '../../../../server/modules/streamer/streamer.module';
import type { IInstanceService } from '../../sdk/types/IInstanceService';
import { getLogger } from './getLogger';
Expand Down Expand Up @@ -155,6 +156,10 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe
};

await InstanceStatus.registerInstance('rocket.chat', instance);
void notifyOnInstanceStatusChangedById(InstanceStatus.id(), 'inserted');
process.on('exit', async () => {
void notifyOnInstanceStatusChangedById(InstanceStatus.id(), 'removed');
});

try {
const hasLicense = await License.hasModule('scalability');
Expand Down
19 changes: 12 additions & 7 deletions apps/meteor/ee/server/startup/presence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@ import { Accounts } from 'meteor/accounts-base';
import { Meteor } from 'meteor/meteor';
import { throttle } from 'underscore';

import { notifyOnInstanceStatusChangedById } from '../../../app/lib/server/lib/notifyListener';

// update connections count every 30 seconds
const updateConns = throttle(function _updateConns() {
void InstanceStatus.updateConnections(Meteor.server.sessions.size);
const updateConns = throttle(async () => {
const { modifiedCount } = await InstanceStatus.updateConnections(Meteor.server.sessions.size);

if (modifiedCount) {
void notifyOnInstanceStatusChangedById(InstanceStatus.id(), 'updated', { 'extraInformation.conns': Meteor.server.sessions.size });
}
}, 30000);

Meteor.startup(() => {
Expand All @@ -21,7 +27,7 @@ Meteor.startup(() => {
}

await Presence.removeConnection(session.userId, connection.id, nodeId);
updateConns();
await updateConns();
});
});

Expand All @@ -42,13 +48,12 @@ Meteor.startup(() => {

void (async function () {
await Presence.newConnection(login.user._id, login.connection.id, nodeId);
updateConns();
await updateConns();
})();
});

Accounts.onLogout((login: any): void => {
Accounts.onLogout(async (login: any): Promise<void> => {
void Presence.removeConnection(login.user._id, login.connection.id, nodeId);

updateConns();
await updateConns();
});
});
12 changes: 9 additions & 3 deletions ee/apps/ddp-streamer/src/DDPStreamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import WebSocket from 'ws';
import { ListenersModule } from '../../../../apps/meteor/server/modules/listeners/listeners.module';
import type { NotificationsModule } from '../../../../apps/meteor/server/modules/notifications/notifications.module';
import { StreamerCentral } from '../../../../apps/meteor/server/modules/streamer/streamer.module';
import { notifyOnInstanceStatusChangedById } from '../../../../apps/meteor/app/lib/server/lib/notifyListener';
import { Client, clientMap } from './Client';
import { events, server } from './configureServer';
import { DDP_EVENTS } from './constants';
Expand Down Expand Up @@ -64,8 +65,12 @@ export class DDPStreamer extends ServiceClass {
}

// update connections count every 30 seconds
updateConnections = throttle(() => {
InstanceStatus.updateConnections(this.wss?.clients.size ?? 0);
updateConnections = throttle(async () => {
const { modifiedCount } = await InstanceStatus.updateConnections(this.wss?.clients.size ?? 0);

if (modifiedCount) {
notifyOnInstanceStatusChangedById(InstanceStatus.id(), 'updated', { 'extraInformation.conns': this.wss?.clients.size ?? 0 });
}
}, 30000);

async created(): Promise<void> {
Expand Down Expand Up @@ -205,7 +210,8 @@ export class DDPStreamer extends ServiceClass {

this.wss.on('connection', (ws, req) => new Client(ws, req.url !== '/websocket', req));

InstanceStatus.registerInstance('ddp-streamer', {});
await InstanceStatus.registerInstance('ddp-streamer', {});
void notifyOnInstanceStatusChangedById(InstanceStatus.id(), 'inserted');
} catch (err) {
console.error('DDPStreamer did not start correctly', err);
}
Expand Down
36 changes: 5 additions & 31 deletions packages/instance-status/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// import { IInstanceStatus } from '@rocket.chat/core-typings';
import { EventEmitter } from 'events';

import { InstanceStatus as InstanceStatusModel } from '@rocket.chat/models';
import { v4 as uuidv4 } from 'uuid';
import type { UpdateResult } from 'mongodb';

Check failure on line 5 in packages/instance-status/src/index.ts

View workflow job for this annotation

GitHub Actions / 🔎 Code Check / Code Lint

`mongodb` import should occur before import of `uuid`

const events = new EventEmitter();

Expand All @@ -11,21 +11,6 @@ const defaultPingInterval = parseInt(String(process.env.MULTIPLE_INSTANCES_PING_
// if not set via env var ensures at least 3 ticks before expiring (multiple of 60s)
const indexExpire = (parseInt(String(process.env.MULTIPLE_INSTANCES_EXPIRE)) || Math.ceil((defaultPingInterval * 3) / 60)) * 60;

const dbWatchersDisabled = ['yes', 'true'].includes(String(process.env.DISABLE_DB_WATCHERS).toLowerCase());

function notifyOnInstanceStatusChangedById(
id: string,
clientAction: 'inserted' | 'updated' | 'removed',
data?: Record<string, unknown>,
diff?: Record<string, unknown>,
) {
if (!dbWatchersDisabled) {
return;
}

events.emit('watch.instanceStatus', { clientAction, id, data, diff });
}

let createIndexes = async () => {
await InstanceStatusModel.col
.indexes()
Expand Down Expand Up @@ -90,16 +75,13 @@ async function registerInstance(name: string, extraInformation: Record<string, u
};

try {
const result = await InstanceStatusModel.updateOne({ _id: ID }, instance as any, { upsert: true });

const instanceStatus = await InstanceStatusModel.findOne({ _id: ID });
await InstanceStatusModel.updateOne({ _id: ID }, instance as any, { upsert: true });
const instanceStatus = await InstanceStatusModel.findOneById(ID);

start();

events.emit('registerInstance', instanceStatus, instance);

notifyOnInstanceStatusChangedById(ID, result.upsertedId ? 'inserted' : 'updated', { ...instanceStatus });

process.on('exit', onExit);

return instanceStatus;
Expand All @@ -115,10 +97,6 @@ async function unregisterInstance() {

events.emit('unregisterInstance', ID);

if (result.deletedCount) {
notifyOnInstanceStatusChangedById(ID, 'removed');
}

process.removeListener('exit', onExit);

return result;
Expand Down Expand Up @@ -168,8 +146,8 @@ async function onExit() {
await unregisterInstance();
}

async function updateConnections(conns: number) {
const result = await InstanceStatusModel.updateOne(
async function updateConnections(conns: number): Promise<UpdateResult> {
return InstanceStatusModel.updateOne(
{
_id: ID,
},
Expand All @@ -179,10 +157,6 @@ async function updateConnections(conns: number) {
},
},
);

if (result.modifiedCount) {
notifyOnInstanceStatusChangedById(ID, 'updated', undefined, { 'extraInformation.conns': conns });
}
}

export const InstanceStatus = {
Expand Down

0 comments on commit 4193d0e

Please sign in to comment.