Skip to content

Commit

Permalink
Support Update By Query with slices auto
Browse files Browse the repository at this point in the history
  • Loading branch information
flavienbert committed Jun 18, 2024
1 parent 316985a commit c48b9bb
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.sksamuel.elastic4s.requests.common

sealed trait Slices

case object AutoSlices extends Slices
case class NumericSlices(slices: Int) extends Slices
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.sksamuel.elastic4s.requests.update

import com.sksamuel.elastic4s.Indexes
import com.sksamuel.elastic4s.ext.OptionImplicits._
import com.sksamuel.elastic4s.requests.common.{RefreshPolicy, Slice}
import com.sksamuel.elastic4s.requests.common.{AutoSlices, NumericSlices, RefreshPolicy, Slice, Slices}
import com.sksamuel.elastic4s.requests.script.Script
import com.sksamuel.elastic4s.requests.searches.queries.Query

Expand All @@ -20,7 +20,7 @@ case class UpdateByQueryAsyncRequest(indexes: Indexes,
retryBackoffInitialTime: Option[FiniteDuration] = None,
scroll: Option[String] = None,
scrollSize: Option[Int] = None,
slices: Option[Int] = None,
slices: Option[Slices] = None,
slice: Option[Slice] = None,
timeout: Option[FiniteDuration] = None,
shouldStoreResult: Option[Boolean] = None,
Expand All @@ -40,7 +40,8 @@ case class UpdateByQueryAsyncRequest(indexes: Indexes,

def scrollSize(scrollSize: Int): UpdateByQueryAsyncRequest = copy(scrollSize = scrollSize.some)
def slice(slice: Slice): UpdateByQueryAsyncRequest = copy(slice = slice.some)
def slices(slices: Int): UpdateByQueryAsyncRequest = copy(slices = slices.some)
def slices(slices: Int): UpdateByQueryAsyncRequest = copy(slices = Some(NumericSlices(slices)))
def slicesAuto(): UpdateByQueryAsyncRequest = copy(slices = Some(AutoSlices))

def requestsPerSecond(requestsPerSecond: Float): UpdateByQueryAsyncRequest =
copy(requestsPerSecond = requestsPerSecond.some)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.sksamuel.elastic4s.requests.update

import com.sksamuel.elastic4s.Indexes
import com.sksamuel.elastic4s.requests.common.{RefreshPolicy, Slice}
import com.sksamuel.elastic4s.requests.common.{ AutoSlices, NumericSlices, RefreshPolicy, Slice, Slices}
import com.sksamuel.elastic4s.requests.script.Script
import com.sksamuel.elastic4s.requests.searches.queries.Query
import com.sksamuel.elastic4s.ext.OptionImplicits._
Expand Down Expand Up @@ -33,7 +33,7 @@ trait BaseUpdateByQueryRequest {

val scrollSize: Option[Int]

val slices: Option[Int]
val slices: Option[Slices]

val slice: Option[Slice]

Expand All @@ -59,7 +59,7 @@ case class UpdateByQueryRequest(indexes: Indexes,
retryBackoffInitialTime: Option[FiniteDuration] = None,
scroll: Option[String] = None,
scrollSize: Option[Int] = None,
slices: Option[Int] = None,
slices: Option[Slices] = None,
slice: Option[Slice] = None,
timeout: Option[FiniteDuration] = None,
shouldStoreResult: Option[Boolean] = None,
Expand All @@ -79,7 +79,8 @@ case class UpdateByQueryRequest(indexes: Indexes,

def scrollSize(scrollSize: Int): UpdateByQueryRequest = copy(scrollSize = scrollSize.some)
def slice(slice: Slice): UpdateByQueryRequest = copy(slice = slice.some)
def slices(slices: Int): UpdateByQueryRequest = copy(slices = slices.some)
def slices(slices: Int): UpdateByQueryRequest = copy(slices = Some(NumericSlices(slices)))
def slicesAuto(): UpdateByQueryRequest = copy(slices = Some(AutoSlices))

def requestsPerSecond(requestsPerSecond: Float): UpdateByQueryRequest =
copy(requestsPerSecond = requestsPerSecond.some)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.sksamuel.elastic4s.handlers.common.FetchSourceContextQueryParameterFn
import com.sksamuel.elastic4s.handlers.script.ScriptBuilderFn
import com.sksamuel.elastic4s.handlers.searches.queries
import com.sksamuel.elastic4s.json.{XContentBuilder, XContentFactory}
import com.sksamuel.elastic4s.requests.common.RefreshPolicyHttpValue
import com.sksamuel.elastic4s.requests.common.{AutoSlices, NumericSlices, RefreshPolicyHttpValue}
import com.sksamuel.elastic4s.requests.task.GetTask
import com.sksamuel.elastic4s.requests.update.{BaseUpdateByQueryRequest, UpdateByQueryAsyncRequest, UpdateByQueryAsyncResponse, UpdateByQueryRequest, UpdateByQueryResponse, UpdateByQueryTask, UpdateRequest, UpdateResponse}
import com.sksamuel.elastic4s.{BulkIndexByScrollFailure, ElasticError, ElasticRequest, ElasticUrlEncoder, Handler, HttpEntity, HttpResponse, ResponseHandler}
Expand Down Expand Up @@ -87,7 +87,10 @@ trait UpdateHandlers {
request.scrollSize.foreach(params.put("scroll_size", _))
request.waitForActiveShards.foreach(params.put("wait_for_active_shards", _))
request.waitForCompletion.foreach(params.put("wait_for_completion", _))
request.slices.foreach(params.put("slices", _))
request.slices.foreach {
case AutoSlices => params.put("slices", "auto")
case NumericSlices(slices) => params.put("slices", slices)
}

val body = UpdateByQueryBodyFn(request)
logger.debug(s"Update by query ${body.string}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ class UpdateByQueryTest
}.await.result.count shouldBe 3
}

it should "support slices auto" in {
client.execute {
updateByQuery("pop", matchAllQuery()).script(script("ctx._source.foo = 'd'").lang("painless")).slicesAuto().refresh(RefreshPolicy.IMMEDIATE)
}.await.result.updated shouldBe 3

client.execute {
count("pop").query(termQuery("foo", "d"))
}.await.result.count shouldBe 3
}

it should "support RefreshPolicy.NONE" in {
client.execute {
updateByQuery("pop", matchAllQuery()).script(script("ctx._source.foo = 'c'").lang("painless")).refresh(RefreshPolicy.NONE)
Expand Down

0 comments on commit c48b9bb

Please sign in to comment.