Skip to content

Commit

Permalink
feat: API V2 WebSocket
Browse files Browse the repository at this point in the history
  • Loading branch information
michael1011 committed Feb 5, 2024
1 parent b321af0 commit 7620de2
Show file tree
Hide file tree
Showing 3 changed files with 496 additions and 1 deletion.
6 changes: 5 additions & 1 deletion lib/api/Api.ts
Expand Up @@ -7,9 +7,11 @@ import Service from '../service/Service';
import Controller from './Controller';
import { errorResponse } from './Utils';
import ApiV2 from './v2/ApiV2';
import WebSocketHandler from './v2/WebSocketHandler';

class Api {
private app: Application;
private readonly websocket: WebSocketHandler;
private readonly controller: Controller;

constructor(
Expand Down Expand Up @@ -46,6 +48,7 @@ class Api {
);

this.controller = new Controller(logger, service, countryCodes);
this.websocket = new WebSocketHandler(service, this.controller);

new ApiV2(
this.logger,
Expand All @@ -60,10 +63,11 @@ class Api {
await this.controller.init();

await new Promise<void>((resolve) => {
this.app.listen(this.config.port, this.config.host, () => {
const server = this.app.listen(this.config.port, this.config.host, () => {
this.logger.info(
`API server listening on: ${this.config.host}:${this.config.port}`,
);
this.websocket.register(server);
resolve();
});
});
Expand Down
202 changes: 202 additions & 0 deletions lib/api/v2/WebSocketHandler.ts
@@ -0,0 +1,202 @@
import http from 'http';
import ws from 'ws';
import { formatError } from '../../Utils';
import Service from '../../service/Service';
import Controller from '../Controller';
import { apiPrefix } from './Consts';

enum Operation {
// TODO: unsubscribe
Subscribe = 'subscribe',
Update = 'update',
}

enum SubscriptionChannel {
SwapUpdate = 'swap.update',
}

type WsRequest = {
op: Operation;
};

type WsSubscribeRequest = WsRequest & {
channel: SubscriptionChannel;
args: string[];
};

type WsResponse = {
event: Operation;
};

type WsErrorResponse = {
error: string;
};

class WebSocketHandler {
private static readonly pingIntervalMs = 15_000;

private readonly ws: ws.Server;
private pingInterval?: NodeJS.Timer;

private readonly swapToSockets = new Map<string, ws[]>();
private readonly socketToSwaps = new Map<ws, string[]>();

constructor(
private readonly service: Service,
private readonly controller: Controller,
) {
this.ws = new ws.Server({
noServer: true,
});
this.listenConnections();
this.listenSwapUpdates();
}

public register = (server: http.Server) => {
server.on('upgrade', (request, socket, head) => {
if (request.url !== `${apiPrefix}/ws`) {
request.destroy();
socket.destroy();
return;
}

this.ws.handleUpgrade(request, socket, head, (ws) => {
this.ws.emit('connection', ws, request);
});
});

this.pingInterval = setInterval(() => {
this.ws.clients.forEach((ws) => ws.ping());
}, WebSocketHandler.pingIntervalMs);
};

public close = () => {
this.ws.close();
clearInterval(this.pingInterval);
};

private listenConnections = () => {
this.ws.on('connection', (socket) => {
socket.on('message', (msg) => this.handleMessage(socket, msg));
socket.on('close', () => {
const ids = this.socketToSwaps.get(socket);
if (ids === undefined) {
return;
}

this.socketToSwaps.delete(socket);

for (const id of ids) {
const sockets = this.swapToSockets
.get(id)
?.filter((s) => s !== socket);
if (sockets === undefined || sockets.length === 0) {
this.swapToSockets.delete(id);
continue;
}

this.swapToSockets.set(id, sockets);
}
});
});
};

private handleMessage = (socket: ws, message: ws.RawData) => {
try {
const data = JSON.parse(message.toString('utf-8')) as WsRequest;

switch (data.op) {
case Operation.Subscribe:
this.handleSubscribe(socket, data);
break;

default:
this.sendToSocket(socket, { error: 'unknown operation' });
break;
}
} catch (e) {
this.sendToSocket(socket, {
error: `could not parse message: ${formatError(e)}`,
});
}
};

private handleSubscribe = (socket: ws, data: WsRequest) => {
const subscribeData = data as WsSubscribeRequest;
switch (subscribeData.channel) {
case SubscriptionChannel.SwapUpdate: {
const existingIds = this.socketToSwaps.get(socket) || [];
this.socketToSwaps.set(
socket,
existingIds.concat(
subscribeData.args.filter((id) => !existingIds.includes(id)),
),
);

for (const id of subscribeData.args) {
const existingSockets = this.swapToSockets.get(id) || [];
if (existingSockets.includes(socket)) {
continue;
}

this.swapToSockets.set(id, existingSockets.concat(socket));
}

break;
}

default:
this.sendToSocket(socket, { error: 'unknown channel' });
return;
}

this.sendToSocket(socket, {
event: Operation.Subscribe,
channel: subscribeData.channel,
args: subscribeData.args,
});

if (subscribeData.channel === SubscriptionChannel.SwapUpdate) {
const args = subscribeData.args
.map((id) => [id, this.controller.pendingSwapInfos.get(id)])
.filter(([, data]) => data !== undefined);

this.sendToSocket(socket, {
event: Operation.Update,
channel: SubscriptionChannel.SwapUpdate,
args: args,
});
}
};

private listenSwapUpdates = () => {
this.service.eventHandler.on('swap.update', ({ id, status }) => {
const sockets = this.swapToSockets.get(id);
if (sockets === undefined) {
return;
}

for (const socket of sockets) {
this.sendToSocket(socket, {
event: Operation.Update,
channel: SubscriptionChannel.SwapUpdate,
args: [[id, status]],
});
}
});
};

private sendToSocket = <T extends WsResponse>(
socket: ws,
msg: T | WsErrorResponse,
) => {
if (socket.readyState !== socket.OPEN) {
return;
}

socket.send(JSON.stringify(msg));
};
}

export default WebSocketHandler;
export { Operation, SubscriptionChannel };

0 comments on commit 7620de2

Please sign in to comment.