Skip to content

Commit fa1a9af

Browse files
committed
add: implement HLSHandler and PlaylistParser for HLS streaming support
1 parent c6e31ba commit fa1a9af

File tree

4 files changed

+368
-153
lines changed

4 files changed

+368
-153
lines changed

src/playback/hls/HLSHandler.js

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
import { PassThrough } from 'node:stream'
2+
import { http1makeRequest, logger } from '../../utils.js'
3+
import PlaylistParser from './PlaylistParser.js'
4+
import SegmentFetcher from './SegmentFetcher.js'
5+
6+
export default class HLSHandler extends PassThrough {
7+
constructor(url, options = {}) {
8+
super({ highWaterMark: options.highWaterMark || 1024 * 1024 * 2 }) // 2MB default
9+
10+
this.currentUrl = url
11+
this.options = options
12+
this.headers = options.headers || {}
13+
this.localAddress = options.localAddress || null
14+
this.chunkReadahead = options.chunkReadahead || 3
15+
this.liveBuffer = options.liveBuffer || 20000
16+
17+
this.fetcher = new SegmentFetcher({
18+
headers: this.headers,
19+
localAddress: this.localAddress
20+
})
21+
22+
this.processedSegments = new Set()
23+
this.processedOrder = []
24+
this.MAX_HISTORY = 100
25+
26+
this.segmentQueue = []
27+
this.isFetching = false
28+
this.stop = false
29+
this.lastMapUri = null
30+
this.isLive = false
31+
this.playlistTimer = null
32+
33+
this.on('close', () => { this.destroy() })
34+
this.on('error', () => { this.destroy() })
35+
36+
this._start()
37+
}
38+
39+
async _start() {
40+
await this._playlistLoop()
41+
}
42+
43+
destroy(err) {
44+
if (this.stop) return
45+
this.stop = true
46+
47+
if (this.playlistTimer) {
48+
clearTimeout(this.playlistTimer)
49+
this.playlistTimer = null
50+
}
51+
52+
this.segmentQueue = []
53+
this.processedSegments.clear()
54+
this.processedOrder = []
55+
56+
super.destroy(err)
57+
}
58+
59+
_rememberSegment(url) {
60+
if (this.processedSegments.has(url)) return false
61+
this.processedSegments.add(url)
62+
this.processedOrder.push(url)
63+
if (this.processedOrder.length > this.MAX_HISTORY) {
64+
this.processedSegments.delete(this.processedOrder.shift())
65+
}
66+
return true
67+
}
68+
69+
async _playlistLoop() {
70+
if (this.stop) return
71+
72+
try {
73+
const { body: playlistContent, error, statusCode } = await http1makeRequest(this.currentUrl, {
74+
headers: this.headers,
75+
method: 'GET',
76+
localAddress: this.localAddress
77+
})
78+
79+
if (error || statusCode !== 200) {
80+
throw new Error(`Failed to fetch playlist: ${statusCode}`)
81+
}
82+
83+
const parsed = PlaylistParser.parse(playlistContent, this.currentUrl)
84+
85+
if (parsed.isMaster) {
86+
this.currentUrl = parsed.variants[0].url
87+
return setImmediate(() => this._playlistLoop())
88+
}
89+
90+
const isFirstLoad = !this.isLive && this.processedSegments.size === 0
91+
this.isLive = parsed.isLive
92+
93+
let segmentsToAdd = parsed.segments
94+
95+
if (isFirstLoad && this.isLive) {
96+
const segmentsToSkip = Math.max(0, parsed.segments.length - this.chunkReadahead)
97+
segmentsToAdd = parsed.segments.slice(segmentsToSkip)
98+
99+
for (let i = 0; i < segmentsToSkip; i++) {
100+
this.processedSegments.add(parsed.segments[i].url)
101+
this.processedOrder.push(parsed.segments[i].url)
102+
}
103+
}
104+
105+
for (const segment of segmentsToAdd) {
106+
if (this._rememberSegment(segment.url)) {
107+
this.segmentQueue.push(segment)
108+
}
109+
}
110+
111+
if (this.segmentQueue.length > 0 && !this.isFetching) {
112+
this._fetchSegments()
113+
}
114+
115+
if (this.isLive && !playlistContent.includes('#EXT-X-ENDLIST')) {
116+
this.playlistTimer = setTimeout(() => this._playlistLoop(), parsed.targetDuration * 1000)
117+
}
118+
} catch (err) {
119+
logger('error', 'HLSHandler', `Playlist error: ${err.message}`)
120+
if (!this.isLive) return this.destroy(err)
121+
122+
this.playlistTimer = setTimeout(() => this._playlistLoop(), 5000)
123+
}
124+
}
125+
126+
async _fetchSegments() {
127+
if (this.isFetching || this.stop) return
128+
this.isFetching = true
129+
130+
while (this.segmentQueue.length > 0 && !this.stop) {
131+
if (this.writableLength >= this.writableHighWaterMark) {
132+
await new Promise((resolve) => this.once('drain', resolve))
133+
if (this.stop) break
134+
}
135+
136+
const segment = this.segmentQueue.shift()
137+
138+
try {
139+
if (segment.map && segment.map.uri !== this.lastMapUri) {
140+
const mapData = await this.fetcher.fetchMap(segment.map)
141+
if (mapData && !this.stop) {
142+
this.write(mapData)
143+
this.lastMapUri = segment.map.uri
144+
}
145+
}
146+
147+
const data = await this.fetcher.fetchSegment(segment)
148+
149+
if (!this.stop) {
150+
this.write(data)
151+
}
152+
} catch (err) {
153+
logger('error', 'HLSHandler', `Segment error ${segment.sequence}: ${err.message}`)
154+
}
155+
}
156+
157+
this.isFetching = false
158+
159+
if (!this.isLive && this.segmentQueue.length === 0 && !this.stop) {
160+
this.emit('finishBuffering')
161+
this.end()
162+
}
163+
}
164+
}

