-
-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathsocket-server.ts
117 lines (95 loc) · 3.94 KB
/
socket-server.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import { IComponent, DownstreamStreamsConfig, IObjectLogger } from "@scramjet/types";
import net from "net";
import { isDefined, TypedEmitter } from "@scramjet/utility";
import { ObjLogger } from "@scramjet/obj-logger";
import { CommunicationChannel } from "@scramjet/symbols";
type MaybeSocket = net.Socket | null
type RunnerConnectionsInProgress = [
MaybeSocket, MaybeSocket, MaybeSocket, MaybeSocket, MaybeSocket, MaybeSocket, MaybeSocket, MaybeSocket, MaybeSocket
]
type RunnerOpenConnections = [
net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket, net.Socket
]
type Events = {
connect: (id: string, streams: DownstreamStreamsConfig) => void
}
/**
* Server for incoming connections from Runners
*/
export class SocketServer extends TypedEmitter<Events> implements IComponent {
// TODO: probably to change to net server, to verify
server?: net.Server;
logger: IObjectLogger;
private runnerConnectionsInProgress = new Map<string, RunnerConnectionsInProgress>();
constructor(private port: number, private hostname: string) {
super();
this.logger = new ObjLogger(this);
}
async start(): Promise<void> {
this.server = net.createServer();
this.server
.on("connection", async (connection) => {
connection.setNoDelay(true);
connection.on("error", (err) => {
this.logger.error("Error on connection from runner", err);
});
const id = await new Promise<string>((resolve) => {
const immediateData = connection.read(36);
if (!immediateData) {
connection.once("readable", () => {
resolve(connection.read(36).toString());
});
} else {
resolve(immediateData);
}
});
const channel = await new Promise<number>((resolve) => {
connection.once("readable", () => {
resolve(parseInt(connection.read(1).toString(), 10));
});
});
connection
.on("error", (err) => this.logger.error("Error on Instance in stream", id, channel, err))
.on("end", () => this.logger.debug(
`Channel [${id}:${channel}] ended. tx/rx: ${connection.bytesWritten}/${connection.bytesRead}`
));
try {
await this.handleConnection(id, channel, connection);
} catch (err: any) {
connection.destroy();
}
});
return new Promise((res, rej) => {
this.server!
.listen(this.port, this.hostname, () => {
this.logger.info("SocketServer on", this.server?.address());
res();
})
.on("error", rej);
});
}
async handleConnection(id: string, channel: number, connection: net.Socket) {
let runner = this.runnerConnectionsInProgress.get(id);
if (!runner) {
runner = [null, null, null, null, null, null, null, null, null];
this.runnerConnectionsInProgress.set(id, runner);
}
if (runner[channel] === null) {
runner[channel] = connection;
} else {
throw new Error(`Runner(${id}) wanted to connect on already initialized channel ${channel}`);
}
if (runner.every(isDefined)) {
this.runnerConnectionsInProgress.delete(id);
runner[CommunicationChannel.OUT]?.setDefaultEncoding("binary");
this.emit("connect", id, runner as RunnerOpenConnections);
}
}
close() {
this.server?.close((err: any) => {
if (err) {
this.logger.error(err);
}
});
}
}