Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.ott.transcoder;

import com.ott.transcoder.inspection.Inspector;
import com.ott.transcoder.inspection.probe.ProbeResult;
import com.ott.transcoder.inspection.validation.DiskSpaceGuard;
import com.ott.transcoder.pipeline.CommandPipeline;
import com.ott.transcoder.queue.TranscodeMessage;
import com.ott.transcoder.storage.VideoStorage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.List;

/**
* 작업 전체 흐름 조율
* diskSpaceGuard → workDir 생성 → download → inspect → pipeline 실행 → cleanup
*/
@Slf4j
@RequiredArgsConstructor
@Component
public class JobOrchestrator {

private final DiskSpaceGuard diskSpaceGuard;
private final VideoStorage videoStorage;
private final Inspector inspector;
private final CommandPipeline pipeline;

@Value("${transcoder.ffmpeg.temp-dir:#{systemProperties['java.io.tmpdir'] + '/ott-transcode'}}")
private String tempDir;

public void handle(TranscodeMessage message) throws Exception {
Long mediaId = message.mediaId();
// TODO: 0. DB 확인 필요

Path workDir = Path.of(tempDir, "media-" + mediaId);

// 1. 디스크 공간 확인
diskSpaceGuard.check(Path.of(message.originUrl()));

try {
// 2. workDir 생성
Files.createDirectories(workDir);

// 3. 원본 다운로드
Path inputFile = videoStorage.download(message.originUrl(), workDir);

// 4. 검사 (FileValidator → Probe → StreamValidator)
ProbeResult probeResult = inspector.inspect(inputFile);

// TODO: 5. 커맨드 생성 -> 각 커맨드 파이프라인 실행

// 6. 파이프라인 실행
pipeline.execute(mediaId, inputFile, workDir, probeResult);

} finally {
cleanUp(workDir);
}
}

private void cleanUp(Path workDir) {
try {
if (Files.exists(workDir)) {
Files.walk(workDir)
.sorted(Comparator.reverseOrder())
.forEach(path -> {
try { Files.deleteIfExists(path); } catch (IOException ignored) {}
});
log.info("작업 디렉토리 정리 완료 - {}", workDir);
}
} catch (IOException e) {
log.warn("작업 디렉토리 정리 실패 - {}", workDir, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* RabbitMQ 설정.
*
* Exchange → Binding → Queue 구조로 메시지 라우팅
* Producer가 transcode.exchange에 routing key "transcode.request"로 메시지를 발행하면,
* Binding을 통해 transcode.queue로 전달되고, RabbitTranscodeListener가 소비
*
* transcoder.messaging.provider=rabbit 일 때만 활성화 (SQS 등 전환 시 비활성화)
*/
/** RabbitMQ Exchange/Queue/Binding 설정. transcoder.messaging.provider=rabbit 일 때 활성화. */
@Configuration
@ConditionalOnProperty(name = "transcoder.messaging.provider", havingValue = "rabbit")
public class RabbitConfig {
Expand Down Expand Up @@ -49,11 +41,7 @@ public Binding transcodeBinding(Queue transcodeQueue, DirectExchange transcodeEx
.with(ROUTING_KEY);
}

/**
* JSON 메시지를 TranscodeMessage로 역직렬화할 때 사용할 기본 타입 지정
* 메시지 헤더에 __TypeId__가 없어도 TranscodeMessage로 변환
* (Management UI 등 외부에서 직접 발행한 메시지 처리를 위해 필요)
*/
/** __TypeId__ 헤더 없는 메시지도 역직렬화 가능하도록 기본 타입 지정 */
@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.ott.transcoder.ffmpeg;

import com.ott.domain.video_profile.domain.Resolution;

/**
* 단일 해상도에 대한 트랜스코딩 설정 묶음
*
* 현재는 Resolution enum 기반의 고정 프리셋이지만,
* 향후 TranscodePlanner가 ProbeResult를 분석하여 동적으로 생성
*
* @param resolution 대상 해상도 (DB 저장용)
* @param height 출력 높이 (px). 너비는 FFmpeg -2 옵션으로 자동 계산
* @param videoBitrate 비디오 비트레이트 (예: "800k", "2400k")
* @param audioBitrate 오디오 비트레이트 (예: "96k", "128k")
* @param videoCodec 비디오 인코더 (예: "libx264")
* @param audioCodec 오디오 인코더 (예: "aac")
* @param preset 인코딩 프리셋 (예: "fast", "medium")
*/
public record TranscodeProfile(
Resolution resolution,
int height,
String videoBitrate,
String audioBitrate,
String videoCodec,
String audioCodec,
String preset
) {
/** 기존 하드코딩 값과 동일한 기본 프리셋 */
public static TranscodeProfile defaultFor(Resolution resolution) {
return switch (resolution) {
case P360 -> new TranscodeProfile(resolution, 360, "800k", "96k", "libx264", "aac", "fast");
case P720 -> new TranscodeProfile(resolution, 720, "2400k", "128k", "libx264", "aac", "fast");
case P1080 -> new TranscodeProfile(resolution, 1080, "4800k", "192k", "libx264", "aac", "fast");
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.ott.transcoder.ffmpeg.execution;

import com.ott.transcoder.ffmpeg.TranscodeProfile;

import java.io.IOException;
import java.nio.file.Path;

/**
* FFmpeg 실행 추상화 인터페이스
*
* FFmpeg를 호출하는 방식(ProcessBuilder, Jaffree 등)에 독립적으로
* 단일 해상도에 대한 HLS 트랜스코딩을 수행
*/
public interface FfmpegExecutor {

/**
* 단일 프로파일에 대해 HLS 트랜스코딩을 수행
*
* @param inputFile 원본 영상 파일 경로
* @param outputDir 출력 디렉토리 (하위에 360p/, 720p/, 1080p/ 폴더가 생성됨)
* @param profile 트랜스코딩 설정 (해상도, 비트레이트, 코덱 등)
* @return 생성된 미디어 플레이리스트(media.m3u8) 경로
*/
Path execute(Path inputFile, Path outputDir, TranscodeProfile profile) throws IOException, InterruptedException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.ott.transcoder.ffmpeg.execution.processbuilder;

import com.ott.transcoder.ffmpeg.execution.FfmpegExecutor;
import com.ott.transcoder.ffmpeg.TranscodeProfile;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* ProcessBuilder 기반 FFmpeg CLI 래퍼
* 단일 해상도에 대해 HLS 트랜스코딩을 수행
*/
@Slf4j
@Component
@ConditionalOnProperty(name = "transcoder.ffmpeg.engine", havingValue = "processbuilder")
public class ProcessBuilderFfmpegExecutor implements FfmpegExecutor {

@Value("${transcoder.ffmpeg.path:ffmpeg}")
private String ffmpegPath;

@Value("${transcoder.ffmpeg.segment-duration:10}")
private int segmentDuration;

@Override
public Path execute(Path inputFile, Path outputDir, TranscodeProfile profile) throws IOException, InterruptedException {
String resolutionKey = profile.resolution().getKey().toLowerCase();

// 해상도별 하위 디렉토리 생성 (예: workDir/360p/)
Path resolutionDir = outputDir.resolve(resolutionKey);
Files.createDirectories(resolutionDir);

Path playlistPath = resolutionDir.resolve("media.m3u8");
String segmentPattern = resolutionDir.resolve("segment_%03d.ts").toString();

// FFmpeg 명령어 조립 — TranscodeProfile에서 설정값을 가져옴
// TODO: FFmpeg Filter Chain 구성 로직 추가 필요
List<String> command = List.of(
ffmpegPath, "-i", inputFile.toString(),
"-vf", "scale=-2:" + profile.height(),
"-c:v", profile.videoCodec(), "-preset", profile.preset(),
"-c:a", profile.audioCodec(), "-b:a", profile.audioBitrate(),
"-b:v", profile.videoBitrate(),
"-f", "hls",
"-hls_time", String.valueOf(segmentDuration),
"-hls_list_size", "0",
"-hls_segment_filename", segmentPattern,
playlistPath.toString()
);

log.info("FFmpeg 실행 - resolution: {}, command: {}", resolutionKey, String.join(" ", command));

ProcessBuilder processBuilder = new ProcessBuilder(command);
processBuilder.redirectErrorStream(true);

Process process = processBuilder.start();

try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
String line;
while ((line = reader.readLine()) != null) {
log.debug("[FFmpeg] {}", line);
}
}

boolean finished = process.waitFor(30, TimeUnit.MINUTES);
if (!finished) {
process.destroyForcibly();
throw new RuntimeException("FFmpeg 타임아웃 - resolution: " + resolutionKey);
}
Comment on lines +64 to +77
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

프로세스 리소스 누수 가능성

출력 읽기 중 예외 발생 시 Process가 정리되지 않아 좀비 프로세스가 남을 수 있습니다. try-finally로 감싸서 예외 상황에서도 프로세스가 종료되도록 해야 합니다.

🔒️ 제안된 수정
         Process process = processBuilder.start();
 
-        try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
-            String line;
-            while ((line = reader.readLine()) != null) {
-                log.debug("[FFmpeg] {}", line);
+        try {
+            try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
+                String line;
+                while ((line = reader.readLine()) != null) {
+                    log.debug("[FFmpeg] {}", line);
+                }
             }
-        }
 
-        boolean finished = process.waitFor(30, TimeUnit.MINUTES);
-        if (!finished) {
-            process.destroyForcibly();
-            throw new RuntimeException("FFmpeg 타임아웃 - resolution: " + resolutionKey);
+            boolean finished = process.waitFor(30, TimeUnit.MINUTES);
+            if (!finished) {
+                process.destroyForcibly();
+                throw new RuntimeException("FFmpeg 타임아웃 - resolution: " + resolutionKey);
+            }
+        } catch (Exception e) {
+            process.destroyForcibly();
+            throw e;
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/transcoder/src/main/java/com/ott/transcoder/ffmpeg/execution/processbuilder/ProcessBuilderFfmpegExecutor.java`
around lines 64 - 77, The code in ProcessBuilderFfmpegExecutor that starts the
Process (processBuilder.start()) must ensure the Process is always cleaned up if
an exception occurs while reading output or waiting; wrap the process lifecycle
in try { start, read from process.getInputStream() } finally { ensure process
streams are closed and call process.destroyForcibly() if still alive and
waitFor() to reap it } so that any IOException during the BufferedReader loop
cannot leave a zombie process; update the block around process, reader, and
process.waitFor(...) to use try/finally (or try-with-resources for streams) and
perform safe destroy/wait in the finally.

int exitCode = process.exitValue();
if (exitCode != 0) {
throw new RuntimeException("FFmpeg 실패 - resolution: " + resolutionKey + ", exitCode: " + exitCode);
}

log.info("FFmpeg 완료 - resolution: {}, output: {}", resolutionKey, playlistPath);
return playlistPath;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.ott.transcoder.inspection;

import com.ott.transcoder.inspection.probe.ProbeResult;
import com.ott.transcoder.inspection.probe.execution.FfprobeExecutor;
import com.ott.transcoder.inspection.validation.FileValidator;
import com.ott.transcoder.inspection.validation.StreamValidator;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.nio.file.Path;

/**
* 입력 파일 검사
* FileValidator → Probe → StreamValidator 순서로 실행
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class Inspector {

private final FileValidator fileValidator;
private final FfprobeExecutor ffprobeExecutor;
private final StreamValidator streamValidator;

public ProbeResult inspect(Path inputFile) {
fileValidator.validate(inputFile);
ProbeResult probeResult = ffprobeExecutor.probe(inputFile);
streamValidator.validate(probeResult);

return probeResult;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.ott.transcoder.inspection.probe;

/**
* ffprobe 실행 결과를 담는 불변 레코드
*
* @param width 영상 너비 (px)
* @param height 영상 높이 (px)
* @param durationSeconds 전체 재생 시간 (초)
* @param videoCodec 비디오 코덱 (예: h264, hevc, vp9)
* @param audioCodec 오디오 코덱 (예: aac, opus, "none")
* @param fps 프레임레이트
* @param videoBitrate 비디오 비트레이트 (bps)
* @param audioBitrate 오디오 비트레이트 (bps)
* @param audioChannels 오디오 채널 수 (예: 2=stereo, 6=5.1ch)
* @param pixelFormat 픽셀 포맷 (예: yuv420p, yuv422p)
* @param rotation 회전 각도 (0, 90, 180, 270). 스마트폰 세로 촬영 시 90 또는 270
*/
public record ProbeResult(
int width,
int height,
double durationSeconds,
String videoCodec,
String audioCodec,
double fps,
long videoBitrate,
long audioBitrate,
int audioChannels,
String pixelFormat,
int rotation
) {
/**
* 회전을 고려한 실제 영상 높이.
* 90° 또는 270° 회전된 영상은 width와 height가 뒤바뀐다.
* 예: 1080x1920(세로 촬영, rotation=90) → 실제 출력은 1920x1080 → effectiveHeight = 1080
*/
public int effectiveHeight() {
return isRotated() ? this.width : this.height;
}

public int effectiveWidth() {
return isRotated() ? this.height : this.width;
}

public boolean isRotated() {
return rotation == 90 || rotation == 270;
}

// 회전을 고려하여 업스케일 여부 판단
public boolean isUpscaleFor(int targetHeight) {
return targetHeight > effectiveHeight();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.ott.transcoder.inspection.probe.execution;

import com.ott.transcoder.inspection.probe.ProbeResult;

import java.nio.file.Path;

/**
* ffprobe 실행 추상화 인터페이스
*
* 입력 파일의 미디어 메타데이터 추출
*/
public interface FfprobeExecutor {

/**
* 입력 파일에 대해 ffprobe를 실행하여 메타데이터 추출
*
* @param inputFile 분석 대상 파일 경로
* @return 추출된 메타데이터
*/
ProbeResult probe(Path inputFile);
}
Loading