Skip to content

Commit

Permalink
Merge 412d230 into 241701f
Browse files Browse the repository at this point in the history
  • Loading branch information
vdfdev committed Aug 10, 2021
2 parents 241701f + 412d230 commit 3e915e3
Show file tree
Hide file tree
Showing 10 changed files with 293 additions and 88 deletions.
34 changes: 34 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
"p-queue": "^6.6.2",
"prop-types": "^15.5.10",
"react-cookies": "^0.1.0",
"redis": "^3.1.2",
"redux": "^4.1.0",
"rfc6902": "^4.0.2",
"socket.io": "^4.1.3",
Expand Down
4 changes: 3 additions & 1 deletion packages/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@ import { Server } from '../src/server';
import { Origins } from '../src/server/cors';
import { FlatFile } from '../src/server/db';
import { SocketIO } from '../src/server/transport/socketio';
import { RedisPubSub } from '../src/server/transport/pubsub/redis-pub-sub';
import { InMemoryPubSub } from '../src/server/transport/pubsub/in-memory-pub-sub';

export { Server, Origins, FlatFile, SocketIO };
export { Server, Origins, FlatFile, SocketIO, RedisPubSub, InMemoryPubSub };
11 changes: 11 additions & 0 deletions src/server/transport/pubsub/generic-pub-sub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/** Generic interface for pub-subs. */
export interface GenericPubSub<T> {
// Publish an event for a match.
publish(channelId: string, payload: T);

// Subscribes to events related to a single match.
subscribe(channelId: string, callback: (payload: T) => void): void;

// Cleans up subscription for a given channel.
unsubscribeAll(channelId: string);
}
47 changes: 47 additions & 0 deletions src/server/transport/pubsub/in-memory-pub-sub.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { InMemoryPubSub } from './in-memory-pub-sub';

const CHANNEL_FOO = 'foo';

describe('in-memory pubsub', () => {
it('should receive message from subscription', () => {
const pubSub = new InMemoryPubSub<string>();
const callback = jest.fn();
pubSub.subscribe(CHANNEL_FOO, callback);
const payload = 'hello world';
pubSub.publish(CHANNEL_FOO, payload);
expect(callback).toHaveBeenCalledWith(payload);
});

it('should receive message from two subscriptions', () => {
const pubSub = new InMemoryPubSub<string>();
const callback1 = jest.fn();
const callback2 = jest.fn();
pubSub.subscribe(CHANNEL_FOO, callback1);
pubSub.subscribe(CHANNEL_FOO, callback2);
const payload = 'hello world';
pubSub.publish(CHANNEL_FOO, payload);
expect(callback1).toHaveBeenCalledWith(payload);
expect(callback2).toHaveBeenCalledWith(payload);
});

it('should unsubscribe', () => {
const pubSub = new InMemoryPubSub<string>();
const callback = jest.fn();
pubSub.subscribe(CHANNEL_FOO, callback);
pubSub.unsubscribeAll(CHANNEL_FOO);
const payload = 'hello world';
pubSub.publish(CHANNEL_FOO, payload);
expect(callback).not.toHaveBeenCalled();
});

it('should ignore extra unsubscribe', () => {
const pubSub = new InMemoryPubSub<string>();
const callback = jest.fn();
pubSub.subscribe(CHANNEL_FOO, callback);
pubSub.unsubscribeAll(CHANNEL_FOO);
pubSub.unsubscribeAll(CHANNEL_FOO); // do nothing
const payload = 'hello world';
pubSub.publish(CHANNEL_FOO, payload);
expect(callback).not.toHaveBeenCalled();
});
});
28 changes: 28 additions & 0 deletions src/server/transport/pubsub/in-memory-pub-sub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import type { GenericPubSub } from './generic-pub-sub';

export class InMemoryPubSub<T> implements GenericPubSub<T> {
callbacks: Map<string, ((payload: T) => void)[]> = new Map();

publish(channelId: string, payload: T) {
if (!this.callbacks.has(channelId)) {
return;
}
const allCallbacks = this.callbacks.get(channelId);
for (const callback of allCallbacks) {
callback(payload);
}
}

subscribe(channelId: string, callback: (payload: T) => void): void {
if (!this.callbacks.has(channelId)) {
this.callbacks.set(channelId, []);
}
this.callbacks.get(channelId).push(callback);
}

unsubscribeAll(channelId: string) {
if (this.callbacks.has(channelId)) {
this.callbacks.delete(channelId);
}
}
}
67 changes: 67 additions & 0 deletions src/server/transport/pubsub/redis-pub-sub.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import type redis from 'redis';
import { RedisPubSub } from './redis-pub-sub';

const CHANNEL_FOO = 'MATCH-foo';

