Skip to content

Commit

Permalink
Migrate Socket.IO from Version 2 to Version 3 馃殌 (#6152)
Browse files Browse the repository at this point in the history
* feat :migrate socket.io 2 -> 3

* fix: backend test

* fix: ts error

* rm

* reset the test timeout

* Updated cli client.

* Updated lock file.

* Use updated load tester.

---------

Co-authored-by: SamTV12345 <40429738+samtv12345@users.noreply.github.com>
  • Loading branch information
HMarzban and SamTV12345 committed Feb 17, 2024
1 parent 47f0113 commit b2be2ca
Show file tree
Hide file tree
Showing 17 changed files with 262 additions and 318 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/load-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
run: src/bin/installDeps.sh
-
name: Install etherpad-load-test
run: sudo npm install -g etherpad-load-test
run: sudo npm install -g etherpad-load-test-socket-io
-
name: Run load test
run: src/tests/frontend/travis/runnerLoadTest.sh 25 50
Expand Down Expand Up @@ -87,7 +87,7 @@ jobs:
run: pnpm config set auto-install-peers false
-
name: Install etherpad-load-test
run: pnpm install -g etherpad-load-test
run: pnpm install -g etherpad-load-test-socket-io
-
name: Install etherpad plugins
# The --legacy-peer-deps flag is required to work around a bug in npm v7:
Expand Down Expand Up @@ -161,7 +161,7 @@ jobs:
run: src/bin/installDeps.sh
-
name: Install etherpad-load-test
run: sudo npm install -g etherpad-load-test
run: sudo npm install -g etherpad-load-test-socket-io
-
name: Run load test
run: src/tests/frontend/travis/runnerLoadTest.sh 5000 5
2 changes: 1 addition & 1 deletion settings.json.docker
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@
/*
* Restrict socket.io transport methods
*/
"socketTransportProtocols" : ["xhr-polling", "jsonp-polling", "htmlfile"],
"socketTransportProtocols" : ["websocket", "polling"],

"socketIo": {
/*
Expand Down
2 changes: 1 addition & 1 deletion settings.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@
/*
* Restrict socket.io transport methods
*/
"socketTransportProtocols" : ["xhr-polling", "jsonp-polling", "htmlfile"],
"socketTransportProtocols" : ["websocket", "polling"],

"socketIo": {
/*
Expand Down
59 changes: 31 additions & 28 deletions src/node/handler/PadMessageHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ exports.socketio = () => {
const sessioninfos = {};
exports.sessioninfos = sessioninfos;

stats.gauge('totalUsers', () => socketio ? Object.keys(socketio.sockets.sockets).length : 0);
stats.gauge('totalUsers', () => socketio ? socketio.sockets.size : 0);
stats.gauge('activePads', () => {
const padIds = new Set();
for (const {padId} of Object.values(sessioninfos)) {
Expand Down Expand Up @@ -155,13 +155,13 @@ exports.handleConnect = (socket) => {
* Kicks all sessions from a pad
*/
exports.kickSessionsFromPad = (padID) => {
if (typeof socketio.sockets.clients !== 'function') return;
if (typeof socketio.sockets.clients !== 'object') return;

// skip if there is nobody on this pad
if (_getRoomSockets(padID).length === 0) return;

// disconnect everyone from this pad
socketio.sockets.in(padID).json.send({disconnect: 'deleted'});
socketio.in(padID).emit('message', {disconnect: 'deleted'});
};

/**
Expand All @@ -183,7 +183,7 @@ exports.handleDisconnect = async (socket) => {
` authorID:${session.author}` +
(user && user.username ? ` username:${user.username}` : ''));
/* eslint-enable prefer-template */
socket.broadcast.to(session.padId).json.send({
socket.broadcast.to(session.padId).emit('message', {
type: 'COLLABROOM',
data: {
type: 'USER_LEAVE',
Expand Down Expand Up @@ -216,7 +216,7 @@ exports.handleMessage = async (socket, message) => {
messageLogger.warn(`Rate limited IP ${socket.request.ip}. To reduce the amount of rate ` +
'limiting that happens edit the rateLimit values in settings.json');
stats.meter('rateLimited').mark();
socket.json.send({disconnect: 'rateLimited'});
socket.emit('message', {disconnect: 'rateLimited'});
throw err;
}
}
Expand Down Expand Up @@ -267,11 +267,11 @@ exports.handleMessage = async (socket, message) => {
const {accessStatus, authorID} =
await securityManager.checkAccess(auth.padID, auth.sessionID, auth.token, user);
if (accessStatus !== 'grant') {
socket.json.send({accessStatus});
socket.emit('message', {accessStatus});
throw new Error('access denied');
}
if (thisSession.author != null && thisSession.author !== authorID) {
socket.json.send({disconnect: 'rejected'});
socket.emit('message', {disconnect: 'rejected'});
throw new Error([
'Author ID changed mid-session. Bad or missing token or sessionID?',
`socket:${socket.id}`,
Expand Down Expand Up @@ -393,10 +393,10 @@ exports.handleCustomObjectMessage = (msg, sessionID) => {
if (msg.data.type === 'CUSTOM') {
if (sessionID) {
// a sessionID is targeted: directly to this sessionID
socketio.sockets.socket(sessionID).json.send(msg);
socketio.sockets.socket(sessionID).emit('message', msg);
} else {
// broadcast to all clients on this pad
socketio.sockets.in(msg.data.payload.padId).json.send(msg);
socketio.sockets.in(msg.data.payload.padId).emit('message', msg);
}
}
};
Expand All @@ -416,7 +416,7 @@ exports.handleCustomMessage = (padID, msgString) => {
time,
},
};
socketio.sockets.in(padID).json.send(msg);
socketio.sockets.in(padID).emit('message', msg);
};

/**
Expand Down Expand Up @@ -453,7 +453,7 @@ exports.sendChatMessageToPadClients = async (mt, puId, text = null, padId = null
// authorManager.getAuthorName() to resolve before saving the message to the database.
const promise = pad.appendChatMessage(message);
message.displayName = await authorManager.getAuthorName(message.authorId);
socketio.sockets.in(padId).json.send({
socketio.sockets.in(padId).emit('message', {
type: 'COLLABROOM',
data: {type: 'CHAT_MESSAGE', message},
});
Expand Down Expand Up @@ -483,7 +483,7 @@ const handleGetChatMessages = async (socket, {data: {start, end}}) => {
};

// send the messages back to the client
socket.json.send(infoMsg);
socket.emit('message', infoMsg);
};

/**
Expand All @@ -500,7 +500,7 @@ const handleSuggestUserName = (socket, message) => {
_getRoomSockets(padId).forEach((socket) => {
const session = sessioninfos[socket.id];
if (session && session.author === unnamedId) {
socket.json.send(message);
socket.emit('message', message);
}
});
};
Expand Down Expand Up @@ -539,7 +539,7 @@ const handleUserInfoUpdate = async (socket, {data: {userInfo: {name, colorId}}})
};

// Send the other clients on the pad the update message
socket.broadcast.to(padId).json.send(infoMsg);
socket.broadcast.to(padId).emit('message',infoMsg);

// Block until the authorManager has stored the new attributes.
await p;
Expand Down Expand Up @@ -654,12 +654,12 @@ const handleUserChanges = async (socket, message) => {
// The client assumes that ACCEPT_COMMIT and NEW_CHANGES messages arrive in order. Make sure we
// have already sent any previous ACCEPT_COMMIT and NEW_CHANGES messages.
assert.equal(thisSession.rev, r);
socket.json.send({type: 'COLLABROOM', data: {type: 'ACCEPT_COMMIT', newRev}});
socket.emit('message', {type: 'COLLABROOM', data: {type: 'ACCEPT_COMMIT', newRev}});
thisSession.rev = newRev;
if (newRev !== r) thisSession.time = await pad.getRevisionDate(newRev);
await exports.updatePadClients(pad);
} catch (err) {
socket.json.send({disconnect: 'badChangeset'});
socket.emit('message', {disconnect: 'badChangeset'});
stats.meter('failedChangesets').mark();
messageLogger.warn(`Failed to apply USER_CHANGES from author ${thisSession.author} ` +
`(socket ${socket.id}) on pad ${thisSession.padId}: ${err.stack || err}`);
Expand Down Expand Up @@ -716,7 +716,7 @@ exports.updatePadClients = async (pad) => {
},
};
try {
socket.json.send(msg);
socket.emit('message', msg);
} catch (err) {
messageLogger.error(`Failed to notify user of new revision: ${err.stack || err}`);
return;
Expand Down Expand Up @@ -833,7 +833,7 @@ const handleClientReady = async (socket, message) => {
// fix user's counter, works on page refresh or if user closes browser window and then rejoins
sessioninfos[otherSocket.id] = {};
otherSocket.leave(sessionInfo.padId);
otherSocket.json.send({disconnect: 'userdup'});
otherSocket.emit('message', {disconnect: 'userdup'});
}
}

Expand Down Expand Up @@ -899,15 +899,15 @@ const handleClientReady = async (socket, message) => {
apool: forWire.pool,
author: changesets[r].author,
currentTime: changesets[r].timestamp}};
socket.json.send(wireMsg);
socket.emit('message', wireMsg);
}

if (startNum === endNum) {
const Msg = {type: 'COLLABROOM',
data: {type: 'CLIENT_RECONNECT',
noChanges: true,
newRev: pad.getHeadRevisionNumber()}};
socket.json.send(Msg);
socket.emit('message', Msg);
}
} else {
// This is a normal first connect
Expand All @@ -921,7 +921,7 @@ const handleClientReady = async (socket, message) => {
atext.attribs = attribsForWire.translated;
} catch (e) {
messageLogger.error(e.stack || e);
socket.json.send({disconnect: 'corruptPad'}); // pull the brakes
socket.emit('message', {disconnect: 'corruptPad'}); // pull the brakes
throw new Error('corrupt pad');
}

Expand Down Expand Up @@ -1005,14 +1005,14 @@ const handleClientReady = async (socket, message) => {
socket.join(sessionInfo.padId);

// Send the clientVars to the Client
socket.json.send({type: 'CLIENT_VARS', data: clientVars});
socket.emit('message', {type: 'CLIENT_VARS', data: clientVars});

// Save the current revision in sessioninfos, should be the same as in clientVars
sessionInfo.rev = pad.getHeadRevisionNumber();
}

// Notify other users about this new user.
socket.broadcast.to(sessionInfo.padId).json.send({
socket.broadcast.to(sessionInfo.padId).emit('message', {
type: 'COLLABROOM',
data: {
type: 'USER_NEWINFO',
Expand Down Expand Up @@ -1062,7 +1062,7 @@ const handleClientReady = async (socket, message) => {
},
};

socket.json.send(msg);
socket.emit('message', msg);
}));

await hooks.aCallAll('userJoin', {
Expand Down Expand Up @@ -1092,7 +1092,7 @@ const handleChangesetRequest = async (socket, {data: {granularity, start, reques
start = headRev;
const data = await getChangesetInfo(pad, start, end, granularity);
data.requestID = requestID;
socket.json.send({type: 'CHANGESET_REQ', data});
socket.emit('message', {type: 'CHANGESET_REQ', data});
};

/**
Expand Down Expand Up @@ -1236,13 +1236,16 @@ const composePadChangesets = async (pad, startNum, endNum) => {

const _getRoomSockets = (padID) => {
const ns = socketio.sockets; // Default namespace.
const adapter = ns.adapter;
// We could call adapter.clients(), but that method is unnecessarily asynchronous. Replicate what
// it does here, but synchronously to avoid a race condition. This code will have to change when
// we update to socket.io v3.
const room = adapter.rooms[padID];
const room = ns.adapter.rooms?.get(padID);

if (!room) return [];
return Object.keys(room.sockets).map((id) => ns.connected[id]).filter((s) => s);

return Array.from(room)
.map(socketId => ns.sockets.get(socketId))
.filter(socket => socket);
};

/**
Expand Down
46 changes: 25 additions & 21 deletions src/node/hooks/express/socketio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const express = require('../express');
const log4js = require('log4js');
const proxyaddr = require('proxy-addr');
const settings = require('../../utils/Settings');
const socketio = require('socket.io');
const {Server} = require('socket.io');
const socketIORouter = require('../../handler/SocketIORouter');
const hooks = require('../../../static/js/pluginfw/hooks');
const padMessageHandler = require('../../handler/PadMessageHandler');
Expand Down Expand Up @@ -48,12 +48,29 @@ exports.expressCloseServer = async () => {
logger.info('All socket.io clients have disconnected');
};

exports.socketSessionMiddleware = (args: any) => (socket: any, next: Function) => {
const req = socket.request;
// Express sets req.ip but socket.io does not. Replicate Express's behavior here.
if (req.ip == null) {
if (settings.trustProxy) {
req.ip = proxyaddr(req, args.app.get('trust proxy fn'));
} else {
req.ip = socket.handshake.address;
}
}
if (!req.headers.cookie) {
// socketio.js-client on node.js doesn't support cookies, so pass them via a query parameter.
req.headers.cookie = socket.handshake.query.cookie;
}
express.sessionMiddleware(req, {}, next);
};

exports.expressCreateServer = (hookName:string, args:ArgsExpressType, cb:Function) => {
// init socket.io and redirect all requests to the MessageHandler
// there shouldn't be a browser that isn't compatible to all
// transports in this list at once
// e.g. XHR is disabled in IE by default, so in IE it should use jsonp-polling
io = socketio({
io = new Server(args.server, {
transports: settings.socketTransportProtocols,
}).listen(args.server, {
/*
Expand All @@ -79,33 +96,20 @@ exports.expressCreateServer = (hookName:string, args:ArgsExpressType, cb:Functio
maxHttpBufferSize: settings.socketIo.maxHttpBufferSize,
});

io.on('connect', (socket:any) => {
io.on('connection', (socket:any) => {
sockets.add(socket);
socketsEvents.emit('updated');
// https://socket.io/docs/v3/faq/index.html
const session = socket.request.session;
session.connections++;
session.save();
socket.on('disconnect', () => {
sockets.delete(socket);
socketsEvents.emit('updated');
});
});

io.use((socket:any, next: Function) => {
const req = socket.request;
// Express sets req.ip but socket.io does not. Replicate Express's behavior here.
if (req.ip == null) {
if (settings.trustProxy) {
req.ip = proxyaddr(req, args.app.get('trust proxy fn'));
} else {
req.ip = socket.handshake.address;
}
}
if (!req.headers.cookie) {
// socketio.js-client on node.js doesn't support cookies (see https://git.io/JU8u9), so the
// token and express_sid cookies have to be passed via a query parameter for unit tests.
req.headers.cookie = socket.handshake.query.cookie;
}
// See: https://socket.io/docs/faq/#Usage-with-express-session
express.sessionMiddleware(req, {}, next);
});
io.use(exports.socketSessionMiddleware(args));

io.use((socket:any, next:Function) => {
socket.conn.on('packet', (packet:string) => {
Expand Down
2 changes: 1 addition & 1 deletion src/node/utils/Settings.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ exports.ssl = false;
/**
* socket.io transport methods
**/
exports.socketTransportProtocols = ['xhr-polling', 'jsonp-polling', 'htmlfile'];
exports.socketTransportProtocols = ['websocket', 'polling'];

exports.socketIo = {
/**
Expand Down

0 comments on commit b2be2ca

Please sign in to comment.