diff --git a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/common/Slices.scala b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/common/Slices.scala new file mode 100644 index 000000000..cb713c357 --- /dev/null +++ b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/common/Slices.scala @@ -0,0 +1,6 @@ +package com.sksamuel.elastic4s.requests.common + +sealed trait Slices + +case object AutoSlices extends Slices +case class NumericSlices(slices: Int) extends Slices diff --git a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryAsyncRequest.scala b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryAsyncRequest.scala index f6cf9df5d..759dcbee3 100644 --- a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryAsyncRequest.scala +++ b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryAsyncRequest.scala @@ -2,7 +2,7 @@ package com.sksamuel.elastic4s.requests.update import com.sksamuel.elastic4s.Indexes import com.sksamuel.elastic4s.ext.OptionImplicits._ -import com.sksamuel.elastic4s.requests.common.{RefreshPolicy, Slice} +import com.sksamuel.elastic4s.requests.common.{AutoSlices, NumericSlices, RefreshPolicy, Slice, Slices} import com.sksamuel.elastic4s.requests.script.Script import com.sksamuel.elastic4s.requests.searches.queries.Query @@ -20,7 +20,7 @@ case class UpdateByQueryAsyncRequest(indexes: Indexes, retryBackoffInitialTime: Option[FiniteDuration] = None, scroll: Option[String] = None, scrollSize: Option[Int] = None, - slices: Option[Int] = None, + slices: Option[Slices] = None, slice: Option[Slice] = None, timeout: Option[FiniteDuration] = None, shouldStoreResult: Option[Boolean] = None, @@ -40,7 +40,8 @@ case class UpdateByQueryAsyncRequest(indexes: Indexes, def scrollSize(scrollSize: Int): UpdateByQueryAsyncRequest = copy(scrollSize = scrollSize.some) def slice(slice: Slice): UpdateByQueryAsyncRequest = copy(slice = slice.some) - def slices(slices: Int): UpdateByQueryAsyncRequest = copy(slices = slices.some) + def slices(slices: Int): UpdateByQueryAsyncRequest = copy(slices = Some(NumericSlices(slices))) + def slicesAuto(): UpdateByQueryAsyncRequest = copy(slices = Some(AutoSlices)) def requestsPerSecond(requestsPerSecond: Float): UpdateByQueryAsyncRequest = copy(requestsPerSecond = requestsPerSecond.some) diff --git a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryRequest.scala b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryRequest.scala index b9b5122eb..7deaf1a19 100644 --- a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryRequest.scala +++ b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryRequest.scala @@ -1,7 +1,7 @@ package com.sksamuel.elastic4s.requests.update import com.sksamuel.elastic4s.Indexes -import com.sksamuel.elastic4s.requests.common.{RefreshPolicy, Slice} +import com.sksamuel.elastic4s.requests.common.{ AutoSlices, NumericSlices, RefreshPolicy, Slice, Slices} import com.sksamuel.elastic4s.requests.script.Script import com.sksamuel.elastic4s.requests.searches.queries.Query import com.sksamuel.elastic4s.ext.OptionImplicits._ @@ -33,7 +33,7 @@ trait BaseUpdateByQueryRequest { val scrollSize: Option[Int] - val slices: Option[Int] + val slices: Option[Slices] val slice: Option[Slice] @@ -59,7 +59,7 @@ case class UpdateByQueryRequest(indexes: Indexes, retryBackoffInitialTime: Option[FiniteDuration] = None, scroll: Option[String] = None, scrollSize: Option[Int] = None, - slices: Option[Int] = None, + slices: Option[Slices] = None, slice: Option[Slice] = None, timeout: Option[FiniteDuration] = None, shouldStoreResult: Option[Boolean] = None, @@ -79,7 +79,8 @@ case class UpdateByQueryRequest(indexes: Indexes, def scrollSize(scrollSize: Int): UpdateByQueryRequest = copy(scrollSize = scrollSize.some) def slice(slice: Slice): UpdateByQueryRequest = copy(slice = slice.some) - def slices(slices: Int): UpdateByQueryRequest = copy(slices = slices.some) + def slices(slices: Int): UpdateByQueryRequest = copy(slices = Some(NumericSlices(slices))) + def slicesAuto(): UpdateByQueryRequest = copy(slices = Some(AutoSlices)) def requestsPerSecond(requestsPerSecond: Float): UpdateByQueryRequest = copy(requestsPerSecond = requestsPerSecond.some) diff --git a/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/update/UpdateHandlers.scala b/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/update/UpdateHandlers.scala index 08709907a..33a90e2e4 100644 --- a/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/update/UpdateHandlers.scala +++ b/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/update/UpdateHandlers.scala @@ -6,7 +6,7 @@ import com.sksamuel.elastic4s.handlers.common.FetchSourceContextQueryParameterFn import com.sksamuel.elastic4s.handlers.script.ScriptBuilderFn import com.sksamuel.elastic4s.handlers.searches.queries import com.sksamuel.elastic4s.json.{XContentBuilder, XContentFactory} -import com.sksamuel.elastic4s.requests.common.RefreshPolicyHttpValue +import com.sksamuel.elastic4s.requests.common.{AutoSlices, NumericSlices, RefreshPolicyHttpValue} import com.sksamuel.elastic4s.requests.task.GetTask import com.sksamuel.elastic4s.requests.update.{BaseUpdateByQueryRequest, UpdateByQueryAsyncRequest, UpdateByQueryAsyncResponse, UpdateByQueryRequest, UpdateByQueryResponse, UpdateByQueryTask, UpdateRequest, UpdateResponse} import com.sksamuel.elastic4s.{BulkIndexByScrollFailure, ElasticError, ElasticRequest, ElasticUrlEncoder, Handler, HttpEntity, HttpResponse, ResponseHandler} @@ -87,7 +87,10 @@ trait UpdateHandlers { request.scrollSize.foreach(params.put("scroll_size", _)) request.waitForActiveShards.foreach(params.put("wait_for_active_shards", _)) request.waitForCompletion.foreach(params.put("wait_for_completion", _)) - request.slices.foreach(params.put("slices", _)) + request.slices.foreach { + case AutoSlices => params.put("slices", "auto") + case NumericSlices(slices) => params.put("slices", slices) + } val body = UpdateByQueryBodyFn(request) logger.debug(s"Update by query ${body.string}") diff --git a/elastic4s-tests/src/test/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryTest.scala b/elastic4s-tests/src/test/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryTest.scala index 808bf9e8d..7fc7ae2ef 100644 --- a/elastic4s-tests/src/test/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryTest.scala +++ b/elastic4s-tests/src/test/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryTest.scala @@ -65,6 +65,16 @@ class UpdateByQueryTest }.await.result.count shouldBe 3 } + it should "support slices auto" in { + client.execute { + updateByQuery("pop", matchAllQuery()).script(script("ctx._source.foo = 'd'").lang("painless")).slicesAuto().refresh(RefreshPolicy.IMMEDIATE) + }.await.result.updated shouldBe 3 + + client.execute { + count("pop").query(termQuery("foo", "d")) + }.await.result.count shouldBe 3 + } + it should "support RefreshPolicy.NONE" in { client.execute { updateByQuery("pop", matchAllQuery()).script(script("ctx._source.foo = 'c'").lang("painless")).refresh(RefreshPolicy.NONE)