-
Notifications
You must be signed in to change notification settings - Fork 36
/
run-websocket-server.ts
102 lines (91 loc) · 4.27 KB
/
run-websocket-server.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import * as express from "express";
import * as WebSocket from "ws";
import * as Knex from "knex";
import Augur from "augur.js";
import { clearInterval, setInterval } from "timers";
import { augurEmitter } from "../events";
import { JsonRpcRequest, WebSocketConfigs } from "../types";
import { addressFormatReviver } from "./address-format-reviver";
import { isJsonRpcRequest } from "./is-json-rpc-request";
import { dispatchJsonRpcRequest } from "./dispatch-json-rpc-request";
import { makeJsonRpcResponse } from "./make-json-rpc-response";
import { makeJsonRpcError, JsonRpcErrorCode } from "./make-json-rpc-error";
import { Subscriptions } from "./subscriptions";
import * as fs from "fs";
import * as https from "https";
import * as http from "http";
export function runWebsocketServer(db: Knex, app: express.Application, augur: Augur, webSocketConfigs: WebSocketConfigs): Array<WebSocket.Server> {
const servers: Array<WebSocket.Server> = [];
if ( webSocketConfigs.wss != null ) {
console.log("Starting websocket secure server on port", webSocketConfigs.wss.port);
const httpsOptions: https.ServerOptions = {
cert: fs.readFileSync(webSocketConfigs.wss.certificateFile),
key: fs.readFileSync(webSocketConfigs.wss.certificateKeyFile),
};
const server = https.createServer(httpsOptions, app);
server.listen(webSocketConfigs.wss.port);
servers.push( new WebSocket.Server({ server }) );
}
if ( webSocketConfigs.ws != null ) {
console.log("Starting websocket server on port", webSocketConfigs.ws.port);
const server = http.createServer(app);
server.listen(webSocketConfigs.ws.port);
servers.push( new WebSocket.Server({ server }) );
}
servers.forEach((server) => {
server.on("connection", (websocket: WebSocket): void => {
const subscriptions = new Subscriptions(augurEmitter);
const pingInterval = setInterval(() => websocket.ping(), webSocketConfigs.pingMs || 12000);
websocket.on("message", (data: WebSocket.Data): void => {
let message: any;
try {
message = JSON.parse(data as string, addressFormatReviver);
if (!isJsonRpcRequest(message)) return console.error("bad json rpc message received:", message);
} catch (exc) {
return websocket.send(makeJsonRpcError("-1", JsonRpcErrorCode.ParseError, "Bad JSON RPC Message Received", { originalText: data as string }));
}
try {
if (message.method === "subscribe") {
const eventName: string = message.params.shift();
try {
const subscription: string = subscriptions.subscribe(eventName, message.params, (data: {}): void => {
websocket.send(makeJsonRpcResponse(null, { subscription, result: data }));
});
websocket.send(makeJsonRpcResponse(message.id, { subscription }));
} catch (exc) {
websocket.send(makeJsonRpcError(message.id, JsonRpcErrorCode.MethodNotFound, exc.toString(), false));
}
} else if (message.method === "unsubscribe") {
const subscription: string = message.params.shift();
subscriptions.unsubscribe(subscription);
websocket.send(makeJsonRpcResponse(message.id, true));
} else {
dispatchJsonRpcRequest(db, message as JsonRpcRequest, augur, (err: Error|null, result?: any): void => {
if (websocket.readyState !== WebSocket.OPEN ) {
console.warn("Client disconnected during request, ignoring response");
return;
}
if (err) {
console.error("getter error: ", err);
websocket.send(makeJsonRpcError(message.id, JsonRpcErrorCode.InvalidParams, err.message, false));
} else {
websocket.send(makeJsonRpcResponse(message.id, result || null));
}
});
}
} catch (exc) {
websocket.send(makeJsonRpcError(message.id, JsonRpcErrorCode.ServerError, exc.toString(), exc));
}
});
websocket.on("close", () => {
clearInterval(pingInterval);
subscriptions.removeAllListeners();
});
});
server.on("error", (err: Error): void => {
console.log("websocket error:", err);
// TODO reconnect
});
});
return servers;
}