Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ akka {
unhandled = on
}
}
http {
host-connection-pool {
# Increased from defaults (4/32) to handle burst traffic to ES
max-connections = 16
max-connections = ${?AKKA_HTTP_MAX_CONNECTIONS}
max-open-requests = 128
max-open-requests = ${?AKKA_HTTP_MAX_OPEN_REQUESTS}
Comment thread
moltude marked this conversation as resolved.
}
}
}
dispatchers {
emailDispatcher {
Expand Down
253 changes: 188 additions & 65 deletions src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,123 @@ package dpla.api.v2.search
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.{Behaviors, LoggerOps}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpMethods, HttpRequest, HttpResponse}
import dpla.api.v2.search.ElasticSearchResponseHandler.{ElasticSearchResponseHandlerCommand, ProcessElasticSearchResponse}
import akka.http.scaladsl.model.{
ContentTypes,
HttpEntity,
HttpMethods,
HttpRequest,
HttpResponse
}
import dpla.api.v2.search.ElasticSearchResponseHandler.{
ElasticSearchResponseHandlerCommand,
ProcessElasticSearchResponse
}
import dpla.api.v2.search.SearchProtocol._
import dpla.api.v2.search.paramValidators.{FetchParams, RandomParams, SearchParams}
import dpla.api.v2.search.paramValidators.{
FetchParams,
RandomParams,
SearchParams
}
import spray.json.JsValue

import scala.concurrent.Future
import java.util.concurrent.{Semaphore, TimeUnit}
import scala.concurrent.{ExecutionContext, Future}
import org.slf4j.LoggerFactory

/**
* Sends requests to Elastic Search.
*/
/** Sends requests to Elastic Search.
*/
object ElasticSearchClient {

private val log = LoggerFactory.getLogger(getClass)

// Default values for configuration
private val DefaultMaxConcurrentRequests: Int = 32
private val DefaultSemaphoreTimeoutSeconds: Long = 5L

// Concurrency limiter to prevent overwhelming the ES cluster and Akka HTTP pool.
// This caps the number of concurrent in-flight ES requests per API instance.
private val maxConcurrentEsRequests: Int = {
val envVar = "ES_MAX_CONCURRENT_REQUESTS"
sys.env.get(envVar) match {
case Some(value) =>
value.toIntOption match {
case Some(parsed) if parsed > 0 => parsed
case Some(parsed) =>
log.error(
s"Invalid value for $envVar: '$value' (must be positive integer). " +
s"Using default: $DefaultMaxConcurrentRequests"
)
DefaultMaxConcurrentRequests
case None =>
log.error(
s"Invalid value for $envVar: '$value' (not a valid integer). " +
s"Using default: $DefaultMaxConcurrentRequests"
)
DefaultMaxConcurrentRequests
Comment thread
moltude marked this conversation as resolved.
}
case None => DefaultMaxConcurrentRequests
}
}
private val semaphore = new Semaphore(maxConcurrentEsRequests)

// Timeout for acquiring a semaphore permit (seconds).
// If exceeded, the request fails fast rather than blocking indefinitely.
// Keep this well below askTimeout (30s) to leave time for the actual ES query.
Comment thread
moltude marked this conversation as resolved.
private val semaphoreTimeoutSeconds: Long = {
val envVar = "ES_SEMAPHORE_TIMEOUT_SECONDS"
sys.env.get(envVar) match {
case Some(value) =>
value.toLongOption match {
case Some(parsed) if parsed > 0 => parsed
case Some(parsed) =>
log.error(
s"Invalid value for $envVar: '$value' (must be positive integer). " +
Comment thread
moltude marked this conversation as resolved.
s"Using default: $DefaultSemaphoreTimeoutSeconds"
)
DefaultSemaphoreTimeoutSeconds
case None =>
log.error(
s"Invalid value for $envVar: '$value' (not a valid integer). " +
Comment thread
moltude marked this conversation as resolved.
s"Using default: $DefaultSemaphoreTimeoutSeconds"
)
DefaultSemaphoreTimeoutSeconds
}
case None => DefaultSemaphoreTimeoutSeconds
}
}

/** Wraps an ES request Future with concurrency limiting. Uses tryAcquire with
* timeout to avoid blocking actor threads indefinitely. Ensures permit is
* released even if Future construction fails.
*/
private def withConcurrencyLimit[T](
f: => Future[T]
)(implicit ec: ExecutionContext): Future[T] = {
if (!semaphore.tryAcquire(semaphoreTimeoutSeconds, TimeUnit.SECONDS)) {
Future.failed(
new RuntimeException(
s"ES request rejected: concurrency limit ($maxConcurrentEsRequests) exceeded, " +
s"timed out after ${semaphoreTimeoutSeconds}s waiting for permit"
)
)
} else {
try {
val future = f
future.andThen { case _ => semaphore.release() }(ec)
} catch {
case e: Throwable =>
semaphore.release()
Future.failed(e)
}
}
}

def apply(
endpoint: String,
nextPhase: ActorRef[IntermediateSearchResult]
): Behavior[IntermediateSearchResult] = {
endpoint: String,
nextPhase: ActorRef[IntermediateSearchResult]
): Behavior[IntermediateSearchResult] = {

Behaviors.setup { context =>

val responseHandler: ActorRef[ElasticSearchResponseHandlerCommand] =
context.spawn(
ElasticSearchResponseHandler(),
Expand All @@ -33,28 +130,52 @@ object ElasticSearchClient {

case SearchQuery(params, query, replyTo) =>
// Create a session child actor to process the request.
val sessionChildActor = processSearch(params, query, endpoint,
replyTo, responseHandler, nextPhase)
val sessionChildActor = processSearch(
params,
query,
endpoint,
replyTo,
responseHandler,
nextPhase
)
context.spawnAnonymous(sessionChildActor)
Behaviors.same

case FetchQuery(id, params, query, replyTo) =>
// Create a session child actor to process the request.
val sessionChildActor = processFetch(id, params, query, endpoint,
replyTo, responseHandler, nextPhase)
val sessionChildActor = processFetch(
id,
params,
query,
endpoint,
replyTo,
responseHandler,
nextPhase
)
context.spawnAnonymous(sessionChildActor)
Behaviors.same

case MultiFetchQuery(query, replyTo) =>
val sessionChildActor = processMultiFetch(query, endpoint, replyTo,
responseHandler, nextPhase)
val sessionChildActor = processMultiFetch(
query,
endpoint,
replyTo,
responseHandler,
nextPhase
)
context.spawnAnonymous(sessionChildActor)
Behaviors.same

case RandomQuery(params, query, replyTo) =>
// Create a session child actor to process the request.
val sessionChildActor = processRandom(params, query, endpoint,
replyTo, responseHandler, nextPhase)
val sessionChildActor = processRandom(
params,
query,
endpoint,
replyTo,
responseHandler,
nextPhase
)
context.spawnAnonymous(sessionChildActor)
Behaviors.same

Expand All @@ -64,23 +185,22 @@ object ElasticSearchClient {
}
}

/**
* Per session actor behavior for handling a search request.
* The session actor has its own internal state and its own ActorRef for
* sending/receiving messages.
*/
/** Per session actor behavior for handling a search request. The session
* actor has its own internal state and its own ActorRef for
* sending/receiving messages.
*/
private def processSearch(
params: SearchParams,
query: JsValue,
endpoint: String,
replyTo: ActorRef[SearchResponse],
responseHandler: ActorRef[ElasticSearchResponseHandlerCommand],
nextPhase: ActorRef[IntermediateSearchResult]
): Behavior[ElasticSearchResponse] = {
params: SearchParams,
query: JsValue,
endpoint: String,
replyTo: ActorRef[SearchResponse],
responseHandler: ActorRef[ElasticSearchResponseHandlerCommand],
nextPhase: ActorRef[IntermediateSearchResult]
): Behavior[ElasticSearchResponse] = {

Behaviors.setup { context =>

implicit val system: ActorSystem[Nothing] = context.system
implicit val ec: ExecutionContext = system.executionContext

// Make an HTTP request to elastic search.
val searchUri: String = s"$endpoint/_search"
Expand All @@ -89,8 +209,9 @@ object ElasticSearchClient {
uri = searchUri,
entity = HttpEntity(ContentTypes.`application/json`, query.toString)
)
val futureResp: Future[HttpResponse] =
val futureResp: Future[HttpResponse] = withConcurrencyLimit {
Http().singleRequest(request)
}

context.log.info2(
"ElasticSearch search QUERY: {}: {}",
Expand Down Expand Up @@ -122,23 +243,24 @@ object ElasticSearchClient {
}

private def processFetch(
id: String,
params: Option[FetchParams],
query: Option[JsValue],
endpoint: String,
replyTo: ActorRef[SearchResponse],
responseHandler: ActorRef[ElasticSearchResponseHandlerCommand],
nextPhase: ActorRef[IntermediateSearchResult]
): Behavior[ElasticSearchResponse] = {
id: String,
params: Option[FetchParams],
query: Option[JsValue],
endpoint: String,
replyTo: ActorRef[SearchResponse],
responseHandler: ActorRef[ElasticSearchResponseHandlerCommand],
nextPhase: ActorRef[IntermediateSearchResult]
): Behavior[ElasticSearchResponse] = {

Behaviors.setup { context =>

implicit val system: ActorSystem[Nothing] = context.system
implicit val ec: ExecutionContext = system.executionContext

// Make an HTTP request to elastic search.
val fetchUri = s"$endpoint/_doc/$id"
val futureResp: Future[HttpResponse] =
val futureResp: Future[HttpResponse] = withConcurrencyLimit {
Http().singleRequest(HttpRequest(uri = fetchUri))
}

context.log.info("ElasticSearch fetch QUERY: {}", fetchUri)

Expand Down Expand Up @@ -168,22 +290,21 @@ object ElasticSearchClient {
}
}

/**
* Per session actor behavior for handling a multi-fetch request.
* The session actor has its own internal state and its own ActorRef for
* sending/receiving messages.
*/
/** Per session actor behavior for handling a multi-fetch request. The session
* actor has its own internal state and its own ActorRef for
* sending/receiving messages.
*/
private def processMultiFetch(
query: JsValue,
endpoint: String,
replyTo: ActorRef[SearchResponse],
responseHandler: ActorRef[ElasticSearchResponseHandlerCommand],
nextPhase: ActorRef[IntermediateSearchResult]
): Behavior[ElasticSearchResponse] = {
query: JsValue,
endpoint: String,
replyTo: ActorRef[SearchResponse],
responseHandler: ActorRef[ElasticSearchResponseHandlerCommand],
nextPhase: ActorRef[IntermediateSearchResult]
): Behavior[ElasticSearchResponse] = {

Behaviors.setup { context =>

implicit val system: ActorSystem[Nothing] = context.system
implicit val ec: ExecutionContext = system.executionContext

// Make an HTTP request to elastic search.
val searchUri: String = s"$endpoint/_search"
Expand All @@ -192,8 +313,9 @@ object ElasticSearchClient {
uri = searchUri,
entity = HttpEntity(ContentTypes.`application/json`, query.toString)
)
val futureResp: Future[HttpResponse] =
val futureResp: Future[HttpResponse] = withConcurrencyLimit {
Http().singleRequest(request)
}

context.log.info2(
"ElasticSearch multi-fetch QUERY: {}: {}",
Expand Down Expand Up @@ -225,17 +347,17 @@ object ElasticSearchClient {
}

private def processRandom(
params: RandomParams,
query: JsValue,
endpoint: String,
replyTo: ActorRef[SearchResponse],
responseHandler: ActorRef[ElasticSearchResponseHandlerCommand],
nextPhase: ActorRef[IntermediateSearchResult]
): Behavior[ElasticSearchResponse] = {
params: RandomParams,
query: JsValue,
endpoint: String,
replyTo: ActorRef[SearchResponse],
responseHandler: ActorRef[ElasticSearchResponseHandlerCommand],
nextPhase: ActorRef[IntermediateSearchResult]
): Behavior[ElasticSearchResponse] = {

Behaviors.setup { context =>

implicit val system: ActorSystem[Nothing] = context.system
implicit val ec: ExecutionContext = system.executionContext

// Make an HTTP request to elastic search.
val searchUri: String = s"$endpoint/_search"
Expand All @@ -244,8 +366,9 @@ object ElasticSearchClient {
uri = searchUri,
entity = HttpEntity(ContentTypes.`application/json`, query.toString)
)
val futureResp: Future[HttpResponse] =
val futureResp: Future[HttpResponse] = withConcurrencyLimit {
Http().singleRequest(request)
}

context.log.info2(
"ElasticSearch random QUERY: {}: {}",
Expand Down
Loading