-
Notifications
You must be signed in to change notification settings - Fork 34
[volume - 8] Decoupling with Kafka #204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weโll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[volume - 8] Decoupling with Kafka #204
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThis PR implements an event-driven architecture using the outbox pattern with Kafka integration. It introduces domain events (StockDepletedEvent), outbox entities for reliable event publishing, a relay component for Kafka distribution, and a consumer for processing events. It also adds ProductMetrics tracking and integrates Kafka infrastructure via Docker Compose. Changes
Sequence Diagram(s)sequenceDiagram
participant DS as ProductDomainService
participant EP as ApplicationEventPublisher
participant OEH as OutboxEventHandler
participant OB as Outbox Repository
participant OR as OutboxRelay
participant KT as Kafka Template
participant KC as CatalogEventConsumer
participant PMR as ProductMetrics Repository
participant PCS as ProductCacheService
participant EHR as EventHandledRepository
rect rgb(200, 220, 255)
note over DS,EP: Event Publishing
DS->>EP: publishEvent(StockDepletedEvent)
EP->>OEH: onStockDepletedEvent()
end
rect rgb(220, 240, 200)
note over OEH,OB: Outbox Persistence
OEH->>OB: save(Outbox)
OB-->>OEH: โ saved with PENDING status
end
rect rgb(255, 240, 200)
note over OR,KT: Relay & Kafka Publishing
OR->>OB: findByStatus(PENDING, pageable)
OB-->>OR: List<Outbox>
OR->>KT: send(ProducerRecord with headers)
KT-->>OR: โ SendResult
OR->>OB: update(Outbox.markProcessed())
OB-->>OR: โ PROCESSED
end
rect rgb(240, 200, 220)
note over KC,EHR: Consumer Processing & Idempotency
KC->>EHR: existsByEventId(eventId)
EHR-->>KC: false (new event)
KC->>PMR: findByProductId(productId)
PMR-->>KC: Optional<ProductMetrics>
KC->>PMR: save(metrics.updateLikeIfNewer(...))
KC->>PCS: invalidate(productId)
PCS-->>KC: โ cache cleared
KC->>EHR: save(EventHandled.create(eventId))
EHR-->>KC: โ idempotency tracked
end
Estimated code review effort๐ฏ 4 (Complex) | โฑ๏ธ ~60 minutes Areas requiring extra attention:
Possibly related PRs
Suggested labels
Poem
Pre-merge checks and finishing touchesโ Failed checks (1 warning)
โ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
Caution
Some comments are outside the diff and canโt be posted inline due to platform limitations.
โ ๏ธ Outside diff range comments (1)
apps/commerce-api/src/main/java/com/loopers/domain/product/ProductDomainService.java (1)
52-70: @transactional ์ ๋ ธํ ์ด์ ๋๋ฝ์ผ๋ก ์ธํ ํธ๋์ญ์ ์ผ๊ด์ฑ ๋ฌธ์
decreaseStock๋ฉ์๋์@Transactional์ ๋ ธํ ์ด์ ์ด ์์ด์ transactional outbox ํจํด์ ๋ณด์ฅ์ด ๊นจ์ง ์ ์์ต๋๋ค.๋ฌธ์ ์ :
- Line 66์์
eventPublisher.publishEvent()๋ฅผ ํธ์ถํ๋ฉดOutboxEventHandler๊ฐ ์ฆ์ ์คํ๋์ด outbox ์ํธ๋ฆฌ๋ฅผ ์ ์ฅํฉ๋๋ค- ํ์ง๋ง ์ด ๋ฉ์๋๊ฐ ํธ๋์ญ์ ๋ฒ์๊ฐ ์๋๋ฏ๋ก, ์ด๋ฒคํธ ํธ๋ค๋ฌ์ outbox ์ ์ฅ๊ณผ ์ฌ๊ณ ๊ฐ์๊ฐ ๋ณ๋์ ํธ๋์ญ์ ์์ ์คํ๋ฉ๋๋ค
- ์ฌ๊ณ ๊ฐ์๋ ์ฑ๊ณตํ์ง๋ง outbox ์ ์ฅ์ด ์คํจํ๊ฑฐ๋, ๊ทธ ๋ฐ๋์ ๊ฒฝ์ฐ ๋ฐ์ดํฐ ๋ถ์ผ์น๊ฐ ๋ฐ์ํ ์ ์์ต๋๋ค
์ฐธ๊ณ : Line 75์
increaseStock๋ฉ์๋๋@Transactional์ด ์ฌ๋ฐ๋ฅด๊ฒ ์ ์ฉ๋์ด ์์ต๋๋ค.๐ ์ ์ํ๋ ์์ ์
/** * ์ฌ๊ณ ์ฐจ๊ฐ */ +@Transactional public Product decreaseStock(Long productId, Long quantity) { Product product = getProductWithLock(productId); if (!product.hasEnoughStock(quantity)) {
๐งน Nitpick comments (7)
docker-compose.yml (1)
61-61: ํ๋ก๋์ ํ๊ฒฝ์์๋ replication factor๋ฅผ ์กฐ์ ํ์ธ์.
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1์ ๋ก์ปฌ ๊ฐ๋ฐ ํ๊ฒฝ์ ์ ํฉํ์ง๋ง, ํ๋ก๋์ ํ๊ฒฝ์์๋ ์ต์ 3์ผ๋ก ์ค์ ํ์ฌ ๋ฐ์ดํฐ ๋ด๊ตฌ์ฑ๊ณผ ๊ณ ๊ฐ์ฉ์ฑ์ ๋ณด์ฅํด์ผ ํฉ๋๋ค.apps/commerce-api/src/test/java/com/loopers/infrastructure/idempotent/EventHandledTest.java (1)
10-17: ํ ์คํธ ์ปค๋ฒ๋ฆฌ์ง ํ์ฅ์ ๊ณ ๋ คํด๋ณด์ธ์.๊ธฐ๋ณธ์ ์ธ ์์ฑ ํ ์คํธ๋ ์ ์์ฑ๋์์ต๋๋ค. ํฅํ ๋ค์๊ณผ ๊ฐ์ ์ถ๊ฐ ํ ์คํธ๋ฅผ ๊ณ ๋ คํด๋ณด์ธ์:
- null ๋๋ ๋น ๋ฌธ์์ด eventId ์ฒ๋ฆฌ
- ์ค๋ณต eventId ์ ์ฅ ์๋ (๋ฐ์ดํฐ๋ฒ ์ด์ค ์ ์ฝ ์กฐ๊ฑด ๊ฒ์ฆ)
- handledAt ํ์์คํฌํ์ ์ ํ์ฑ
ํ์ฌ ๊ตฌํ์ผ๋ก๋ ์ถฉ๋ถํ์ง๋ง, ์ฃ์ง ์ผ์ด์ค ์ปค๋ฒ๋ฆฌ์ง๋ฅผ ๋์ด๋ฉด ๋ ๊ฒฌ๊ณ ํ idempotency ๋ณด์ฅ์ด ๊ฐ๋ฅํฉ๋๋ค.
apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventHandler.java (1)
56-62: ์ง๋ ฌํ ์คํจ ์ ๋ ๊ตฌ์ฒด์ ์ธ ์์ธ ํ์ ์ฌ์ฉ ๊ณ ๋ ค.ํ์ฌ ๊ตฌํ์ ์ ์ ๋์ํ์ง๋ง, ๋ ๋ช ํํ ์๋ฌ ์ถ์ ์ ์ํด ์ปค์คํ ์์ธ๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ ๊ณ ๋ คํด๋ณด์ธ์.
๐ ์์ธ ํ์ ๊ฐ์ ์ ์
private String toJson(Object event) { try { return objectMapper.writeValueAsString(event); } catch (JsonProcessingException e) { - throw new RuntimeException("Failed to serialize event", e); + throw new IllegalStateException("Failed to serialize event: " + event.getClass().getSimpleName(), e); } }apps/commerce-api/src/main/java/com/loopers/domain/product/event/StockDepletedEvent.java (1)
10-13: productId์ ๋ํ null ๊ฒ์ฆ ์ถ๊ฐ ๊ณ ๋ ค.๋๋ฉ์ธ ์ด๋ฒคํธ์ ๋ฌด๊ฒฐ์ฑ์ ๋ณด์ฅํ๊ธฐ ์ํด ์์ฑ์์์ null ๊ฒ์ฆ์ ์ถ๊ฐํ๋ ๊ฒ์ด ์ข์ต๋๋ค.
๐ null ๊ฒ์ฆ ์ถ๊ฐ ์ ์
private StockDepletedEvent(Long productId) { + if (productId == null) { + throw new IllegalArgumentException("productId must not be null"); + } this.productId = productId; this.occurredAt = LocalDateTime.now(); }apps/commerce-api/src/main/java/com/loopers/infrastructure/idempotent/EventHandled.java (1)
20-21: eventId ์ปฌ๋ผ์ ์ธ๋ฑ์ค ์ถ๊ฐ๋ฅผ ๊ถ์ฅํฉ๋๋ค.
EventHandledRepository.existsByEventId()๊ฐ ์์ฃผ ํธ์ถ๋ ๊ฒ์ด๋ฏ๋ก,eventId์ปฌ๋ผ์ ์ธ๋ฑ์ค๋ฅผ ์ถ๊ฐํ๋ฉด ์กฐํ ์ฑ๋ฅ์ด ํฅ์๋ฉ๋๋ค.unique = true๊ฐ ์ผ๋ถ DB์์ ์๋์ผ๋ก ์ธ๋ฑ์ค๋ฅผ ์์ฑํ์ง๋ง, ๋ช ์์ ์ผ๋ก ์ ์ธํ๋ ๊ฒ์ด ์ข์ต๋๋ค.๐ ์ธ๋ฑ์ค ์ถ๊ฐ ์ ์
@Entity -@Table(name = "event_handled") +@Table(name = "event_handled", indexes = { + @Index(name = "idx_event_handled_event_id", columnList = "eventId") +}) public class EventHandled {apps/commerce-api/src/test/java/com/loopers/infrastructure/consumer/CatalogEventConsumerTest.java (1)
99-120: consumeTest4์ ํ ์คํธ ์๋๋ฅผ ๋ ๋ช ํํ ํ๋ฉด ์ข๊ฒ ์ต๋๋ค.ํ์ฌ ํ ์คํธ์์
newRecord๊ฐ ๋จผ์ ์ฒ๋ฆฌ๋๋ฉดmetrics๊ฐ ์์ฑ๋๊ณ , ์ดํoldRecord๋ ์๊ฐ ๋น๊ต๋ก ๋ฌด์๋ฉ๋๋ค. ํ์ง๋ง mock ์ค์ ์์productMetricsRepository.findByProductId(1L)๊ฐ ์ฒซ ํธ์ถ์Optional.empty()๋ฅผ ๋ฐํํ๊ณ , ๋ ๋ฒ์งธ ํธ์ถ์ ๋ฏธ๋ฆฌ ์ ๋ฐ์ดํธ๋metrics๋ฅผ ๋ฐํํ๋๋ฐ, ์ด๋ ์ฒซ ๋ฒ์งธ ํธ์ถ์์ ์ ์ฅ๋ ๊ฐ์ฒด๊ฐ ์๋๋๋ค.์ค์ ๋์์ ๋ ์ ํํ ๋ฐ์ํ๋ ค๋ฉด
ArgumentCaptor๋ฅผ ์ฌ์ฉํ๊ฑฐ๋ mock ์ค์ ์ ์กฐ์ ํ๋ ๊ฒ์ ๊ณ ๋ คํด๋ณด์ธ์.apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/Outbox.java (1)
14-35: ์ํฐํฐ ์ค๊ณ ์ ์ , ์ปฌ๋ผ ์ ์ฝ ์ถ๊ฐ ๊ณ ๋ คOutbox ์ํฐํฐ ์ค๊ณ๊ฐ ์ ์ ํฉ๋๋ค. ๋ค๋ง, ์๋ ์ฌํญ์ ์ ํ์ ์ผ๋ก ๊ณ ๋ คํด๋ณผ ์ ์์ต๋๋ค:
- ์ปฌ๋ผ ๊ธธ์ด ๋ช ์:
aggregateType,aggregateId,eventType,topicํ๋์@Column(length = ...)๋ฅผ ์ง์ ํ๋ฉด DB ์คํค๋ง๊ฐ ๋ ๋ช ํํด์ง๋๋ค.- ๋๊ด์ ๋ฝํน: ๋ค์ค ์ธ์คํด์ค ํ๊ฒฝ์์ ๋์ผ ๋ ์ฝ๋ ๋์ ์ ๋ฐ์ดํธ ๊ฐ์ง๋ฅผ ์ํด
@Versionํ๋๋ฅผ ๊ณ ๋ คํ ์ ์์ต๋๋ค. ๋จ, Consumer ์ธก idempotency๊ฐ ์์ผ๋ฏ๋ก ํ์ฌ ๊ตฌ์กฐ์์๋ ํ์๋ ์๋๋๋ค.๐ ์ ํ์ ๊ฐ์ ์
+ @Version + private Long version; + + @Column(length = 50) private String aggregateType; + @Column(length = 100) private String aggregateId; + @Column(length = 100) private String eventType; + @Column(length = 100) private String topic;
๐ Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
๐ Files selected for processing (23)
apps/commerce-api/build.gradle.kts(1 hunks)apps/commerce-api/src/main/java/com/loopers/domain/product/ProductDomainService.java(3 hunks)apps/commerce-api/src/main/java/com/loopers/domain/product/event/StockDepletedEvent.java(1 hunks)apps/commerce-api/src/main/java/com/loopers/infrastructure/consumer/CatalogEventConsumer.java(1 hunks)apps/commerce-api/src/main/java/com/loopers/infrastructure/idempotent/EventHandled.java(1 hunks)apps/commerce-api/src/main/java/com/loopers/infrastructure/idempotent/EventHandledRepository.java(1 hunks)apps/commerce-api/src/main/java/com/loopers/infrastructure/metrics/ProductMetrics.java(1 hunks)apps/commerce-api/src/main/java/com/loopers/infrastructure/metrics/ProductMetricsRepository.java(1 hunks)apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/Outbox.java(1 hunks)apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventHandler.java(1 hunks)apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxRelay.java(1 hunks)apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxRepository.java(1 hunks)apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxStatus.java(1 hunks)apps/commerce-api/src/main/resources/application.yml(1 hunks)apps/commerce-api/src/test/java/com/loopers/infrastructure/consumer/CatalogEventConsumerTest.java(1 hunks)apps/commerce-api/src/test/java/com/loopers/infrastructure/idempotent/EventHandledTest.java(1 hunks)apps/commerce-api/src/test/java/com/loopers/infrastructure/metrics/ProductMetricsTest.java(1 hunks)apps/commerce-api/src/test/java/com/loopers/infrastructure/outbox/OutboxEventHandlerTest.java(1 hunks)apps/commerce-api/src/test/java/com/loopers/infrastructure/outbox/OutboxRelayTest.java(1 hunks)apps/commerce-api/src/test/java/com/loopers/infrastructure/outbox/OutboxTest.java(1 hunks)apps/commerce-api/src/test/resources/application.yml(1 hunks)docker-compose.yml(1 hunks)modules/kafka/src/main/resources/kafka.yml(1 hunks)
๐งฐ Additional context used
๐ง Learnings (5)
๐ Common learnings
Learnt from: kilian-develop
Repo: Loopers-dev-lab/loopers-spring-java-template PR: 190
File: apps/commerce-streamer/src/main/java/com/loopers/applications/streamer/consumer/product/IncreaseProductViewKafkaConsumer.java:25-35
Timestamp: 2025-12-18T13:24:54.339Z
Learning: In this codebase, Kafka consumers delegate error handling and event tracking to the service layer via EventInboxAspect. Service methods annotated with InboxEvent are intercepted by the aspect, which handles failures by saving failed EventInbox entries and logging errors. This centralized approach avoids duplicating error handling logic across multiple consumers.
๐ Learning: 2025-12-18T01:01:55.894Z
Learnt from: jikimee64
Repo: Loopers-dev-lab/loopers-spring-kotlin-template PR: 65
File: modules/kafka/src/main/resources/kafka.yml:25-32
Timestamp: 2025-12-18T01:01:55.894Z
Learning: In Spring Boot Kafka configuration YAML, properties inside spring.kafka.producer.properties and spring.kafka.consumer.properties maps must use exact Kafka client property names with dot notation, and must be quoted with bracket notation like "[enable.idempotence]": true and "[enable.auto.commit]": false to prevent YAML from parsing dots as nested keys. Spring Boot's relaxed binding only applies to top-level Spring Kafka properties, not to the properties map.
Applied to files:
apps/commerce-api/src/main/resources/application.ymlapps/commerce-api/src/test/resources/application.ymlmodules/kafka/src/main/resources/kafka.yml
๐ Learning: 2025-12-18T13:24:54.339Z
Learnt from: kilian-develop
Repo: Loopers-dev-lab/loopers-spring-java-template PR: 190
File: apps/commerce-streamer/src/main/java/com/loopers/applications/streamer/consumer/product/IncreaseProductViewKafkaConsumer.java:25-35
Timestamp: 2025-12-18T13:24:54.339Z
Learning: In this codebase, Kafka consumers delegate error handling and event tracking to the service layer via EventInboxAspect. Service methods annotated with InboxEvent are intercepted by the aspect, which handles failures by saving failed EventInbox entries and logging errors. This centralized approach avoids duplicating error handling logic across multiple consumers.
Applied to files:
apps/commerce-api/src/main/java/com/loopers/infrastructure/consumer/CatalogEventConsumer.javaapps/commerce-api/src/test/java/com/loopers/infrastructure/consumer/CatalogEventConsumerTest.javaapps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventHandler.java
๐ Learning: 2025-11-27T09:09:24.961Z
Learnt from: sky980221
Repo: Loopers-dev-lab/loopers-spring-java-template PR: 121
File: apps/commerce-api/src/main/java/com/loopers/infrastructure/product/ProductJpaRepository.java:22-24
Timestamp: 2025-11-27T09:09:24.961Z
Learning: Product ์ํฐํฐ (apps/commerce-api/src/main/java/com/loopers/domain/product/Product.java)๋ ์ ์ฆ์ผ์ด์ค๋ณ๋ก ์๋์ ์ผ๋ก ๋ค๋ฅธ ๋ฝ ์ ๋ต์ ์ฌ์ฉํ๋ค: ์ข์์ ๊ธฐ๋ฅ์๋ ๋น๊ด์ ๋ฝ(findByIdForUpdate)์, ์ฌ๊ณ ์ฐจ๊ฐ์๋ ๋๊ด์ ๋ฝ(Version + ์ฌ์๋)์ ์ฌ์ฉํ๋ค.
Applied to files:
apps/commerce-api/src/main/java/com/loopers/infrastructure/metrics/ProductMetrics.javaapps/commerce-api/src/test/java/com/loopers/infrastructure/metrics/ProductMetricsTest.java
๐ Learning: 2025-11-09T10:41:39.297Z
Learnt from: ghojeong
Repo: Loopers-dev-lab/loopers-spring-kotlin-template PR: 25
File: apps/commerce-api/src/main/kotlin/com/loopers/domain/product/ProductRepository.kt:1-12
Timestamp: 2025-11-09T10:41:39.297Z
Learning: In this codebase, domain repository interfaces are allowed to use Spring Data's org.springframework.data.domain.Page and org.springframework.data.domain.Pageable types. This is an accepted architectural decision and should not be flagged as a DIP violation.
Applied to files:
apps/commerce-api/src/main/java/com/loopers/infrastructure/metrics/ProductMetricsRepository.javaapps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxRepository.java
๐งฌ Code graph analysis (6)
apps/commerce-api/src/main/java/com/loopers/domain/product/ProductDomainService.java (1)
apps/commerce-api/src/main/java/com/loopers/domain/product/event/StockDepletedEvent.java (1)
StockDepletedEvent(5-26)
apps/commerce-api/src/main/java/com/loopers/infrastructure/consumer/CatalogEventConsumer.java (2)
apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxRelay.java (1)
Slf4j(16-69)apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventHandler.java (1)
Component(13-63)
apps/commerce-api/src/test/java/com/loopers/infrastructure/outbox/OutboxEventHandlerTest.java (2)
apps/commerce-api/src/main/java/com/loopers/domain/like/event/ProductLikedEvent.java (1)
ProductLikedEvent(5-42)apps/commerce-api/src/main/java/com/loopers/domain/order/event/OrderCompletedEvent.java (1)
OrderCompletedEvent(7-52)
apps/commerce-api/src/test/java/com/loopers/infrastructure/consumer/CatalogEventConsumerTest.java (1)
apps/commerce-api/src/test/java/com/loopers/infrastructure/outbox/OutboxRelayTest.java (1)
ExtendWith(27-102)
apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventHandler.java (3)
apps/commerce-api/src/main/java/com/loopers/domain/like/event/ProductLikedEvent.java (1)
ProductLikedEvent(5-42)apps/commerce-api/src/main/java/com/loopers/domain/order/event/OrderCompletedEvent.java (1)
OrderCompletedEvent(7-52)apps/commerce-api/src/main/java/com/loopers/domain/product/event/StockDepletedEvent.java (1)
StockDepletedEvent(5-26)
apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxRelay.java (2)
apps/commerce-api/src/main/java/com/loopers/infrastructure/consumer/CatalogEventConsumer.java (1)
Slf4j(23-108)apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventHandler.java (1)
Component(13-63)
๐ Additional comments (26)
modules/kafka/src/main/resources/kafka.yml (2)
17-18: ํ๋ก๋์ ์์ ์ฑ ์ค์ ์ด ์ ์ ํฉ๋๋ค.
acks: all๊ณผretries: 3์ค์ ์ ๋ฉ์์ง ์์ค ๋ฐฉ์ง๋ฅผ ์ํ ์ ์ ํ ๊ตฌ์ฑ์ ๋๋ค. idempotence์ ํจ๊ป ์ฌ์ฉํ๋ฉด ์ ํํ ํ ๋ฒ(exactly-once) ์ ์ก ์๋งจํฑ์ ๋ณด์ฅํฉ๋๋ค.
24-24: ByteArrayDeserializer ์ค์ ์ ByteArrayJsonMessageConverter์ ํจ๊ป ์๋ํ๋ฉฐ ์๋์ผ๋ก JSON ์ญ์ง๋ ฌํ๋ฅผ ์ฒ๋ฆฌํฉ๋๋ค. ์ปจ์๋จธ๋ ์์ ๋ฐ์ดํธ ๋ฐฐ์ด์ด ์๋ String ํํ๋ก ๋ฉ์์ง๋ฅผ ์์ ํ๋ฏ๋ก ์ถ๊ฐ ์ค์ ์ด ๋ถํ์ํฉ๋๋ค.Likely an incorrect or invalid review comment.
apps/commerce-api/src/test/resources/application.yml (1)
8-8: LGTM!ํ ์คํธ ํ๊ฒฝ์ Kafka ์ค์ ์ ์ํฌํธํ๋ ๊ฒ์ด ์ ์ ํฉ๋๋ค.
apps/commerce-api/src/main/resources/application.yml (1)
24-24: LGTM!๋ฉ์ธ ์ ํ๋ฆฌ์ผ์ด์ ์ค์ ์ Kafka ์ค์ ์ ์ํฌํธํ๋ ๊ฒ์ด ์ ์ ํฉ๋๋ค.
apps/commerce-api/build.gradle.kts (1)
5-5: LGTM!Kafka ๋ชจ๋ ์์กด์ฑ ์ถ๊ฐ๊ฐ ์ ์ ํ๋ฉฐ, ํ๋ก์ ํธ ๊ตฌ์กฐ์ ์ผ๊ด์ฑ์ด ์์ต๋๋ค.
apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxStatus.java (1)
3-6: LGTM!Outbox ํจํด์ ์๋ช ์ฃผ๊ธฐ๋ฅผ ํํํ๋ ๊ฐ๊ฒฐํ๊ณ ๋ช ํํ enum ์ ์์ ๋๋ค. PENDING๊ณผ PROCESSED ๋ ๊ฐ์ง ์ํ๋ก transactional outbox ํจํด์ ์ถฉ๋ถํ ๊ตฌํํ ์ ์์ต๋๋ค.
apps/commerce-api/src/main/java/com/loopers/domain/product/ProductDomainService.java (1)
3-3: LGTM!ApplicationEventPublisher๋ฅผ ์ฌ๋ฐ๋ฅด๊ฒ ์ฃผ์ ํ์ฌ ๋๋ฉ์ธ ์ด๋ฒคํธ๋ฅผ ๋ฐํํ ์ ์๋๋ก ๊ตฌ์ฑํ์ต๋๋ค.
Also applies to: 8-8, 20-20
apps/commerce-api/src/test/java/com/loopers/infrastructure/outbox/OutboxTest.java (1)
12-64: LGTM!Outbox ์ํฐํฐ์ ์๋ช ์ฃผ๊ธฐ๋ฅผ ํฌ๊ด์ ์ผ๋ก ๊ฒ์ฆํ๋ ์ฐ์ํ ํ ์คํธ์ ๋๋ค:
- ์์ฑ ์ PENDING ์ํ ์ด๊ธฐํ
- markProcessed() ํธ์ถ ์ PROCESSED ์ํ ์ ํ ๋ฐ processedAt ํ์์คํฌํ ์ค์
- ๋ฉฑ๋ฑ์ฑ ๋ณด์ฅ: ์ค๋ณต markProcessed() ํธ์ถ ์์๋ ์ต์ด processedAt ์ ์ง
Transactional outbox ํจํด์ ํต์ฌ ๋์์ ์ ๊ฒ์ฆํ๊ณ ์์ต๋๋ค.
apps/commerce-api/src/test/java/com/loopers/infrastructure/outbox/OutboxEventHandlerTest.java (1)
29-53: LGTM!ProductLikedEvent๊ฐ Outbox์ ์ฌ๋ฐ๋ฅด๊ฒ ์ ์ฅ๋๋์ง ๊ฒ์ฆํ๋ ์ ์์ฑ๋ ํ ์คํธ์ ๋๋ค. ObjectMapper ์ค์ (JavaTimeModule, WRITE_DATES_AS_TIMESTAMPS ๋นํ์ฑํ)์ด ํ๋ก๋์ ํ๊ฒฝ๊ณผ ์ผ์นํ๋ฉฐ, ๋ชจ๋ ์ค์ ํ๋(aggregateType, aggregateId, eventType, topic, status)๋ฅผ ์ ์ ํ ๊ฒ์ฆํฉ๋๋ค.
apps/commerce-api/src/main/java/com/loopers/infrastructure/metrics/ProductMetricsRepository.java (1)
7-10: LGTM!ProductMetrics๋ฅผ productId๋ก ์กฐํํ๋ ๊ฐ๊ฒฐํ๊ณ ๋ช ํํ ๋ฆฌํฌ์งํ ๋ฆฌ ์ธํฐํ์ด์ค์ ๋๋ค. Spring Data JPA์ ๋ฉ์๋ ๋ค์ด๋ฐ ์ปจ๋ฒค์ ์ ์ ๋ฐ๋ฅด๊ณ ์์ต๋๋ค.
apps/commerce-api/src/main/java/com/loopers/infrastructure/idempotent/EventHandledRepository.java (1)
5-8: LGTM!์ด๋ฒคํธ ๋ฉฑ๋ฑ์ฑ ์ฒดํฌ๋ฅผ ์ํ ํจ์จ์ ์ธ ๋ฆฌํฌ์งํ ๋ฆฌ ์ธํฐํ์ด์ค์ ๋๋ค.
existsByEventId๋ฉ์๋๋ boolean ๋ฐํ์ผ๋ก ๋ถํ์ํ ์ํฐํฐ ๋ก๋ฉ ์์ด ์ค๋ณต ์ฒ๋ฆฌ๋ฅผ ๋ฐฉ์งํ ์ ์์ต๋๋ค.apps/commerce-api/src/test/java/com/loopers/infrastructure/outbox/OutboxRelayTest.java (4)
54-76: ํฌ๊ด์ ์ธ ์ฑ๊ณต ์ผ์ด์ค ํ ์คํธ์ ๋๋ค.PENDING ์ํ์ Outbox๊ฐ Kafka๋ก ๋ฐํ๋๊ณ PROCESSED๋ก ์ ํ๋๋ ์ ์ฒด ํ๋ฆ์ ์ ๊ฒ์ฆํฉ๋๋ค:
- Kafka ProducerRecord์ topic, key, value ๊ฒ์ฆ
- HEADER_OUTBOX_ID, HEADER_EVENT_TYPE ํค๋ ์กด์ฌ ํ์ธ
- ์ํ ์ ํ ๋ฐ ์ ์ฅ ํธ์ถ ๊ฒ์ฆ
Line 69์ partition key ํ์
"1:100"(aggregateId:userId)์ ์ด๋ฒคํธ ์์ ๋ณด์ฅ์ ์ํ ์ข์ ์ค๊ณ์ ๋๋ค.
78-87: LGTM!PENDING ์ํ์ Outbox๊ฐ ์์ ๋ ๋ถํ์ํ Kafka ํธ์ถ์ ํ์ง ์๋์ง ๊ฒ์ฆํ๋ ์ค์ํ ์ฃ์ง ์ผ์ด์ค์ ๋๋ค.
89-101: ์ ๋ขฐ์ฑ ๋ณด์ฅ์ ์ํ ์ค์ํ ์คํจ ์ผ์ด์ค ํ ์คํธ์ ๋๋ค.Kafka ๋ฐํ ์คํจ ์ Outbox ์ํ๋ฅผ PENDING์ผ๋ก ์ ์งํ์ฌ ๋ค์ relay ์ฌ์ดํด์์ ์ฌ์๋ํ ์ ์๋๋ก ๋ณด์ฅํฉ๋๋ค. At-least-once delivery ํจํด์ ํต์ฌ์ ์ธ ๋์์ ๊ฒ์ฆํฉ๋๋ค.
69-69: partition key ์์ฑ ๋ก์ง์ ์ฌ๋ฐ๋ฅด๊ฒ ๊ตฌํ๋์์ต๋๋ค.OutboxRelay์
buildPartitionKey()๋ฉ์๋๊ฐ ๋ค์ํ ์ด๋ฒคํธ ํ์ ์ payload ๊ตฌ์กฐ์์ partition key๋ฅผ ์ฌ๋ฐ๋ฅด๊ฒ ์ถ์ถํฉ๋๋ค:
- ์ ์ ํ๋ฆ: userId๊ฐ payload JSON์ ์กด์ฌํ๋ฉด
aggregateId:userIdํ์์ผ๋ก ๋ฐํ (์: "1:100")- StockDepletedEvent (userId ์์): userId๊ฐ null์ด๋ฉด aggregateId๋ง ๋ฐํ
- ํ์ฑ ์คํจ ์: ๋ชจ๋ ์์ธ๋ฅผ catchํ๊ณ aggregateId๋ก fallback
๊ตฌํ๋ fallback ๋ก์ง์ผ๋ก ์ธํด event type๊ณผ ๋ฌด๊ดํ๊ฒ ์์ ํ๊ฒ ์๋ํฉ๋๋ค.
apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxRepository.java (1)
8-11: LGTM!๋ฆฌํฌ์งํ ๋ฆฌ ์ธํฐํ์ด์ค๊ฐ ๊น๋ํ๊ฒ ๊ตฌํ๋์์ต๋๋ค. Spring Data JPA ์ฟผ๋ฆฌ ๋ฉ์๋ ๋ค์ด๋ฐ ๊ท์น์ ์ ๋ฐ๋ฅด๊ณ ์์ผ๋ฉฐ,
Pageableํ๋ผ๋ฏธํฐ๋ฅผ ํตํด OutboxRelay์์ ๋ฐฐ์น ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅํฉ๋๋ค.apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxEventHandler.java (1)
20-30: LGTM - Transactional Outbox ํจํด์ด ์ฌ๋ฐ๋ฅด๊ฒ ๊ตฌํ๋์์ต๋๋ค.
TransactionPhase.BEFORE_COMMIT์ ์ฌ์ฉํ์ฌ ๋น์ฆ๋์ค ํธ๋์ญ์ ๊ณผ Outbox ์ ์ฅ์ด ์์์ ์ผ๋ก ์ฒ๋ฆฌ๋ฉ๋๋ค. ์ด๋ ์ด๋ฒคํธ ์ ์ค์ ๋ฐฉ์งํ๋ ํต์ฌ ํจํด์ ๋๋ค.apps/commerce-api/src/test/java/com/loopers/infrastructure/metrics/ProductMetricsTest.java (1)
10-123: LGTM - ํฌ๊ด์ ์ธ ํ ์คํธ ์ผ์ด์ค๊ฐ ์ ์์ฑ๋์์ต๋๋ค!
ProductMetrics์ ํต์ฌ ๋์๋ค์ด ์ ํ ์คํธ๋์์ต๋๋ค:
- ์ด๊ธฐํ, ์ฆ๊ฐ/๊ฐ์, ๊ฒฝ๊ณ๊ฐ(0 ๋ฏธ๋ง ๋ฐฉ์ง)
- ์๊ฐ ๊ธฐ๋ฐ ์ด๋ฒคํธ ์์ ์ฒ๋ฆฌ (
updateLikeIfNewer)- ์ข์์/์ทจ์ ์๋๋ฆฌ์ค
ํนํ
updateLikeIfNewerTest2์์ ์ค๋๋ ์ด๋ฒคํธ ๋ฌด์ ๋ก์ง ๊ฒ์ฆ์ด ์ ๋์ด์์ต๋๋ค.apps/commerce-api/src/test/java/com/loopers/infrastructure/consumer/CatalogEventConsumerTest.java (1)
137-156: LGTM - ๋ฉฑ๋ฑ์ฑ ํ ์คํธ๊ฐ ์ ๊ตฌํ๋์์ต๋๋ค!๋์ผํ
eventId๋ก ์ค๋ณต ๋ฉ์์ง๊ฐ ์ ์ก๋ ๋ ํ ๋ฒ๋ง ์ฒ๋ฆฌ๋๊ณ , ์์ชฝ ๋ชจ๋ acknowledge๋๋ ๊ฒ์ ์ฌ๋ฐ๋ฅด๊ฒ ๊ฒ์ฆํ๊ณ ์์ต๋๋ค.apps/commerce-api/src/main/java/com/loopers/infrastructure/consumer/CatalogEventConsumer.java (1)
103-107: LGTM - ์ฌ๊ณ ์์ง ์ด๋ฒคํธ ์ฒ๋ฆฌ๊ฐ ์ ์ ํฉ๋๋ค.์บ์ ๋ฌดํจํ ๋ก์ง์ด ๋ช ํํ๊ณ , ์ํ ์์ธ ๋ฐ ๋ชฉ๋ก ์บ์๋ฅผ ๋ชจ๋ ๋ฌดํจํํ์ฌ ์ผ๊ด์ฑ์ ๋ณด์ฅํฉ๋๋ค.
apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxRelay.java (2)
29-54: ๋์ ์คํ ์ ์ค๋ณต ๋ฐํ ๊ฐ๋ฅ์ฑ ๊ฒํ ํ์
relay()๋ฉ์๋์@Transactional์ด ์์ด์, ์ฌ๋ฌ ์ธ์คํด์ค๊ฐ ๋์์ ์คํ๋ ๊ฒฝ์ฐ ๊ฐ์ PENDING ๋ ์ฝ๋๋ฅผ ์ค๋ณต์ผ๋ก ๊ฐ์ ธ์ ๋ฐํํ ์ ์์ต๋๋ค. Consumer ์ธก์์ idempotency ์ฒ๋ฆฌ(eventId ๊ธฐ๋ฐ)๊ฐ ๋์ด ์์ผ๋ฏ๋ก ๊ธฐ๋ฅ์ ๋ฌธ์ ๋ ์์ง๋ง, ๋ถํ์ํ ์ค๋ณต ๋ฐํ์ด ๋ฐ์ํ ์ ์์ต๋๋ค.๋ํ, ๋ฐํ ์คํจ ์ ์ฌ์๋ ํ์ ์ ํ์ด๋ DLQ(Dead Letter Queue) ์ฒ๋ฆฌ๊ฐ ์์ด์ ์๊ตฌ์ ์ผ๋ก ์คํจํ๋ ๋ฉ์์ง๊ฐ ๋ฌดํ ์ฌ์๋๋ ์ ์์ต๋๋ค.
PR objectives์์ "Operational concerns (DLQ, monitoring, traceId propagation) are acknowledged but currently only at a conceptual level"๋ก ์ธ๊ธ๋์ด ์์ผ๋ฏ๋ก, ํ์ฌ ๋ฒ์์์๋ ์ธ์ง๋ ์ ํ์ฌํญ์ผ๋ก ๋ณด์ ๋๋ค. ํฅํ ๊ฐ์ ์ ๋ค์ ์ฌํญ์ ๊ณ ๋ คํ์ธ์:
SELECT ... FOR UPDATE SKIP LOCKED๋ฅผ ์ฌ์ฉํ ๋ถ์ฐ ๋ฝ- ์ฌ์๋ ํ์ ํ๋ ์ถ๊ฐ ๋ฐ ์๊ณ๊ฐ ์ด๊ณผ ์ FAILED ์ํ๋ก ์ ํ
57-68: LGTM!ํํฐ์ ํค ์์ฑ ๋ก์ง์ด ์ ์ ํฉ๋๋ค. userId๊ฐ ์์ ๊ฒฝ์ฐ aggregateId์ ์กฐํฉํ์ฌ ์ฌ์ฉ์๋ณ ์์ ๋ณด์ฅ์ ๊ฐํํ๊ณ , ํ์ฑ ์คํจ ์ aggregateId๋ก ์์ ํ๊ฒ ํด๋ฐฑํฉ๋๋ค.
apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/Outbox.java (1)
49-59: LGTM!ํฉํ ๋ฆฌ ๋ฉ์๋์
markProcessed()๊ตฌํ์ด ์ ์ ํฉ๋๋ค. ํนํmarkProcessed()๊ฐ idempotentํ๊ฒ ๊ตฌํ๋์ด ์ค๋ณต ํธ์ถ์๋ ์์ ํฉ๋๋ค.apps/commerce-api/src/main/java/com/loopers/infrastructure/metrics/ProductMetrics.java (3)
57-68: ํ์์คํฌํ ๊ธฐ๋ฐ ๋ฉฑ๋ฑ์ฑ ์ฒ๋ฆฌ ์ ๊ตฌํ๋จ
updateLikeIfNewer๋ฉ์๋๊ฐ ์ด๋ฒคํธ ํ์์คํฌํ๋ฅผ ๋น๊ตํ์ฌ ์์๊ฐ ๋ค๋ฐ๋ ์ด๋ฒคํธ๋ฅผ ์ฌ๋ฐ๋ฅด๊ฒ ๋ฌด์ํฉ๋๋ค. ์ด๋ Kafka ํํฐ์ ๊ฐ ์์ ๋ณด์ฅ์ด ์ ๋๋ ์ํฉ์์ ํจ๊ณผ์ ์ธ ๋ฐฉ์ด ๋ก์ง์ ๋๋ค.
16-26: ์ํฐํฐ ๊ตฌ์กฐ ์ ์ ํจproductId์ unique constraint๊ฐ ์ ์ ํ ์ค์ ๋์ด ์๊ณ , ํ๋ ์ด๊ธฐํ๊ฐ ์ฌ๋ฐ๋ฆ ๋๋ค.
๋์์ฑ ์ ์ด๊ฐ ํ์ํ ๊ฒฝ์ฐ
@Versionํ๋ ์ถ๊ฐ๋ฅผ ๊ณ ๋ คํ ์ ์์ต๋๋ค.
42-55: Kafka ํํฐ์ ํค ์ ๋ต์ผ๋ก ์ธํ ๋์์ฑ ๋ณดํธ - ์ถ๊ฐ ๋ฝ ๋ถํ์
incrementLikeCount,decrementLikeCount,addOrder๋ฉ์๋๋ ์ง์ ์นด์ดํฐ๋ฅผ ์ฆ๊ฐํ์ง๋ง, ์ค์ ๋ก ๋์์ฑ ๋ฌธ์ ๋ ๋ฐ์ํ์ง ์์ต๋๋ค.OutboxRelay๋
aggregateId(productId)๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ํํฐ์ ํค๋ฅผ ์์ฑํ๋ฏ๋ก, ๋์ผํ productId์ ๋ํ ๋ชจ๋ ์ด๋ฒคํธ๋ ํญ์ ๋์ผํ Kafka ํํฐ์ ์ผ๋ก ๋ผ์ฐํ ๋ฉ๋๋ค. CatalogEventConsumer๋ ๋จ์ผgroupId="catalog-consumer"๋ฅผ ์ฌ์ฉํ๋ฏ๋ก, Kafka์ ์๋น์ ๊ทธ๋ฃน ๋ชจ๋ธ์ ์ํด ๊ฐ ํํฐ์ ์ ์ ํํ ํ๋์ ์๋น์ ์ธ์คํด์ค์ ์ํด ์์ฐจ์ ์ผ๋ก ์ฒ๋ฆฌ๋ฉ๋๋ค. ๋ฐ๋ผ์ ๊ฐ์ productId์ ProductMetrics ์ํฐํฐ๋ ๋์์ ์ ๋ฐ์ดํธ๋ ์ ์์ต๋๋ค.ProductMetrics๋ ๋น์ ๊ทํ๋ ๋ฉํธ๋ฆญ ํ ์ด๋ธ์ด๋ฉฐ, ์ค์ ์ข์์ ๋ฐ์ดํฐ์ ์๋ณธ์ธ Product ์ํฐํฐ๋ ๋น๊ด์ ๋ฝ์ผ๋ก ๋ณดํธ๋๊ณ ์์ต๋๋ค. ํ์ฌ ์ํคํ ์ฒ๋ ์๋์ ์ผ๋ก metrics์ ๋ํด ์ต์ข ์ผ๊ด์ฑ์ ํ์ฉํ๊ณ ์์ผ๋ฏ๋ก, ProductMetricsRepository์ ์ถ๊ฐ ๋ฝ์ ๊ตฌํํ ํ์๊ฐ ์์ต๋๋ค.
Likely an incorrect or invalid review comment.
| @KafkaListener(topics = "catalog-events", groupId = "catalog-consumer") | ||
| @Transactional | ||
| public void consume(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) { | ||
| String eventId = extractEventId(record); | ||
| if (eventId == null) { | ||
| log.warn("outbox-id ํค๋๊ฐ ์๋ ๋ฉ์์ง ์์ : {}", record); | ||
| acknowledgment.acknowledge(); | ||
| return; | ||
| } | ||
|
|
||
| if (eventHandledRepository.existsByEventId(eventId)) { | ||
| log.info("์ด๋ฏธ ์ฒ๋ฆฌ๋ ์ด๋ฒคํธ ๋ฌด์: eventId={}", eventId); | ||
| acknowledgment.acknowledge(); | ||
| return; | ||
| } | ||
|
|
||
| try { | ||
| String eventType = extractEventType(record); | ||
| processEvent(eventType, record.value()); | ||
| eventHandledRepository.save(EventHandled.create(eventId)); | ||
| acknowledgment.acknowledge(); | ||
| log.info("์ด๋ฒคํธ ์ฒ๋ฆฌ ์๋ฃ: eventId={}, eventType={}", eventId, eventType); | ||
| } catch (Exception e) { | ||
| log.error("์ด๋ฒคํธ ์ฒ๋ฆฌ ์คํจ, ์ฌ์ฒ๋ฆฌ ์์ : eventId={}", eventId, e); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ํธ๋์ญ์ ๊ณผ acknowledgment ์์์ ์ฃผ์๊ฐ ํ์ํฉ๋๋ค.
ํ์ฌ ๊ตฌํ์์ @Transactional ๋ด์์ acknowledgment.acknowledge()๊ฐ ํธ์ถ๋ฉ๋๋ค. ๋ง์ฝ acknowledge ํ ํธ๋์ญ์
์ปค๋ฐ ์ ์ ์์ธ๊ฐ ๋ฐ์ํ๋ฉด, ๋ฉ์์ง๋ ack๋์์ง๋ง DB์๋ ์ ์ฅ๋์ง ์์ ๋ฉ์์ง๊ฐ ์ ์ค๋ ์ ์์ต๋๋ค.
๋ํ, ๋์์ ๊ฐ์ ์ด๋ฒคํธ๋ฅผ ์ฒ๋ฆฌํ๋ ๊ฒฝ์ฐ existsByEventId ์ฒดํฌ์ save ์ฌ์ด์ race condition์ด ๋ฐ์ํ ์ ์์ต๋๋ค. EventHandled์ unique ์ ์ฝ์กฐ๊ฑด์ผ๋ก ์ธํด DataIntegrityViolationException์ด ๋ฐ์ํ๊ณ , ์ด๋ ์ฌ์ฒ๋ฆฌ๋ก ์ด์ด์ง๋๋ค (์์ ํ์ง๋ง ๋ช
์์ ์ฒ๋ฆฌ๊ฐ ์ข์ต๋๋ค).
๐ ๊ฐ์ ์ ์
@KafkaListener(topics = "catalog-events", groupId = "catalog-consumer")
@Transactional
public void consume(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
// ... existing code ...
try {
String eventType = extractEventType(record);
processEvent(eventType, record.value());
eventHandledRepository.save(EventHandled.create(eventId));
- acknowledgment.acknowledge();
log.info("์ด๋ฒคํธ ์ฒ๋ฆฌ ์๋ฃ: eventId={}, eventType={}", eventId, eventType);
+ } catch (DataIntegrityViolationException e) {
+ log.info("์ด๋ฏธ ๋ค๋ฅธ ์ธ์คํด์ค์์ ์ฒ๋ฆฌ ์ค์ธ ์ด๋ฒคํธ: eventId={}", eventId);
} catch (Exception e) {
log.error("์ด๋ฒคํธ ์ฒ๋ฆฌ ์คํจ, ์ฌ์ฒ๋ฆฌ ์์ : eventId={}", eventId, e);
+ throw e; // rethrow to prevent ack
}
+ acknowledgment.acknowledge();
}| private void processEvent(String eventType, String payload) { | ||
| try { | ||
| JsonNode node = objectMapper.readTree(payload); | ||
| Long productId = node.get("productId").asLong(); | ||
|
|
||
| switch (eventType) { | ||
| case "ProductLikedEvent" -> processProductLikedEvent(node, productId); | ||
| case "StockDepletedEvent" -> processStockDepletedEvent(productId); | ||
| default -> log.warn("์ ์ ์๋ ์ด๋ฒคํธ ํ์ : {}", eventType); | ||
| } | ||
| } catch (Exception e) { | ||
| throw new RuntimeException("์ด๋ฒคํธ ํ์ฑ ์คํจ", e); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ํ์ด๋ก๋ ํ์ฑ ์ null ์์ ์ฑ์ด ํ์ํฉ๋๋ค.
node.get() ๋ฉ์๋๋ ํ๋๊ฐ ์์ผ๋ฉด null์ ๋ฐํํฉ๋๋ค. ์๋ชป๋ ํ์์ ํ์ด๋ก๋๊ฐ ๋ค์ด์ค๋ฉด NullPointerException์ด ๋ฐ์ํ ์ ์์ต๋๋ค.
๐ null ์์ ์ฑ ๊ฐ์ ์ ์
private void processEvent(String eventType, String payload) {
try {
JsonNode node = objectMapper.readTree(payload);
- Long productId = node.get("productId").asLong();
+ JsonNode productIdNode = node.get("productId");
+ if (productIdNode == null || productIdNode.isNull()) {
+ log.warn("productId๊ฐ ์๋ ์ด๋ฒคํธ: {}", payload);
+ return;
+ }
+ Long productId = productIdNode.asLong();
switch (eventType) {
case "ProductLikedEvent" -> processProductLikedEvent(node, productId);๋๋ path() ๋ฉ์๋๋ฅผ ์ฌ์ฉํ๋ฉด MissingNode๋ฅผ ๋ฐํํ์ฌ ๋ ์์ ํฉ๋๋ค:
Long productId = node.path("productId").asLong(-1L);
if (productId == -1L) {
log.warn("productId๊ฐ ์๋ ์ด๋ฒคํธ: {}", payload);
return;
}๐ค Prompt for AI Agents
In
apps/commerce-api/src/main/java/com/loopers/infrastructure/consumer/CatalogEventConsumer.java
around lines 76-89, the payload parsing uses node.get(...) which can return null
and cause NPEs; replace node.get("productId").asLong() with
node.path("productId").asLong(-1L), check for the sentinel (-1L) and log a
warning with the payload and return early if missing/invalid, and apply the same
path()/default-check pattern for any other potentially missing fields before
processing; keep the existing exception handling for unexpected errors.
| private void processProductLikedEvent(JsonNode node, Long productId) { | ||
| boolean liked = node.get("liked").asBoolean(); | ||
| LocalDateTime occurredAt = LocalDateTime.parse(node.get("occurredAt").asText()); | ||
|
|
||
| ProductMetrics metrics = productMetricsRepository.findByProductId(productId) | ||
| .orElseGet(() -> ProductMetrics.create(productId)); | ||
|
|
||
| if (metrics.updateLikeIfNewer(liked, occurredAt)) { | ||
| productMetricsRepository.save(metrics); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
processProductLikedEvent์์๋ null ์ฒดํฌ๊ฐ ํ์ํฉ๋๋ค.
liked์ occurredAt ํ๋ ์ ๊ทผ ์์๋ ๋์ผํ NPE ์ํ์ด ์์ต๋๋ค.
๐ null ์์ ์ฑ ๊ฐ์ ์ ์
private void processProductLikedEvent(JsonNode node, Long productId) {
- boolean liked = node.get("liked").asBoolean();
- LocalDateTime occurredAt = LocalDateTime.parse(node.get("occurredAt").asText());
+ JsonNode likedNode = node.get("liked");
+ JsonNode occurredAtNode = node.get("occurredAt");
+ if (likedNode == null || occurredAtNode == null) {
+ log.warn("ํ์ ํ๋ ๋๋ฝ - liked ๋๋ occurredAt: productId={}", productId);
+ return;
+ }
+ boolean liked = likedNode.asBoolean();
+ LocalDateTime occurredAt = LocalDateTime.parse(occurredAtNode.asText());
ProductMetrics metrics = productMetricsRepository.findByProductId(productId)
.orElseGet(() -> ProductMetrics.create(productId));๐ Committable suggestion
โผ๏ธ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| private void processProductLikedEvent(JsonNode node, Long productId) { | |
| boolean liked = node.get("liked").asBoolean(); | |
| LocalDateTime occurredAt = LocalDateTime.parse(node.get("occurredAt").asText()); | |
| ProductMetrics metrics = productMetricsRepository.findByProductId(productId) | |
| .orElseGet(() -> ProductMetrics.create(productId)); | |
| if (metrics.updateLikeIfNewer(liked, occurredAt)) { | |
| productMetricsRepository.save(metrics); | |
| } | |
| } | |
| private void processProductLikedEvent(JsonNode node, Long productId) { | |
| JsonNode likedNode = node.get("liked"); | |
| JsonNode occurredAtNode = node.get("occurredAt"); | |
| if (likedNode == null || occurredAtNode == null) { | |
| log.warn("ํ์ ํ๋ ๋๋ฝ - liked ๋๋ occurredAt: productId={}", productId); | |
| return; | |
| } | |
| boolean liked = likedNode.asBoolean(); | |
| LocalDateTime occurredAt = LocalDateTime.parse(occurredAtNode.asText()); | |
| ProductMetrics metrics = productMetricsRepository.findByProductId(productId) | |
| .orElseGet(() -> ProductMetrics.create(productId)); | |
| if (metrics.updateLikeIfNewer(liked, occurredAt)) { | |
| productMetricsRepository.save(metrics); | |
| } | |
| } |
๐ค Prompt for AI Agents
In
apps/commerce-api/src/main/java/com/loopers/infrastructure/consumer/CatalogEventConsumer.java
around lines 91 to 101, add null and type checks for node.get("liked") and
node.get("occurredAt") before accessing them: verify the "liked" field exists
and is a boolean (or use asBoolean(default) only if a sensible default applies),
verify the "occurredAt" field exists and is textual, wrap LocalDateTime.parse in
a try/catch to handle parse errors, and if any check fails log a warning and
return early instead of proceeding; then only call metrics.updateLikeIfNewer and
save when both values are validated and parsed successfully.
| zookeeper: | ||
| image: confluentinc/cp-zookeeper:7.5.0 | ||
| container_name: loopers-zookeeper | ||
| ports: | ||
| - "2181:2181" | ||
| environment: | ||
| ZOOKEEPER_CLIENT_PORT: 2181 | ||
| ZOOKEEPER_TICK_TIME: 2000 | ||
|
|
||
| kafka: | ||
| image: confluentinc/cp-kafka:7.5.0 | ||
| container_name: loopers-kafka | ||
| ports: | ||
| - "19092:9092" | ||
| environment: | ||
| KAFKA_BROKER_ID: 1 | ||
| KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 | ||
| KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:19092 | ||
| KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 | ||
| depends_on: | ||
| - zookeeper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confluent Platform 7.5.0์ ์ต์ ๋ฒ์ ์ด ์๋๋ฉฐ ๋ณด์ ํจ์น ์ ๋ฐ์ดํธ๊ฐ ํ์ํฉ๋๋ค.
Docker Compose ์ค์ ์ด ๋ก์ปฌ ๊ฐ๋ฐ ํ๊ฒฝ์ ์ ํฉํ๊ฒ ๊ตฌ์ฑ๋์ด ์์ต๋๋ค. ํฌํธ ๋งคํ๊ณผ ์์กด์ฑ ๊ด๊ณ๊ฐ ์ฌ๋ฐ๋ฅด๊ฒ ์ค์ ๋์์ต๋๋ค.
๋ค๋ง ํ์ฌ ์ต์ Confluent Platform ๋ฒ์ ์ 8.1์ด๋ฉฐ, 7.9.x๋ ์ต๊ทผ ์์ ํ ๋ฒ์ ์ ๋๋ค. ์ฌ์ฉ ์ค์ธ 7.5.0์ 2023๋ ๋ฒ์ ์ผ๋ก ์๋นํ ์ค๋๋ ๋ฒ์ ์ ๋๋ค. ๋ณด์ ๊ด์ ์์ ํญ์ ์ต์ ํจ์น ๋ฒ์ ์ ์ฌ์ฉํด์ผ ํฉ๋๋ค. ์ต์ํ 7.5.10๊ณผ ๊ฐ์ ์ต์ ํจ์น ๋ฒ์ ์ผ๋ก ์ ๊ทธ๋ ์ด๋ํ๊ฑฐ๋, ๋ ๋์ ๋ฐฉ๋ฒ์ 7.9.x ์ด์์ ์ต์ ๋ฒ์ ์ ์ฌ์ฉํ๋ ๊ฒ์ ๊ถ์ฅํฉ๋๋ค.
๐ค Prompt for AI Agents
In docker-compose.yml around lines 43 to 63, the Confluent images are pinned to
an outdated 7.5.0 tag; update the kafka and zookeeper image tags to a currently
supported patched version (at minimum the latest 7.5.x patch, e.g. 7.5.10) or
preferably to a more recent stable release such as 7.9.x or 8.1, then
rebuild/test the compose stack to ensure compatibility (adjust any
connector/config differences between versions if needed) and update any project
documentation/CI that references these image tags.
๐ Summary
event_handledํ ์ด๋ธ ๊ธฐ๋ฐ ๋ฉฑ๋ฑ ์ฒ๋ฆฌ๋ก ์ค๋ณต ๋ฐฉ์ง๐ฌ Review Points
Kafka ํ์ต ๊น์ด์ ๋ํ ์กฐ์ธ ๋ถํ๋๋ฆฝ๋๋ค
Kafka๋ฅผ ์ด๋ฒ์ ์ฒ์ ์ ํ๊ณ , Outbox ํจํด + Consumer ๋ฉฑ๋ฑ ์ฒ๋ฆฌ๊น์ง๋ ๊ตฌํํ์ต๋๋ค.
DLQ, ๋ชจ๋ํฐ๋ง, traceId ์ ํ ๊ฐ์ ์ด์ ๊ด์ ์ ์์ง ๊ฐ๋ ๋ง ์๋ ์ํ์ ๋๋ค.
ํ์ฌ ํ์ฌ์์๋ Kafka๋ฅผ ์ธ ์ผ์ด ์๋๋ฐ, ๋ฉด์ ๋๋น๋ก ์ง๊ธ ๋ ๊น์ด ๊ณต๋ถํด์ผ ํ ์ง ๊ณ ๋ฏผ์ ๋๋ค.
์๋๋ฉด ์ด์ง ํ ํ์ฌ์์ ์ฐ๊ฒ ๋๋ค๋ฉด ๊ทธ๋ ๋ ๊น์ด ๊ณต๋ถํด๋ ๊ด์ฐฎ์์ง ์กฐ์ธ ๋ถํ๋๋ฆฝ๋๋ค.
โ Checklist
๐พ Producer
โพ Consumer
event_handledํ ์ด๋ธ์ ํตํ ๋ฉฑ๋ฑ ์ฒ๋ฆฌ ๊ตฌํ๐ References
Summary by CodeRabbit
๋ฆด๋ฆฌ์ค ๋ ธํธ
์ ๊ธฐ๋ฅ
Chores
Tests
โ๏ธ Tip: You can customize this high-level summary in your review settings.