Skip to content

Commit a00fe31

Browse files
authored
feat: sip streaming api (#88)
This includes some performance improvements in camera live streams. The time to initiate a stream has been reduced and multiple streams should be able to function at the same time. BREAKING CHANGE: `SipSession` api has changed and now exposes `Observable`s for RTP packets on `audioStream` and `videoStream`
1 parent 907fc73 commit a00fe31

File tree

10 files changed

+497
-308
lines changed

10 files changed

+497
-308
lines changed

api/api.ts

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,22 @@ export class RingApi {
6161
cameraStatusPollingSeconds,
6262
cameraDingsPollingSeconds
6363
} = this.options,
64-
camerasRequestUpdate$ = merge(
64+
onCamerasRequestUpdate = merge(
6565
...cameras.map(camera => camera.onRequestUpdate)
66-
).pipe(throttleTime(500)),
66+
),
67+
onCamerasRequestActiveDings = merge(
68+
...cameras.map(camera => camera.onRequestActiveDings)
69+
),
6770
onUpdateReceived = new Subject(),
71+
onActiveDingsReceived = new Subject(),
6872
onPollForStatusUpdate = cameraStatusPollingSeconds
6973
? onUpdateReceived.pipe(debounceTime(cameraStatusPollingSeconds * 1000))
7074
: EMPTY,
75+
onPollForActiveDings = cameraDingsPollingSeconds
76+
? onActiveDingsReceived.pipe(
77+
debounceTime(cameraDingsPollingSeconds * 1000)
78+
)
79+
: EMPTY,
7180
camerasById = cameras.reduce(
7281
(byId, camera) => {
7382
byId[camera.id] = camera
@@ -80,7 +89,7 @@ export class RingApi {
8089
return
8190
}
8291

83-
merge(camerasRequestUpdate$, onPollForStatusUpdate)
92+
merge(onCamerasRequestUpdate, onPollForStatusUpdate)
8493
.pipe(
8594
throttleTime(500),
8695
switchMap(async () => {
@@ -107,32 +116,26 @@ export class RingApi {
107116
onUpdateReceived.next() // kick off polling
108117
}
109118

110-
if (cameraDingsPollingSeconds) {
111-
const onPollForActiveDings = new Subject()
112-
113-
onPollForActiveDings
114-
.pipe(
115-
debounceTime(cameraDingsPollingSeconds * 1000),
116-
switchMap(() => {
117-
return this.fetchActiveDings().catch(() => null)
118-
})
119-
)
120-
.subscribe(activeDings => {
121-
onPollForActiveDings.next()
119+
merge(onCamerasRequestActiveDings, onPollForActiveDings).subscribe(
120+
async () => {
121+
const activeDings = await this.fetchActiveDings().catch(() => null)
122+
onActiveDingsReceived.next()
122123

123-
if (!activeDings || !activeDings.length) {
124-
return
125-
}
124+
if (!activeDings || !activeDings.length) {
125+
return
126+
}
126127

127-
activeDings.forEach(activeDing => {
128-
const camera = camerasById[activeDing.doorbot_id]
129-
if (camera) {
130-
camera.processActiveDing(activeDing)
131-
}
132-
})
128+
activeDings.forEach(activeDing => {
129+
const camera = camerasById[activeDing.doorbot_id]
130+
if (camera) {
131+
camera.processActiveDing(activeDing)
132+
}
133133
})
134+
}
135+
)
134136

135-
onPollForActiveDings.next() // kick off polling
137+
if (cameraDingsPollingSeconds) {
138+
onActiveDingsReceived.next() // kick off polling
136139
}
137140
}
138141

api/ring-camera.ts

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@ import {
1818
share,
1919
take
2020
} from 'rxjs/operators'
21+
import { createSocket } from 'dgram'
22+
import { bindToRandomPort, getPublicIp } from './rtp-utils'
2123
import { delay, logError } from './util'
22-
import { RtpOptions, SipSession } from './sip-session'
24+
import { SipSession, SrtpOptions } from './sip-session'
25+
26+
const getPort = require('get-port')
2327

2428
const snapshotRefreshDelay = 500,
2529
maxSnapshotRefreshSeconds = 30,
@@ -49,6 +53,7 @@ export class RingCamera {
4953

5054
onData = new BehaviorSubject<CameraData>(this.initialData)
5155
onRequestUpdate = new Subject()
56+
onRequestActiveDings = new Subject()
5257

5358
onNewDing = new Subject<ActiveDing>()
5459
onActiveDings = new BehaviorSubject<ActiveDing[]>([])
@@ -160,6 +165,7 @@ export class RingCamera {
160165
)
161166
.toPromise()
162167
await this.startVideoOnDemand()
168+
this.onRequestActiveDings.next()
163169
return vodPromise
164170
}
165171

@@ -271,18 +277,46 @@ export class RingCamera {
271277
.find(x => !this.sipUsedDingIds.includes(x.id_str)),
272278
targetDing = existingDing || (await this.getSipConnectionDetails())
273279

280+
this.sipUsedDingIds.push(targetDing.id_str)
281+
274282
return {
275283
to: targetDing.sip_to,
276284
from: targetDing.sip_from,
277285
dingId: targetDing.id_str
278286
}
279287
}
280288

281-
async createSipSession(rtpOptions: RtpOptions) {
282-
const sipOptions = await this.getSipOptions()
283-
284-
this.sipUsedDingIds.push(sipOptions.dingId)
289+
async createSipSession(
290+
srtpOption: { audio?: SrtpOptions; video?: SrtpOptions } = {}
291+
) {
292+
const videoSocket = createSocket('udp4'),
293+
audioSocket = createSocket('udp4'),
294+
[sipOptions, publicIpPromise, videoPort, audioPort] = await Promise.all([
295+
this.getSipOptions(),
296+
getPublicIp(),
297+
bindToRandomPort(videoSocket),
298+
bindToRandomPort(audioSocket)
299+
]),
300+
rtpOptions = {
301+
address: await publicIpPromise,
302+
audio: {
303+
port: audioPort,
304+
...srtpOption.audio
305+
},
306+
video: {
307+
port: videoPort,
308+
...srtpOption.video
309+
}
310+
}
285311

286-
return new SipSession(sipOptions, rtpOptions)
312+
return new SipSession(
313+
{
314+
...sipOptions,
315+
tlsPort: await getPort() // get a random port, this can still cause race conditions.
316+
},
317+
rtpOptions,
318+
videoSocket,
319+
audioSocket
320+
)
287321
}
288322
}

api/rtp-utils.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import { Socket } from 'dgram'
2+
import { AddressInfo } from 'net'
3+
import { v4 as fetchPublicIp } from 'public-ip'
4+
const stun = require('stun')
5+
6+
export function getPublicIpViaStun() {
7+
return new Promise<string>((resolve, reject) => {
8+
reject(new Error('test'))
9+
stun.request('stun.l.google.com:19302', (err: Error, response: any) => {
10+
if (err) {
11+
return reject(err)
12+
}
13+
14+
resolve(response.getXorAddress().address)
15+
})
16+
})
17+
}
18+
19+
export function getPublicIp() {
20+
return fetchPublicIp()
21+
.catch(() => fetchPublicIp({ https: true }))
22+
.catch(() => getPublicIpViaStun())
23+
.catch(() => {
24+
throw new Error('Failed to retrieve public ip address')
25+
})
26+
}
27+
28+
function isRtpMessage(message: Buffer) {
29+
const payloadType = message.readUInt8(1) & 0x7f
30+
return payloadType > 90 || payloadType === 0
31+
}
32+
33+
export function getSsrc(message: Buffer) {
34+
try {
35+
const isRtp = isRtpMessage(message)
36+
return message.readUInt32BE(isRtp ? 8 : 4)
37+
} catch (_) {
38+
return null
39+
}
40+
}
41+
42+
export function bindToRandomPort(socket: Socket) {
43+
return new Promise<number>(resolve => {
44+
// 0 means select a random open port
45+
socket.bind(0, () => {
46+
const { port } = socket.address() as AddressInfo
47+
resolve(port)
48+
})
49+
})
50+
}
51+
52+
export function sendUdpHolePunch(
53+
socket: Socket,
54+
port: number,
55+
address: string
56+
) {
57+
socket.send('', port, address)
58+
}

0 commit comments

Comments
 (0)