src/playback/hls/PlaylistParser.js

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
export default class PlaylistParser {
2+
static parse(content, baseUrl) {
3+
const lines = content.split('\n').map((l) => l.trim()).filter(Boolean)
4+
5+
// 4.3.4: Detect Master Playlist
6+
if (lines.some(l => l.startsWith('#EXT-X-STREAM-INF'))) {
7+
return { isMaster: true, variants: this.parseMaster(lines, baseUrl) }
8+
}
9+
10+
const result = {
11+
isMaster: false,
12+
mediaSequence: 0,
13+
targetDuration: 5,
14+
isLive: !content.includes('#EXT-X-ENDLIST'),
15+
segments: []
16+
}
17+
18+
let currentKey = null
19+
let currentMap = null
20+
let mediaSequence = 0
21+
let lastByteRange = null
22+
23+
const mediaSequenceLine = lines.find((l) => l.startsWith('#EXT-X-MEDIA-SEQUENCE:'))
24+
if (mediaSequenceLine) {
25+
mediaSequence = parseInt(mediaSequenceLine.split(':')[1], 10)
26+
result.mediaSequence = mediaSequence
27+
}
28+
29+
const targetDurationLine = lines.find((l) => l.startsWith('#EXT-X-TARGETDURATION:'))
30+
if (targetDurationLine) {
31+
result.targetDuration = parseInt(targetDurationLine.split(':')[1], 10)
32+
}
33+
34+
let segmentIndex = 0
35+
for (let i = 0; i < lines.length; i++) {
36+
const line = lines[i]
37+
38+
if (line.startsWith('#EXT-X-KEY:')) {
39+
currentKey = this.parseKey(line, baseUrl)
40+
} else if (line.startsWith('#EXT-X-MAP:')) {
41+
currentMap = this.parseMap(line, baseUrl)
42+
} else if (line.startsWith('#EXT-X-BYTERANGE:')) {
43+
lastByteRange = this.parseByteRange(line, lastByteRange)
44+
} else if (line.startsWith('#EXTINF:')) {
45+
const segmentUrl = lines[++i]
46+
if (segmentUrl && !segmentUrl.startsWith('#')) {
47+
const absoluteUrl = new URL(segmentUrl, baseUrl).toString()
48+
const sequence = mediaSequence + segmentIndex
49+
50+
result.segments.push({
51+
url: absoluteUrl,
52+
key: currentKey,
53+
map: currentMap,
54+
byteRange: lastByteRange,
55+
sequence
56+
})
57+
segmentIndex++
58+
lastByteRange = null
59+
}
60+
}
61+
}
62+
63+
return result
64+
}
65+
66+
static parseMaster(lines, baseUrl) {
67+
const variants = []
68+
for (let i = 0; i < lines.length; i++) {
69+
if (lines[i].startsWith('#EXT-X-STREAM-INF:')) {
70+
const attrLine = lines[i]
71+
const url = new URL(lines[++i], baseUrl).toString()
72+
const bandwidthMatch = attrLine.match(/BANDWIDTH=(\d+)/)
73+
const codecsMatch = attrLine.match(/CODECS="([^"]+)"/)
74+
75+
variants.push({
76+
url,
77+
bandwidth: bandwidthMatch ? parseInt(bandwidthMatch[1], 10) : 0,
78+
codecs: codecsMatch ? codecsMatch[1] : ''
79+
})
80+
}
81+
}
82+
83+
return variants.sort((a, b) => b.bandwidth - a.bandwidth)
84+
}
85+
86+
static parseKey(line, baseUrl) {
87+
const methodMatch = line.match(/METHOD=([^,]+)/)
88+
const method = methodMatch ? methodMatch[1] : 'NONE'
89+
if (method === 'NONE') return null
90+
91+
const uriMatch = line.match(/URI="([^"]+)"/)
92+
const ivMatch = line.match(/IV=0x([0-9a-fA-F]+)/)
93+
if (!uriMatch) return null
94+
95+
return {
96+
method,
97+
uri: new URL(uriMatch[1], baseUrl).toString(),
98+
iv: ivMatch ? Buffer.from(ivMatch[1], 'hex') : null
99+
}
100+
}
101+
102+
static parseMap(line, baseUrl) {
103+
const uriMatch = line.match(/URI="([^"]+)"/)
104+
const rangeMatch = line.match(/BYTERANGE="([^"]+)"/)
105+
if (!uriMatch) return null
106+
107+
return {
108+
uri: new URL(uriMatch[1], baseUrl).toString(),
109+
byteRange: rangeMatch ? this.parseByteRange(`#EXT-X-BYTERANGE:${rangeMatch[1]}`, null) : null
110+
}
111+
}
112+
113+
static parseByteRange(line, lastRange) {
114+
const match = line.match(/:(\d+)(?:@(\d+))?/)
115+
if (!match) return null
116+
117+
const length = parseInt(match[1], 10)
118+
let offset = match[2] ? parseInt(match[2], 10) : null
119+
120+
if (offset === null && lastRange) {
121+
offset = lastRange.offset + lastRange.length
122+
}
123+
124+
return { length, offset: offset || 0 }
125+
}
126+
}

