Skip to content

Commit

Permalink
Address review
Browse files Browse the repository at this point in the history
  • Loading branch information
vdfdev committed Aug 10, 2021
1 parent 9df06ba commit 86778d5
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 114 deletions.
18 changes: 3 additions & 15 deletions src/server/transport/pubsub/generic-pub-sub.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,11 @@
import type { Observable } from 'rxjs';

/** Generic interface for pub-subs. */
export interface GenericPubSub<T> {
// Publish an event for a match.
publish(channelId: PubSubChannelId, payload: T);
publish(channelId: string, payload: T);

// Subscribes to events related to a single match.
subscribe(channelId: PubSubChannelId): Observable<T>;
subscribe(channelId: string, callback: (payload: T) => void): void;

// Cleans up subscription for a given channel.
unsubscribe(channelId: PubSubChannelId);
}

/** All possible namespaces for IDs that pubsubs might be used. */
export enum PubSubChannelIdNamespace {
MATCH = 'MATCH',
}

export interface PubSubChannelId {
namespace: PubSubChannelIdNamespace;
value: string;
unsubscribeAll(channelId: string);
}
19 changes: 7 additions & 12 deletions src/server/transport/pubsub/in-memory-pub-sub.test.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
import { InMemoryPubSub } from './in-memory-pub-sub';
import type { PubSubChannelId } from './generic-pub-sub';
import { PubSubChannelIdNamespace } from './generic-pub-sub';

const CHANNEL_FOO: PubSubChannelId = {
namespace: PubSubChannelIdNamespace.MATCH,
value: 'foo',
};
const CHANNEL_FOO = 'foo';

