Skip to content

Commit 152cee6

Browse files
committed
add: youtube live chat support via websocket
1 parent 8c577cb commit 152cee6

File tree

8 files changed

+389
-6
lines changed

8 files changed

+389
-6
lines changed

src/index.js

Lines changed: 84 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -952,11 +952,16 @@ class NodelinkServer extends EventEmitter {
952952

953953
let eventName = '/v4/websocket'
954954
let guildId = null
955+
let liveId = null
955956
try {
956957
const url = new URL(ws.data.url)
957958
const voiceMatch = url.pathname.match(
958959
/^\/v4\/websocket\/voice\/([A-Za-z0-9]+)\/?$/
959960
)
961+
const liveMatch = url.pathname.match(
962+
/^\/v4\/websocket\/youtube\/live\/([^/]+)\/?$/
963+
)
964+
960965
if (voiceMatch) {
961966
if (!self.options.voiceReceive?.enabled) {
962967
try {
@@ -966,6 +971,9 @@ class NodelinkServer extends EventEmitter {
966971
}
967972
eventName = '/v4/websocket/voice'
968973
guildId = voiceMatch[1]
974+
} else if (liveMatch) {
975+
eventName = '/v4/websocket/youtube/live'
976+
liveId = liveMatch[1]
969977
}
970978
} catch {}
971979

@@ -975,7 +983,7 @@ class NodelinkServer extends EventEmitter {
975983
reqShim,
976984
clientInfo,
977985
sessionId,
978-
guildId
986+
guildId || liveId
979987
)
980988
},
981989
message(ws, message) {
@@ -1073,8 +1081,11 @@ class NodelinkServer extends EventEmitter {
10731081
const voiceMatch = pathname.match(
10741082
/^\/v4\/websocket\/voice\/([A-Za-z0-9]+)\/?$/
10751083
)
1084+
const liveMatch = pathname.match(
1085+
/^\/v4\/websocket\/youtube\/live\/([^/]+)\/?$/
1086+
)
10761087

1077-
if (pathname === '/v4/websocket' || voiceMatch) {
1088+
if (pathname === '/v4/websocket' || voiceMatch || liveMatch) {
10781089
if (!headers['user-id']) {
10791090
logger(
10801091
'warn',
@@ -1110,8 +1121,16 @@ class NodelinkServer extends EventEmitter {
11101121
} connected from ${clientAddress} | \x1b[33mURL:\x1b[0m ${request.url}`
11111122
)
11121123

1113-
const eventName = voiceMatch ? '/v4/websocket/voice' : '/v4/websocket'
1114-
const guildId = voiceMatch ? voiceMatch[1] : null
1124+
let eventName = '/v4/websocket'
1125+
let routeId = null
1126+
1127+
if (voiceMatch) {
1128+
eventName = '/v4/websocket/voice'
1129+
routeId = voiceMatch[1]
1130+
} else if (liveMatch) {
1131+
eventName = '/v4/websocket/youtube/live'
1132+
routeId = liveMatch[1]
1133+
}
11151134

11161135
if (isBun && !this._usingBunServer) {
11171136
this.socket.handleUpgrade(request, socket, head, (ws) => {
@@ -1121,7 +1140,7 @@ class NodelinkServer extends EventEmitter {
11211140
request,
11221141
clientInfo,
11231142
sessionId,
1124-
guildId
1143+
routeId
11251144
)
11261145
})
11271146
} else {
@@ -1132,7 +1151,7 @@ class NodelinkServer extends EventEmitter {
11321151
request,
11331152
clientInfo,
11341153
sessionId,
1135-
guildId
1154+
routeId
11361155
)
11371156
)
11381157
}
@@ -1169,6 +1188,65 @@ class NodelinkServer extends EventEmitter {
11691188
this.registerVoiceSocket(guildId, socket)
11701189
}
11711190
)
1191+
1192+
this.socket.on(
1193+
'/v4/websocket/youtube/live',
1194+
(socket, request, _clientInfo, _sessionId, id) => {
1195+
let videoId = id
1196+
1197+
if (/^\d{17,20}$/.test(id)) {
1198+
const player = this.sessions.getPlayer(id)
1199+
if (player?.track?.info?.sourceName?.includes('youtube')) {
1200+
videoId = player.track.info.identifier
1201+
}
1202+
}
1203+
else if (id.length > 50) {
1204+
try {
1205+
const decoded = decodeTrack(id)
1206+
if (decoded?.info?.sourceName?.includes('youtube')) {
1207+
videoId = decoded.info.identifier
1208+
}
1209+
} catch (_e) {}
1210+
}
1211+
1212+
if (!this.sourceWorkerManager) {
1213+
const yt = this.sources.getSource('youtube')
1214+
if (!yt) {
1215+
socket.close(1008, 'YouTube source not enabled')
1216+
return
1217+
}
1218+
yt.handleLiveChat(socket, videoId)
1219+
return
1220+
}
1221+
1222+
logger('info', 'YouTube-LiveChat', `Delegating live chat for video: ${videoId} to worker`)
1223+
1224+
const resShim = {
1225+
headersSent: false,
1226+
send: (data) => {
1227+
const payload = Buffer.isBuffer(data) ? data : Buffer.from(String(data))
1228+
socket.sendFrame(payload, { len: payload.length, fin: true, opcode: Buffer.isBuffer(data) ? 0x02 : 0x01 })
1229+
},
1230+
writeHead: (status) => {
1231+
if (status !== 200) socket.close(1011, 'Worker failed')
1232+
},
1233+
write: (data) => {
1234+
const payload = Buffer.isBuffer(data) ? data : Buffer.from(String(data))
1235+
socket.sendFrame(payload, { len: payload.length, fin: true, opcode: Buffer.isBuffer(data) ? 0x02 : 0x01 })
1236+
},
1237+
end: () => socket.close(1000, 'Finished'),
1238+
on: (event, cb) => socket.on(event, cb)
1239+
}
1240+
1241+
this.sourceWorkerManager.delegate(
1242+
request,
1243+
resShim,
1244+
'loadLiveChat',
1245+
{ videoId },
1246+
{ isWebSocket: true }
1247+
)
1248+
}
1249+
)
11721250
}
11731251

11741252
_listen() {

src/managers/sessionManager.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,4 +138,12 @@ export default class SessionManager {
138138
values() {
139139
return this.activeSessions.values()
140140
}
141+
142+
getPlayer(guildId) {
143+
for (const session of this.activeSessions.values()) {
144+
const player = session.players.get(guildId)
145+
if (player) return player
146+
}
147+
return null
148+
}
141149
}

src/managers/sourceWorkerManager.js

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,20 @@ class SourceWorkerManager {
5858
} else if (type === 1) {
5959
request.res.end()
6060
this._cleanupRequest(id, request)
61+
} else if (type === 3) {
62+
if (request.timeout) {
63+
clearTimeout(request.timeout)
64+
request.timeout = null
65+
}
66+
if (!request.res.headersSent && request.options?.isWebSocket) {
67+
request.res.send(payload)
68+
} else if (!request.res.headersSent) {
69+
request.res.setHeader('Content-Type', 'application/json')
70+
request.res.writeHead(200)
71+
request.res.write(payload)
72+
} else {
73+
request.res.write(payload)
74+
}
6175
} else if (type === 2) {
6276
const errorMsg = payload.toString('utf8')
6377
if (!request.res.headersSent) {
@@ -150,6 +164,20 @@ class SourceWorkerManager {
150164
if (!request || request.cleaned) return
151165
request.cleaned = true
152166
if (request.timeout) clearTimeout(request.timeout)
167+
168+
if (request.task === 'loadLiveChat') {
169+
const worker = this.workers.find((w) => w.id === request.workerId)
170+
if (worker) {
171+
worker.send({
172+
type: 'sourceTask',
173+
payload: {
174+
task: 'cancelLiveChat',
175+
payload: { id }
176+
}
177+
})
178+
}
179+
}
180+
153181
this._decrementLoad(request.workerId)
154182
this.requests.delete(id)
155183
}
@@ -173,6 +201,7 @@ class SourceWorkerManager {
173201
const request = {
174202
req,
175203
res,
204+
task,
176205
timeout: null,
177206
workerId: bestWorker.id,
178207
options,

src/managers/workerManager.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,12 @@ export default class WorkerManager {
410410
100
411411
)
412412
}
413+
} else if (type === 9) {
414+
if (global.nodelink)
415+
global.nodelink.handleIPCMessage({
416+
type: 'liveChatAction',
417+
payload: data
418+
})
413419
}
414420
} catch (e) {
415421
logger('error', 'Cluster', `Socket event parse error: ${e.message}`)

src/sourceWorker.js

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ if (isMainThread) {
6464
processNextTask()
6565
} else if (msg.type === 'stream') {
6666
sendStreamChunk(msg.socketPath, msg.id, msg.chunk)
67+
} else if (msg.type === 'chatAction') {
68+
sendChatAction(msg.socketPath, msg.id, msg.data)
6769
} else if (msg.type === 'end') {
6870
sendStreamEnd(msg.socketPath, msg.id)
6971
worker.load = Math.max(0, worker.load - 1)
@@ -134,6 +136,11 @@ if (isMainThread) {
134136
withSocket(socketPath, (socket) => sendFrame(socket, id, 0, payload))
135137
}
136138

139+
function sendChatAction(socketPath, id, data) {
140+
const payload = Buffer.from(JSON.stringify(data), 'utf8')
141+
withSocket(socketPath, (socket) => sendFrame(socket, id, 3, payload))
142+
}
143+
137144
function sendStreamEnd(socketPath, id) {
138145
withSocket(socketPath, (socket) =>
139146
sendFrame(socket, id, 1, Buffer.alloc(0))
@@ -228,6 +235,8 @@ if (isMainThread) {
228235
await nodelink.sources.loadFolder()
229236
await nodelink.lyrics.loadFolder()
230237

238+
const activeChats = new Map()
239+
231240
parentPort.postMessage({ type: 'ready' })
232241

233242
const sendStreamChunkFromWorker = (id, socketPath, chunk) => {
@@ -247,6 +256,48 @@ if (isMainThread) {
247256
})
248257
}
249258

259+
const handleLiveChat = async (id, socketPath, payload) => {
260+
const videoId = payload.videoId
261+
const yt = nodelink.sources.getSource('youtube')
262+
if (!yt) throw new Error('YouTube source not available in worker')
263+
264+
activeChats.set(id, true)
265+
266+
try {
267+
const chat = await yt.liveChat.getLiveChat(videoId)
268+
if (!chat) throw new Error('Could not initialize live chat')
269+
270+
const pollLoop = async () => {
271+
while (activeChats.has(id)) {
272+
try {
273+
const result = await chat.poll()
274+
if (!result) break
275+
276+
const { actions, timeoutMs } = result
277+
278+
if (actions.length > 0 && activeChats.has(id)) {
279+
utils.logger('debug', 'SourceWorker', `[${id}] Sending ${actions.length} actions for ${videoId}`)
280+
parentPort.postMessage({ type: 'chatAction', id, socketPath, data: { op: 'actions', actions } })
281+
}
282+
283+
await new Promise(resolve => setTimeout(resolve, timeoutMs || 5000))
284+
} catch (e) {
285+
utils.logger('error', 'SourceWorker', `[${id}] Polling exception for ${videoId}: ${e.message}`)
286+
break
287+
}
288+
}
289+
}
290+
291+
await pollLoop()
292+
293+
parentPort.postMessage({ type: 'end', id, socketPath })
294+
} catch (e) {
295+
sendStreamErrorFromWorker(id, socketPath, e.message)
296+
} finally {
297+
activeChats.delete(id)
298+
}
299+
}
300+
250301
const handleLoadStream = async (id, socketPath, payload) => {
251302
let fetched = null
252303
let pcmStream = null
@@ -329,6 +380,20 @@ if (isMainThread) {
329380
return
330381
}
331382

383+
if (task === 'loadLiveChat') {
384+
try {
385+
await handleLiveChat(id, socketPath, payload)
386+
} catch (e) {
387+
sendStreamErrorFromWorker(id, socketPath, e.message || e)
388+
}
389+
return
390+
}
391+
392+
if (task === 'cancelLiveChat') {
393+
activeChats.delete(payload.id)
394+
return
395+
}
396+
332397
try {
333398
let result
334399
switch (task) {

0 commit comments

Comments
 (0)