Skip to content

Commit

Permalink
ElasticSource should clear the scroll (#3031)
Browse files Browse the repository at this point in the history
* fixes #3030

* Also expose root_cause if present
  • Loading branch information
brunoballekens committed May 4, 2024
1 parent 657bb98 commit cf30663
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ case class ElasticError(`type`: String,
grouped: Option[Boolean] = None,
@JsonProperty("failed_shards") failedShards: Seq[FailedShard] = Seq()
) {
def asException: Exception = causedBy.fold(new RuntimeException(s"${`type`} $reason"))(cause => new RuntimeException(s"${`type`} $reason", new RuntimeException(cause.toString)))
def asException: Exception =
causedBy.fold(new RuntimeException(s"${`type`} $reason", Option(rootCause).flatMap(_.headOption).map(_.asException).orNull))(cause => new RuntimeException(s"${`type`} $reason", new RuntimeException(cause.toString)))
}

case class FailedShard(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package com.sksamuel.elastic4s.akka.streams

import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import akka.stream.{Attributes, Outlet, SourceShape}
import com.sksamuel.elastic4s.ElasticApi.clearScroll
import com.sksamuel.elastic4s.ElasticDsl.searchScroll
import com.sksamuel.elastic4s.requests.searches.{SearchHandlers, SearchHit, SearchRequest, SearchResponse, SearchScrollHandlers, SearchScrollRequest}
import com.sksamuel.elastic4s.{ElasticClient, Executor, Functor, Handler, RequestFailure, RequestSuccess, Response}
import com.sksamuel.elastic4s.requests.searches._
import com.sksamuel.elastic4s._

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
Expand All @@ -26,6 +27,7 @@ class ElasticSource(client: ElasticClient, settings: SourceSettings)

private implicit val searchHandler: Handler[SearchRequest, SearchResponse] = SearchHandlers.SearchHandler
private implicit val scrollHandler: Handler[SearchScrollRequest, SearchResponse] = SearchScrollHandlers.SearchScrollHandler
private implicit val clearScrollHandler: Handler[ClearScrollRequest, ClearScrollResponse] = SearchScrollHandlers.ClearScrollHandler
private implicit val executor: Executor[Future] = Executor.FutureExecutor
private implicit val functor: Functor[Future] = Functor.FutureFunctor

Expand Down Expand Up @@ -87,6 +89,13 @@ class ElasticSource(client: ElasticClient, settings: SourceSettings)
maybeFetch()
}

override def postStop(): Unit = {
Option(scrollId) match {
case Some(id) => client.execute(clearScroll(id))
case _ => ()
}
}

setHandler(out, this)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.sksamuel.elastic4s.pekko.streams

import com.sksamuel.elastic4s.ElasticApi.clearScroll
import com.sksamuel.elastic4s.ElasticDsl.searchScroll
import com.sksamuel.elastic4s._
import com.sksamuel.elastic4s.requests.searches._
Expand All @@ -26,6 +27,7 @@ class ElasticSource(client: ElasticClient, settings: SourceSettings)

private implicit val searchHandler: Handler[SearchRequest, SearchResponse] = SearchHandlers.SearchHandler
private implicit val scrollHandler: Handler[SearchScrollRequest, SearchResponse] = SearchScrollHandlers.SearchScrollHandler
private implicit val clearScrollHandler: Handler[ClearScrollRequest, ClearScrollResponse] = SearchScrollHandlers.ClearScrollHandler
private implicit val executor: Executor[Future] = Executor.FutureExecutor
private implicit val functor: Functor[Future] = Functor.FutureFunctor

Expand Down Expand Up @@ -87,6 +89,13 @@ class ElasticSource(client: ElasticClient, settings: SourceSettings)
maybeFetch()
}

override def postStop(): Unit = {
Option(scrollId) match {
case Some(id) => client.execute(clearScroll(id))
case _ => ()
}
}

setHandler(out, this)
}
}
Expand Down

0 comments on commit cf30663

Please sign in to comment.