Skip to content

Commit

Permalink
refactor message enqueueing, clients now have to acknowledge JOIN_ROOM.
Browse files Browse the repository at this point in the history
closes #260
  • Loading branch information
endel committed Jan 3, 2020
1 parent 4eb561a commit 5b69715
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 140 deletions.
6 changes: 3 additions & 3 deletions package.json
@@ -1,6 +1,6 @@
{
"name": "colyseus",
"version": "0.11.25",
"version": "0.12.0",
"description": "Multiplayer Game Server for Node.js.",
"main": "./lib/index.js",
"typings": "./lib/index.d.ts",
Expand Down Expand Up @@ -57,13 +57,13 @@
"@types/express": "^4.16.1",
"@types/fossil-delta": "^1.0.0",
"@types/koa": "^2.0.49",
"@types/mocha": "^5.2.0",
"@types/mocha": "^5.2.7",
"@types/mongoose": "^5.5.12",
"@types/node": "^10.0.8",
"@types/sinon": "^4.3.3",
"all-contributors-cli": "^5.4.0",
"benchmark": "^2.1.1",
"colyseus.js": "^0.11.6-beta.1",
"colyseus.js": "^0.11.7",
"cors": "^2.8.5",
"express": "^4.16.2",
"httpie": "^1.1.2",
Expand Down
40 changes: 18 additions & 22 deletions src/Protocol.ts
Expand Up @@ -53,12 +53,16 @@ export function decode(message: any) {

export const send = {
raw: (client: Client, bytes: number[]) => {
if (
client.state === ClientState.JOINING &&
client.readyState !== WebSocket.OPEN
) {
if (client.readyState !== WebSocket.OPEN) { return; }

if (client.state === ClientState.JOINING) {
// sending messages during `onJoin`.
// - the client-side cannot register "onMessage" callbacks at this point.
// - enqueue the messages to be send after JOIN_ROOM message has been sent
client._enqueuedMessages.push(bytes);
return;
}

client.send(bytes, { binary: true });
},

Expand All @@ -70,7 +74,7 @@ export const send = {
client.send(buff, { binary: true });
},

[Protocol.JOIN_ROOM]: (client: Client, serializerId: string, handshake?: number[]) => {
[Protocol.JOIN_ROOM]: async (client: Client, serializerId: string, handshake?: number[]) => {
if (client.readyState !== WebSocket.OPEN) { return; }
let offset = 0;

Expand All @@ -89,17 +93,16 @@ export const send = {
}
}

client.send(buff, { binary: true });
return new Promise((resolve, reject) => {
client.send(buff, { binary: true }, (err) => {
if (err) { reject(); }
else { resolve(); }
});
})
},

[Protocol.ROOM_STATE]: (client: Client, bytes: number[]) => {
if (
client.state === ClientState.JOINING &&
client.readyState !== WebSocket.OPEN
) {
return;
}
client.send([Protocol.ROOM_STATE, ...bytes], { binary: true });
send.raw(client, [Protocol.ROOM_STATE, ...bytes]);
},

// [Protocol.ROOM_STATE_PATCH]: (client: Client, bytes: number[]) => {
Expand All @@ -118,21 +121,14 @@ export const send = {
* TODO: refactor me. Move this to `SchemaSerializer` / `FossilDeltaSerializer`
*/
[Protocol.ROOM_DATA]: (client: Client, message: any, encode: boolean = true) => {
if (
client.state === ClientState.JOINING &&
client.readyState !== WebSocket.OPEN
) {
return;
}
client.send([Protocol.ROOM_DATA, ...(encode && msgpack.encode(message) || message)], { binary: true });
send.raw(client, [Protocol.ROOM_DATA, ...(encode && msgpack.encode(message) || message)]);
},

/**
* TODO: refactor me. Move this to SchemaSerializer
*/
[Protocol.ROOM_DATA_SCHEMA]: (client: Client, typeid: number, bytes: number[]) => {
if (client.readyState !== WebSocket.OPEN) { return; }
client.send([Protocol.ROOM_DATA_SCHEMA, typeid, ...bytes], { binary: true });
send.raw(client, [Protocol.ROOM_DATA_SCHEMA, typeid, ...bytes]);
},

};
Expand Down
54 changes: 25 additions & 29 deletions src/Room.ts
Expand Up @@ -216,14 +216,7 @@ export abstract class Room<State= any, Metadata= any> extends EventEmitter {
}

public send(client: Client, message: any): void {
if (client.state === ClientState.JOINING) {
// sending messages during `onJoin`.
// - the client-side cannot register "onMessage" callbacks at this point.
// - enqueue the messages to be send after JOIN_ROOM message has been sent
if (!client._enqueuedMessages) { client._enqueuedMessages = []; }
client._enqueuedMessages.push(message);

} else if (message instanceof Schema) {
if (message instanceof Schema) {
send[Protocol.ROOM_DATA_SCHEMA](client, (message.constructor as typeof Schema)._typeid, message.encodeAll());

} else {
Expand Down Expand Up @@ -307,8 +300,6 @@ export abstract class Room<State= any, Metadata= any> extends EventEmitter {
}

public async ['_onJoin'](client: Client, req?: http.IncomingMessage) {
client.state = ClientState.JOINING;

const sessionId = client.sessionId;

if (this.reservedSeatTimeouts[sessionId]) {
Expand All @@ -322,13 +313,18 @@ export abstract class Room<State= any, Metadata= any> extends EventEmitter {
this._autoDisposeTimeout = undefined;
}

// bind clean-up callback when client connection closes
client.once('close', this._onLeave.bind(this, client));

// get seat reservation options and clear it
const options = this.reservedSeats[sessionId];
delete this.reservedSeats[sessionId];

// bind clean-up callback when client connection closes
client.once('close', this._onLeave.bind(this, client));

client.state = ClientState.JOINING;
client._enqueuedMessages = [];

this.clients.push(client);

const reconnection = this.reconnections[sessionId];
if (reconnection) {
reconnection.resolve(client);
Expand All @@ -345,6 +341,7 @@ export abstract class Room<State= any, Metadata= any> extends EventEmitter {
await this.onJoin(client, options, client.auth);
}
} catch (e) {
spliceOne(this.clients, this.clients.indexOf(client));
debugAndPrintError(e);
throw e;

Expand All @@ -366,22 +363,6 @@ export abstract class Room<State= any, Metadata= any> extends EventEmitter {
this._serializer.id,
this._serializer.handshake && this._serializer.handshake(),
);

client.state = ClientState.JOINED;

// dequeue messages (on user-defined `onJoin`)
if (client._enqueuedMessages) {
client._enqueuedMessages.forEach((data) => this.send(client, data));
delete client._enqueuedMessages;
}

// send current state when new client joins the room
if (this.state) {
this.sendState(client);
}

// joined successfully, add to local client list
this.clients.push(client);
}

protected _getSerializer?(): Serializer<State> {
Expand Down Expand Up @@ -551,6 +532,21 @@ export abstract class Room<State= any, Metadata= any> extends EventEmitter {
if (message[0] === Protocol.ROOM_DATA) {
this.onMessage(client, message[1]);

} else if (message[0] === Protocol.JOIN_ROOM) {
// join room has been acknowledged by the client
client.state = ClientState.JOINED;

// dequeue messages sent before client has joined effectively (on user-defined `onJoin`)
if (client._enqueuedMessages.length > 0) {
client._enqueuedMessages.forEach((bytes) => send.raw(client, bytes));
}
delete client._enqueuedMessages;

// send current state when new client joins the room
if (this.state) {
this.sendState(client);
}

} else if (message[0] === Protocol.LEAVE_ROOM) {
// stop receiving messages from this client
client.removeAllListeners('message');
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Expand Up @@ -48,5 +48,5 @@ export type Client = WebSocket & {

pingCount: number; // ping / pong
state: ClientState;
_enqueuedMessages: any;
_enqueuedMessages: any[];
};

0 comments on commit 5b69715

Please sign in to comment.