Add Akka circuit breaker for ElasticSearch calls#105
Conversation
After maxFailures consecutive call timeouts, the breaker opens and immediately returns ElasticSearchResponseFailure for resetTimeout, giving ES breathing room to recover without continued request pressure. All three parameters are overrideable via env vars for production tuning. Resurrects #34 (shelved 2022 for performance testing), ported to the current dpla.api.v2 package structure. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
WalkthroughAdds an Akka CircuitBreaker for ElasticSearch HTTP calls, a new Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant ESClient as ElasticSearchClient
participant Breaker as CircuitBreaker
participant HTTP as Akka-HTTP
participant ES as ElasticSearch
Client->>ESClient: send search request
ESClient->>Breaker: withCircuitBreaker { perform request }
Breaker->>HTTP: singleRequest(...)
HTTP->>ES: HTTP/REST call
ES-->>HTTP: response
HTTP-->>Breaker: Future completes
Breaker-->>ESClient: Future result or short-circuit failure
ESClient-->>Client: ElasticSearchResponse
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/main/scala/dpla/api/v2/search/ElasticSearchResponseHandler.scala (1)
75-82:⚠️ Potential issue | 🟠 MajorCircuit breaker wraps an already-started Future, preventing it from protecting ElasticSearch.
The
futureHttpResponsepassed inProcessElasticSearchResponseis initiated inElasticSearchClient(lines 212-223) viaHttp().singleRequest(request)insidewithConcurrencyLimit. By the time this Future reaches the handler, the HTTP request is already in-flight and the semaphore permit is already held.In the handler (line 76),
breaker.withCircuitBreaker(futureHttpResponse)receives an already-evaluated Future. AlthoughCircuitBreaker.withCircuitBreakeruses call-by-name semantics (body: => Future[T]), it can only prevent execution of code that hasn't run yet. Once the Future is evaluated and the HTTP call is underway, the circuit breaker can only:
- Fail the Future immediately if the breaker is open (but the request is already sent to ES)
- Track the timeout/success for future decisions
This means ES still receives and processes requests even when the circuit breaker is open, defeating its core purpose: reducing pressure on a struggling cluster.
To properly protect ES, the circuit breaker must wrap the
Http().singleRequest()call directly inElasticSearchClient, preventing the HTTP request from being initiated when the breaker is open. This requires either making the breaker accessible from the client or restructuring the code so the HTTP call happens inside the handler after the circuit breaker check.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/main/scala/dpla/api/v2/search/ElasticSearchResponseHandler.scala` around lines 75 - 82, The circuit breaker is being applied to an already-started Future in ProcessElasticSearchResponse, so it cannot prevent the HTTP request from being sent; change the design so the breaker wraps the actual Http().singleRequest(...) call inside ElasticSearchClient (where the Future is created in withConcurrencyLimit) instead of wrapping futureHttpResponse in ElasticSearchResponseHandler; either (a) make the CircuitBreaker instance available to ElasticSearchClient and call breaker.withCircuitBreaker around Http().singleRequest(request) before returning the Future, or (b) move the HTTP invocation into the handler so ProcessElasticSearchResponse executes breaker.withCircuitBreaker { Http().singleRequest(request) } directly, ensuring the request is not initiated when the breaker is open. Ensure references: ProcessElasticSearchResponse, ElasticSearchClient, withConcurrencyLimit, Http().singleRequest, and breaker.withCircuitBreaker are updated accordingly.
🧹 Nitpick comments (1)
src/main/scala/dpla/api/v2/search/ElasticSearchResponseHandler.scala (1)
119-127: Consider distinguishing circuit breaker rejections in logs.When the breaker is open, the error will be a
CircuitBreakerOpenException. Logging this the same as actual ES failures may cause confusion during incident response. Consider logging breaker rejections at a different level or with distinct messaging.💡 Suggested improvement
case ReturnFinalResponse(response, replyTo, error) => // Log error if there is one error match { + case Some(_: akka.pattern.CircuitBreakerOpenException) => + context.log.warn("Request rejected: ElasticSearch circuit breaker is open") case Some(e) => context.log.error( "Failed to process ElasticSearch response:", e ) case None => // no-op }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/main/scala/dpla/api/v2/search/ElasticSearchResponseHandler.scala` around lines 119 - 127, Logs currently treat all errors in the ReturnFinalResponse handler the same; update the error handling in ElasticSearchResponseHandler so that when error is a CircuitBreakerOpenException (or matches its fully qualified type) you log a distinct message and/or lower severity (e.g., context.log.warn or context.log.info) indicating a circuit-breaker rejection, otherwise keep the existing context.log.error for real ES failures; locate the match on error in the case ReturnFinalResponse(response, replyTo, error) and branch on error.get (or pattern-match Some(e: CircuitBreakerOpenException) vs Some(e)) to implement the different messages and levels.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/main/scala/dpla/api/v2/search/ElasticSearchResponseHandler.scala`:
- Around line 75-82: The circuit breaker is being applied to an already-started
Future in ProcessElasticSearchResponse, so it cannot prevent the HTTP request
from being sent; change the design so the breaker wraps the actual
Http().singleRequest(...) call inside ElasticSearchClient (where the Future is
created in withConcurrencyLimit) instead of wrapping futureHttpResponse in
ElasticSearchResponseHandler; either (a) make the CircuitBreaker instance
available to ElasticSearchClient and call breaker.withCircuitBreaker around
Http().singleRequest(request) before returning the Future, or (b) move the HTTP
invocation into the handler so ProcessElasticSearchResponse executes
breaker.withCircuitBreaker { Http().singleRequest(request) } directly, ensuring
the request is not initiated when the breaker is open. Ensure references:
ProcessElasticSearchResponse, ElasticSearchClient, withConcurrencyLimit,
Http().singleRequest, and breaker.withCircuitBreaker are updated accordingly.
---
Nitpick comments:
In `@src/main/scala/dpla/api/v2/search/ElasticSearchResponseHandler.scala`:
- Around line 119-127: Logs currently treat all errors in the
ReturnFinalResponse handler the same; update the error handling in
ElasticSearchResponseHandler so that when error is a CircuitBreakerOpenException
(or matches its fully qualified type) you log a distinct message and/or lower
severity (e.g., context.log.warn or context.log.info) indicating a
circuit-breaker rejection, otherwise keep the existing context.log.error for
real ES failures; locate the match on error in the case
ReturnFinalResponse(response, replyTo, error) and branch on error.get (or
pattern-match Some(e: CircuitBreakerOpenException) vs Some(e)) to implement the
different messages and levels.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8c42f750-9bbe-4d65-ac46-51adb7600039
📒 Files selected for processing (2)
src/main/resources/application.confsrc/main/scala/dpla/api/v2/search/ElasticSearchResponseHandler.scala
Move CircuitBreaker creation from ElasticSearchResponseHandler to ElasticSearchClient so it wraps Http().singleRequest() directly inside withConcurrencyLimit. Previously the breaker wrapped an already-started Future, so it could not prevent requests from being sent to ES when open. Also distinguish CircuitBreakerOpenException in error log (warn vs error). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Fixed both issues: Major (outside-diff): Moved Nitpick: Added a |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
Summary
CircuitBreakerinElasticSearchResponseHandlermaxFailuresconsecutive call timeouts, the breaker opens and immediately returnsElasticSearchResponseFailureforresetTimeout, giving ES breathing room without continued request pressureConfiguration (with defaults)
ES_CIRCUIT_BREAKER_MAX_FAILURES3ES_CIRCUIT_BREAKER_CALL_TIMEOUT10sES_CIRCUIT_BREAKER_RESET_TIMEOUT20sBackground
Resurrects #34 (shelved in 2022 pending performance testing), ported to the current
dpla.api.v2package. Motivated by recurring ES flakiness under expensive bot-driven queries — the breaker prevents a struggling ES from being hammered further during recovery.The
callTimeoutis set to 10s (vs. the original 3s) to avoid tripping on legitimately slow-but-valid queries under normal load.Test plan
sbt test)ES_CIRCUIT_BREAKER_CALL_TIMEOUTif needed based on observed p99 ES latency🤖 Generated with Claude Code
Add Akka circuit breaker for ElasticSearch calls
Wraps ElasticSearch calls with an Akka CircuitBreaker to protect ES from cascading failures and reduce pressure during ES slowness/outages.
Changes
Configuration (src/main/resources/application.conf):
Implementation:
Concurrency behavior:
Environment Variables / Secrets
Deployment & Operational Notes
Testing
Impact Summary