diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 601df04..ee760c8 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -11,6 +11,15 @@ akka { unhandled = on } } + http { + host-connection-pool { + # Increased from defaults (4/32) to handle burst traffic to ES + max-connections = 16 + max-connections = ${?AKKA_HTTP_MAX_CONNECTIONS} + max-open-requests = 128 + max-open-requests = ${?AKKA_HTTP_MAX_OPEN_REQUESTS} + } + } } dispatchers { emailDispatcher { diff --git a/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala b/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala index 46cdb97..17b22cc 100644 --- a/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala +++ b/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala @@ -3,26 +3,123 @@ package dpla.api.v2.search import akka.actor.typed.{ActorRef, ActorSystem, Behavior} import akka.actor.typed.scaladsl.{Behaviors, LoggerOps} import akka.http.scaladsl.Http -import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpMethods, HttpRequest, HttpResponse} -import dpla.api.v2.search.ElasticSearchResponseHandler.{ElasticSearchResponseHandlerCommand, ProcessElasticSearchResponse} +import akka.http.scaladsl.model.{ + ContentTypes, + HttpEntity, + HttpMethods, + HttpRequest, + HttpResponse +} +import dpla.api.v2.search.ElasticSearchResponseHandler.{ + ElasticSearchResponseHandlerCommand, + ProcessElasticSearchResponse +} import dpla.api.v2.search.SearchProtocol._ -import dpla.api.v2.search.paramValidators.{FetchParams, RandomParams, SearchParams} +import dpla.api.v2.search.paramValidators.{ + FetchParams, + RandomParams, + SearchParams +} import spray.json.JsValue -import scala.concurrent.Future +import java.util.concurrent.{Semaphore, TimeUnit} +import scala.concurrent.{ExecutionContext, Future} +import org.slf4j.LoggerFactory -/** - * Sends requests to Elastic Search. - */ +/** Sends requests to Elastic Search. + */ object ElasticSearchClient { + private val log = LoggerFactory.getLogger(getClass) + + // Default values for configuration + private val DefaultMaxConcurrentRequests: Int = 32 + private val DefaultSemaphoreTimeoutSeconds: Long = 5L + + // Concurrency limiter to prevent overwhelming the ES cluster and Akka HTTP pool. + // This caps the number of concurrent in-flight ES requests per API instance. + private val maxConcurrentEsRequests: Int = { + val envVar = "ES_MAX_CONCURRENT_REQUESTS" + sys.env.get(envVar) match { + case Some(value) => + value.toIntOption match { + case Some(parsed) if parsed > 0 => parsed + case Some(parsed) => + log.error( + s"Invalid value for $envVar: '$value' (must be positive integer). " + + s"Using default: $DefaultMaxConcurrentRequests" + ) + DefaultMaxConcurrentRequests + case None => + log.error( + s"Invalid value for $envVar: '$value' (not a valid integer). " + + s"Using default: $DefaultMaxConcurrentRequests" + ) + DefaultMaxConcurrentRequests + } + case None => DefaultMaxConcurrentRequests + } + } + private val semaphore = new Semaphore(maxConcurrentEsRequests) + + // Timeout for acquiring a semaphore permit (seconds). + // If exceeded, the request fails fast rather than blocking indefinitely. + // Keep this well below askTimeout (30s) to leave time for the actual ES query. + private val semaphoreTimeoutSeconds: Long = { + val envVar = "ES_SEMAPHORE_TIMEOUT_SECONDS" + sys.env.get(envVar) match { + case Some(value) => + value.toLongOption match { + case Some(parsed) if parsed > 0 => parsed + case Some(parsed) => + log.error( + s"Invalid value for $envVar: '$value' (must be positive integer). " + + s"Using default: $DefaultSemaphoreTimeoutSeconds" + ) + DefaultSemaphoreTimeoutSeconds + case None => + log.error( + s"Invalid value for $envVar: '$value' (not a valid integer). " + + s"Using default: $DefaultSemaphoreTimeoutSeconds" + ) + DefaultSemaphoreTimeoutSeconds + } + case None => DefaultSemaphoreTimeoutSeconds + } + } + + /** Wraps an ES request Future with concurrency limiting. Uses tryAcquire with + * timeout to avoid blocking actor threads indefinitely. Ensures permit is + * released even if Future construction fails. + */ + private def withConcurrencyLimit[T]( + f: => Future[T] + )(implicit ec: ExecutionContext): Future[T] = { + if (!semaphore.tryAcquire(semaphoreTimeoutSeconds, TimeUnit.SECONDS)) { + Future.failed( + new RuntimeException( + s"ES request rejected: concurrency limit ($maxConcurrentEsRequests) exceeded, " + + s"timed out after ${semaphoreTimeoutSeconds}s waiting for permit" + ) + ) + } else { + try { + val future = f + future.andThen { case _ => semaphore.release() }(ec) + } catch { + case e: Throwable => + semaphore.release() + Future.failed(e) + } + } + } + def apply( - endpoint: String, - nextPhase: ActorRef[IntermediateSearchResult] - ): Behavior[IntermediateSearchResult] = { + endpoint: String, + nextPhase: ActorRef[IntermediateSearchResult] + ): Behavior[IntermediateSearchResult] = { Behaviors.setup { context => - val responseHandler: ActorRef[ElasticSearchResponseHandlerCommand] = context.spawn( ElasticSearchResponseHandler(), @@ -33,28 +130,52 @@ object ElasticSearchClient { case SearchQuery(params, query, replyTo) => // Create a session child actor to process the request. - val sessionChildActor = processSearch(params, query, endpoint, - replyTo, responseHandler, nextPhase) + val sessionChildActor = processSearch( + params, + query, + endpoint, + replyTo, + responseHandler, + nextPhase + ) context.spawnAnonymous(sessionChildActor) Behaviors.same case FetchQuery(id, params, query, replyTo) => // Create a session child actor to process the request. - val sessionChildActor = processFetch(id, params, query, endpoint, - replyTo, responseHandler, nextPhase) + val sessionChildActor = processFetch( + id, + params, + query, + endpoint, + replyTo, + responseHandler, + nextPhase + ) context.spawnAnonymous(sessionChildActor) Behaviors.same case MultiFetchQuery(query, replyTo) => - val sessionChildActor = processMultiFetch(query, endpoint, replyTo, - responseHandler, nextPhase) + val sessionChildActor = processMultiFetch( + query, + endpoint, + replyTo, + responseHandler, + nextPhase + ) context.spawnAnonymous(sessionChildActor) Behaviors.same case RandomQuery(params, query, replyTo) => // Create a session child actor to process the request. - val sessionChildActor = processRandom(params, query, endpoint, - replyTo, responseHandler, nextPhase) + val sessionChildActor = processRandom( + params, + query, + endpoint, + replyTo, + responseHandler, + nextPhase + ) context.spawnAnonymous(sessionChildActor) Behaviors.same @@ -64,23 +185,22 @@ object ElasticSearchClient { } } - /** - * Per session actor behavior for handling a search request. - * The session actor has its own internal state and its own ActorRef for - * sending/receiving messages. - */ + /** Per session actor behavior for handling a search request. The session + * actor has its own internal state and its own ActorRef for + * sending/receiving messages. + */ private def processSearch( - params: SearchParams, - query: JsValue, - endpoint: String, - replyTo: ActorRef[SearchResponse], - responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], - nextPhase: ActorRef[IntermediateSearchResult] - ): Behavior[ElasticSearchResponse] = { + params: SearchParams, + query: JsValue, + endpoint: String, + replyTo: ActorRef[SearchResponse], + responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], + nextPhase: ActorRef[IntermediateSearchResult] + ): Behavior[ElasticSearchResponse] = { Behaviors.setup { context => - implicit val system: ActorSystem[Nothing] = context.system + implicit val ec: ExecutionContext = system.executionContext // Make an HTTP request to elastic search. val searchUri: String = s"$endpoint/_search" @@ -89,8 +209,9 @@ object ElasticSearchClient { uri = searchUri, entity = HttpEntity(ContentTypes.`application/json`, query.toString) ) - val futureResp: Future[HttpResponse] = + val futureResp: Future[HttpResponse] = withConcurrencyLimit { Http().singleRequest(request) + } context.log.info2( "ElasticSearch search QUERY: {}: {}", @@ -122,23 +243,24 @@ object ElasticSearchClient { } private def processFetch( - id: String, - params: Option[FetchParams], - query: Option[JsValue], - endpoint: String, - replyTo: ActorRef[SearchResponse], - responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], - nextPhase: ActorRef[IntermediateSearchResult] - ): Behavior[ElasticSearchResponse] = { + id: String, + params: Option[FetchParams], + query: Option[JsValue], + endpoint: String, + replyTo: ActorRef[SearchResponse], + responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], + nextPhase: ActorRef[IntermediateSearchResult] + ): Behavior[ElasticSearchResponse] = { Behaviors.setup { context => - implicit val system: ActorSystem[Nothing] = context.system + implicit val ec: ExecutionContext = system.executionContext // Make an HTTP request to elastic search. val fetchUri = s"$endpoint/_doc/$id" - val futureResp: Future[HttpResponse] = + val futureResp: Future[HttpResponse] = withConcurrencyLimit { Http().singleRequest(HttpRequest(uri = fetchUri)) + } context.log.info("ElasticSearch fetch QUERY: {}", fetchUri) @@ -168,22 +290,21 @@ object ElasticSearchClient { } } - /** - * Per session actor behavior for handling a multi-fetch request. - * The session actor has its own internal state and its own ActorRef for - * sending/receiving messages. - */ + /** Per session actor behavior for handling a multi-fetch request. The session + * actor has its own internal state and its own ActorRef for + * sending/receiving messages. + */ private def processMultiFetch( - query: JsValue, - endpoint: String, - replyTo: ActorRef[SearchResponse], - responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], - nextPhase: ActorRef[IntermediateSearchResult] - ): Behavior[ElasticSearchResponse] = { + query: JsValue, + endpoint: String, + replyTo: ActorRef[SearchResponse], + responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], + nextPhase: ActorRef[IntermediateSearchResult] + ): Behavior[ElasticSearchResponse] = { Behaviors.setup { context => - implicit val system: ActorSystem[Nothing] = context.system + implicit val ec: ExecutionContext = system.executionContext // Make an HTTP request to elastic search. val searchUri: String = s"$endpoint/_search" @@ -192,8 +313,9 @@ object ElasticSearchClient { uri = searchUri, entity = HttpEntity(ContentTypes.`application/json`, query.toString) ) - val futureResp: Future[HttpResponse] = + val futureResp: Future[HttpResponse] = withConcurrencyLimit { Http().singleRequest(request) + } context.log.info2( "ElasticSearch multi-fetch QUERY: {}: {}", @@ -225,17 +347,17 @@ object ElasticSearchClient { } private def processRandom( - params: RandomParams, - query: JsValue, - endpoint: String, - replyTo: ActorRef[SearchResponse], - responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], - nextPhase: ActorRef[IntermediateSearchResult] - ): Behavior[ElasticSearchResponse] = { + params: RandomParams, + query: JsValue, + endpoint: String, + replyTo: ActorRef[SearchResponse], + responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], + nextPhase: ActorRef[IntermediateSearchResult] + ): Behavior[ElasticSearchResponse] = { Behaviors.setup { context => - implicit val system: ActorSystem[Nothing] = context.system + implicit val ec: ExecutionContext = system.executionContext // Make an HTTP request to elastic search. val searchUri: String = s"$endpoint/_search" @@ -244,8 +366,9 @@ object ElasticSearchClient { uri = searchUri, entity = HttpEntity(ContentTypes.`application/json`, query.toString) ) - val futureResp: Future[HttpResponse] = + val futureResp: Future[HttpResponse] = withConcurrencyLimit { Http().singleRequest(request) + } context.log.info2( "ElasticSearch random QUERY: {}: {}",