describe('in-memory pubsub', () => {
it('should receive message from subscription', () => {
const pubSub = new InMemoryPubSub<string>();
const callback = jest.fn();
pubSub.subscribe(CHANNEL_FOO).subscribe(callback);
pubSub.subscribe(CHANNEL_FOO, callback);
const payload = 'hello world';
pubSub.publish(CHANNEL_FOO, payload);
expect(callback).toHaveBeenCalledWith(payload);
Expand All @@ -20,8 +15,8 @@ describe('in-memory pubsub', () => {
it('should unsubscribe', () => {
const pubSub = new InMemoryPubSub<string>();
const callback = jest.fn();
pubSub.subscribe(CHANNEL_FOO).subscribe(callback);
pubSub.unsubscribe(CHANNEL_FOO);
pubSub.subscribe(CHANNEL_FOO, callback);
pubSub.unsubscribeAll(CHANNEL_FOO);
const payload = 'hello world';
pubSub.publish(CHANNEL_FOO, payload);
expect(callback).not.toHaveBeenCalled();
Expand All @@ -30,9 +25,9 @@ describe('in-memory pubsub', () => {
it('should ignore extra unsubscribe', () => {
const pubSub = new InMemoryPubSub<string>();
const callback = jest.fn();
pubSub.subscribe(CHANNEL_FOO).subscribe(callback);
pubSub.unsubscribe(CHANNEL_FOO);
pubSub.unsubscribe(CHANNEL_FOO); // do nothing
pubSub.subscribe(CHANNEL_FOO, callback);
pubSub.unsubscribeAll(CHANNEL_FOO);
pubSub.unsubscribeAll(CHANNEL_FOO); // do nothing
const payload = 'hello world';
pubSub.publish(CHANNEL_FOO, payload);
expect(callback).not.toHaveBeenCalled();
Expand Down
39 changes: 17 additions & 22 deletions src/server/transport/pubsub/in-memory-pub-sub.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,28 @@
import { Subject } from 'rxjs';
import type { Observable } from 'rxjs';
import type { GenericPubSub, PubSubChannelId } from './generic-pub-sub';
import { globalChannelId } from './util';
import type { GenericPubSub } from './generic-pub-sub';

export class InMemoryPubSub<T> implements GenericPubSub<T> {
subjects: Map<string, Subject<T>> = new Map();
callbacks: Map<string, ((payload: T) => void)[]> = new Map();

publish(channelId: PubSubChannelId, payload: T) {
this.initializeSubject(channelId);
const subject = this.subjects.get(globalChannelId(channelId));
subject.next(payload);
}

subscribe(channelId: PubSubChannelId): Observable<T> {
this.initializeSubject(channelId);
const subject = this.subjects.get(globalChannelId(channelId));
return subject;
publish(channelId: string, payload: T) {
if (!this.callbacks.has(channelId)) {
return;
}
const allCallbacks = this.callbacks.get(channelId);
for (const callback of allCallbacks) {
callback(payload);
}
}

unsubscribe(channelId: PubSubChannelId) {
if (this.subjects.has(globalChannelId(channelId))) {
this.subjects.get(globalChannelId(channelId)).unsubscribe();
this.subjects.delete(globalChannelId(channelId));
subscribe(channelId: string, callback: (payload: T) => void): void {
if (!this.callbacks.has(channelId)) {
this.callbacks.set(channelId, []);
}
this.callbacks.get(channelId).push(callback);
}

private initializeSubject(channelId: PubSubChannelId) {
if (!this.subjects.has(globalChannelId(channelId))) {
this.subjects.set(globalChannelId(channelId), new Subject<T>());
unsubscribeAll(channelId: string) {
if (this.callbacks.has(channelId)) {
this.callbacks.delete(channelId);
}
}
}
39 changes: 11 additions & 28 deletions src/server/transport/pubsub/redis-pub-sub.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
import type redis from 'redis';
import { RedisPubSub } from './redis-pub-sub';
import { PubSubChannelIdNamespace } from './generic-pub-sub';
import type { PubSubChannelId } from './generic-pub-sub';

const CHANNEL_FOO: PubSubChannelId = {
namespace: PubSubChannelIdNamespace.MATCH,
value: 'foo',
};
const CHANNEL_FOO_GLOBAL_ID = 'MATCH-foo';
const CHANNEL_FOO = 'MATCH-foo';

describe('redis pub-sub', () => {
let pubClient: redis.RedisClient;
Expand All @@ -30,42 +24,31 @@ describe('redis pub-sub', () => {
const payload = 'hello world';
pubSub.publish(CHANNEL_FOO, payload);
expect(pubClient.publish).toHaveBeenCalledWith(
CHANNEL_FOO_GLOBAL_ID,
CHANNEL_FOO,
JSON.stringify(payload)
);
});

it('should unsubscribe to a channel in redis', () => {
pubSub.unsubscribe(CHANNEL_FOO);
expect(subClient.unsubscribe).toHaveBeenCalledWith(CHANNEL_FOO_GLOBAL_ID);
});

it('should subscribe to a channel in redis', () => {
pubSub.subscribe(CHANNEL_FOO);
expect(subClient.subscribe).toHaveBeenCalledWith(CHANNEL_FOO_GLOBAL_ID);
});

it('should not subscribe twice to the same channel in redis', () => {
pubSub.subscribe(CHANNEL_FOO);
pubSub.subscribe(CHANNEL_FOO);
expect(subClient.subscribe).toHaveBeenCalledTimes(1);
pubSub.unsubscribeAll(CHANNEL_FOO);
expect(subClient.unsubscribe).toHaveBeenCalledWith(CHANNEL_FOO);
});

it('should receive a message after subscription', () => {
const rxjsCallback = jest.fn();
const callback = jest.fn();
const payload = 'hello world';
pubSub.subscribe(CHANNEL_FOO).subscribe(rxjsCallback);
pubSub.subscribe(CHANNEL_FOO, callback);
const redisCallback = subClient.on.mock.calls[0][1];
redisCallback(CHANNEL_FOO_GLOBAL_ID, JSON.stringify(payload));
expect(rxjsCallback).toHaveBeenCalledWith(payload);
redisCallback(CHANNEL_FOO, JSON.stringify(payload));
expect(callback).toHaveBeenCalledWith(payload);
});

it('should ignore message from unrelated channel', () => {
const rxjsCallback = jest.fn();
const callback = jest.fn();
const payload = 'hello world';
pubSub.subscribe(CHANNEL_FOO).subscribe(rxjsCallback);
pubSub.subscribe(CHANNEL_FOO, callback);
const redisCallback = subClient.on.mock.calls[0][1];
redisCallback('notTheRightId', JSON.stringify(payload));
expect(rxjsCallback).not.toHaveBeenCalled();
expect(callback).not.toHaveBeenCalled();
});
});
47 changes: 24 additions & 23 deletions src/server/transport/pubsub/redis-pub-sub.ts
Original file line number Diff line number Diff line change
@@ -1,41 +1,42 @@
import type redis from 'redis';
import { Observable } from 'rxjs';
import type { GenericPubSub, PubSubChannelId } from './generic-pub-sub';
import { globalChannelId } from './util';
import type { GenericPubSub } from './generic-pub-sub';

export class RedisPubSub<T> implements GenericPubSub<T> {
private pubClient: redis.RedisClient;
private subClient: redis.RedisClient;
private subscriptions: Map<string, Observable<T>> = new Map();
callbacks: Map<string, ((payload: T) => void)[]> = new Map();

constructor(pubClient: redis.redisclient, subClient: redis.redisclient) {
this.pubClient = pubClient;
this.subClient = subClient;
this.subClient.on('message', (redisChannelId, message) => {
if (!this.callbacks.has(redisChannelId)) {
return;
}
const allCallbacks = this.callbacks.get(redisChannelId);
const parsedPayload = JSON.parse(message) as T;
for (const callback of allCallbacks) {
callback(parsedPayload);
}
});
}

publish(channelId: PubSubChannelId, payload: T) {
this.pubClient.publish(globalChannelId(channelId), JSON.stringify(payload));
publish(channelId: string, payload: T) {
this.pubClient.publish(channelId, JSON.stringify(payload));
}

subscribe(channelId: PubSubChannelId): Observable<T> {
if (this.subscriptions.has(globalChannelId(channelId))) {
return this.subscriptions.get(globalChannelId(channelId));
subscribe(channelId: string, callback: (payload: T) => void) {
if (!this.callbacks.has(channelId)) {
this.callbacks.set(channelId, []);
}
const observable = new Observable<T>((subscribe) => {
this.subClient.on('message', (redisChannelId, message) => {
if (redisChannelId !== globalChannelId(channelId)) {
return;
}
subscribe.next(JSON.parse(message) as T);
});
});
this.subClient.subscribe(globalChannelId(channelId));
this.subscriptions.set(globalChannelId(channelId), observable);
return observable;
this.callbacks.get(channelId).push(callback);
this.subClient.subscribe(channelId);
}

unsubscribe(channelId: PubSubChannelId) {
this.subscriptions.delete(globalChannelId(channelId));
this.subClient.unsubscribe(globalChannelId(channelId));
unsubscribeAll(channelId: string) {
this.subClient.unsubscribe(channelId);
if (this.callbacks.has(channelId)) {
this.callbacks.delete(channelId);
}
}
}
5 changes: 0 additions & 5 deletions src/server/transport/pubsub/util.ts

This file was deleted.

17 changes: 8 additions & 9 deletions src/server/transport/socketio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ import type {
import { getFilterPlayerView } from '../../master/filter-player-view';
import type { Game, Server } from '../../types';
import type { GenericPubSub } from './pubsub/generic-pub-sub';
import { PubSubChannelIdNamespace } from './pubsub/generic-pub-sub';
import type { IntermediateTransportData } from '../../master/master';
import { InMemoryPubSub } from './pubsub/in-memory-pub-sub';
import type { PubSubChannelId } from './pubsub/generic-pub-sub';

const PING_TIMEOUT = 20 * 1e3;
const PING_INTERVAL = 10 * 1e3;
Expand All @@ -30,8 +28,8 @@ const emit = (socket: IOTypes.Socket, { type, args }: TransportData) => {
socket.emit(type, ...args);
};

function getPubSubChannelId(matchID: string): PubSubChannelId {
return { namespace: PubSubChannelIdNamespace.MATCH, value: matchID };
function getPubSubChannelId(matchID: string): string {
return `MATCH-${matchID}`;
}

/**
Expand Down Expand Up @@ -133,19 +131,20 @@ export class SocketIO {
}

private subscribePubSubChannel(matchID: string, game: Game) {
this.pubSub
.subscribe(getPubSubChannelId(matchID))
.subscribe((payload: IntermediateTransportData) => {
this.pubSub.subscribe(
getPubSubChannelId(matchID),
(payload: IntermediateTransportData) => {
this.roomInfo.get(matchID).forEach((clientID) => {
const client = this.clientInfo.get(clientID);
const data = getFilterPlayerView(game)(client.playerID, payload);
emit(client.socket, data);
});
});
}
);
}

private unsubscribePubSubChannel(matchID: string) {
this.pubSub.unsubscribe(getPubSubChannelId(matchID));
this.pubSub.unsubscribeAll(getPubSubChannelId(matchID));
}

init(
Expand Down

0 comments on commit 86778d5

Please sign in to comment.