src/playback/hls/SegmentFetcher.js

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import crypto from 'node:crypto'
2+
import { http1makeRequest, logger } from '../../utils.js'
3+
4+
export default class SegmentFetcher {
5+
constructor(options = {}) {
6+
this.headers = options.headers || {}
7+
this.localAddress = options.localAddress || null
8+
this.keyMap = new Map()
9+
}
10+
11+
async _fetchResource(uri, byteRange = null) {
12+
const headers = { ...this.headers }
13+
if (byteRange) {
14+
const end = byteRange.offset + byteRange.length - 1
15+
headers.Range = `bytes=${byteRange.offset}-${end}`
16+
}
17+
18+
const { body, error, statusCode } = await http1makeRequest(uri, {
19+
headers,
20+
responseType: 'buffer',
21+
localAddress: this.localAddress,
22+
timeout: 15000
23+
})
24+
25+
if (error || (statusCode !== 200 && statusCode !== 206)) {
26+
throw new Error(`Failed to fetch resource from ${uri}: ${statusCode} ${error?.message || ''}`)
27+
}
28+
29+
return body
30+
}
31+
32+
async fetchKey(keyInfo) {
33+
if (!keyInfo || keyInfo.method === 'NONE') return null
34+
let keyData = this.keyMap.get(keyInfo.uri)
35+
if (keyData) return keyData
36+
37+
const body = await this._fetchResource(keyInfo.uri)
38+
this.keyMap.set(keyInfo.uri, body)
39+
return body
40+
}
41+
42+
async fetchMap(mapInfo) {
43+
if (!mapInfo) return null
44+
return await this._fetchResource(mapInfo.uri, mapInfo.byteRange)
45+
}
46+
47+
async fetchSegment(segment) {
48+
const body = await this._fetchResource(segment.url, segment.byteRange)
49+
50+
let data = body
51+
if (segment.key && segment.key.method === 'AES-128') {
52+
const keyData = await this.fetchKey(segment.key)
53+
if (keyData) {
54+
const iv = segment.key.iv || this._getIv(segment.sequence)
55+
const decipher = crypto.createDecipheriv('aes-128-cbc', keyData, iv)
56+
decipher.setAutoPadding(false)
57+
data = Buffer.concat([decipher.update(body), decipher.final()])
58+
}
59+
}
60+
61+
return data
62+
}
63+
64+
_getIv(sequence) {
65+
const iv = Buffer.alloc(16)
66+
iv.writeBigUInt64BE(BigInt(sequence), 8)
67+
return iv
68+
}
69+
}

0 commit comments

Comments
 (0)