Conversation
…T-375-feature/message
Walkthrough하트비트 기반 작업 점유(CAS) 및 RabbitMQ 지연 큐(retry) 메커니즘을 추가합니다. 스케줄러가 데이터베이스의 Changes
Sequence Diagram(s)sequenceDiagram
participant RL as RabbitTranscodeListener
participant JO as JobOrchestrator
participant SM as IngestJobStatusManager
participant DB as Database
participant HS as HeartbeatScheduler
participant HW as HeartbeatWriter
participant DQP as DelayQueuePublisher
RL->>JO: handle(message, delayed=false)
alt 이미 터미널 상태
JO->>SM: isTerminal(jobId)
SM->>DB: select status
DB-->>SM: SUCCESS/FAILED
SM-->>JO: true
JO-->>RL: ACK (skip)
else 비터미널
JO->>SM: startProcessing(jobId)
SM->>DB: tryPreempt(jobId, timeout)
alt 선점 성공 (affected=1)
DB-->>SM: 1
SM-->>JO: true
JO->>HS: start(jobId)
HS->>HS: ScheduledExecutorService 생성
HS-->>JO: Heartbeat
loop every HEARTBEAT_INTERVAL_SEC
HS->>HW: updateHeartbeat(jobId)
HW->>DB: update heartbeat_at = NOW()
end
JO->>JO: executeTranscoding(...)
JO->>HS: close() (stop scheduler)
else 선점 실패
DB-->>SM: 0
SM-->>JO: false
alt delayed == false
JO->>DQP: publishToDelay(message)
DQP->>DB: publish to delay exchange (x-delayed=true)
JO-->>RL: ACK (re-queued)
else delayed == true
JO-->>RL: ACK (drop)
end
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (4)
apps/transcoder/src/main/java/com/ott/transcoder/heartbeat/HeartbeatScheduler.java (1)
30-41: [P2] 스케줄러 스레드 네이밍 및 데몬 설정 권장
Executors.newSingleThreadScheduledExecutor()는 기본적으로 non-daemon 스레드를 생성하고 이름도pool-N-thread-1형태라 디버깅/쉬다운 시 불편합니다. 예외 상황으로Heartbeat#close()가 호출되지 않으면 JVM 종료가 지연될 수 있고, 로그에서 어떤 job의 heartbeat 스레드인지 식별하기도 어렵습니다.♻️ 제안
- public Heartbeat start(Long jobId) { - ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + public Heartbeat start(Long jobId) { + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "heartbeat-job-" + jobId); + t.setDaemon(true); + return t; + });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/transcoder/src/main/java/com/ott/transcoder/heartbeat/HeartbeatScheduler.java` around lines 30 - 41, The scheduler created in Heartbeat.start currently uses Executors.newSingleThreadScheduledExecutor() which creates a non-daemon thread with an opaque name; change it to create the ScheduledExecutorService with a custom ThreadFactory that sets threads as daemon and names them with the jobId (e.g., "heartbeat-%d-job-%d" or similar) so threads are identifiable and won't block JVM shutdown, e.g., build the ThreadFactory inside Heartbeat.start (or a private helper) and call Executors.newSingleThreadScheduledExecutor(yourThreadFactory); also verify Heartbeat.close properly shuts down the executor (shutdownNow/awaitTermination) to avoid leaks.modules/infra-db/src/main/resources/db/migration/V13__add_heartbeat_to_ingest_job.sql (1)
3-4: [P2]heartbeat_at단독 인덱스 고려 (선택)
tryPreempt의 CAS 조건에서heartbeat_at이 WHERE 절에 포함되지만, 일반적으로 PK(id)로 조회되므로 대부분의 케이스는 문제없습니다. 다만 향후 "만료된 job 스캔" 유스케이스(예: 죽은 워커 정리 배치)가 추가될 가능성이 있다면heartbeat_at인덱스를 추후 마이그레이션에서 추가하는 방향을 검토해 보세요. 현재 변경 범위에서는 필수 아님.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@modules/infra-db/src/main/resources/db/migration/V13__add_heartbeat_to_ingest_job.sql` around lines 3 - 4, Add a non-unique index on ingest_job.heartbeat_at to support future "expired job scan" queries referenced by tryPreempt: create a new migration (or extend the V13 migration) that issues an ALTER TABLE ingest_job ADD INDEX ingest_job_heartbeat_at_idx (heartbeat_at); ensure the index name is unique in the schema, that it handles NULL values, and run migrations to validate there are no conflicts with existing indices.apps/transcoder/src/main/java/com/ott/transcoder/constant/IngestJobConstant.java (1)
51-63: [P2] 타이밍 불변식(invariant)을 테스트로 보장 권장주석에 "Delay Queue TTL > HEARTBEAT_TIMEOUT" 및 "TIMEOUT = INTERVAL * 3" 가정이 명시되어 있습니다. 누군가 상수를 개별적으로 바꾸면 전체 캐스케이드(10s → 30s → 40s)가 깨져 선점/복구 로직에 미묘한 버그가 생길 수 있습니다. 다음과 같이 단위 테스트 또는
static초기화 블록의assert로 불변식을 명시적으로 고정해 두면 회귀를 방지할 수 있습니다.♻️ 제안: 불변식 assertion
public static final class HeartbeatConstant { private HeartbeatConstant() { } public static final int HEARTBEAT_INTERVAL_SEC = 10; public static final int HEARTBEAT_TIMEOUT_SEC = 30; public static final int DELAY_QUEUE_TTL_MS = 40_000; + + static { + if (HEARTBEAT_TIMEOUT_SEC <= HEARTBEAT_INTERVAL_SEC + || DELAY_QUEUE_TTL_MS <= HEARTBEAT_TIMEOUT_SEC * 1000L) { + throw new IllegalStateException( + "Heartbeat timing invariant broken: INTERVAL < TIMEOUT < DELAY_TTL 이어야 함"); + } + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/transcoder/src/main/java/com/ott/transcoder/constant/IngestJobConstant.java` around lines 51 - 63, Add explicit runtime assertions in the HeartbeatConstant class to lock the documented invariants: assert that HEARTBEAT_TIMEOUT_SEC == HEARTBEAT_INTERVAL_SEC * 3 and assert that DELAY_QUEUE_TTL_MS > HEARTBEAT_TIMEOUT_SEC * 1000; put these checks in a static initialization block inside the HeartbeatConstant nested class so they run at class load time and will fail fast if someone changes HEARTBEAT_INTERVAL_SEC, HEARTBEAT_TIMEOUT_SEC, or DELAY_QUEUE_TTL_MS.apps/transcoder/src/main/java/com/ott/transcoder/queue/rabbit/DelayQueuePublisher.java (1)
25-37: [P2]"x-delayed"헤더 키는 상수화 필수리스너(
RabbitTranscodeListener.java:30)와 퍼블리셔(DelayQueuePublisher.java:31) 양쪽에서 동일한 문자열"x-delayed"를 사용하고 있습니다. 현재RabbitConsumerConfig에는 이 상수가 정의되지 않아 오타로 인한 무음 실패 위험이 있습니다. 공용 상수로 추출하여 양쪽에서 참조하세요.제안 변경
RabbitConsumerConfig.java에 추가:public static final String HEADER_X_DELAYED = "x-delayed";
DelayQueuePublisher.java수정:- msg.getMessageProperties().setHeader("x-delayed", true); + msg.getMessageProperties().setHeader(RabbitConsumerConfig.HEADER_X_DELAYED, true);
RabbitTranscodeListener.java수정:- `@Header`(name = "x-delayed", required = false, defaultValue = "false") boolean delayed + `@Header`(name = RabbitConsumerConfig.HEADER_X_DELAYED, required = false, defaultValue = "false") boolean delayed🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/transcoder/src/main/java/com/ott/transcoder/queue/rabbit/DelayQueuePublisher.java` around lines 25 - 37, Add a shared constant for the header key and use it in both publisher and listener: define public static final String HEADER_X_DELAYED = "x-delayed" in RabbitConsumerConfig, then replace the literal "x-delayed" in DelayQueuePublisher.publishToDelay (the msg.getMessageProperties().setHeader call) and the corresponding usage in RabbitTranscodeListener to reference RabbitConsumerConfig.HEADER_X_DELAYED so both sides use the same constant.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@apps/transcoder/src/main/java/com/ott/transcoder/heartbeat/Heartbeat.java`:
- Around line 17-20: The current close() only calls scheduler.shutdown(), which
allows already-scheduled or in-flight updateHeartbeat tasks to run and possibly
update heartbeat_at after JobOrchestrator transitions; change close() to call
scheduler.shutdownNow() to cancel queued tasks, then call
scheduler.awaitTermination(...) with a short timeout to wait for in-flight
updateHeartbeat to finish, and handle InterruptedException by restoring the
thread interrupt; ensure references to scheduler, close(), updateHeartbeat, and
JobOrchestrator/heartbeat_at are used when updating the method.
In
`@apps/transcoder/src/main/java/com/ott/transcoder/job/IngestJobStatusManager.java`:
- Around line 48-52: The isTerminal method in IngestJobStatusManager incorrectly
treats a missing ingest job as terminal by using orElse(true); change this to
surface missing jobs instead of swallowing them: update isTerminal (or its
caller) so that when ingestJobRepository.findById(ingestJobId) returns empty it
does not return true but either throws a JobNotFoundException (or another
runtime exception) or returns an Optional/boolean that clearly indicates
"missing" so JobOrchestrator can route the message to DLQ/retry handling instead
of acknowledging; reference the isTerminal method in IngestJobStatusManager and
the consumer logic in JobOrchestrator to implement and wire this behavior.
In `@apps/transcoder/src/main/java/com/ott/transcoder/job/JobOrchestrator.java`:
- Line 18: JobOrchestrator currently depends directly on the Rabbit-specific
DelayQueuePublisher which breaks bean creation when the property
transcoder.messaging.provider != rabbit; introduce a domain port interface
(e.g., DelayMessagePublisher) and change JobOrchestrator to depend on that
interface instead of DelayQueuePublisher, then provide two conditional
implementations: the existing RabbitDelayMessagePublisher (wraps/renames
DelayQueuePublisher logic and is
`@ConditionalOnProperty`(name="transcoder.messaging.provider",
havingValue="rabbit")) and a NoOpDelayMessagePublisher (registered when the
rabbit bean is absent, e.g., `@ConditionalOnMissingBean` or `@ConditionalOnProperty`
with inverse condition) so the application context can always create
JobOrchestrator regardless of the messaging provider.
In `@apps/transcoder/src/main/java/com/ott/transcoder/queue/MessageListener.java`:
- Around line 10-15: Move the misplaced `@param` delayed from the interface-level
Javadoc into a proper method-level Javadoc for
MessageListener.listen(TranscodeMessage message, boolean delayed), add a `@param`
entry documenting the message parameter (type TranscodeMessage) and clarify what
delayed means, and then add an identical method-level Javadoc to the
implementing class RabbitTranscodeListener.listen(...) so the interface contract
is documented consistently across MessageListener.listen and
RabbitTranscodeListener.listen.
---
Nitpick comments:
In
`@apps/transcoder/src/main/java/com/ott/transcoder/constant/IngestJobConstant.java`:
- Around line 51-63: Add explicit runtime assertions in the HeartbeatConstant
class to lock the documented invariants: assert that HEARTBEAT_TIMEOUT_SEC ==
HEARTBEAT_INTERVAL_SEC * 3 and assert that DELAY_QUEUE_TTL_MS >
HEARTBEAT_TIMEOUT_SEC * 1000; put these checks in a static initialization block
inside the HeartbeatConstant nested class so they run at class load time and
will fail fast if someone changes HEARTBEAT_INTERVAL_SEC, HEARTBEAT_TIMEOUT_SEC,
or DELAY_QUEUE_TTL_MS.
In
`@apps/transcoder/src/main/java/com/ott/transcoder/heartbeat/HeartbeatScheduler.java`:
- Around line 30-41: The scheduler created in Heartbeat.start currently uses
Executors.newSingleThreadScheduledExecutor() which creates a non-daemon thread
with an opaque name; change it to create the ScheduledExecutorService with a
custom ThreadFactory that sets threads as daemon and names them with the jobId
(e.g., "heartbeat-%d-job-%d" or similar) so threads are identifiable and won't
block JVM shutdown, e.g., build the ThreadFactory inside Heartbeat.start (or a
private helper) and call
Executors.newSingleThreadScheduledExecutor(yourThreadFactory); also verify
Heartbeat.close properly shuts down the executor (shutdownNow/awaitTermination)
to avoid leaks.
In
`@apps/transcoder/src/main/java/com/ott/transcoder/queue/rabbit/DelayQueuePublisher.java`:
- Around line 25-37: Add a shared constant for the header key and use it in both
publisher and listener: define public static final String HEADER_X_DELAYED =
"x-delayed" in RabbitConsumerConfig, then replace the literal "x-delayed" in
DelayQueuePublisher.publishToDelay (the msg.getMessageProperties().setHeader
call) and the corresponding usage in RabbitTranscodeListener to reference
RabbitConsumerConfig.HEADER_X_DELAYED so both sides use the same constant.
In
`@modules/infra-db/src/main/resources/db/migration/V13__add_heartbeat_to_ingest_job.sql`:
- Around line 3-4: Add a non-unique index on ingest_job.heartbeat_at to support
future "expired job scan" queries referenced by tryPreempt: create a new
migration (or extend the V13 migration) that issues an ALTER TABLE ingest_job
ADD INDEX ingest_job_heartbeat_at_idx (heartbeat_at); ensure the index name is
unique in the schema, that it handles NULL values, and run migrations to
validate there are no conflicts with existing indices.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 90642bb5-f745-408b-8788-5c7f9c7fed98
📒 Files selected for processing (13)
apps/transcoder/src/main/java/com/ott/transcoder/config/RabbitConsumerConfig.javaapps/transcoder/src/main/java/com/ott/transcoder/constant/IngestJobConstant.javaapps/transcoder/src/main/java/com/ott/transcoder/heartbeat/Heartbeat.javaapps/transcoder/src/main/java/com/ott/transcoder/heartbeat/HeartbeatScheduler.javaapps/transcoder/src/main/java/com/ott/transcoder/heartbeat/HeartbeatWriter.javaapps/transcoder/src/main/java/com/ott/transcoder/job/IngestJobStatusManager.javaapps/transcoder/src/main/java/com/ott/transcoder/job/JobOrchestrator.javaapps/transcoder/src/main/java/com/ott/transcoder/queue/MessageListener.javaapps/transcoder/src/main/java/com/ott/transcoder/queue/rabbit/DelayQueuePublisher.javaapps/transcoder/src/main/java/com/ott/transcoder/queue/rabbit/RabbitTranscodeListener.javamodules/domain/src/main/java/com/ott/domain/ingest_job/domain/IngestJob.javamodules/domain/src/main/java/com/ott/domain/ingest_job/repository/IngestJobRepository.javamodules/infra-db/src/main/resources/db/migration/V13__add_heartbeat_to_ingest_job.sql
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@apps/api-admin/src/main/java/com/ott/api_admin/publish/RabbitTranscodePublisher.java`:
- Around line 18-26: The publish() currently calls rabbitTemplate.invoke and
waitForConfirmsOrDie(5_000) per message which can block
OutboxPoller.pollAndPublish() (scheduled fixedDelay=10000) for up to 50×5s;
change to a non-blocking/batched confirm strategy: either collect messages in
the poll cycle and call rabbitTemplate.invoke once to send all and then a single
operations.waitForConfirmsOrDie(...) (define and document the batch rollback
semantics in publish()/OutboxPoller), or reduce the per-message timeout to 1–2s
and switch to asynchronous confirms with correlation and Outbox state updates;
also audit DelayQueuePublisher.publishToDelay() for symmetry (it uses
convertAndSend without confirms) and align its reliability strategy with
publish() to avoid asymmetric loss. Ensure references: publish(),
OutboxPoller.pollAndPublish(), rabbitTemplate.invoke(...),
operations.waitForConfirmsOrDie(...), and DelayQueuePublisher.publishToDelay()
when implementing the chosen approach.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 588b11b6-ce8c-4f2b-8201-76e02bad79d8
📒 Files selected for processing (4)
apps/api-admin/src/main/java/com/ott/api_admin/publish/RabbitTranscodePublisher.javaapps/api-admin/src/main/resources/application.ymlapps/transcoder/src/main/java/com/ott/transcoder/ffmpeg/execution/processbuilder/ProcessBuilderFfmpegExecutor.javaapps/transcoder/src/main/java/com/ott/transcoder/job/IngestJobStatusManager.java
✅ Files skipped from review due to trivial changes (2)
- apps/api-admin/src/main/resources/application.yml
- apps/transcoder/src/main/java/com/ott/transcoder/ffmpeg/execution/processbuilder/ProcessBuilderFfmpegExecutor.java
🚧 Files skipped from review as they are similar to previous changes (1)
- apps/transcoder/src/main/java/com/ott/transcoder/job/IngestJobStatusManager.java
| rabbitTemplate.invoke(operations -> { | ||
| operations.convertAndSend( | ||
| TranscodeConstants.EXCHANGE_NAME, | ||
| TranscodeConstants.ROUTING_KEY, | ||
| message | ||
| ); | ||
| operations.waitForConfirmsOrDie(5_000); | ||
| return null; | ||
| }); |
There was a problem hiding this comment.
P1 — 스케줄러 스레드 블로킹 위험: per-message 5초 동기 대기
이 publish()는 OutboxPoller.pollAndPublish()(@Scheduled(fixedDelay=10000), 한 사이클당 최대 50건 순차 처리)에서 호출됩니다. 메시지마다 waitForConfirmsOrDie(5_000)을 동기 대기하므로, 브로커가 일시적으로 느려지면 한 사이클이 최악의 경우 50 × 5s = 250s까지 블로킹되어 fixedDelay=10s가 사실상 무력화되고 lockAtMostFor=5m에 근접합니다. 정상 상황에서는 확인이 ms 단위라 큰 문제가 없지만, 브로커 저하 시 영향이 증폭됩니다.
다음 중 하나를 권장합니다.
- 폴러 측에서 50건을 모두 publish 한 뒤
invoke한 번으로 마지막에 단일waitForConfirmsOrDie를 호출(배치 confirm). 이 경우 한 건 실패 시 일괄 처리/롤백 정책을 정의해야 합니다. - 또는 타임아웃을 더 짧게(예: 1~2초) 두고
correlated비동기 confirm + Outbox 상태 업데이트로 전환. - 최소한 대량 적체 시 부분 폴링/short-circuit 처리를 검토.
또한 apps/transcoder/.../DelayQueuePublisher.publishToDelay()는 confirm 없이 convertAndSend만 사용 중이라(컨텍스트 스니펫 2 참고) 발행 신뢰성 정책이 publisher 간 비대칭입니다. 재시도/지연 경로의 메시지 유실이 본 PR의 “메시지 중복 실행 최소화” 의도에 영향을 준다면 동일한 publisher-confirm 패턴 적용을 함께 검토하세요.
코딩 가이드라인의 “P0/P1 checks: ... validate RabbitMQ/DB state transitions and DLQ/ack behavior correctness (avoid duplicate execution regressions)” 항목에 따라 발행 신뢰성/스케줄러 처리량 관점에서 표시했습니다.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/api-admin/src/main/java/com/ott/api_admin/publish/RabbitTranscodePublisher.java`
around lines 18 - 26, The publish() currently calls rabbitTemplate.invoke and
waitForConfirmsOrDie(5_000) per message which can block
OutboxPoller.pollAndPublish() (scheduled fixedDelay=10000) for up to 50×5s;
change to a non-blocking/batched confirm strategy: either collect messages in
the poll cycle and call rabbitTemplate.invoke once to send all and then a single
operations.waitForConfirmsOrDie(...) (define and document the batch rollback
semantics in publish()/OutboxPoller), or reduce the per-message timeout to 1–2s
and switch to asynchronous confirms with correlation and Outbox state updates;
also audit DelayQueuePublisher.publishToDelay() for symmetry (it uses
convertAndSend without confirms) and align its reliability strategy with
publish() to avoid asymmetric loss. Ensure references: publish(),
OutboxPoller.pollAndPublish(), rabbitTemplate.invoke(...),
operations.waitForConfirmsOrDie(...), and DelayQueuePublisher.publishToDelay()
when implementing the chosen approach.
📝 작업 내용
📷 스크린샷
☑️ 체크 리스트
#️⃣ 연관된 이슈
closes #227
💬 리뷰 요구사항
4/22 노션에서 확인 가능합니다~
Summary by CodeRabbit
새로운 기능
개선 사항
Chores