diff --git a/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/requests/reindex/ReindexBuilderFnTest.scala b/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/requests/reindex/ReindexBuilderFnTest.scala index 67ad5553a..6e225245f 100644 --- a/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/requests/reindex/ReindexBuilderFnTest.scala +++ b/elastic4s-core/src/test/scala/com/sksamuel/elastic4s/requests/reindex/ReindexBuilderFnTest.scala @@ -14,4 +14,11 @@ class ReindexBuilderFnTest extends AnyFunSuite with Matchers { ReindexBuilderFn(req).string shouldBe """{"source":{"index":["source"]},"dest":{"index":"target","version_type":"external_gte"}}""".stripMargin } + + test("reindex content builder should support pipeline") { + val req = reindex("source", "target").pipeline("my_pipeline") + + ReindexBuilderFn(req).string shouldBe + """{"source":{"index":["source"]},"dest":{"index":"target","pipeline":"my_pipeline"}}""".stripMargin + } } diff --git a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/reindex/ReindexRequest.scala b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/reindex/ReindexRequest.scala index 16e7c9329..952a7189c 100644 --- a/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/reindex/ReindexRequest.scala +++ b/elastic4s-domain/src/main/scala/com/sksamuel/elastic4s/requests/reindex/ReindexRequest.scala @@ -30,7 +30,8 @@ case class ReindexRequest(sourceIndexes: Indexes, createOnly: Option[Boolean] = None, slices: Option[Int] = None, slice: Option[Slice] = None, - versionType: Option[VersionType] = None) { + versionType: Option[VersionType] = None, + pipeline: Option[String] = None) { def remote(uri: String): ReindexRequest = copy(remoteHost = Option(uri)) def remote(uri: String, user: String, pass: String): ReindexRequest = @@ -76,4 +77,6 @@ case class ReindexRequest(sourceIndexes: Indexes, def versionType(versionType: String): ReindexRequest = this.versionType(VersionType.valueOf(versionType)) def versionType(versionType: VersionType): ReindexRequest = copy(versionType = versionType.some) + + def pipeline(pipeline: String): ReindexRequest = copy(pipeline = pipeline.some) } diff --git a/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/reindex/ReindexBuilderFn.scala b/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/reindex/ReindexBuilderFn.scala index 1153fddad..b41c5683f 100644 --- a/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/reindex/ReindexBuilderFn.scala +++ b/elastic4s-handlers/src/main/scala/com/sksamuel/elastic4s/handlers/reindex/ReindexBuilderFn.scala @@ -56,6 +56,8 @@ object ReindexBuilderFn { case false => builder.field("op_type", "index") } + request.pipeline.foreach(builder.field("pipeline", _)) + // end dest builder.endObject() }