From 5d0cea1468a715052861b5055ac32961b55b9891 Mon Sep 17 00:00:00 2001 From: Manuel Tumiati <36959970+iltumio@users.noreply.github.com> Date: Mon, 29 Nov 2021 15:54:53 +0100 Subject: [PATCH] feat(webrtc): add low level abstractions for handling p2p connections and audio/video calls (#324) * wip: WebRTC implementation * chore: better structure * chore: documented all methods * fix: prettier config * fix: replaced p2pt dependency with the internal one * Update encoders.ts * fix: typos in comments and comma dangle Co-authored-by: Matt Wisniewski Co-authored-by: Matt Wisniewski --- .prettierrc | 2 +- libraries/Textile/IdentityManager.ts | 4 +- libraries/Textile/encoders.ts | 9 +- libraries/WebRTC/Call.ts | 284 +++++++++++++++++++++++++++ libraries/WebRTC/Emitter.ts | 76 ++++--- libraries/WebRTC/WebRTC.ts | 53 +++-- libraries/WebRTC/Wire.ts | 242 +++++++++++++++++++++++ libraries/WebRTC/encoders.ts | 45 +++++ libraries/WebRTC/types.ts | 119 +++++++++-- p2pt.d.ts | 44 +++++ package.json | 3 + tsconfig.json | 20 +- types/webrtc/User.d.ts | 6 +- 13 files changed, 812 insertions(+), 95 deletions(-) create mode 100644 libraries/WebRTC/Call.ts create mode 100644 libraries/WebRTC/Wire.ts create mode 100644 libraries/WebRTC/encoders.ts create mode 100644 p2pt.d.ts diff --git a/.prettierrc b/.prettierrc index ce6574f3ef..a6c50e64e8 100644 --- a/.prettierrc +++ b/.prettierrc @@ -2,5 +2,5 @@ "semi": false, "singleQuote": true, "endOfLine": "auto", - "trailingComma": "none" + "trailingComma": "all" } diff --git a/libraries/Textile/IdentityManager.ts b/libraries/Textile/IdentityManager.ts index 88684dffea..ebcb3c6532 100644 --- a/libraries/Textile/IdentityManager.ts +++ b/libraries/Textile/IdentityManager.ts @@ -132,7 +132,7 @@ export default class IdentityManager { this.client = await Client.withKeyInfo({ key: Config.textile.key }) this.users = await Users.withKeyInfo({ - key: Config.textile.key, + key: Config.textile.key }) await this.users.getToken(identity) @@ -142,7 +142,7 @@ export default class IdentityManager { return { client: this.client, token, - users: this.users, + users: this.users } } diff --git a/libraries/Textile/encoders.ts b/libraries/Textile/encoders.ts index 99d14b7372..afa6d7b436 100644 --- a/libraries/Textile/encoders.ts +++ b/libraries/Textile/encoders.ts @@ -1,6 +1,4 @@ import * as t from 'io-ts' -// import * as D from 'io-ts/Decoder' -// import { MessageFromThread } from '~/types/textile/mailbox' const isBase64 = (s: string) => /^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?$/gm.test(s) @@ -12,7 +10,7 @@ interface Base64Brand { const base64 = t.brand( t.string, (s: string): s is t.Branded => isBase64(s), - 'Base64' + 'Base64', ) export type Base64 = t.TypeOf @@ -52,11 +50,6 @@ export const messageFromThread = t.intersection([ t.partial({ read_at: t.number }), ]) -// export const messageFromThread: D.Decoder = { -// decode: (u) => -// typeof u === 'string' ? D.success(u) : D.failure(u, 'string'), -// } - const baseMessage = t.intersection([ t.type({ id: t.string, diff --git a/libraries/WebRTC/Call.ts b/libraries/WebRTC/Call.ts new file mode 100644 index 0000000000..af1d8231f8 --- /dev/null +++ b/libraries/WebRTC/Call.ts @@ -0,0 +1,284 @@ +import Peer, { SignalData } from 'simple-peer' +import Emitter from './Emitter' +import { CallEventListeners } from './types' +import { Wire } from './Wire' + +/** + * @class Call + * @description The Call class manages a p2p connection for audio/video calls + * It makes use of a Wire as a communication bus for signaling. + */ +export class Call extends Emitter { + communicationBus: Wire + + peer?: Peer.Instance // The Simple Peer instance for the active call + signalingBuffer?: Peer.SignalData // A variable to store the signaling data before the answer + + stream?: MediaStream // MediaStream for the active call + + /** + * @constructor + * @param communicationBus The Wire instance used for signaling (it happens through p2p connection) + */ + constructor(communicationBus: Wire) { + super() + + this.communicationBus = communicationBus + + this._bindBusListeners() + } + + /** + * @method start + * @description It's used for initiate a call + * @param stream MediaStream object containing the audio/video tracks + * @example + * const call = new Call(wireInstance) + * call.start(mediaStream) + */ + start(stream: MediaStream) { + // A new Simple Peer instance is created with the initiator flag set to true + this.peer = new Peer({ initiator: true, trickle: false, stream }) + this._bindPeerListeners() + + // Store the stream of the active call to destroy it after hang up + this.stream = stream + } + + /** + * @method answer + * @description It's used for answering a call. A call request must be active before + * to call this method + * @param stream MediaStream object containing the audio/video stream + * @example + * const call = new Call(wireInstance) + * call.answer(mediaStream) + */ + answer(stream: MediaStream) { + if (!this.signalingBuffer) { + throw new Error('No call to answer') + } + + // A new Simple Peer instance is created with the initiator flag set to false + this.peer = new Peer({ initiator: false, trickle: false, stream }) + this._bindPeerListeners() + + // Store the stream of the active call to destroy it after hang up + this.stream = stream + + // Signal to the peer with previously received Signaling Data + this.peer.signal(this.signalingBuffer) + } + + /** + * @method hangUp + * @description It's used to close the call + * @example + * const call = new Call(wireInstance) + * call.hangUp() + */ + hangUp() { + console.log('hangup') + + this.peer?.destroy() + this.stream?.getTracks().forEach((track) => track.stop()) + + delete this.peer + delete this.stream + } + + /** + * @method addStream + * @description Adds a stream to the call + * @param stream MediaStream to add + * @example + * const call = new Call(wireInstance) + * call.addStream(mediaStream) + */ + addStream(stream: MediaStream) { + this.peer?.addStream(stream) + } + + /** + * @method removeStream + * @description Removes a stream from the call + * @param stream MediaStream to remove + * @example + * const call = new Call(wireInstance) + * call.removeStream(mediaStream) + */ + removeStream(stream: MediaStream) { + this.peer?.removeStream(stream) + } + + /** + * @method addTrack + * @description Adds a track to the call + * @param track MediaStreamTrack to add + * @param stream Related stream + * @example + * const call = new Call(wireInstance) + * call.addTrack(newTrack, mediaStream) + */ + addTrack(track: MediaStreamTrack, stream: MediaStream) { + this.peer?.addTrack(track, stream) + } + + /** + * @method removeTrack + * @description Removes a track from the call + * @param track MediaStreamTrack to remove + * @param stream Related stream + * @example + * const call = new Call(wireInstance) + * call.removeTrack(trackToRemove, mediaStream) + */ + removeTrack(track: MediaStreamTrack, stream: MediaStream) { + this.peer?.removeTrack(track, stream) + } + + /** + * @method replaceTrack + * @description Replaces a track with a new one + * @param oldTrack old MediaStreamTrack to remove + * @param newTrack new MediaStreamTrack to add + * @param stream Related stream + * @example + * const call = new Call(wireInstance) + * call.replaceTrack(trackToRemove, trackToAdd, mediaStream) + */ + replaceTrack( + oldTrack: MediaStreamTrack, + newTrack: MediaStreamTrack, + stream: MediaStream + ) { + this.peer?.replaceTrack(oldTrack, newTrack, stream) + } + + /** + * @method _bindPeerListeners + * @description Internal function to bind listeners to the communiciationBus events + * @example + * this._bindBusListeners() + */ + protected _bindBusListeners() { + this.communicationBus?.on('SIGNAL', this._onBusSignal.bind(this)) + // this.communicationBus?.on('REFUSE', this.) + } + + /** + * @method _bindPeerListeners + * @description Internal function to bind listeners to the main peer events + * @example + * this._bindPeerListeners() + */ + protected _bindPeerListeners() { + this.peer?.on('signal', this._onSignal.bind(this)) + this.peer?.on('connect', this._onConnect.bind(this)) + this.peer?.on('error', this._onError.bind(this)) + this.peer?.on('track', this._onTrack.bind(this)) + this.peer?.on('stream', this._onStream.bind(this)) + this.peer?.on('close', this._onClose.bind(this)) + } + + /** + * @method _onSignal + * @description Callback for the Simple Peer signal event + * @param data Simple Peer signaling data + */ + protected _onSignal(data: Peer.SignalData) { + this._sendSignal(data) + } + + /** + * @method _sendSignal + * @description Sends the signaling data through the communication bus + * @param data Signaling data to send + */ + protected _sendSignal(data: Peer.SignalData) { + if (!this.communicationBus) { + throw new Error('Communication bus not found') + } + + this.communicationBus.send({ + type: 'SIGNAL', + payload: { + peerId: this.communicationBus.identifier, + data, + }, + sentAt: Date.now(), + }) + } + + /** + * @method _onConnect + * @description Callback for the Simple Peer signal event + */ + protected _onConnect() { + this.emit('CONNECTED', { peerId: this.communicationBus.identifier }) + } + + /** + * @method _onError + * @description Callback for the Simple Peer error event + */ + protected _onError(error: Error) { + this.emit('ERROR', { peerId: this.communicationBus.identifier, error }) + } + + /** + * @method _onTrack + * @description Callback for the Simple Peer track event + * @param track MediaStreamTrack object for the audio/video tracks + * @param stream MediaStream object for the audio/video stream + */ + protected _onTrack(track: MediaStreamTrack, stream: MediaStream) { + this.emit('TRACK', { + peerId: this.communicationBus.identifier, + track, + stream, + }) + } + + /** + * @method _onStream + * @description Callback for the Simple Peer stream event + * @param stream MediaStream object for the audio/video stream + */ + protected _onStream(stream: MediaStream) { + this.emit('STREAM', { peerId: this.communicationBus.identifier, stream }) + } + + /** + * @method _onClose + * @description Callback for the Simple Peer close event + */ + protected _onClose() { + this.hangUp() + this.emit('HANG_UP', { peerId: this.communicationBus.identifier }) + } + + /** + * @method _onBusSignal + * @description Callback for the Wire on signal event + * @param message Wire Message containing the signal data + */ + protected _onBusSignal(message: { peerId: string; data: SignalData }) { + this.signalingBuffer = message.data + + if (message.data.type === 'offer') { + this.emit('INCOMING_CALL', { peerId: this.communicationBus.identifier }) + } else { + this.peer?.signal(this.signalingBuffer) + } + } + + /** + * @method _onBusRefuse + * @description Callback for the Wire on refuse event. Used for the hang up + * before the call started + */ + protected _onBusRefuse() { + this._onClose() + } +} diff --git a/libraries/WebRTC/Emitter.ts b/libraries/WebRTC/Emitter.ts index 214acfc4ed..379cd905e0 100644 --- a/libraries/WebRTC/Emitter.ts +++ b/libraries/WebRTC/Emitter.ts @@ -1,66 +1,64 @@ -import { - WebRTCEventBox, - WebRTCEvents, - WebRTCEvent, - WebRTCData, -} from '~/libraries/WebRTC/types' - -export default class Emitter { - private _events: typeof WebRTCEventBox = WebRTCEventBox +/** + * @class Emitter + * @description The emitter class provides an interface for + * listening and emitting custom events in a strongly typed way + */ +export default class Emitter< + Listeners extends { [key in keyof Listeners]: (...args: any[]) => any }, +> { + private _events: { [key in keyof Listeners]?: Array } = {} /** * @method on - * @description Bind event listeners to WebRTCEvents - * @param event WebRTCEvent to listen to - * @param listener to call on any WebRTC Event - * @example Emitter.on(WebRTCEvents.INIT, () => {}) + * @description Bind event listeners + * @param event Name of the event to listen to + * @param listener to call on any Event + * @example Emitter.on('EVENT_NAME', (parameter: ParameterType) => {}) */ - on(event: WebRTCEvent, listener: Function) { - this._events[WebRTCEvents[event]].push(listener) + on(event: T, listener: Listeners[T]) { + if (!this._events[event]) { + this._events[event] = [] + } + + this._events[event]!.push(listener) } /** * @method off * @description Removes an event listener from the listener box - * @param event WebRTCEvent to unsubscribe from + * @param event Name of the event to unsubscribe from * @param listener Listener function to remove - * @example Emitter.off(WebRTCEvents.INIT, () => {}) + * @example Emitter.off("EVENT_NAME", (parameter: ParameterType) => {}) */ - off(event: WebRTCEvent, listenerToRemove: Function) { - if (!this._events[WebRTCEvents[event]]) { + off(event: T, listenerToRemove: Listeners[T]) { + if (!this._events[event]) { throw new Error( - `Can't remove a listener. Event "${event}" doesn't exits.` + `Can't remove a listener. Event "${event}" doesn't exits.`, ) } - const filterListeners = (listener: Function) => - listener !== listenerToRemove + const filteredListeners = this._events[event]!.filter( + (listener: Listeners[T]) => listener !== listenerToRemove, + ) - this._events[WebRTCEvents[event]] = - this._events[WebRTCEvents[event]].filter(filterListeners) + this._events[event] = filteredListeners } /** * @method emit * @description Emits an event to all listeners - * @param event WebRTCEvent to emit + * @param event Name of the event to emit * @param data Data to provide to listeners - * @example Emitter.emit(WebRTCEvents.INIT, 'something') + * @example Emitter.emit('EVENT_NAME', {eventParam, otherParam}) */ - protected emit(event: WebRTCEvent, data: any) { - if (!this._events[WebRTCEvents[event]]) { - throw new Error(`Can't emit an event. Event "${event}" doesn't exits.`) - } - - const fireCallbacks = (callback: Function) => { - // eslint-disable-next-line node/no-callback-literal - callback({ - at: Date.now(), - event: WebRTCEvents[event], - data, - } as Object as WebRTCData) + protected emit( + event: T, + ...[data]: Parameters + ) { + if (!this._events[event]) { + return } - this._events[WebRTCEvents[event]].forEach(fireCallbacks) + this._events[event]!.forEach((cb) => cb(data)) } } diff --git a/libraries/WebRTC/WebRTC.ts b/libraries/WebRTC/WebRTC.ts index f810724fb7..8dfa464b5a 100644 --- a/libraries/WebRTC/WebRTC.ts +++ b/libraries/WebRTC/WebRTC.ts @@ -1,17 +1,20 @@ -import { WebRTCUser } from '~/types/webrtc/User' - import { Config } from '~/config' import Emitter from '~/libraries/WebRTC/Emitter' -import { WebRTCEvents } from '~/libraries/WebRTC/types' - -export default class WebRTC extends Emitter { +import { Wire } from '~/libraries/WebRTC/Wire' +import { + WebRTCEventListeners, + WebRTCEvents, + WireEvents, +} from '~/libraries/WebRTC/types' + +export default class WebRTC extends Emitter { // Identifier to connect to signaling server with id: string | undefined // If this is undefined, the WebRTC services cannot run initalized: boolean | undefined // List of peers we're actively or have been connected to - peers: Map | undefined + peers: Map | undefined // --- Internal --- // @@ -36,7 +39,7 @@ export default class WebRTC extends Emitter { this.initalized = true this._runQueue() - this.emit(WebRTCEvents.INIT, '') + this.emit(WebRTCEvents.INIT) } /** @@ -87,11 +90,35 @@ export default class WebRTC extends Emitter { * @returns * @example */ - protected _connect(peerId: string): void { + protected _connect(peerId: string, channel: string): void { console.log('connecting to', peerId) + const wire = new Wire( + 'originator', + peerId, + channel, + this._announceURLs, + false + ) + + this._bindWireListeners(wire) + return undefined } + protected _bindWireListeners(wire: Wire) { + wire.on('CONNECT', ({ peerId }) => { + this.emit(WebRTCEvents.PEER_CONNECT, { peerId }) + }) + + wire.on('DATA', ({ peerId, data }) => { + console.log(peerId, data) + }) + + wire.on('ERROR', ({ peerId, error }) => { + console.log(peerId, error) + }) + } + // --- Public Methods --- // // Methods who are exposed for interaction @@ -110,12 +137,12 @@ export default class WebRTC extends Emitter { /** * @method getPeer - * @description Get a WebRTCUser from the list of connected peers + * @description Get a Wire from the list of connected peers * @param peerId identifier of peer we're seeking * @returns * @example */ - getPeer(peerId: string): WebRTCUser | undefined { + getPeer(peerId: string): Wire | undefined { return this.peers?.get(peerId) } @@ -125,11 +152,11 @@ export default class WebRTC extends Emitter { * @param peerId identifier of peer we're connecting to * @example */ - connect(peerId: string) { + connect(peerId: string, channel: string) { if (!this.initalized) { - this._queue(() => this._connect(peerId)) + this._queue(() => this._connect(peerId, channel)) } else { - this._connect(peerId) + this._connect(peerId, channel) } } } diff --git a/libraries/WebRTC/Wire.ts b/libraries/WebRTC/Wire.ts new file mode 100644 index 0000000000..106dbc1aed --- /dev/null +++ b/libraries/WebRTC/Wire.ts @@ -0,0 +1,242 @@ +import { isRight } from 'fp-ts/lib/Either' +import P2PT from 'p2pt' +import Peer, { SignalData } from 'simple-peer' +import Emitter from '~/libraries/WebRTC/Emitter' +import { + WireEventListeners, + WireIdentificationMessage, + WireMessage, +} from '~/libraries/WebRTC/types' +import { + wireDataMessage, + wireIdentificationMessage, + wireRefuseConnectionMessage, + wireSignalMessage, +} from './Encoders' + +/** + * @description A wire is a connection between peers on a specific channel. + * In the 1 to 1 connection we will use a deterministic generated secret between + * 2 parties as channel id. + */ +export class Wire extends Emitter { + originator: string + identifier: string + channel: string + sendIdentification: boolean + + instance: P2PT // P2PT Instance used for connecting peers through Web Torrent signaling servers + peer?: Peer.Instance // Simple Peer object instantiated right after the connection occurred through p2pt library + + /** + * @constructor + * @param originator Identifier of the originator + * @param identifier Identifier of the recipient + * @param channel Secret communication channel you want to connect (shared secret between parties) + * @param announceURLs Array of Web Torrent trackers for the signaling + * @param sendIdentification Setting for enabling/disabling the identification mechanism + * @example + * const wire = new Wire('originator', 'identifier', 'secret_channel', ['announce_url_1', 'announce_url_1'], false); + */ + constructor( + originator: string, + identifier: string, + channel: string, + announceURLs: string[] = [], + sendIdentification: boolean = false, + ) { + super() + + this.originator = originator + this.identifier = identifier + this.channel = channel + this.sendIdentification = sendIdentification + + // Create a new P2PT instance using the channel as identifier + // This way any other peer with the same identifier will be announced + // by webtorrent trackers. We are going to use an ECDH shared secret between 2 + // parties to connect them + this.instance = new P2PT(announceURLs, channel) + + this._bindListeners() + } + + /** + * @method _bindListeners + * @description Binds listeners for p2pt library + * @example + * this._bindListeners() + */ + protected _bindListeners() { + this.instance.on('trackerconnect', this._onTrackerConnect.bind(this)) + + this.instance.on('peerconnect', this._onPeerConnect.bind(this)) + + this.instance.start() + } + + /** + * @method _onTrackerConnect + * @description Callback for the trackerconnect event provided by p2pt library + * @param tracker Tracker information + */ + protected _onTrackerConnect(tracker: any) { + this.emit('TRACKER_CONNECT', { tracker }) + } + + /** + * @method _onPeerConnect + * @description Callback for the peerconnect event provided by p2pt library + * @param peer Simple Peer instance + */ + protected _onPeerConnect(peer: Peer.Instance) { + peer.on('connect', this._onPeerConnect.bind(this)) + peer.on('error', this._onError.bind(this)) + peer.on('data', this._onData.bind(this)) + peer.on('close', this._onClose.bind(this)) + + this.peer = peer + + this._onConnectionHappened(peer) + } + + /** + * @method _onConnectionHappened + * @description Callback for the connect event provided by Simple Peer + * @param peer Simple Peer instance + */ + protected _onConnectionHappened(peer: Peer.Instance) { + if (this.sendIdentification) { + const identificationMessage: WireIdentificationMessage = { + type: 'IDENTIFICATION', + payload: { peerId: this.originator }, + sentAt: Date.now(), + } + + // Immediately send an identification message to let + // the peer know what is the identifier + // TODO: use a signed message for the future + peer.send(JSON.stringify(identificationMessage)) + } + + this.emit('CONNECT', { peerId: this.identifier }) + } + + /** + * @method _onError + * @description Callback for the error event provided by Simple Peer + * @param error Error received + */ + protected _onError(error: Error) { + this.emit('ERROR', { peerId: this.identifier, error }) + } + + /** + * @method _onData + * @description Callback for the data event provided by Simple Peer + * @param data Data buffer received + */ + protected _onData(data: Buffer) { + const decoder = new TextDecoder() + const decodedString = decoder.decode(data) + const parsedData = JSON.parse(decodedString) + + const identificationMessage = wireIdentificationMessage.decode(parsedData) + + if (isRight(identificationMessage)) { + const { peerId } = identificationMessage.right.payload + + if (peerId !== this.identifier) { + console.log('Not recognized. Drop connection') + } else { + console.log('identified') + } + + this.emit('IDENTIFICATION', { + peerId, + }) + + return + } + + const signalMessage = wireSignalMessage.decode(parsedData) + + if (isRight(signalMessage)) { + const { peerId, data } = signalMessage.right.payload + + this.emit('SIGNAL', { + peerId, + data: data as SignalData, + }) + + return + } + + const dataMessage = wireDataMessage.decode(parsedData) + + if (isRight(dataMessage)) { + const data = dataMessage.right.payload + + this.emit('DATA', { + peerId: this.identifier, + data, + }) + + return + } + + const refuseMessage = wireRefuseConnectionMessage.decode(parsedData) + + if (isRight(refuseMessage)) { + const data = refuseMessage.right.payload + + this.emit('REFUSE', { + peerId: this.identifier, + }) + + return + } + + this.emit('RAW_DATA', { + peerId: this.identifier, + data: parsedData.data.payload, + }) + } + + /** + * @method _onClose + * @description Callback for the close event provided by Simple Peer + */ + protected _onClose() { + this.emit('CLOSE', { peerId: this.identifier }) + } + + /** + * @method destroy + * @description Removes all the listeners and destroys all the peer + * instances + * @example + * const wire = new Wire('originator', 'identifier', 'secret_channel', ['announce_url_1', 'announce_url_1'], false); + * wire.destroy() + */ + destroy() { + this.peer?.removeAllListeners() + this.peer?.destroy() + this.instance.removeAllListeners() + this.instance.destroy() + + this.emit('CLOSE', { peerId: this.identifier }) + } + + /** + * @method send + * @description Sends a message to the connected peer + * @param message Wire Message to send + * @example + * const wire = new Wire('originator', 'identifier', 'secret_channel', ['announce_url_1', 'announce_url_1'], false); + * wire.send({ type: 'SIGNAL', payload: { peerId: 'id', data }, sentAt: Date.now() }) + */ + send(message: WireMessage) { + this.peer?.send(JSON.stringify(message)) + } +} diff --git a/libraries/WebRTC/encoders.ts b/libraries/WebRTC/encoders.ts new file mode 100644 index 0000000000..3e270d1445 --- /dev/null +++ b/libraries/WebRTC/encoders.ts @@ -0,0 +1,45 @@ +import * as t from 'io-ts' + +export const wireBaseMessage = t.type({ + type: t.string, + payload: t.unknown, + sentAt: t.number, +}) + +export const wireIdentificationMessage = t.intersection([ + wireBaseMessage, + t.type({ + type: t.literal('IDENTIFICATION'), + payload: t.type({ + peerId: t.string, + }), + }), +]) + +export const wireDataMessage = t.intersection([ + wireBaseMessage, + t.type({ + type: t.literal('DATA'), + }), +]) + +export const wireSignalMessage = t.intersection([ + wireBaseMessage, + t.type({ + type: t.literal('SIGNAL'), + payload: t.type({ + peerId: t.string, + data: t.unknown, + }), + }), +]) + +export const wireRefuseConnectionMessage = t.intersection([ + wireBaseMessage, + t.type({ + type: t.literal('REFUSE'), + payload: t.type({ + peerId: t.string, + }), + }), +]) diff --git a/libraries/WebRTC/types.ts b/libraries/WebRTC/types.ts index 8b928df49d..47de2eb90b 100644 --- a/libraries/WebRTC/types.ts +++ b/libraries/WebRTC/types.ts @@ -1,19 +1,116 @@ +import { TypeOf } from 'io-ts' +import { + wireDataMessage, + wireIdentificationMessage, + wireRefuseConnectionMessage, + wireSignalMessage, +} from './Encoders' +import { SignalData } from 'simple-peer' + +export interface WireEventListeners { + ERROR: (data: { peerId: string; error: Error }) => void + TRACKER_CONNECT: (data: { tracker: string }) => void + CONNECT: (data: { peerId: string }) => void + DATA: (data: { peerId: string; data: any }) => void + RAW_DATA: (data: { peerId: string; data: any }) => void + CLOSE: (data: { peerId: string }) => void + IDENTIFICATION: (data: { peerId: string }) => void + SIGNAL: (data: { peerId: string; data: SignalData }) => void + REFUSE: (data: { peerId: string }) => void +} + +export type WireEvents = keyof WireEventListeners + +export type WireIdentificationMessage = TypeOf +export type WireDataMessage = TypeOf +export type WireSignalMessage = TypeOf +export type WireRefuseConnectionMessage = TypeOf< + typeof wireRefuseConnectionMessage +> +export interface WireMessages { + IDENTIFICATION: WireIdentificationMessage + DATA: WireDataMessage + SIGNAL: WireSignalMessage + REFUSE: WireRefuseConnectionMessage +} + +export type WireMessageType = keyof WireMessages +export type WireMessage = WireMessages[WireMessageType] + +export interface CallEventListeners { + INCOMING_CALL: (data: { peerId: string }) => void + CONNECTED: (data: { peerId: string }) => void + HANG_UP: (data: { peerId: string }) => void + ERROR: (data: { peerId: string; error: Error }) => void + TRACK: (data: { + peerId: string + track: MediaStreamTrack + stream: MediaStream + }) => void + STREAM: (data: { peerId: string; stream: MediaStream }) => void +} + +export type CallEvents = keyof CallEventListeners + +export enum WebRTCUserEvents { + INCOMING_CALL = 'INCOMING_CALL', + CALL_CONNECTED = 'CALL_CONNECTED', + STREAM_RECEIVED = 'STREAM_RECEIVED', + TRACK_RECEIVED = 'TRACK_RECEIVED', + CALL_ENDED = 'CALL_ENDED', + CALL_ANSWERED = 'CALL_ANSWERED', + CALL_BUSY = 'CALL_BUSY', +} + +export interface WebRTCUserEventListeners { + [WebRTCUserEvents.INCOMING_CALL]: () => void + [WebRTCUserEvents.CALL_CONNECTED]: () => void + [WebRTCUserEvents.STREAM_RECEIVED]: () => void + [WebRTCUserEvents.TRACK_RECEIVED]: () => void + [WebRTCUserEvents.CALL_ENDED]: () => void + [WebRTCUserEvents.CALL_ANSWERED]: () => void + [WebRTCUserEvents.CALL_BUSY]: () => void +} + export enum WebRTCEvents { INIT = 'INIT', KILL = 'KILL', + ERROR = 'ERROR', + CLOSE = 'CLOSE', + TRACKER_CONNECT = 'TRACKER_CONNECT', + PEER_CONNECT = 'PEER_CONNECT', + PEER_DATA = 'PEER_DATA', +} +export interface WebRTCEventListeners { + [WebRTCEvents.INIT]: () => void + [WebRTCEvents.KILL]: () => void + [WebRTCEvents.ERROR]: (data: { error: Error }) => void + [WebRTCEvents.CLOSE]: (data: { peerId: string }) => void + [WebRTCEvents.TRACKER_CONNECT]: (data: { tracker: string }) => void + [WebRTCEvents.PEER_CONNECT]: (data: { peerId: string }) => void + [WebRTCEvents.PEER_DATA]: (data: { peerId: string; data: any }) => void } -export type WebRTCEvent = keyof typeof WebRTCEvents +export type WebRTCEvent = keyof WebRTCEventListeners -// Declare empty function arrays here for use in -// event emitters and pubsub methods. -export const WebRTCEventBox = { - [WebRTCEvents.INIT]: [] as Function[], - [WebRTCEvents.KILL]: [] as Function[], -} +export type WebRTCEventListenerOf = + WebRTCEventListeners[T] -export type WebRTCData = { - at: Date - event: WebRTCEvent - data: any +export type WebRTCEventBox = { + [key in WebRTCEvent]?: Array> } + +export type OptionalPayload< + T extends keyof B, + B extends { [key in keyof B]: (...args: any[]) => any } +> = Parameters extends never + ? { data?: undefined } + : { data: Parameters[0] } + +export type DataOf< + T extends keyof B, + B extends { [key in keyof B]: (...args: any[]) => any } +> = { + at: number + event: T +} & OptionalPayload diff --git a/p2pt.d.ts b/p2pt.d.ts new file mode 100644 index 0000000000..d108cbe192 --- /dev/null +++ b/p2pt.d.ts @@ -0,0 +1,44 @@ +import EventEmitter from 'events' +import Peer from 'simple-peer' + +type TrackerStats = { + connected: number + total: number +} + +type WebTorrentEventType = + | 'trackerconnect' + | 'peerconnect' + | 'connect' + | 'error' + | 'data' + | 'track' + | 'stream' + | 'close' + +export default class P2PT extends EventEmitter { + announceURLs: string[] + trackers: { [key: string]: any } + peers: { [key: string]: Peer.Instance } + msgChunks: { [key: string]: any } + responseWaiting: { [key: string]: any } + + _peerIdBuffer: Buffer + _peerId: string + _peerIdBinary: string + + identifierString?: string + infoHash?: string + _infoHashBuffer?: Buffer + _infoHashBinary?: string + + constructor(announceURLs: string[], identifierString: string) + setIdentifier: (identifierString: string) => void + start: () => void + addTracker: (announceURL: string) => void + removeTracker: (announceURL: string) => void + send: (peer: Peer.Instance, msg: {}, msgID: string) => Promise<[any, any]> + requestMorePeers: () => Promise + getTrackerStats: () => TrackerStats + destroy: () => void +} diff --git a/package.json b/package.json index 99b1316051..8d4b75673c 100644 --- a/package.json +++ b/package.json @@ -61,11 +61,13 @@ "lodash": "^4.17.21", "micro-base58": "^0.5.0", "mousetrap": "^1.6.5", + "net": "^1.0.2", "node-html-markdown": "^1.1.2", "nsfwjs": "^2.4.1", "nuxt-edge": "latest", "nuxt-i18n": "^6.26.0", "nuxt-mq": "^2.0.2", + "p2pt": "Satellite-im/P2PT#fixed-dependencies", "qrcode.vue": "^1.7.0", "remarkable": "^2.0.1", "satellite-lucide-icons": "Satellite-im/satellite-lucide-vue", @@ -124,6 +126,7 @@ "jest": "^26.6.3", "lint-staged": ">=10", "prettier": "^2.2.1", + "simple-peer": "^9.11.0", "ts-jest": "^27.0.7", "vue-jest": "^3.0.4" }, diff --git a/tsconfig.json b/tsconfig.json index e475fcdf10..be666581a6 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -4,11 +4,7 @@ "target": "ES2018", "module": "ESNext", "moduleResolution": "Node", - "lib": [ - "ESNext", - "ESNext.AsyncIterable", - "DOM" - ], + "lib": ["ESNext", "ESNext.AsyncIterable", "DOM"], "esModuleInterop": true, "allowJs": true, "sourceMap": true, @@ -17,12 +13,8 @@ "experimentalDecorators": true, "baseUrl": ".", "paths": { - "~/*": [ - "./*" - ], - "@/*": [ - "./*" - ] + "~/*": ["./*"], + "@/*": ["./*"] }, "types": [ "@nuxt/types", @@ -33,9 +25,5 @@ "jest", ] }, - "exclude": [ - "node_modules", - ".nuxt", - "dist" - ], + "exclude": ["node_modules", ".nuxt", "dist"] } diff --git a/types/webrtc/User.d.ts b/types/webrtc/User.d.ts index f77b8be6fd..8b13789179 100644 --- a/types/webrtc/User.d.ts +++ b/types/webrtc/User.d.ts @@ -1,5 +1 @@ -export type WebRTCUser = { - identifier: String - keepAlive: Number - lastHeartbeat: Number -} +