From 02ac489b480a225d17b25e77bf049b28d481fd73 Mon Sep 17 00:00:00 2001 From: scott Date: Mon, 8 Dec 2025 12:44:48 -0700 Subject: [PATCH 1/6] Cursor API fix --- src/main/resources/application.conf | 9 +++++ .../api/v2/search/ElasticSearchClient.scala | 36 ++++++++++++++++--- 2 files changed, 40 insertions(+), 5 deletions(-) diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 601df04..ee760c8 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -11,6 +11,15 @@ akka { unhandled = on } } + http { + host-connection-pool { + # Increased from defaults (4/32) to handle burst traffic to ES + max-connections = 16 + max-connections = ${?AKKA_HTTP_MAX_CONNECTIONS} + max-open-requests = 128 + max-open-requests = ${?AKKA_HTTP_MAX_OPEN_REQUESTS} + } + } } dispatchers { emailDispatcher { diff --git a/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala b/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala index 46cdb97..6629cd8 100644 --- a/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala +++ b/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala @@ -9,13 +9,31 @@ import dpla.api.v2.search.SearchProtocol._ import dpla.api.v2.search.paramValidators.{FetchParams, RandomParams, SearchParams} import spray.json.JsValue -import scala.concurrent.Future +import java.util.concurrent.Semaphore +import scala.concurrent.{ExecutionContext, Future} /** * Sends requests to Elastic Search. */ object ElasticSearchClient { + // Concurrency limiter to prevent overwhelming the ES cluster and Akka HTTP pool. + // This caps the number of concurrent in-flight ES requests per API instance. + // Default 32 permits; can be tuned via environment variable. + private val maxConcurrentEsRequests: Int = + sys.env.getOrElse("ES_MAX_CONCURRENT_REQUESTS", "32").toInt + private val semaphore = new Semaphore(maxConcurrentEsRequests) + + /** + * Wraps an ES request Future with concurrency limiting. + * Acquires a permit before making the request and releases it when complete. + */ + private def withConcurrencyLimit[T](f: => Future[T]) + (implicit ec: ExecutionContext): Future[T] = { + semaphore.acquire() + f.andThen { case _ => semaphore.release() }(ec) + } + def apply( endpoint: String, nextPhase: ActorRef[IntermediateSearchResult] @@ -81,6 +99,7 @@ object ElasticSearchClient { Behaviors.setup { context => implicit val system: ActorSystem[Nothing] = context.system + implicit val ec: ExecutionContext = system.executionContext // Make an HTTP request to elastic search. val searchUri: String = s"$endpoint/_search" @@ -89,8 +108,9 @@ object ElasticSearchClient { uri = searchUri, entity = HttpEntity(ContentTypes.`application/json`, query.toString) ) - val futureResp: Future[HttpResponse] = + val futureResp: Future[HttpResponse] = withConcurrencyLimit { Http().singleRequest(request) + } context.log.info2( "ElasticSearch search QUERY: {}: {}", @@ -134,11 +154,13 @@ object ElasticSearchClient { Behaviors.setup { context => implicit val system: ActorSystem[Nothing] = context.system + implicit val ec: ExecutionContext = system.executionContext // Make an HTTP request to elastic search. val fetchUri = s"$endpoint/_doc/$id" - val futureResp: Future[HttpResponse] = + val futureResp: Future[HttpResponse] = withConcurrencyLimit { Http().singleRequest(HttpRequest(uri = fetchUri)) + } context.log.info("ElasticSearch fetch QUERY: {}", fetchUri) @@ -184,6 +206,7 @@ object ElasticSearchClient { Behaviors.setup { context => implicit val system: ActorSystem[Nothing] = context.system + implicit val ec: ExecutionContext = system.executionContext // Make an HTTP request to elastic search. val searchUri: String = s"$endpoint/_search" @@ -192,8 +215,9 @@ object ElasticSearchClient { uri = searchUri, entity = HttpEntity(ContentTypes.`application/json`, query.toString) ) - val futureResp: Future[HttpResponse] = + val futureResp: Future[HttpResponse] = withConcurrencyLimit { Http().singleRequest(request) + } context.log.info2( "ElasticSearch multi-fetch QUERY: {}: {}", @@ -236,6 +260,7 @@ object ElasticSearchClient { Behaviors.setup { context => implicit val system: ActorSystem[Nothing] = context.system + implicit val ec: ExecutionContext = system.executionContext // Make an HTTP request to elastic search. val searchUri: String = s"$endpoint/_search" @@ -244,8 +269,9 @@ object ElasticSearchClient { uri = searchUri, entity = HttpEntity(ContentTypes.`application/json`, query.toString) ) - val futureResp: Future[HttpResponse] = + val futureResp: Future[HttpResponse] = withConcurrencyLimit { Http().singleRequest(request) + } context.log.info2( "ElasticSearch random QUERY: {}: {}", From bdda195c495a1cefb7861fcea2d8e7d0548c9277 Mon Sep 17 00:00:00 2001 From: scott Date: Mon, 8 Dec 2025 13:03:01 -0700 Subject: [PATCH 2/6] fix bugs --- .../api/v2/search/ElasticSearchClient.scala | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala b/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala index 6629cd8..6d66ef2 100644 --- a/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala +++ b/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala @@ -9,7 +9,7 @@ import dpla.api.v2.search.SearchProtocol._ import dpla.api.v2.search.paramValidators.{FetchParams, RandomParams, SearchParams} import spray.json.JsValue -import java.util.concurrent.Semaphore +import java.util.concurrent.{Semaphore, TimeUnit} import scala.concurrent.{ExecutionContext, Future} /** @@ -24,14 +24,34 @@ object ElasticSearchClient { sys.env.getOrElse("ES_MAX_CONCURRENT_REQUESTS", "32").toInt private val semaphore = new Semaphore(maxConcurrentEsRequests) + // Timeout for acquiring a semaphore permit (seconds). + // If exceeded, the request fails fast rather than blocking indefinitely. + // Keep this well below askTimeout (30s) to leave time for the actual ES query. + private val semaphoreTimeoutSeconds: Long = + sys.env.getOrElse("ES_SEMAPHORE_TIMEOUT_SECONDS", "5").toLong + /** * Wraps an ES request Future with concurrency limiting. - * Acquires a permit before making the request and releases it when complete. + * Uses tryAcquire with timeout to avoid blocking actor threads indefinitely. + * Ensures permit is released even if Future construction fails. */ private def withConcurrencyLimit[T](f: => Future[T]) (implicit ec: ExecutionContext): Future[T] = { - semaphore.acquire() - f.andThen { case _ => semaphore.release() }(ec) + if (!semaphore.tryAcquire(semaphoreTimeoutSeconds, TimeUnit.SECONDS)) { + Future.failed(new RuntimeException( + s"ES request rejected: concurrency limit ($maxConcurrentEsRequests) exceeded, " + + s"timed out after ${semaphoreTimeoutSeconds}s waiting for permit" + )) + } else { + try { + val future = f + future.andThen { case _ => semaphore.release() }(ec) + } catch { + case e: Throwable => + semaphore.release() + Future.failed(e) + } + } } def apply( From 93ec8857d70e7bba5607f6880484f7b3f5368daf Mon Sep 17 00:00:00 2001 From: scott Date: Mon, 8 Dec 2025 13:04:49 -0700 Subject: [PATCH 3/6] Update src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala Parameter validation Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../dpla/api/v2/search/ElasticSearchClient.scala | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala b/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala index 6d66ef2..6e977a9 100644 --- a/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala +++ b/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala @@ -20,8 +20,19 @@ object ElasticSearchClient { // Concurrency limiter to prevent overwhelming the ES cluster and Akka HTTP pool. // This caps the number of concurrent in-flight ES requests per API instance. // Default 32 permits; can be tuned via environment variable. - private val maxConcurrentEsRequests: Int = - sys.env.getOrElse("ES_MAX_CONCURRENT_REQUESTS", "32").toInt + private val maxConcurrentEsRequests: Int = { + val value = sys.env.getOrElse("ES_MAX_CONCURRENT_REQUESTS", "32") + try { + val parsed = value.toInt + if (parsed <= 0) { + throw new IllegalArgumentException(s"ES_MAX_CONCURRENT_REQUESTS must be positive, got: $parsed") + } + parsed + } catch { + case _: NumberFormatException => + throw new IllegalArgumentException(s"ES_MAX_CONCURRENT_REQUESTS must be a valid integer, got: $value") + } + } private val semaphore = new Semaphore(maxConcurrentEsRequests) // Timeout for acquiring a semaphore permit (seconds). From b506ad526c7edb404cf75de69363d2c5eb9a83e9 Mon Sep 17 00:00:00 2001 From: scott Date: Mon, 8 Dec 2025 13:13:10 -0700 Subject: [PATCH 4/6] More input validation --- .../api/v2/search/ElasticSearchClient.scala | 236 +++++++++++------- 1 file changed, 151 insertions(+), 85 deletions(-) diff --git a/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala b/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala index 6e977a9..17b22cc 100644 --- a/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala +++ b/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala @@ -3,34 +3,61 @@ package dpla.api.v2.search import akka.actor.typed.{ActorRef, ActorSystem, Behavior} import akka.actor.typed.scaladsl.{Behaviors, LoggerOps} import akka.http.scaladsl.Http -import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpMethods, HttpRequest, HttpResponse} -import dpla.api.v2.search.ElasticSearchResponseHandler.{ElasticSearchResponseHandlerCommand, ProcessElasticSearchResponse} +import akka.http.scaladsl.model.{ + ContentTypes, + HttpEntity, + HttpMethods, + HttpRequest, + HttpResponse +} +import dpla.api.v2.search.ElasticSearchResponseHandler.{ + ElasticSearchResponseHandlerCommand, + ProcessElasticSearchResponse +} import dpla.api.v2.search.SearchProtocol._ -import dpla.api.v2.search.paramValidators.{FetchParams, RandomParams, SearchParams} +import dpla.api.v2.search.paramValidators.{ + FetchParams, + RandomParams, + SearchParams +} import spray.json.JsValue import java.util.concurrent.{Semaphore, TimeUnit} import scala.concurrent.{ExecutionContext, Future} +import org.slf4j.LoggerFactory -/** - * Sends requests to Elastic Search. - */ +/** Sends requests to Elastic Search. + */ object ElasticSearchClient { + private val log = LoggerFactory.getLogger(getClass) + + // Default values for configuration + private val DefaultMaxConcurrentRequests: Int = 32 + private val DefaultSemaphoreTimeoutSeconds: Long = 5L + // Concurrency limiter to prevent overwhelming the ES cluster and Akka HTTP pool. // This caps the number of concurrent in-flight ES requests per API instance. - // Default 32 permits; can be tuned via environment variable. private val maxConcurrentEsRequests: Int = { - val value = sys.env.getOrElse("ES_MAX_CONCURRENT_REQUESTS", "32") - try { - val parsed = value.toInt - if (parsed <= 0) { - throw new IllegalArgumentException(s"ES_MAX_CONCURRENT_REQUESTS must be positive, got: $parsed") - } - parsed - } catch { - case _: NumberFormatException => - throw new IllegalArgumentException(s"ES_MAX_CONCURRENT_REQUESTS must be a valid integer, got: $value") + val envVar = "ES_MAX_CONCURRENT_REQUESTS" + sys.env.get(envVar) match { + case Some(value) => + value.toIntOption match { + case Some(parsed) if parsed > 0 => parsed + case Some(parsed) => + log.error( + s"Invalid value for $envVar: '$value' (must be positive integer). " + + s"Using default: $DefaultMaxConcurrentRequests" + ) + DefaultMaxConcurrentRequests + case None => + log.error( + s"Invalid value for $envVar: '$value' (not a valid integer). " + + s"Using default: $DefaultMaxConcurrentRequests" + ) + DefaultMaxConcurrentRequests + } + case None => DefaultMaxConcurrentRequests } } private val semaphore = new Semaphore(maxConcurrentEsRequests) @@ -38,21 +65,43 @@ object ElasticSearchClient { // Timeout for acquiring a semaphore permit (seconds). // If exceeded, the request fails fast rather than blocking indefinitely. // Keep this well below askTimeout (30s) to leave time for the actual ES query. - private val semaphoreTimeoutSeconds: Long = - sys.env.getOrElse("ES_SEMAPHORE_TIMEOUT_SECONDS", "5").toLong - - /** - * Wraps an ES request Future with concurrency limiting. - * Uses tryAcquire with timeout to avoid blocking actor threads indefinitely. - * Ensures permit is released even if Future construction fails. - */ - private def withConcurrencyLimit[T](f: => Future[T]) - (implicit ec: ExecutionContext): Future[T] = { + private val semaphoreTimeoutSeconds: Long = { + val envVar = "ES_SEMAPHORE_TIMEOUT_SECONDS" + sys.env.get(envVar) match { + case Some(value) => + value.toLongOption match { + case Some(parsed) if parsed > 0 => parsed + case Some(parsed) => + log.error( + s"Invalid value for $envVar: '$value' (must be positive integer). " + + s"Using default: $DefaultSemaphoreTimeoutSeconds" + ) + DefaultSemaphoreTimeoutSeconds + case None => + log.error( + s"Invalid value for $envVar: '$value' (not a valid integer). " + + s"Using default: $DefaultSemaphoreTimeoutSeconds" + ) + DefaultSemaphoreTimeoutSeconds + } + case None => DefaultSemaphoreTimeoutSeconds + } + } + + /** Wraps an ES request Future with concurrency limiting. Uses tryAcquire with + * timeout to avoid blocking actor threads indefinitely. Ensures permit is + * released even if Future construction fails. + */ + private def withConcurrencyLimit[T]( + f: => Future[T] + )(implicit ec: ExecutionContext): Future[T] = { if (!semaphore.tryAcquire(semaphoreTimeoutSeconds, TimeUnit.SECONDS)) { - Future.failed(new RuntimeException( - s"ES request rejected: concurrency limit ($maxConcurrentEsRequests) exceeded, " + - s"timed out after ${semaphoreTimeoutSeconds}s waiting for permit" - )) + Future.failed( + new RuntimeException( + s"ES request rejected: concurrency limit ($maxConcurrentEsRequests) exceeded, " + + s"timed out after ${semaphoreTimeoutSeconds}s waiting for permit" + ) + ) } else { try { val future = f @@ -66,12 +115,11 @@ object ElasticSearchClient { } def apply( - endpoint: String, - nextPhase: ActorRef[IntermediateSearchResult] - ): Behavior[IntermediateSearchResult] = { + endpoint: String, + nextPhase: ActorRef[IntermediateSearchResult] + ): Behavior[IntermediateSearchResult] = { Behaviors.setup { context => - val responseHandler: ActorRef[ElasticSearchResponseHandlerCommand] = context.spawn( ElasticSearchResponseHandler(), @@ -82,28 +130,52 @@ object ElasticSearchClient { case SearchQuery(params, query, replyTo) => // Create a session child actor to process the request. - val sessionChildActor = processSearch(params, query, endpoint, - replyTo, responseHandler, nextPhase) + val sessionChildActor = processSearch( + params, + query, + endpoint, + replyTo, + responseHandler, + nextPhase + ) context.spawnAnonymous(sessionChildActor) Behaviors.same case FetchQuery(id, params, query, replyTo) => // Create a session child actor to process the request. - val sessionChildActor = processFetch(id, params, query, endpoint, - replyTo, responseHandler, nextPhase) + val sessionChildActor = processFetch( + id, + params, + query, + endpoint, + replyTo, + responseHandler, + nextPhase + ) context.spawnAnonymous(sessionChildActor) Behaviors.same case MultiFetchQuery(query, replyTo) => - val sessionChildActor = processMultiFetch(query, endpoint, replyTo, - responseHandler, nextPhase) + val sessionChildActor = processMultiFetch( + query, + endpoint, + replyTo, + responseHandler, + nextPhase + ) context.spawnAnonymous(sessionChildActor) Behaviors.same case RandomQuery(params, query, replyTo) => // Create a session child actor to process the request. - val sessionChildActor = processRandom(params, query, endpoint, - replyTo, responseHandler, nextPhase) + val sessionChildActor = processRandom( + params, + query, + endpoint, + replyTo, + responseHandler, + nextPhase + ) context.spawnAnonymous(sessionChildActor) Behaviors.same @@ -113,22 +185,20 @@ object ElasticSearchClient { } } - /** - * Per session actor behavior for handling a search request. - * The session actor has its own internal state and its own ActorRef for - * sending/receiving messages. - */ + /** Per session actor behavior for handling a search request. The session + * actor has its own internal state and its own ActorRef for + * sending/receiving messages. + */ private def processSearch( - params: SearchParams, - query: JsValue, - endpoint: String, - replyTo: ActorRef[SearchResponse], - responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], - nextPhase: ActorRef[IntermediateSearchResult] - ): Behavior[ElasticSearchResponse] = { + params: SearchParams, + query: JsValue, + endpoint: String, + replyTo: ActorRef[SearchResponse], + responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], + nextPhase: ActorRef[IntermediateSearchResult] + ): Behavior[ElasticSearchResponse] = { Behaviors.setup { context => - implicit val system: ActorSystem[Nothing] = context.system implicit val ec: ExecutionContext = system.executionContext @@ -173,17 +243,16 @@ object ElasticSearchClient { } private def processFetch( - id: String, - params: Option[FetchParams], - query: Option[JsValue], - endpoint: String, - replyTo: ActorRef[SearchResponse], - responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], - nextPhase: ActorRef[IntermediateSearchResult] - ): Behavior[ElasticSearchResponse] = { + id: String, + params: Option[FetchParams], + query: Option[JsValue], + endpoint: String, + replyTo: ActorRef[SearchResponse], + responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], + nextPhase: ActorRef[IntermediateSearchResult] + ): Behavior[ElasticSearchResponse] = { Behaviors.setup { context => - implicit val system: ActorSystem[Nothing] = context.system implicit val ec: ExecutionContext = system.executionContext @@ -221,21 +290,19 @@ object ElasticSearchClient { } } - /** - * Per session actor behavior for handling a multi-fetch request. - * The session actor has its own internal state and its own ActorRef for - * sending/receiving messages. - */ + /** Per session actor behavior for handling a multi-fetch request. The session + * actor has its own internal state and its own ActorRef for + * sending/receiving messages. + */ private def processMultiFetch( - query: JsValue, - endpoint: String, - replyTo: ActorRef[SearchResponse], - responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], - nextPhase: ActorRef[IntermediateSearchResult] - ): Behavior[ElasticSearchResponse] = { + query: JsValue, + endpoint: String, + replyTo: ActorRef[SearchResponse], + responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], + nextPhase: ActorRef[IntermediateSearchResult] + ): Behavior[ElasticSearchResponse] = { Behaviors.setup { context => - implicit val system: ActorSystem[Nothing] = context.system implicit val ec: ExecutionContext = system.executionContext @@ -280,16 +347,15 @@ object ElasticSearchClient { } private def processRandom( - params: RandomParams, - query: JsValue, - endpoint: String, - replyTo: ActorRef[SearchResponse], - responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], - nextPhase: ActorRef[IntermediateSearchResult] - ): Behavior[ElasticSearchResponse] = { + params: RandomParams, + query: JsValue, + endpoint: String, + replyTo: ActorRef[SearchResponse], + responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], + nextPhase: ActorRef[IntermediateSearchResult] + ): Behavior[ElasticSearchResponse] = { Behaviors.setup { context => - implicit val system: ActorSystem[Nothing] = context.system implicit val ec: ExecutionContext = system.executionContext From 939ef26df80d72659486c1f6f6579c1738c64f34 Mon Sep 17 00:00:00 2001 From: scott Date: Mon, 8 Dec 2025 13:50:47 -0700 Subject: [PATCH 5/6] More input validation --- .../api/v2/search/ElasticSearchClient.scala | 40 +++++-------------- 1 file changed, 9 insertions(+), 31 deletions(-) diff --git a/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala b/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala index 17b22cc..8a22d29 100644 --- a/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala +++ b/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala @@ -22,7 +22,6 @@ import dpla.api.v2.search.paramValidators.{ } import spray.json.JsValue -import java.util.concurrent.{Semaphore, TimeUnit} import scala.concurrent.{ExecutionContext, Future} import org.slf4j.LoggerFactory @@ -60,7 +59,6 @@ object ElasticSearchClient { case None => DefaultMaxConcurrentRequests } } - private val semaphore = new Semaphore(maxConcurrentEsRequests) // Timeout for acquiring a semaphore permit (seconds). // If exceeded, the request fails fast rather than blocking indefinitely. @@ -88,31 +86,11 @@ object ElasticSearchClient { } } - /** Wraps an ES request Future with concurrency limiting. Uses tryAcquire with - * timeout to avoid blocking actor threads indefinitely. Ensures permit is - * released even if Future construction fails. - */ - private def withConcurrencyLimit[T]( - f: => Future[T] - )(implicit ec: ExecutionContext): Future[T] = { - if (!semaphore.tryAcquire(semaphoreTimeoutSeconds, TimeUnit.SECONDS)) { - Future.failed( - new RuntimeException( - s"ES request rejected: concurrency limit ($maxConcurrentEsRequests) exceeded, " + - s"timed out after ${semaphoreTimeoutSeconds}s waiting for permit" - ) - ) - } else { - try { - val future = f - future.andThen { case _ => semaphore.release() }(ec) - } catch { - case e: Throwable => - semaphore.release() - Future.failed(e) - } - } - } + // Concurrency limiter instance used to wrap ES requests + private val concurrencyLimiter = new ConcurrencyLimiter( + maxConcurrent = maxConcurrentEsRequests, + timeoutSeconds = semaphoreTimeoutSeconds + ) def apply( endpoint: String, @@ -209,7 +187,7 @@ object ElasticSearchClient { uri = searchUri, entity = HttpEntity(ContentTypes.`application/json`, query.toString) ) - val futureResp: Future[HttpResponse] = withConcurrencyLimit { + val futureResp: Future[HttpResponse] = concurrencyLimiter { Http().singleRequest(request) } @@ -258,7 +236,7 @@ object ElasticSearchClient { // Make an HTTP request to elastic search. val fetchUri = s"$endpoint/_doc/$id" - val futureResp: Future[HttpResponse] = withConcurrencyLimit { + val futureResp: Future[HttpResponse] = concurrencyLimiter { Http().singleRequest(HttpRequest(uri = fetchUri)) } @@ -313,7 +291,7 @@ object ElasticSearchClient { uri = searchUri, entity = HttpEntity(ContentTypes.`application/json`, query.toString) ) - val futureResp: Future[HttpResponse] = withConcurrencyLimit { + val futureResp: Future[HttpResponse] = concurrencyLimiter { Http().singleRequest(request) } @@ -366,7 +344,7 @@ object ElasticSearchClient { uri = searchUri, entity = HttpEntity(ContentTypes.`application/json`, query.toString) ) - val futureResp: Future[HttpResponse] = withConcurrencyLimit { + val futureResp: Future[HttpResponse] = concurrencyLimiter { Http().singleRequest(request) } From 42aec0428cc0d69a2f8d6c0c272dc65738b29a61 Mon Sep 17 00:00:00 2001 From: scott Date: Mon, 8 Dec 2025 13:59:03 -0700 Subject: [PATCH 6/6] Fix broken test --- .../api/v2/search/ElasticSearchClient.scala | 40 ++++++++++++++----- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala b/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala index 8a22d29..17b22cc 100644 --- a/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala +++ b/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala @@ -22,6 +22,7 @@ import dpla.api.v2.search.paramValidators.{ } import spray.json.JsValue +import java.util.concurrent.{Semaphore, TimeUnit} import scala.concurrent.{ExecutionContext, Future} import org.slf4j.LoggerFactory @@ -59,6 +60,7 @@ object ElasticSearchClient { case None => DefaultMaxConcurrentRequests } } + private val semaphore = new Semaphore(maxConcurrentEsRequests) // Timeout for acquiring a semaphore permit (seconds). // If exceeded, the request fails fast rather than blocking indefinitely. @@ -86,11 +88,31 @@ object ElasticSearchClient { } } - // Concurrency limiter instance used to wrap ES requests - private val concurrencyLimiter = new ConcurrencyLimiter( - maxConcurrent = maxConcurrentEsRequests, - timeoutSeconds = semaphoreTimeoutSeconds - ) + /** Wraps an ES request Future with concurrency limiting. Uses tryAcquire with + * timeout to avoid blocking actor threads indefinitely. Ensures permit is + * released even if Future construction fails. + */ + private def withConcurrencyLimit[T]( + f: => Future[T] + )(implicit ec: ExecutionContext): Future[T] = { + if (!semaphore.tryAcquire(semaphoreTimeoutSeconds, TimeUnit.SECONDS)) { + Future.failed( + new RuntimeException( + s"ES request rejected: concurrency limit ($maxConcurrentEsRequests) exceeded, " + + s"timed out after ${semaphoreTimeoutSeconds}s waiting for permit" + ) + ) + } else { + try { + val future = f + future.andThen { case _ => semaphore.release() }(ec) + } catch { + case e: Throwable => + semaphore.release() + Future.failed(e) + } + } + } def apply( endpoint: String, @@ -187,7 +209,7 @@ object ElasticSearchClient { uri = searchUri, entity = HttpEntity(ContentTypes.`application/json`, query.toString) ) - val futureResp: Future[HttpResponse] = concurrencyLimiter { + val futureResp: Future[HttpResponse] = withConcurrencyLimit { Http().singleRequest(request) } @@ -236,7 +258,7 @@ object ElasticSearchClient { // Make an HTTP request to elastic search. val fetchUri = s"$endpoint/_doc/$id" - val futureResp: Future[HttpResponse] = concurrencyLimiter { + val futureResp: Future[HttpResponse] = withConcurrencyLimit { Http().singleRequest(HttpRequest(uri = fetchUri)) } @@ -291,7 +313,7 @@ object ElasticSearchClient { uri = searchUri, entity = HttpEntity(ContentTypes.`application/json`, query.toString) ) - val futureResp: Future[HttpResponse] = concurrencyLimiter { + val futureResp: Future[HttpResponse] = withConcurrencyLimit { Http().singleRequest(request) } @@ -344,7 +366,7 @@ object ElasticSearchClient { uri = searchUri, entity = HttpEntity(ContentTypes.`application/json`, query.toString) ) - val futureResp: Future[HttpResponse] = concurrencyLimiter { + val futureResp: Future[HttpResponse] = withConcurrencyLimit { Http().singleRequest(request) }