Skip to content

Commit

Permalink
fix wormhole
Browse files Browse the repository at this point in the history
  • Loading branch information
dcharbonnier committed Nov 13, 2019
1 parent 5afa00e commit f69f881
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 36 deletions.
26 changes: 21 additions & 5 deletions src/router/DummyRouterConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export class DummyRouterConnector implements IRouterConnector {
public onStatus?: (key: string, status: number) => boolean;
public onBroadcast?: (data: any) => void;
public onClose?: (key: string, code: number, reason: string) => boolean;
public onRequestReadyState?: (key: string) => number;
public subscriptions: Dict<string, RoutedWebSocket[]> = new Dict();

constructor() {
Expand All @@ -25,11 +26,23 @@ export class DummyRouterConnector implements IRouterConnector {
}
}

public async requestReadyState(key: string): Promise<number> {
return new Promise((resolve) => {
for (const connector of DummyRouterConnector.instances) {
if (connector !== this && connector.onRequestReadyState && connector.onRequestReadyState(key)) {
setTimeout(() => resolve(connector.onRequestReadyState(key)), 0);
return;
}
}
});

}

public emitMessage(key: string, event: MessageEvent): void {
DummyRouterConnector.instances.forEach((connector) => {
if (connector !== this) {
(connector.subscriptions.get(key) || [])
.forEach((ws: RoutedWebSocket) => ws.emitMessage(event));
.forEach((ws: RoutedWebSocket) => setTimeout(() => ws.emitMessage(event)));
}
});
}
Expand Down Expand Up @@ -57,16 +70,19 @@ export class DummyRouterConnector implements IRouterConnector {
public broadcast(data: any): void {
DummyRouterConnector.instances.forEach((connector) => {
if (connector !== this && connector.onBroadcast) {
connector.onBroadcast(data);
setTimeout(() => connector.onBroadcast(data));
}
});
}

public readyState(key: string, status: number): void {
DummyRouterConnector.instances.forEach((connector) => {
if (connector !== this && connector.onStatus) {
connector.onStatus(key, status);
}
setTimeout(() => {
if (connector !== this && connector.onStatus) {

connector.onStatus(key, status);
}
});
});
}

Expand Down
3 changes: 3 additions & 0 deletions src/router/IRouterConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export interface IRouterConnector {
onBroadcast?: (data: any) => void;
onStatus?: (key: string, status: number) => void;
onClose?: (key: string, code: number, reason: string) => boolean;
onRequestReadyState?: (key: string) => number;

send(key: string, data: any): void;

Expand All @@ -14,6 +15,8 @@ export interface IRouterConnector {

close(key: string, code: number, reason: string): void;

requestReadyState(key: string): Promise<number>;

subscribe(key: string, ws: RoutedWebSocket);

unsubscribe(key: string, ws: RoutedWebSocket);
Expand Down
38 changes: 24 additions & 14 deletions src/router/Router.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import WebSocket from "../polyfill/WebSocket";

import { expect } from "chai";
import { IRouterConnector } from "./IRouterConnector";
import { Router } from "./Router";
import {expect} from "chai";
import {IRouterConnector} from "./IRouterConnector";
import {Router} from "./Router";
import {DummyRouterConnector} from "./DummyRouterConnector";
import MessageEvent from "../polyfill/MessageEvent";

Expand All @@ -13,8 +13,7 @@ class MockWebsocketClass {
public received: any[] = [];
public closed: { code: number, reason: string } = void 0;
public onmessage: any;

public listeners: { open: any[], close: any[], message: any[] } = { open: [], close: [], message: [] };
public listeners: { open: any[], close: any[], message: any[] } = {open: [], close: [], message: []};

constructor(public readyState: number = WebSocket.OPEN) {
setTimeout(() => {
Expand All @@ -30,16 +29,17 @@ class MockWebsocketClass {
this.received.push(data);
const event = new MessageEvent("message", {
data
})
if(this.onmessage) {
});
if (this.onmessage) {
this.onmessage(data);
}
this.listeners.message.forEach((callback) => callback(event));

}


public close(code?: number, reason?: string) {
this.closed = { code, reason };
this.closed = {code, reason};
this.readyState = WebSocket.CLOSED;
setTimeout(() => this.listeners.close.forEach(((clb) => clb())));

Expand Down Expand Up @@ -107,9 +107,10 @@ describe("Router", () => {
router.get("id").onmessage = (e) => {
router.destroy();
done();
}
};
ws1.send("test should get a virtual Websocket receiving message");
});

it("should get a virtual Websocket receiving messages addListener", (done) => {
const router = new Router();
const ws1 = new MockWebsocket();
Expand Down Expand Up @@ -177,7 +178,7 @@ describe("Router", () => {
router2 = new Router();
router2.connector = connector2;
ws1 = new MockWebsocket(WebSocket.CONNECTING);
ws2 = new MockWebsocket(WebSocket.CONNECTING);
ws2 = new MockWebsocket(WebSocket.OPEN);
router1.set("id1", ws1);
router2.set("id2", ws2);
});
Expand All @@ -187,9 +188,11 @@ describe("Router", () => {
it("should broadcast", (done) => {
setTimeout(() => {
router1.broadcast("test");
expect(ws1.received.length).to.equal(1);
expect(ws2.received.length).to.equal(1);
done();
setTimeout(() => {
expect(ws1.received.length).to.equal(1);
expect(ws2.received.length).to.equal(1);
done();
});
});
});
it("should broadcast to open only", (done) => {
Expand All @@ -209,6 +212,13 @@ describe("Router", () => {
done();
});
});
it("should get the remote websocket state", (done) => {
const ws = router1.get("id2");
setTimeout(() => {
expect(ws.readyState).to.equal(WebSocket.OPEN);
done();
}, 10);
});
it("should emit messages on a remote routed websocket", (done) => {
setTimeout(() => {
router1.get("id2").onmessage = () => done();
Expand Down Expand Up @@ -236,7 +246,7 @@ describe("Router", () => {
it("should close a remote websocket", (done) => {
setTimeout(() => {
router1.get("id2").close(999, "test");
expect(ws2.closed).to.deep.equal({ code: 999, reason: "test" });
expect(ws2.closed).to.deep.equal({code: 999, reason: "test"});
done();
});
});
Expand Down
52 changes: 45 additions & 7 deletions src/router/Router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export class Router {
this._connector.onMessage = this.onMessage.bind(this);
this._connector.onClose = this.onClose.bind(this);
this._connector.onStatus = this.onStatus.bind(this);
this._connector.onRequestReadyState = this.onRequestReadyState.bind(this);
}
}

Expand All @@ -49,7 +50,21 @@ export class Router {
}
}

public onRequestReadyState(id: string, data: any): number {
const ws = this.localWebSockets.get(id);
if (ws && ws.readyState === WebSocket.OPEN) {
return ws.readyState;
}
}

public onStatus(id: string, status: number): void {
if (status === WebSocket.OPEN && this.localWebSockets.has(id) && this.localWebSockets.get(id) .readyState !== status) {
if (this.virtualWebSockets.has(id)) {
this.virtualWebSockets.get(id).setReadyState(WebSocket.CLOSED);
}
this.localWebSockets.delete(id);

}
const ws = this.virtualWebSockets.get(id);
if (ws) {
ws.setReadyState(status);
Expand Down Expand Up @@ -80,10 +95,23 @@ export class Router {
}

public set(id: string, ws: WebSocket) {
if (this.localWebSockets.get(id) === ws) {
return;
}
this.close(id, 1000, "Duplicate websocket");
ws.addEventListener("open", () => this.emitState(id, ws));
ws.addEventListener("close", () => this.emitState(id, ws));
ws.addEventListener("message", (event: MessageEvent) => this.emitMessage(id, event));
const openListener = () => this.emitState(id, ws);
const messageListener = (event: MessageEvent) => this.emitMessage(id, event);
const closeListener = (event) => {
if (this.localWebSockets.get(id) === ws) {
this.emitState(id, ws);
}
};
ws.addEventListener("open", openListener);
ws.addEventListener("message", messageListener);
ws.addEventListener("close", closeListener);
if (this.virtualWebSockets.has(id) && this.virtualWebSockets.get(id).readyState === ws.readyState) {
this.virtualWebSockets.get(id).setReadyState(WebSocket.CLOSED);
}
this.localWebSockets.set(id, ws);
this.emitState(id, ws);
}
Expand All @@ -98,12 +126,21 @@ export class Router {
const routedWs = new RoutedWebSocket(
(data: string | ArrayBufferLike | Blob | ArrayBufferView) => this.send(id, data),
(code: number, reason: string) => this.close(id, code, reason),
(vWs) => this.onMessageSubscribe(id, vWs),
(vWs) => this.onMessageUnsubscribe(id, vWs),
) ;
(vWs) => this.onMessageSubscribe(id, vWs),
(vWs) => this.onMessageUnsubscribe(id, vWs),
);
this.virtualWebSockets.set(id, routedWs);
if (this.localWebSockets.has(id)) {
routedWs.setReadyState(this.localWebSockets.get(id).readyState);
} else {
if (this._connector) {
this._connector.requestReadyState(id)
.then((readyState) => {
if (readyState && !routedWs.readyState) {
routedWs.setReadyState(readyState);
}
});
}
}
}
const ws = this.virtualWebSockets.get(id);
Expand All @@ -115,6 +152,7 @@ export class Router {
this._connector.subscribe(id, ws);
}
}

public onMessageUnsubscribe(id: string, ws: RoutedWebSocket) {
if (this._connector) {
this._connector.unsubscribe(id, ws);
Expand Down Expand Up @@ -144,7 +182,7 @@ export class Router {
}

private emitMessage(id, event: MessageEvent) {
if (this.virtualWebSockets.has(id)) {
if (this.localWebSockets.has(id) && this.virtualWebSockets.has(id)) {
this.virtualWebSockets.get(id).emitMessage(event);
}
if (this._connector) {
Expand Down
17 changes: 8 additions & 9 deletions src/wormhole/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,12 @@ export class Server {
const dataPipe = new Pipe(ws, "WOHD");

cable.register("identity", ({uuid}: { uuid: string }): Promise<void> => {
try {
this.clients.set(uuid, {data: dataPipe, cable: cablePipe});
this.router.set(uuid, ws);
} catch (e) {
// ignore
}
this.router.set(uuid, ws);
return new Promise((resolve) => setTimeout(resolve, 0));
});

cable.register("close", ({channel}: { channel: string }): Promise<void> => {
const pipes = this.channels.get(channel);

if (pipes && pipes.source) {
pipes.source.close();
}
Expand Down Expand Up @@ -89,7 +83,11 @@ export class Server {
const targetCable = new Cable(targetCablePipe);
pipes.target = new Pipe(targetDataPipe, channel, 32);
pipes.target.onmessage = (event) => {
pipes.source.send(event.data);
try {
pipes.source.send(event.data);
} catch (e) {
// ignore
}
};
pipes.source.onclose = (event) => {
pipes.target.close();
Expand All @@ -99,7 +97,6 @@ export class Server {
// ignore
}
this.channels.delete(channel);

};
targetCable.request("open", {channel})
.then(() => {
Expand All @@ -121,6 +118,8 @@ export class Server {
targetWs.addEventListener("open", () => {
connect();
});
targetWs.addEventListener("close", () => {
});
} catch (e) {
// ignore
}
Expand Down
Loading

0 comments on commit f69f881

Please sign in to comment.