describe('redis pub-sub', () => {
let pubClient: redis.RedisClient;
let subClient: redis.RedisClient;
let pubSub: RedisPubSub<string>;

beforeEach(() => {
subClient = {
subscribe: jest.fn(),
unsubscribe: jest.fn(),
on: jest.fn(),
};
pubClient = {
publish: jest.fn(),
};
pubSub = new RedisPubSub(pubClient, subClient);
});

it('should publish a payload to redis', () => {
const payload = 'hello world';
pubSub.publish(CHANNEL_FOO, payload);
expect(pubClient.publish).toHaveBeenCalledWith(
CHANNEL_FOO,
JSON.stringify(payload)
);
});

it('should unsubscribe to a channel in redis', () => {
pubSub.unsubscribeAll(CHANNEL_FOO);
expect(subClient.unsubscribe).toHaveBeenCalledWith(CHANNEL_FOO);
});

it('should receive a message after subscription', () => {
const callback1 = jest.fn();
const callback2 = jest.fn();
const payload = 'hello world';
pubSub.subscribe(CHANNEL_FOO, callback1);
pubSub.subscribe(CHANNEL_FOO, callback2);
const redisCallback = subClient.on.mock.calls[0][1];
redisCallback(CHANNEL_FOO, JSON.stringify(payload));
expect(callback1).toHaveBeenCalledWith(payload);
expect(callback2).toHaveBeenCalledWith(payload);
});

it('should ignore message from unrelated channel', () => {
const callback = jest.fn();
const payload = 'hello world';
pubSub.subscribe(CHANNEL_FOO, callback);
const redisCallback = subClient.on.mock.calls[0][1];
redisCallback('notTheRightId', JSON.stringify(payload));
expect(callback).not.toHaveBeenCalled();
});

it('should ignore message after unsubscription', () => {
const callback = jest.fn();
const payload = 'hello world';
pubSub.subscribe(CHANNEL_FOO, callback);
pubSub.unsubscribeAll(CHANNEL_FOO);
const redisCallback = subClient.on.mock.calls[0][1];
redisCallback(CHANNEL_FOO, JSON.stringify(payload));
expect(callback).not.toHaveBeenCalled();
});
});
42 changes: 42 additions & 0 deletions src/server/transport/pubsub/redis-pub-sub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import type redis from 'redis';
import type { GenericPubSub } from './generic-pub-sub';

export class RedisPubSub<T> implements GenericPubSub<T> {
private pubClient: redis.RedisClient;
private subClient: redis.RedisClient;
callbacks: Map<string, ((payload: T) => void)[]> = new Map();

constructor(pubClient: redis.redisclient, subClient: redis.redisclient) {
this.pubClient = pubClient;
this.subClient = subClient;
this.subClient.on('message', (redisChannelId, message) => {
if (!this.callbacks.has(redisChannelId)) {
return;
}
const allCallbacks = this.callbacks.get(redisChannelId);
const parsedPayload = JSON.parse(message) as T;
for (const callback of allCallbacks) {
callback(parsedPayload);
}
});
}

publish(channelId: string, payload: T) {
this.pubClient.publish(channelId, JSON.stringify(payload));
}

subscribe(channelId: string, callback: (payload: T) => void) {
if (!this.callbacks.has(channelId)) {
this.callbacks.set(channelId, []);
}
this.callbacks.get(channelId).push(callback);
this.subClient.subscribe(channelId);
}

unsubscribeAll(channelId: string) {
this.subClient.unsubscribe(channelId);
if (this.callbacks.has(channelId)) {
this.callbacks.delete(channelId);
}
}
}
25 changes: 12 additions & 13 deletions src/server/transport/socketio.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ class SocketIOTestAdapter extends SocketIO {
this.clientInfo = clientInfo;
this.roomInfo = roomInfo;
}

public getPubSub() {
return this.pubSub;
}
}

jest.mock('../../master/master', () => {
Expand Down Expand Up @@ -156,6 +160,7 @@ describe('socketAdapter', () => {
describe('TransportAPI', () => {
let io;
let api;
const matchID = 'matchID';

beforeAll(() => {
const auth = new Auth({ authenticateCredentials: () => true });
Expand All @@ -166,23 +171,23 @@ describe('TransportAPI', () => {
const transport = new SocketIOTestAdapter({ clientInfo, roomInfo });
transport.init(app, games);
io = app.context.io;
const socket = io.socket;
const filterPlayerView = getFilterPlayerView(games[0]);
api = TransportAPI(
'matchID',
io.socket,
clientInfo,
roomInfo,
filterPlayerView
matchID,
socket,
filterPlayerView,
transport.getPubSub()
);
});

beforeEach(async () => {
io.socket.emit = jest.fn();
io.socket.id = '0';
const args0: SyncArgs = ['matchID', '0', undefined, 2];
const args0: SyncArgs = [matchID, '0', undefined];
await io.socket.receive('sync', ...args0);
io.socket.id = '1';
const args1: SyncArgs = ['matchID', '1', undefined, 2];
const args1: SyncArgs = [matchID, '1', undefined];
await io.socket.receive('sync', ...args1);
});

Expand All @@ -192,12 +197,6 @@ describe('TransportAPI', () => {
expect(io.socket.emit).toHaveBeenCalledWith('A');
});

test('send to another player', () => {
io.socket.id = '0';
api.send({ type: 'A', playerID: '1', args: [] });
expect(io.socket.emit).toHaveBeenCalledWith('A');
});

test('sendAll - function', () => {
api.sendAll({ type: 'A', args: [] });
expect(io.socket.emit).toHaveBeenCalledTimes(2);
Expand Down
Loading

0 comments on commit 3e915e3

Please sign in to comment.