/
transport-local.ts
107 lines (95 loc) · 3.63 KB
/
transport-local.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import { FnsBag } from './types-bag.ts';
import { IConnection, ITransport, Thunk, TransportStatus } from './types.ts';
import { Envelope } from './types-envelope.ts';
import { Watchable, WatchableSet } from './watchable.ts';
import { Connection } from './connection.ts';
import { logTransport as log } from './log.ts';
export interface ITransportLocalOpts<BagType extends FnsBag> {
deviceId: string; // id of this device
methods: BagType;
//streams: { [method: string]: Fn },
description: string;
}
/**
* A Transport that connects directly to other Transports in memory, on the same machine.
*
* This is mostly useful for testing.
*/
export class TransportLocal<BagType extends FnsBag> implements ITransport<BagType> {
status: Watchable<TransportStatus> = new Watchable('OPEN' as TransportStatus);
deviceId: string;
methods: BagType;
connections: WatchableSet<IConnection<BagType>> = new WatchableSet();
description: string;
constructor(opts: ITransportLocalOpts<BagType>) {
log(`TransportLocal constructor: ${opts.deviceId} "${opts.description}"`);
this.deviceId = opts.deviceId;
this.methods = opts.methods;
this.description = `transport ${opts.description}`;
}
get isClosed() {
return this.status.value === 'CLOSED';
}
onClose(cb: Thunk): Thunk {
return this.status.onChangeTo('CLOSED', cb);
}
close(): void {
if (this.isClosed) return;
log(`${this.deviceId} | closing...`);
this.status.set('CLOSED');
log(`${this.deviceId} | ...closing connections...`);
for (const conn of this.connections) {
conn.close();
}
this.connections.clear();
log(`${this.deviceId} | ...closed`);
}
addConnection(otherTrans: TransportLocal<BagType>) {
if (this.isClosed) throw new Error('Can\'t use a transport after it\'s closed');
// deno-lint-ignore prefer-const
let thisConn: Connection<BagType>;
// deno-lint-ignore prefer-const
let otherConn: Connection<BagType>;
thisConn = new Connection({
description: `conn ${this.deviceId} to ${otherTrans.deviceId}`,
transport: this,
deviceId: this.deviceId,
methods: this.methods,
sendEnvelope: async (conn: IConnection<BagType>, env: Envelope<BagType>) => {
await otherConn.handleIncomingEnvelope(env);
},
});
otherConn = new Connection({
description: `conn ${otherTrans.deviceId} to ${this.deviceId}`,
transport: otherTrans,
deviceId: otherTrans.deviceId,
methods: otherTrans.methods,
sendEnvelope: async (conn: IConnection<BagType>, env: Envelope<BagType>) => {
await thisConn.handleIncomingEnvelope(env);
},
});
// close one side of the connection, the other side closes
thisConn.onClose(() => {
otherConn.close();
this.connections.delete(thisConn);
});
otherConn.onClose(() => thisConn.close());
this.connections.add(thisConn);
otherTrans.connections.add(otherConn);
return { thisConn, otherConn };
}
}
export function makeLocalTransportPair<BagType extends FnsBag>(methods: BagType) {
const transA = new TransportLocal({
deviceId: 'device:A',
methods,
description: 'A',
});
const transB = new TransportLocal({
deviceId: 'device:B',
methods,
description: 'B',
});
const { thisConn, otherConn } = transA.addConnection(transB);
return { transA, transB, thisConn, otherConn };
}