Conversation
|
Caution Review failedThe pull request is closed. Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds Akka-HTTP host-connection-pool settings and a semaphore-based concurrency limiter around ElasticSearch HTTP requests, configurable via environment variables and applied to all internal ES request paths without changing public APIs. Changes
Sequence Diagram(s)mermaid Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Pre-merge checks and finishing touches❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR addresses production connection exhaustion issues by increasing Akka HTTP connection pool capacity and implementing semaphore-based concurrency limiting for Elasticsearch requests. The changes aim to prevent BufferOverflowException errors during traffic bursts by expanding pool capacity from 4 to 16 connections and max open requests from 32 to 128, while also capping concurrent ES requests at 32 per instance.
Key Changes:
- Increased Akka HTTP pool settings (
max-connections: 4→16,max-open-requests: 32→128) with environment variable overrides - Added Java Semaphore-based concurrency limiter wrapping all ES HTTP requests
- Made concurrency limit configurable via
ES_MAX_CONCURRENT_REQUESTSenvironment variable (default: 32)
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
src/main/resources/application.conf |
Configures increased Akka HTTP connection pool capacity with environment variable overrides |
src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala |
Implements semaphore-based concurrency limiting for all ES request types (search, fetch, multi-fetch, random) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (3)
src/main/resources/application.conf (1)
14-22: Akka HTTP pool config and env overrides look consistentThe new
akka.http.host-connection-poolblock follows the same “default then optional env override” pattern used elsewhere in this file, and the 16/128 defaults match the PR description. No issues from a config/syntax standpoint; just ensure ops tunesAKKA_HTTP_MAX_CONNECTIONS/AKKA_HTTP_MAX_OPEN_REQUESTSbased on ES and JVM capacity in each environment.src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala (2)
20-35: Semaphore limiter works but consider guarding synchronous failures and avoiding blocking on the dispatcherThe semaphore-based limiter is a simple way to cap in‑flight ES requests per JVM, but there are two caveats:
Blocking acquire on the default dispatcher
semaphore.acquire()blocks the calling thread. Under sustained overload, many session actors can end up blocked here, tying up dispatcher threads and reducing overall throughput. It’s not a correctness bug, but it does work against Akka’s non‑blocking design and can become a bottleneck at high QPS.No protection if
fthrows synchronously
If the by‑namefthrows before returning aFuture(even if unlikely forHttp().singleRequest), the permit is never released.You can make the helper more robust with minimal change by guarding
fand still keep the current design:private def withConcurrencyLimit[T](f: => Future[T]) (implicit ec: ExecutionContext): Future[T] = { - semaphore.acquire() - f.andThen { case _ => semaphore.release() }(ec) + semaphore.acquire() + val future = + try f + catch { + case scala.util.control.NonFatal(e) => + semaphore.release() + throw e + } + future.andThen { case _ => semaphore.release() }(ec) }Longer‑term, if this limiter starts engaging frequently, consider a non‑blocking approach (e.g., acquiring permits on a dedicated blocking dispatcher, or using an async/stream‑based limiter) to avoid occupying default dispatcher threads while waiting for capacity.
12-13: Consistent concurrency wiring around ES calls looks goodThe added imports, per‑session
implicit val ec = system.executionContext, and use ofwithConcurrencyLimit { Http().singleRequest(...) }in all four paths (search, fetch, multi‑fetch, random) are cohesive and keep ES concurrency control centralized in one helper. This keeps behavior unchanged from the caller’s perspective while giving you a single place to tune concurrency.If you find the
implicit ecboilerplate repetitive across theprocess*methods, you could later factor a small shared helper that takesActorContextand anHttpRequestand returns theFuture[HttpResponse]under the limiter, but that’s optional and not required for this fix.Also applies to: 101-103, 111-113, 157-158, 161-163, 209-210, 218-220, 263-264, 272-274
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/main/resources/application.conf(1 hunks)src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala(8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Agent
Parameter validation Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala(8 hunks)
🔇 Additional comments (5)
src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala (5)
12-13: LGTM!The imports are appropriate for the semaphore-based concurrency limiting feature.
20-35: Past review concerns addressed.The environment variable parsing now includes proper error handling for invalid numeric values and validation for positive integers. The implementation correctly addresses the concerns raised in previous reviews.
44-66: Past review concerns fully addressed.The
withConcurrencyLimitimplementation now correctly handles all previously flagged issues:
- Uses
tryAcquirewith timeout instead of blockingacquire(), preventing actor thread blocking- Includes try-catch to ensure permit release even if Future construction fails
- Provides clear error messaging when concurrency limit is exceeded
The implementation is robust and addresses concurrency concerns appropriately.
133-133: LGTM!The ExecutionContext is correctly added as an implicit parameter (required by
withConcurrencyLimit), and the ES HTTP request is properly wrapped with the concurrency limiter.Also applies to: 142-144
188-188: LGTM!The concurrency limiter is consistently applied across all ES request paths (fetch, multi-fetch, and random), with the necessary ExecutionContext implicit parameter added to each method. The implementation maintains consistency with the search request pattern.
Also applies to: 192-194, 240-240, 249-251, 294-294, 303-305
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala (1)
74-74: Fix error message type inconsistency.The error message states "must be positive integer" but
semaphoreTimeoutSecondsis declared asLong. This inconsistency was flagged in previous reviews but remains uncorrected.Apply this diff:
- s"Invalid value for $envVar: '$value' (must be positive integer). " + + s"Invalid value for $envVar: '$value' (must be positive long). " +Same issue exists at line 80:
- s"Invalid value for $envVar: '$value' (not a valid integer). " + + s"Invalid value for $envVar: '$value' (not a valid long). " +
🧹 Nitpick comments (1)
src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala (1)
90-92: Add test coverage for concurrency limiting functionality.The new concurrency limiting logic lacks test coverage. Given that this repository has comprehensive automated testing (as noted in past reviews), consider adding tests to verify:
- Permit acquisition and release under normal operation
- Timeout behavior when concurrency limit is exceeded
- Proper permit release on both successful and failed requests
- Configuration parsing with invalid/missing environment variables
- Behavior under concurrent load (stress testing)
While manual/production testing may suffice for Phase 1 remediation, automated tests would:
- Prevent regressions during Phase 2 query optimizations
- Verify the ConcurrencyLimiter implementation is correct
- Ensure consistent behavior across the codebase
Would you like me to generate a test skeleton for the ConcurrencyLimiter class once it's implemented?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala(9 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala (2)
src/main/scala/dpla/api/v2/search/SearchProtocol.scala (15)
search(61-64)search(66-70)search(72-75)search(77-80)search(82-86)search(88-91)search(93-97)search(99-104)search(106-109)search(111-115)search(117-121)search(123-127)search(129-132)search(134-138)SearchProtocol(8-139)src/main/scala/dpla/api/v2/search/ElasticSearchResponseHandler.scala (2)
ElasticSearchResponseHandler(29-120)ProcessElasticSearchResponse(33-36)
🪛 GitHub Actions: Scala CI
src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala
[error] 90-92: sbt clean coverage test coverageReport failed. Compilation errors: not found: type ConcurrencyLimiter; not found: value maxConcurrent; not found: value timeoutSeconds in ElasticSearchClient.scala (lines 90-92).
🔇 Additional comments (7)
src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala (7)
25-26: LGTM: Imports support concurrency changes.The
ExecutionContextimport is necessary for the implicitecparameters added to each handler method, and theLoggerFactoryimport supports the error logging added for configuration validation.
32-32: LGTM: Logger properly initialized.The SLF4J logger is correctly initialized and used appropriately for configuration validation errors.
40-61: LGTM: Configuration parsing is robust.The environment variable parsing correctly handles invalid inputs with safe fallbacks. The validation ensures positive values and provides clear error messages when falling back to defaults.
190-192: Verify ConcurrencyLimiter API supports this usage pattern.The code uses
concurrencyLimiter { ... }syntax, which requiresConcurrencyLimiterto either:
- Have an
applymethod:def apply[T](f: => Future[T]): Future[T]- Be assigned to a function value rather than a class instance
When implementing the missing
ConcurrencyLimiterclass, ensure its API supports this by-name parameter pattern. Based on past reviews, this should wrap the HTTP request with semaphore-based concurrency control.The implementation should handle:
- Acquiring permits with timeout (using
semaphoreTimeoutSeconds)- Releasing permits on both success and failure
- Avoiding permit leaks if Future construction fails
- Non-blocking operation to avoid starving Akka threads
181-181: LGTM: ExecutionContext properly added to all handlers.Each handler method correctly adds an implicit
ExecutionContextderived from the actor system. This is necessary for Future operations and follows Akka best practices by usingsystem.executionContext.Also applies to: 235-235, 285-285, 338-338
166-169: LGTM: Documentation improvements.The added Scaladoc comments correctly describe the per-session actor pattern, clarifying that each session has its own internal state and ActorRef.
Also applies to: 271-274
90-92: Ensure ConcurrencyLimiter addresses past review concerns.Previous reviews raised several critical concerns about the concurrency limiting implementation (semaphore blocking, permit leaks, thread starvation). Those details are no longer visible in this file, suggesting they were refactored into the missing
ConcurrencyLimiterclass.When providing the
ConcurrencyLimiterimplementation, ensure it addresses:
- Non-blocking operation: Avoid
acquire()which blocks Akka threads; usetryAcquire()with timeout- Permit leak prevention: Ensure permits are released even if Future construction fails, not just in
andThencallback- Thread safety: Properly handle concurrent access to the semaphore
- Timeout handling: Fail fast when
semaphoreTimeoutSecondsis exceeded rather than blocking indefinitely- Akka integration: Consider using Akka's native backpressure mechanisms or a dedicated blocking dispatcher
Past reviews suggested patterns like:
def apply[T](f: => Future[T])(implicit ec: ExecutionContext): Future[T] = { if (!semaphore.tryAcquire(timeoutSeconds, TimeUnit.SECONDS)) { Future.failed(new TimeoutException("ES concurrency limit exceeded")) } else { try { val future = f future.onComplete(_ => semaphore.release())(ec) future } catch { case e: Throwable => semaphore.release() Future.failed(e) } } }
Opus 4.5 work to try and fix part of the connection exhaustion through changes to AKKA and query concurrency.
PR: Increase Akka HTTP pool capacity and add ES request concurrency limiting
Problem
Production outages caused by
BufferOverflowExceptionwhen heavy search traffic overwhelms the Akka HTTP client pool. The default pool settings (max-connections=4,max-open-requests=32) are too small for burst traffic, causing requests to queue up and overflow even though Elasticsearch is healthy and responding (albeit with 300-700ms latency on complex queries).Solution
1. Increase Akka HTTP connection pool capacity (
application.conf)max-connections: 4 → 16max-open-requests: 32 → 1282. Add semaphore-based concurrency limiting (
ElasticSearchClient.scala)Http().singleRequest()callsConfiguration
All values are tunable via environment variables without code changes:
AKKA_HTTP_MAX_CONNECTIONSAKKA_HTTP_MAX_OPEN_REQUESTSES_MAX_CONCURRENT_REQUESTSRollback
akka.http.host-connection-poolblock fromapplication.confwithConcurrencyLimitwrapper, revert to directHttp().singleRequest()callsTesting
BufferOverflowExceptionRelated
This is Phase 1 of ES performance remediation. Phase 2 (query optimizations) will follow once production is stabilized.
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.