Skip to content

Commit

Permalink
[FIX][ENTERPRISE] DDP streamer sending data to destroyed streams (#27929
Browse files Browse the repository at this point in the history
)
  • Loading branch information
sampaiodiego committed Feb 1, 2023
1 parent 536eb15 commit 1c0a814
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
10 changes: 9 additions & 1 deletion ee/apps/ddp-streamer/src/Server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { EventEmitter } from 'events';

import type WebSocket from 'ws';
import WebSocket from 'ws';
import ejson from 'ejson';
import { v1 as uuidv1 } from 'uuid';
import { MeteorService, isMeteorError, MeteorError } from '@rocket.chat/core-services';
Expand Down Expand Up @@ -53,6 +53,10 @@ export class Server extends EventEmitter {
};

async call(client: Client, packet: IPacket): Promise<void> {
// if client is not connected we don't need to do anything
if (client.ws.readyState !== WebSocket.OPEN) {
return;
}
try {
// if method was not defined on DDP Streamer we fall back to Meteor
if (!this._methods.has(packet.method)) {
Expand Down Expand Up @@ -86,6 +90,10 @@ export class Server extends EventEmitter {
}

async subscribe(client: Client, packet: IPacket): Promise<void> {
// if client is not connected we don't need to do anything
if (client.ws.readyState !== WebSocket.OPEN) {
return;
}
try {
if (!this._subscriptions.has(packet.name)) {
throw new MeteorError(404, `Subscription '${packet.name}' not found`);
Expand Down
23 changes: 22 additions & 1 deletion ee/apps/ddp-streamer/src/Streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,22 @@ export class Stream extends Streamer {
};

for await (const { subscription } of subscriptions) {
// if the connection state is not open anymore, it somehow got to a weird state,
// we'll emit close so it can clean up the weird state, and so we stop emitting to it
if (subscription.client.ws.readyState !== WebSocket.OPEN) {
subscription.client.ws.emit('close');
continue;
}

if (this.retransmitToSelf === false && origin && origin === subscription.connection) {
continue;
}

if (await this.isEmitAllowed(subscription, eventName, ...args)) {
if (!(await this.isEmitAllowed(subscription, eventName, ...args))) {
continue;
}

try {
await new Promise<void>((resolve, reject) => {
const frame = data[subscription.client.meteorClient ? 'meteor' : 'normal'];

Expand All @@ -72,6 +83,16 @@ export class Stream extends Streamer {
resolve();
});
});
} catch (error: any) {
if (error.code === 'ERR_STREAM_DESTROYED') {
console.warn('Trying to send data to destroyed stream, closing connection.');

// if we still tried to send data to a destroyed stream, we'll try again to close the connection
if (subscription.client.ws.readyState !== WebSocket.OPEN) {
subscription.client.ws.emit('close');
}
}
console.error('Error trying to send data to stream.', error);
}
}
}
Expand Down

0 comments on commit 1c0a814

Please sign in to comment.