Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
brunoballekens committed Mar 28, 2024
1 parent cb05196 commit e614472
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package com.sksamuel.elastic4s.akka.streams

import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import akka.stream.{Attributes, Outlet, SourceShape}
import com.sksamuel.elastic4s.ElasticApi.clearScroll
import com.sksamuel.elastic4s.ElasticDsl.searchScroll
import com.sksamuel.elastic4s.requests.searches.{SearchHandlers, SearchHit, SearchRequest, SearchResponse, SearchScrollHandlers, SearchScrollRequest}
import com.sksamuel.elastic4s.{ElasticClient, Executor, Functor, Handler, RequestFailure, RequestSuccess, Response}
import com.sksamuel.elastic4s.requests.searches._
import com.sksamuel.elastic4s._

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
Expand All @@ -26,6 +27,7 @@ class ElasticSource(client: ElasticClient, settings: SourceSettings)

private implicit val searchHandler: Handler[SearchRequest, SearchResponse] = SearchHandlers.SearchHandler
private implicit val scrollHandler: Handler[SearchScrollRequest, SearchResponse] = SearchScrollHandlers.SearchScrollHandler
private implicit val clearScrollHandler: Handler[ClearScrollRequest, ClearScrollResponse] = SearchScrollHandlers.ClearScrollHandler
private implicit val executor: Executor[Future] = Executor.FutureExecutor
private implicit val functor: Functor[Future] = Functor.FutureFunctor

Expand Down Expand Up @@ -87,6 +89,13 @@ class ElasticSource(client: ElasticClient, settings: SourceSettings)
maybeFetch()
}

override def postStop(): Unit = {
Option(scrollId) match {
case Some(id) => client.execute(clearScroll(id))
case _ => ()
}
}

setHandler(out, this)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.sksamuel.elastic4s.pekko.streams

import com.sksamuel.elastic4s.ElasticApi.clearScroll
import com.sksamuel.elastic4s.ElasticDsl.searchScroll
import com.sksamuel.elastic4s._
import com.sksamuel.elastic4s.requests.searches._
Expand All @@ -26,6 +27,7 @@ class ElasticSource(client: ElasticClient, settings: SourceSettings)

private implicit val searchHandler: Handler[SearchRequest, SearchResponse] = SearchHandlers.SearchHandler
private implicit val scrollHandler: Handler[SearchScrollRequest, SearchResponse] = SearchScrollHandlers.SearchScrollHandler
private implicit val clearScrollHandler: Handler[ClearScrollRequest, ClearScrollResponse] = SearchScrollHandlers.ClearScrollHandler
private implicit val executor: Executor[Future] = Executor.FutureExecutor
private implicit val functor: Functor[Future] = Functor.FutureFunctor

Expand Down Expand Up @@ -87,6 +89,13 @@ class ElasticSource(client: ElasticClient, settings: SourceSettings)
maybeFetch()
}

override def postStop(): Unit = {
Option(scrollId) match {
case Some(id) => client.execute(clearScroll(id))
case _ => ()
}
}

setHandler(out, this)
}
}
Expand Down

0 comments on commit e614472

Please sign in to comment.