Skip to content

Commit 97e885f

Browse files
committed
feat(media): support cloud codec via ffmpeg
1 parent f891d3d commit 97e885f

3 files changed

Lines changed: 82 additions & 8 deletions

File tree

internal/transcode/manager.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ func (m *Manager) maintenance() {
111111
}
112112

113113
// sweepIdleJobs 扫描所有活跃任务,超过空闲超时时间无播放端请求则自动取消并 kill FFmpeg
114+
// 注意:这里只 cancel job + kill ffmpeg 释放 CPU,**不删除已转好的切片缓存**。
115+
// 当用户继续播放时,TCSegment / TCPlaylist 会通过 Scheduler.Reactivate 复用同一 jobID/token
116+
// 重新启动转码;已存在的切片可以直接命中,不需要再次转码。
114117
func (m *Manager) sweepIdleJobs() {
115118
idleTimeoutSec := setting.GetInt(conf.TranscodeIdleTimeoutSec, 90)
116119
if idleTimeoutSec <= 0 {
@@ -124,15 +127,16 @@ func (m *Manager) sweepIdleJobs() {
124127
lastAccess := j.GetLastAccess()
125128
idle := now.Sub(lastAccess)
126129
if idle > timeout {
127-
utils.Log.Infof("[transcode] job %s idle for %.0fs (threshold %ds), auto-cancelling",
130+
utils.Log.Infof("[transcode] job %s idle for %.0fs (threshold %ds), pausing ffmpeg (cache kept for resume)",
128131
j.ID, idle.Seconds(), idleTimeoutSec)
129-
// 先取消所有 chunk ffmpeg,再调度器层面取消,最后清理缓存
132+
// 先取消所有 chunk ffmpeg,再调度器层面取消
130133
if m.Chunks != nil {
131134
m.Chunks.CancelAllForJob(j.ID)
132135
}
133136
m.Scheduler.Cancel(j.ID)
134-
// 清理缓存文件释放磁盘空间
135-
m.Cache.Cleanup(j.ID)
137+
// 【重要】不再调用 m.Cache.Cleanup —— 已转好的切片必须保留,
138+
// 这样浏览器恢复播放时旧的 seg-N.ts 仍然能命中,避免 404 和重新转码。
139+
// 真正的磁盘清理由更长周期的 PurgeBefore + Cache.EnforceLRU 负责。
136140
}
137141
}
138142
}

internal/transcode/scheduler.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,61 @@ func (s *Scheduler) ListJobs() []*Job {
265265
return out
266266
}
267267

268+
// Reactivate 把一个已 Cancelled / Failed / Finished 的 job 重新激活回 Pending 状态,
269+
// 让 worker 能再次领取并跑转码。用于"播放端重新请求时按需重启转码"的场景。
270+
//
271+
// 设计点:
272+
// 1. 复用原 jobID 和 callback_token,前端持有的 master.m3u8 中的 URL 不需要变更
273+
// 2. 重置 readyCh / Status / WorkerID 等运行时字段,但保留 Path / SourceURL / Profiles / Probe
274+
// 3. 重新加入 pending 队列并唤醒等待的 worker
275+
//
276+
// 返回被激活的 Job;如果 job 不存在返回 nil 和 false。
277+
func (s *Scheduler) Reactivate(jobID string) (*Job, bool) {
278+
s.mu.Lock()
279+
j, ok := s.jobs[jobID]
280+
if !ok {
281+
s.mu.Unlock()
282+
return nil, false
283+
}
284+
j.mu.Lock()
285+
// 只有终止态才需要重新激活;Running/Ready 直接返回即可
286+
switch j.Status {
287+
case JobPending, JobRunning, JobReady:
288+
j.mu.Unlock()
289+
s.mu.Unlock()
290+
return j, true
291+
}
292+
// 重置运行时状态
293+
j.Status = JobPending
294+
j.WorkerID = ""
295+
j.Error = ""
296+
j.FinishedAt = time.Time{}
297+
j.LastAccessAt = time.Now()
298+
// 重置 readyCh:原 channel 已 closed,需要全新一份给后续 WaitReady
299+
j.readyCh = make(chan struct{})
300+
j.readyOnce = sync.Once{}
301+
j.mu.Unlock()
302+
303+
// 重新入 pending 队列(先确认不在队列中以免重复)
304+
inQueue := false
305+
for _, pj := range s.pending {
306+
if pj.ID == jobID {
307+
inQueue = true
308+
break
309+
}
310+
}
311+
if !inQueue {
312+
s.pending = append(s.pending, j)
313+
}
314+
// 唤醒等待的 worker
315+
for _, ch := range s.waiters {
316+
close(ch)
317+
}
318+
s.waiters = nil
319+
s.mu.Unlock()
320+
return j, true
321+
}
322+
268323
// PurgeBefore 删除 finishedAt 早于某时刻的任务,避免 jobs map 无限增长
269324
// 返回被清理的 job ID 列表,调用方需联动清理 Cache
270325
func (s *Scheduler) PurgeBefore(t time.Time) []string {

server/handles/transcode.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,8 @@ func TCPlaylist(c *gin.Context) {
147147
jobID := c.Param("job")
148148
token := c.Param("token")
149149
profile := c.Param("profile")
150-
job, ok := transcode.Default().Scheduler.Get(jobID)
150+
mgr := transcode.Default()
151+
job, ok := mgr.Scheduler.Get(jobID)
151152
if !ok {
152153
c.String(http.StatusNotFound, "job not found")
153154
return
@@ -156,6 +157,13 @@ func TCPlaylist(c *gin.Context) {
156157
c.String(http.StatusForbidden, "bad token")
157158
return
158159
}
160+
// 【关键】如果 job 之前因 idle 被 cancel,这里重新激活:复用 jobID/token 重新入队
161+
// 让 worker 再次启动 ffmpeg 转码,避免播放器看到 404 后无法恢复
162+
if st := job.GetStatus(); st == transcode.JobCancelled || st == transcode.JobFinished || st == transcode.JobFailed {
163+
if _, ok := mgr.Scheduler.Reactivate(jobID); ok {
164+
fmt.Printf("[transcode] reactivate job %s on playlist request (was %s)\n", jobID, st)
165+
}
166+
}
159167
job.Touch()
160168
// 等首切片
161169
select {
@@ -193,7 +201,8 @@ func TCSegment(c *gin.Context) {
193201
token := c.Param("token")
194202
profile := c.Param("profile")
195203
segName := c.Param("seg")
196-
job, ok := transcode.Default().Scheduler.Get(jobID)
204+
mgr := transcode.Default()
205+
job, ok := mgr.Scheduler.Get(jobID)
197206
if !ok {
198207
c.String(http.StatusNotFound, "job not found")
199208
return
@@ -202,17 +211,23 @@ func TCSegment(c *gin.Context) {
202211
c.String(http.StatusForbidden, "bad token")
203212
return
204213
}
214+
// 【关键】job 之前因 idle 被 cancel 时,复用同一 jobID/token 重新入队启动转码。
215+
// 这样浏览器持有的旧 master.m3u8 + 切片 URL 仍然有效,暂停再继续也不会 404。
216+
if st := job.GetStatus(); st == transcode.JobCancelled || st == transcode.JobFinished || st == transcode.JobFailed {
217+
if _, ok := mgr.Scheduler.Reactivate(jobID); ok {
218+
fmt.Printf("[transcode] reactivate job %s on segment request (was %s)\n", jobID, st)
219+
}
220+
}
205221
// seg-N.ts
206222
name := strings.TrimSuffix(strings.TrimPrefix(segName, "seg-"), ".ts")
207223
seq, err := strconv.Atoi(name)
208224
if err != nil {
209225
c.String(http.StatusBadRequest, "bad segment name")
210226
return
211227
}
212-
cache := transcode.Default().Cache
228+
cache := mgr.Cache
213229
// 【智能调度】触发对应 chunk 的 ffmpeg 启动(如已运行则只更新 LastAccess)
214230
// 这是用户拖动进度条到任意位置时能快速响应的关键
215-
mgr := transcode.Default()
216231
if mgr.Chunks != nil {
217232
chunkIdx := mgr.Chunks.EnsureChunkRunningForSeg(jobID, profile, seq)
218233
fmt.Printf("[tc-segment] req seq=%d chunk=%d job=%s\n", seq, chunkIdx, jobID)

0 commit comments

Comments
 (0)