Skip to content

Commit

Permalink
feat(webrtc): add low level abstractions for handling p2p connections…
Browse files Browse the repository at this point in the history
… and audio/video calls (#324)

* wip: WebRTC implementation

* chore: better structure

* chore: documented all methods

* fix: prettier config

* fix: replaced p2pt dependency with the internal one

* Update encoders.ts

* fix: typos in comments and comma dangle

Co-authored-by: Matt Wisniewski <contact.mattdylan@gmail.com>
Co-authored-by: Matt Wisniewski <retropronghorn@gmail.com>
  • Loading branch information
3 people committed Nov 29, 2021
1 parent 2dbf690 commit 5d0cea1
Show file tree
Hide file tree
Showing 13 changed files with 812 additions and 95 deletions.
2 changes: 1 addition & 1 deletion .prettierrc
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"semi": false,
"singleQuote": true,
"endOfLine": "auto",
"trailingComma": "none"
"trailingComma": "all"
}
4 changes: 2 additions & 2 deletions libraries/Textile/IdentityManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ export default class IdentityManager {
this.client = await Client.withKeyInfo({ key: Config.textile.key })

this.users = await Users.withKeyInfo({
key: Config.textile.key,
key: Config.textile.key
})

await this.users.getToken(identity)
Expand All @@ -142,7 +142,7 @@ export default class IdentityManager {
return {
client: this.client,
token,
users: this.users,
users: this.users
}
}

Expand Down
9 changes: 1 addition & 8 deletions libraries/Textile/encoders.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import * as t from 'io-ts'
// import * as D from 'io-ts/Decoder'
// import { MessageFromThread } from '~/types/textile/mailbox'

const isBase64 = (s: string) =>
/^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?$/gm.test(s)
Expand All @@ -12,7 +10,7 @@ interface Base64Brand {
const base64 = t.brand(
t.string,
(s: string): s is t.Branded<string, Base64Brand> => isBase64(s),
'Base64'
'Base64',
)

export type Base64 = t.TypeOf<typeof base64>
Expand Down Expand Up @@ -52,11 +50,6 @@ export const messageFromThread = t.intersection([
t.partial({ read_at: t.number }),
])

// export const messageFromThread: D.Decoder<unknown, MessageFromThread> = {
// decode: (u) =>
// typeof u === 'string' ? D.success(u) : D.failure(u, 'string'),
// }

const baseMessage = t.intersection([
t.type({
id: t.string,
Expand Down
284 changes: 284 additions & 0 deletions libraries/WebRTC/Call.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
import Peer, { SignalData } from 'simple-peer'
import Emitter from './Emitter'
import { CallEventListeners } from './types'
import { Wire } from './Wire'

/**
* @class Call
* @description The Call class manages a p2p connection for audio/video calls
* It makes use of a Wire as a communication bus for signaling.
*/
export class Call extends Emitter<CallEventListeners> {
communicationBus: Wire

peer?: Peer.Instance // The Simple Peer instance for the active call
signalingBuffer?: Peer.SignalData // A variable to store the signaling data before the answer

stream?: MediaStream // MediaStream for the active call

/**
* @constructor
* @param communicationBus The Wire instance used for signaling (it happens through p2p connection)
*/
constructor(communicationBus: Wire) {
super()

this.communicationBus = communicationBus

this._bindBusListeners()
}

/**
* @method start
* @description It's used for initiate a call
* @param stream MediaStream object containing the audio/video tracks
* @example
* const call = new Call(wireInstance)
* call.start(mediaStream)
*/
start(stream: MediaStream) {
// A new Simple Peer instance is created with the initiator flag set to true
this.peer = new Peer({ initiator: true, trickle: false, stream })
this._bindPeerListeners()

// Store the stream of the active call to destroy it after hang up
this.stream = stream
}

/**
* @method answer
* @description It's used for answering a call. A call request must be active before
* to call this method
* @param stream MediaStream object containing the audio/video stream
* @example
* const call = new Call(wireInstance)
* call.answer(mediaStream)
*/
answer(stream: MediaStream) {
if (!this.signalingBuffer) {
throw new Error('No call to answer')
}

// A new Simple Peer instance is created with the initiator flag set to false
this.peer = new Peer({ initiator: false, trickle: false, stream })
this._bindPeerListeners()

// Store the stream of the active call to destroy it after hang up
this.stream = stream

// Signal to the peer with previously received Signaling Data
this.peer.signal(this.signalingBuffer)
}

/**
* @method hangUp
* @description It's used to close the call
* @example
* const call = new Call(wireInstance)
* call.hangUp()
*/
hangUp() {
console.log('hangup')

this.peer?.destroy()
this.stream?.getTracks().forEach((track) => track.stop())

delete this.peer
delete this.stream
}

/**
* @method addStream
* @description Adds a stream to the call
* @param stream MediaStream to add
* @example
* const call = new Call(wireInstance)
* call.addStream(mediaStream)
*/
addStream(stream: MediaStream) {
this.peer?.addStream(stream)
}

/**
* @method removeStream
* @description Removes a stream from the call
* @param stream MediaStream to remove
* @example
* const call = new Call(wireInstance)
* call.removeStream(mediaStream)
*/
removeStream(stream: MediaStream) {
this.peer?.removeStream(stream)
}

/**
* @method addTrack
* @description Adds a track to the call
* @param track MediaStreamTrack to add
* @param stream Related stream
* @example
* const call = new Call(wireInstance)
* call.addTrack(newTrack, mediaStream)
*/
addTrack(track: MediaStreamTrack, stream: MediaStream) {
this.peer?.addTrack(track, stream)
}

/**
* @method removeTrack
* @description Removes a track from the call
* @param track MediaStreamTrack to remove
* @param stream Related stream
* @example
* const call = new Call(wireInstance)
* call.removeTrack(trackToRemove, mediaStream)
*/
removeTrack(track: MediaStreamTrack, stream: MediaStream) {
this.peer?.removeTrack(track, stream)
}

/**
* @method replaceTrack
* @description Replaces a track with a new one
* @param oldTrack old MediaStreamTrack to remove
* @param newTrack new MediaStreamTrack to add
* @param stream Related stream
* @example
* const call = new Call(wireInstance)
* call.replaceTrack(trackToRemove, trackToAdd, mediaStream)
*/
replaceTrack(
oldTrack: MediaStreamTrack,
newTrack: MediaStreamTrack,
stream: MediaStream
) {
this.peer?.replaceTrack(oldTrack, newTrack, stream)
}

/**
* @method _bindPeerListeners
* @description Internal function to bind listeners to the communiciationBus events
* @example
* this._bindBusListeners()
*/
protected _bindBusListeners() {
this.communicationBus?.on('SIGNAL', this._onBusSignal.bind(this))
// this.communicationBus?.on('REFUSE', this.)
}

/**
* @method _bindPeerListeners
* @description Internal function to bind listeners to the main peer events
* @example
* this._bindPeerListeners()
*/
protected _bindPeerListeners() {
this.peer?.on('signal', this._onSignal.bind(this))
this.peer?.on('connect', this._onConnect.bind(this))
this.peer?.on('error', this._onError.bind(this))
this.peer?.on('track', this._onTrack.bind(this))
this.peer?.on('stream', this._onStream.bind(this))
this.peer?.on('close', this._onClose.bind(this))
}

/**
* @method _onSignal
* @description Callback for the Simple Peer signal event
* @param data Simple Peer signaling data
*/
protected _onSignal(data: Peer.SignalData) {
this._sendSignal(data)
}

/**
* @method _sendSignal
* @description Sends the signaling data through the communication bus
* @param data Signaling data to send
*/
protected _sendSignal(data: Peer.SignalData) {
if (!this.communicationBus) {
throw new Error('Communication bus not found')
}

this.communicationBus.send({
type: 'SIGNAL',
payload: {
peerId: this.communicationBus.identifier,
data,
},
sentAt: Date.now(),
})
}

/**
* @method _onConnect
* @description Callback for the Simple Peer signal event
*/
protected _onConnect() {
this.emit('CONNECTED', { peerId: this.communicationBus.identifier })
}

/**
* @method _onError
* @description Callback for the Simple Peer error event
*/
protected _onError(error: Error) {
this.emit('ERROR', { peerId: this.communicationBus.identifier, error })
}

/**
* @method _onTrack
* @description Callback for the Simple Peer track event
* @param track MediaStreamTrack object for the audio/video tracks
* @param stream MediaStream object for the audio/video stream
*/
protected _onTrack(track: MediaStreamTrack, stream: MediaStream) {
this.emit('TRACK', {
peerId: this.communicationBus.identifier,
track,
stream,
})
}

/**
* @method _onStream
* @description Callback for the Simple Peer stream event
* @param stream MediaStream object for the audio/video stream
*/
protected _onStream(stream: MediaStream) {
this.emit('STREAM', { peerId: this.communicationBus.identifier, stream })
}

/**
* @method _onClose
* @description Callback for the Simple Peer close event
*/
protected _onClose() {
this.hangUp()
this.emit('HANG_UP', { peerId: this.communicationBus.identifier })
}

/**
* @method _onBusSignal
* @description Callback for the Wire on signal event
* @param message Wire Message containing the signal data
*/
protected _onBusSignal(message: { peerId: string; data: SignalData }) {
this.signalingBuffer = message.data

if (message.data.type === 'offer') {
this.emit('INCOMING_CALL', { peerId: this.communicationBus.identifier })
} else {
this.peer?.signal(this.signalingBuffer)
}
}

/**
* @method _onBusRefuse
* @description Callback for the Wire on refuse event. Used for the hang up
* before the call started
*/
protected _onBusRefuse() {
this._onClose()
}
}
Loading

0 comments on commit 5d0cea1

Please sign in to comment.