Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ elasticSearch {
itemsUrl = ${?ITEM_ELASTICSEARCH_URL}
pssUrl = ""
pssUrl = ${?PSS_ELASTICSEARCH_URL}
circuitBreaker {
# Trip after this many consecutive call timeouts
maxFailures = 3
maxFailures = ${?ES_CIRCUIT_BREAKER_MAX_FAILURES}
# How long a single ES call may take before counting as a failure
callTimeout = 10s
callTimeout = ${?ES_CIRCUIT_BREAKER_CALL_TIMEOUT}
# How long to keep the breaker open before attempting ES again
resetTimeout = 20s
resetTimeout = ${?ES_CIRCUIT_BREAKER_RESET_TIMEOUT}
}
}
awsSes {
emailFrom = "info@dp.la"
Expand Down
49 changes: 37 additions & 12 deletions src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dpla.api.v2.search

import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.{Behaviors, LoggerOps}
import akka.actor.typed.scaladsl.adapter.TypedSchedulerOps
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{
ContentTypes,
Expand All @@ -10,6 +11,7 @@ import akka.http.scaladsl.model.{
HttpRequest,
HttpResponse
}
import akka.pattern.CircuitBreaker
import dpla.api.v2.search.ElasticSearchResponseHandler.{
ElasticSearchResponseHandlerCommand,
ProcessElasticSearchResponse
Expand All @@ -23,7 +25,9 @@ import dpla.api.v2.search.paramValidators.{
import spray.json.JsValue

import java.util.concurrent.{Semaphore, TimeUnit}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.DurationConverters._
import org.slf4j.LoggerFactory

/** Sends requests to Elastic Search.
Expand Down Expand Up @@ -126,6 +130,19 @@ object ElasticSearchClient {
"ElasticSearchResponseHandler"
)

val cfg = context.system.settings.config.getConfig("elasticSearch.circuitBreaker")
val maxFailures: Int = cfg.getInt("maxFailures")
val callTimeout: FiniteDuration = cfg.getDuration("callTimeout").toScala.toCoarsest
val resetTimeout: FiniteDuration = cfg.getDuration("resetTimeout").toScala.toCoarsest
val breaker = CircuitBreaker(
scheduler = context.system.scheduler.toClassic,
maxFailures = maxFailures,
callTimeout = callTimeout,
resetTimeout = resetTimeout
).onOpen(() => context.log.warn("ElasticSearch circuit breaker opened"))
.onClose(() => context.log.info("ElasticSearch circuit breaker closed"))
.onHalfOpen(() => context.log.info("ElasticSearch circuit breaker half-open, testing ES"))

Behaviors.receiveMessage[IntermediateSearchResult] {

case SearchQuery(params, query, replyTo) =>
Expand All @@ -136,7 +153,8 @@ object ElasticSearchClient {
endpoint,
replyTo,
responseHandler,
nextPhase
nextPhase,
breaker
)
context.spawnAnonymous(sessionChildActor)
Behaviors.same
Expand All @@ -150,7 +168,8 @@ object ElasticSearchClient {
endpoint,
replyTo,
responseHandler,
nextPhase
nextPhase,
breaker
)
context.spawnAnonymous(sessionChildActor)
Behaviors.same
Expand All @@ -161,7 +180,8 @@ object ElasticSearchClient {
endpoint,
replyTo,
responseHandler,
nextPhase
nextPhase,
breaker
)
context.spawnAnonymous(sessionChildActor)
Behaviors.same
Expand All @@ -174,7 +194,8 @@ object ElasticSearchClient {
endpoint,
replyTo,
responseHandler,
nextPhase
nextPhase,
breaker
)
context.spawnAnonymous(sessionChildActor)
Behaviors.same
Expand All @@ -195,7 +216,8 @@ object ElasticSearchClient {
endpoint: String,
replyTo: ActorRef[SearchResponse],
responseHandler: ActorRef[ElasticSearchResponseHandlerCommand],
nextPhase: ActorRef[IntermediateSearchResult]
nextPhase: ActorRef[IntermediateSearchResult],
breaker: CircuitBreaker
): Behavior[ElasticSearchResponse] = {

Behaviors.setup { context =>
Expand All @@ -210,7 +232,7 @@ object ElasticSearchClient {
entity = HttpEntity(ContentTypes.`application/json`, query.toString)
)
val futureResp: Future[HttpResponse] = withConcurrencyLimit {
Http().singleRequest(request)
breaker.withCircuitBreaker { Http().singleRequest(request) }
}

context.log.info2(
Expand Down Expand Up @@ -252,7 +274,8 @@ object ElasticSearchClient {
endpoint: String,
replyTo: ActorRef[SearchResponse],
responseHandler: ActorRef[ElasticSearchResponseHandlerCommand],
nextPhase: ActorRef[IntermediateSearchResult]
nextPhase: ActorRef[IntermediateSearchResult],
breaker: CircuitBreaker
): Behavior[ElasticSearchResponse] = {

Behaviors.setup { context =>
Expand All @@ -262,7 +285,7 @@ object ElasticSearchClient {
// Make an HTTP request to elastic search.
val fetchUri = s"$endpoint/_doc/$id"
val futureResp: Future[HttpResponse] = withConcurrencyLimit {
Http().singleRequest(HttpRequest(uri = fetchUri))
breaker.withCircuitBreaker { Http().singleRequest(HttpRequest(uri = fetchUri)) }
}

context.log.info("ElasticSearch fetch QUERY: {}", fetchUri)
Expand Down Expand Up @@ -302,7 +325,8 @@ object ElasticSearchClient {
endpoint: String,
replyTo: ActorRef[SearchResponse],
responseHandler: ActorRef[ElasticSearchResponseHandlerCommand],
nextPhase: ActorRef[IntermediateSearchResult]
nextPhase: ActorRef[IntermediateSearchResult],
breaker: CircuitBreaker
): Behavior[ElasticSearchResponse] = {

Behaviors.setup { context =>
Expand All @@ -317,7 +341,7 @@ object ElasticSearchClient {
entity = HttpEntity(ContentTypes.`application/json`, query.toString)
)
val futureResp: Future[HttpResponse] = withConcurrencyLimit {
Http().singleRequest(request)
breaker.withCircuitBreaker { Http().singleRequest(request) }
}

context.log.info2(
Expand Down Expand Up @@ -355,7 +379,8 @@ object ElasticSearchClient {
endpoint: String,
replyTo: ActorRef[SearchResponse],
responseHandler: ActorRef[ElasticSearchResponseHandlerCommand],
nextPhase: ActorRef[IntermediateSearchResult]
nextPhase: ActorRef[IntermediateSearchResult],
breaker: CircuitBreaker
): Behavior[ElasticSearchResponse] = {

Behaviors.setup { context =>
Expand All @@ -370,7 +395,7 @@ object ElasticSearchClient {
entity = HttpEntity(ContentTypes.`application/json`, query.toString)
)
val futureResp: Future[HttpResponse] = withConcurrencyLimit {
Http().singleRequest(request)
breaker.withCircuitBreaker { Http().singleRequest(request) }
}

context.log.info2(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.model.{HttpResponse, StatusCode}
import akka.http.scaladsl.model.HttpMessage.DiscardedEntity
import akka.http.scaladsl.unmarshalling.Unmarshaller
import akka.pattern.CircuitBreakerOpenException

import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -48,10 +49,10 @@ object ElasticSearchResponseHandler {

def apply(): Behavior[ElasticSearchResponseHandlerCommand] = {
Behaviors.setup { context =>

Behaviors.receiveMessage[ElasticSearchResponseHandlerCommand] {

case ProcessElasticSearchResponse(futureHttpResponse, replyTo) =>
// Map the Future value to a message, handled by this actor.
context.pipeToSelf(futureHttpResponse) {
case Success(httpResponse) =>
ProcessHttpResponse(httpResponse, replyTo)
Expand Down Expand Up @@ -96,12 +97,11 @@ object ElasticSearchResponseHandler {
}

case ReturnFinalResponse(response, replyTo, error) =>
// Log error if there is one
error match {
case Some(_: CircuitBreakerOpenException) =>
context.log.warn("Request rejected: ElasticSearch circuit breaker is open")
case Some(e) =>
context.log.error(
"Failed to process ElasticSearch response:", e
)
context.log.error("Failed to process ElasticSearch response:", e)
case None => // no-op
}
// Send fully processed reply to original requester.
Expand Down
Loading