diff --git a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/ElasticError.scala b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/ElasticError.scala index dbc38a00a..8db4a5b1c 100644 --- a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/ElasticError.scala +++ b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/ElasticError.scala @@ -15,7 +15,8 @@ case class ElasticError(`type`: String, grouped: Option[Boolean] = None, @JsonProperty("failed_shards") failedShards: Seq[FailedShard] = Seq() ) { - def asException: Exception = causedBy.fold(new RuntimeException(s"${`type`} $reason"))(cause => new RuntimeException(s"${`type`} $reason", new RuntimeException(cause.toString))) + def asException: Exception = + causedBy.fold(new RuntimeException(s"${`type`} $reason", Option(rootCause).flatMap(_.headOption).map(_.asException).orNull))(cause => new RuntimeException(s"${`type`} $reason", new RuntimeException(cause.toString))) } case class FailedShard( diff --git a/elastic4s-streams-akka/src/main/scala/com/sksamuel/elastic4s/akka/streams/ElasticSource.scala b/elastic4s-streams-akka/src/main/scala/com/sksamuel/elastic4s/akka/streams/ElasticSource.scala index 9f9dadb1c..4fd986990 100644 --- a/elastic4s-streams-akka/src/main/scala/com/sksamuel/elastic4s/akka/streams/ElasticSource.scala +++ b/elastic4s-streams-akka/src/main/scala/com/sksamuel/elastic4s/akka/streams/ElasticSource.scala @@ -2,9 +2,10 @@ package com.sksamuel.elastic4s.akka.streams import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler} import akka.stream.{Attributes, Outlet, SourceShape} +import com.sksamuel.elastic4s.ElasticApi.clearScroll import com.sksamuel.elastic4s.ElasticDsl.searchScroll -import com.sksamuel.elastic4s.requests.searches.{SearchHandlers, SearchHit, SearchRequest, SearchResponse, SearchScrollHandlers, SearchScrollRequest} -import com.sksamuel.elastic4s.{ElasticClient, Executor, Functor, Handler, RequestFailure, RequestSuccess, Response} +import com.sksamuel.elastic4s.requests.searches._ +import com.sksamuel.elastic4s._ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success, Try} @@ -26,6 +27,7 @@ class ElasticSource(client: ElasticClient, settings: SourceSettings) private implicit val searchHandler: Handler[SearchRequest, SearchResponse] = SearchHandlers.SearchHandler private implicit val scrollHandler: Handler[SearchScrollRequest, SearchResponse] = SearchScrollHandlers.SearchScrollHandler + private implicit val clearScrollHandler: Handler[ClearScrollRequest, ClearScrollResponse] = SearchScrollHandlers.ClearScrollHandler private implicit val executor: Executor[Future] = Executor.FutureExecutor private implicit val functor: Functor[Future] = Functor.FutureFunctor @@ -87,6 +89,13 @@ class ElasticSource(client: ElasticClient, settings: SourceSettings) maybeFetch() } + override def postStop(): Unit = { + Option(scrollId) match { + case Some(id) => client.execute(clearScroll(id)) + case _ => () + } + } + setHandler(out, this) } } diff --git a/elastic4s-streams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/streams/ElasticSource.scala b/elastic4s-streams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/streams/ElasticSource.scala index 49303a44d..be0d494a0 100644 --- a/elastic4s-streams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/streams/ElasticSource.scala +++ b/elastic4s-streams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/streams/ElasticSource.scala @@ -1,5 +1,6 @@ package com.sksamuel.elastic4s.pekko.streams +import com.sksamuel.elastic4s.ElasticApi.clearScroll import com.sksamuel.elastic4s.ElasticDsl.searchScroll import com.sksamuel.elastic4s._ import com.sksamuel.elastic4s.requests.searches._ @@ -26,6 +27,7 @@ class ElasticSource(client: ElasticClient, settings: SourceSettings) private implicit val searchHandler: Handler[SearchRequest, SearchResponse] = SearchHandlers.SearchHandler private implicit val scrollHandler: Handler[SearchScrollRequest, SearchResponse] = SearchScrollHandlers.SearchScrollHandler + private implicit val clearScrollHandler: Handler[ClearScrollRequest, ClearScrollResponse] = SearchScrollHandlers.ClearScrollHandler private implicit val executor: Executor[Future] = Executor.FutureExecutor private implicit val functor: Functor[Future] = Functor.FutureFunctor @@ -87,6 +89,13 @@ class ElasticSource(client: ElasticClient, settings: SourceSettings) maybeFetch() } + override def postStop(): Unit = { + Option(scrollId) match { + case Some(id) => client.execute(clearScroll(id)) + case _ => () + } + } + setHandler(out, this) } }