From 47be8830a95182a9523195571632526988d951e9 Mon Sep 17 00:00:00 2001 From: Jacek Ewertowski Date: Tue, 10 Mar 2020 22:34:32 +0100 Subject: [PATCH 1/7] Support operation replaceOne (#2100). --- .../alpakka/mongodb/DocumentReplace.scala | 9 +++++ .../alpakka/mongodb/scaladsl/MongoFlow.scala | 24 ++++++++++- .../alpakka/mongodb/scaladsl/MongoSink.scala | 17 +++++++- .../scala/docs/scaladsl/MongoSinkSpec.scala | 40 ++++++++++++++++++- 4 files changed, 84 insertions(+), 6 deletions(-) create mode 100644 mongodb/src/main/scala/akka/stream/alpakka/mongodb/DocumentReplace.scala diff --git a/mongodb/src/main/scala/akka/stream/alpakka/mongodb/DocumentReplace.scala b/mongodb/src/main/scala/akka/stream/alpakka/mongodb/DocumentReplace.scala new file mode 100644 index 0000000000..0a84621823 --- /dev/null +++ b/mongodb/src/main/scala/akka/stream/alpakka/mongodb/DocumentReplace.scala @@ -0,0 +1,9 @@ +/* + * Copyright (C) 2016-2020 Lightbend Inc. + */ + +package akka.stream.alpakka.mongodb + +import org.bson.conversions.Bson + +case class DocumentReplace[T](filter: Bson, replacement: T) diff --git a/mongodb/src/main/scala/akka/stream/alpakka/mongodb/scaladsl/MongoFlow.scala b/mongodb/src/main/scala/akka/stream/alpakka/mongodb/scaladsl/MongoFlow.scala index 0677c5ab57..824714d06b 100644 --- a/mongodb/src/main/scala/akka/stream/alpakka/mongodb/scaladsl/MongoFlow.scala +++ b/mongodb/src/main/scala/akka/stream/alpakka/mongodb/scaladsl/MongoFlow.scala @@ -7,8 +7,8 @@ package akka.stream.alpakka.mongodb.scaladsl import akka.stream.scaladsl.{Flow, Source} import akka.NotUsed import akka.annotation.InternalApi -import akka.stream.alpakka.mongodb.DocumentUpdate -import com.mongodb.client.model.{DeleteOptions, InsertManyOptions, InsertOneOptions, UpdateOptions} +import akka.stream.alpakka.mongodb.{DocumentReplace, DocumentUpdate} +import com.mongodb.client.model.{DeleteOptions, InsertManyOptions, InsertOneOptions, ReplaceOptions, UpdateOptions} import com.mongodb.client.result.{DeleteResult, UpdateResult} import com.mongodb.reactivestreams.client.MongoCollection import org.bson.conversions.Bson @@ -29,6 +29,9 @@ object MongoFlow { /** Internal Api */ @InternalApi private[mongodb] val DefaultDeleteOptions = new DeleteOptions() + /** Internal Api */ + @InternalApi private[mongodb] val DefaultReplaceOptions = new ReplaceOptions() + /** * A [[akka.stream.scaladsl.Flow Flow]] that will insert documents into a collection. * @@ -103,4 +106,21 @@ object MongoFlow { def deleteMany[T](collection: MongoCollection[T], options: DeleteOptions = DefaultDeleteOptions): Flow[Bson, (DeleteResult, Bson), NotUsed] = Flow[Bson].flatMapConcat(bson => Source.fromPublisher(collection.deleteMany(bson, options)).map(_ -> bson)) + + /** + * A [[akka.stream.scaladsl.Flow Flow]] that will replace document as defined by a [[DocumentReplace]]. + * + * @param collection the mongo db collection to update. + * @param options options to apply to the operation + */ + def replaceOne[T]( + collection: MongoCollection[T], + options: ReplaceOptions = DefaultReplaceOptions + ): Flow[DocumentReplace[T], (UpdateResult, DocumentReplace[T]), NotUsed] = + Flow[DocumentReplace[T]].flatMapConcat( + documentReplace => + Source + .fromPublisher(collection.replaceOne(documentReplace.filter, documentReplace.replacement, options)) + .map(_ -> documentReplace) + ) } diff --git a/mongodb/src/main/scala/akka/stream/alpakka/mongodb/scaladsl/MongoSink.scala b/mongodb/src/main/scala/akka/stream/alpakka/mongodb/scaladsl/MongoSink.scala index 518f299d6f..778ce4f50f 100644 --- a/mongodb/src/main/scala/akka/stream/alpakka/mongodb/scaladsl/MongoSink.scala +++ b/mongodb/src/main/scala/akka/stream/alpakka/mongodb/scaladsl/MongoSink.scala @@ -6,14 +6,15 @@ package akka.stream.alpakka.mongodb.scaladsl import akka.stream.scaladsl.{Keep, Sink} import akka.Done -import akka.stream.alpakka.mongodb.DocumentUpdate +import akka.stream.alpakka.mongodb.{DocumentReplace, DocumentUpdate} import akka.stream.alpakka.mongodb.scaladsl.MongoFlow.{ DefaultDeleteOptions, DefaultInsertManyOptions, DefaultInsertOneOptions, + DefaultReplaceOptions, DefaultUpdateOptions } -import com.mongodb.client.model.{DeleteOptions, InsertManyOptions, InsertOneOptions, UpdateOptions} +import com.mongodb.client.model.{DeleteOptions, InsertManyOptions, InsertOneOptions, ReplaceOptions, UpdateOptions} import com.mongodb.reactivestreams.client.MongoCollection import org.bson.conversions.Bson @@ -63,6 +64,18 @@ object MongoSink { ): Sink[DocumentUpdate, Future[Done]] = MongoFlow.updateMany(collection, options).toMat(Sink.ignore)(Keep.right) + /** + * A [[akka.stream.scaladsl.Sink Sink]] that will replace document as defined by a [[akka.stream.alpakka.mongodb.DocumentReplace]]. + * + * @param collection the mongo db collection to update. + * @param options options to apply to the operation + */ + def replaceOne[T]( + collection: MongoCollection[T], + options: ReplaceOptions = DefaultReplaceOptions + ): Sink[DocumentReplace[T], Future[Done]] = + MongoFlow.replaceOne(collection, options).toMat(Sink.ignore)(Keep.right) + /** * A [[akka.stream.scaladsl.Sink Sink]] that will delete individual documents as defined by a [[org.bson.conversions.Bson Bson]] filter query. * diff --git a/mongodb/src/test/scala/docs/scaladsl/MongoSinkSpec.scala b/mongodb/src/test/scala/docs/scaladsl/MongoSinkSpec.scala index 14950f295f..930f91d7e2 100644 --- a/mongodb/src/test/scala/docs/scaladsl/MongoSinkSpec.scala +++ b/mongodb/src/test/scala/docs/scaladsl/MongoSinkSpec.scala @@ -6,7 +6,7 @@ package docs.scaladsl import akka.actor.ActorSystem import akka.stream.ActorMaterializer -import akka.stream.alpakka.mongodb.DocumentUpdate +import akka.stream.alpakka.mongodb.{DocumentReplace, DocumentUpdate} import akka.stream.alpakka.mongodb.scaladsl.MongoSink import akka.stream.alpakka.testkit.scaladsl.LogCapturing import akka.stream.scaladsl.{Sink, Source} @@ -35,8 +35,9 @@ class MongoSinkSpec // case class and codec for mongodb macros case class Number(_id: Int) + case class DomainObject(_id: Int, firstProperty: String, secondProperty: String) - val codecRegistry = fromRegistries(fromProviders(classOf[Number]), DEFAULT_CODEC_REGISTRY) + val codecRegistry = fromRegistries(fromProviders(classOf[Number], classOf[DomainObject]), DEFAULT_CODEC_REGISTRY) implicit val system = ActorSystem() implicit val mat = ActorMaterializer() @@ -48,6 +49,8 @@ class MongoSinkSpec private val db = client.getDatabase("MongoSinkSpec").withCodecRegistry(codecRegistry) private val numbersColl: MongoCollection[Number] = db.getCollection("numbersSink", classOf[Number]).withCodecRegistry(codecRegistry) + private val domainObjectsColl: MongoCollection[DomainObject] = + db.getCollection("domainObjectsSink", classOf[DomainObject]).withCodecRegistry(codecRegistry) private val numbersDocumentColl = db.getCollection("numbersSink") implicit val defaultPatience = @@ -67,6 +70,16 @@ class MongoSinkSpec .runWith(Sink.head) .futureValue + def insertDomainObjectsRange(): Unit = + Source + .fromPublisher( + domainObjectsColl.insertMany( + testRange.map(i => DomainObject(i, s"first-property-$i", s"second-property-$i")).asJava + ) + ) + .runWith(Sink.head) + .futureValue + "MongoSinkSpec" must { "save with insertOne" in assertAllStagesStopped { @@ -204,6 +217,29 @@ class MongoSinkSpec found mustBe empty } + + "replace with replaceOne and codec support" in assertAllStagesStopped { + insertDomainObjectsRange() + val updatedObjects = + testRange.map(i => DomainObject(i, s"updated-first-property-$i", s"updated-second-property-$i")) + + // #replace-one + val source = Source(testRange).map( + i => + DocumentReplace[DomainObject]( + filter = Filters.eq("_id", i), + replacement = DomainObject(i, s"updated-first-property-$i", s"updated-second-property-$i") + ) + ) + val completion = source.runWith(MongoSink.replaceOne[DomainObject](domainObjectsColl)) + // #replace-one + + completion.futureValue + + val found = Source.fromPublisher(domainObjectsColl.find()).runWith(Sink.seq).futureValue + + found must contain theSameElementsAs updatedObjects + } } } From 60893b364e87018cd7911c991fda2ea70b7d6331 Mon Sep 17 00:00:00 2001 From: Jacek Ewertowski Date: Thu, 12 Mar 2020 17:48:58 +0100 Subject: [PATCH 2/7] MongoDB: Add java API for replaceOne --- .../alpakka/mongodb/DocumentReplace.scala | 29 +++++- .../alpakka/mongodb/javadsl/MongoFlow.scala | 18 +++- .../alpakka/mongodb/javadsl/MongoSink.scala | 25 ++++- .../alpakka/mongodb/scaladsl/MongoSink.scala | 23 +++-- .../test/java/docs/javadsl/DomainObject.java | 78 +++++++++++++++ .../test/java/docs/javadsl/MongoSinkTest.java | 95 +++++++++++++++++-- 6 files changed, 241 insertions(+), 27 deletions(-) create mode 100644 mongodb/src/test/java/docs/javadsl/DomainObject.java diff --git a/mongodb/src/main/scala/akka/stream/alpakka/mongodb/DocumentReplace.scala b/mongodb/src/main/scala/akka/stream/alpakka/mongodb/DocumentReplace.scala index 0a84621823..d3769d9b5c 100644 --- a/mongodb/src/main/scala/akka/stream/alpakka/mongodb/DocumentReplace.scala +++ b/mongodb/src/main/scala/akka/stream/alpakka/mongodb/DocumentReplace.scala @@ -6,4 +6,31 @@ package akka.stream.alpakka.mongodb import org.bson.conversions.Bson -case class DocumentReplace[T](filter: Bson, replacement: T) +/** + * + * @param filter a document describing the query filter, which may not be null. This can be of any type for which a { @code Codec} is registered + * @param replacement an object to replace the previous one, which may not be null. This can be of any type for which a { @code Codec} is registered + */ +final case class DocumentReplace[T] private (filter: Bson, replacement: T) { + + def withFilter(filter: Bson): DocumentReplace[T] = copy(filter = filter) + def withReplacement[T1](replacement: T1): DocumentReplace[T1] = copy(replacement = replacement) + + override def toString: String = + "DocumentReplace(" + + s"filter=$filter," + + s"replacement=$replacement" + + ")" + + private def copy[T1](filter: Bson = filter, replacement: T1 = replacement) = + new DocumentReplace[T1](filter, replacement) +} + +object DocumentReplace { + def apply[T](filter: Bson, replacement: T): DocumentReplace[T] = new DocumentReplace(filter, replacement) + + /** + * Java Api + */ + def create[T](filter: Bson, replacement: T): DocumentReplace[T] = DocumentReplace(filter, replacement) +} diff --git a/mongodb/src/main/scala/akka/stream/alpakka/mongodb/javadsl/MongoFlow.scala b/mongodb/src/main/scala/akka/stream/alpakka/mongodb/javadsl/MongoFlow.scala index 5d2e62ad8e..2e7e5f55e9 100644 --- a/mongodb/src/main/scala/akka/stream/alpakka/mongodb/javadsl/MongoFlow.scala +++ b/mongodb/src/main/scala/akka/stream/alpakka/mongodb/javadsl/MongoFlow.scala @@ -5,16 +5,16 @@ package akka.stream.alpakka.mongodb.javadsl import akka.NotUsed -import akka.stream.alpakka.mongodb.DocumentUpdate -import akka.stream.alpakka.mongodb.scaladsl +import akka.stream.alpakka.mongodb.{scaladsl, DocumentReplace, DocumentUpdate} import akka.stream.alpakka.mongodb.scaladsl.MongoFlow.{ DefaultDeleteOptions, DefaultInsertManyOptions, DefaultInsertOneOptions, + DefaultReplaceOptions, DefaultUpdateOptions } import akka.stream.javadsl.Flow -import com.mongodb.client.model.{DeleteOptions, InsertManyOptions, InsertOneOptions, UpdateOptions} +import com.mongodb.client.model.{DeleteOptions, InsertManyOptions, InsertOneOptions, ReplaceOptions, UpdateOptions} import com.mongodb.client.result.{DeleteResult, UpdateResult} import com.mongodb.reactivestreams.client.MongoCollection import org.bson.conversions.Bson @@ -143,5 +143,17 @@ object MongoFlow { options: DeleteOptions): Flow[Bson, akka.japi.Pair[DeleteResult, Bson], NotUsed] = scaladsl.MongoFlow.deleteMany(collection, options).map(fromTupleToPair).asJava + /** + * A [[akka.stream.javadsl.Flow Flow]] that will replace document as defined by a [[DocumentReplace]]. + * + * @param collection the mongo db collection to update. + * @param options options to apply to the operation + */ + def replaceOne[T]( + collection: MongoCollection[T], + options: ReplaceOptions = DefaultReplaceOptions + ): Flow[DocumentReplace[T], akka.japi.Pair[UpdateResult, DocumentReplace[T]], NotUsed] = + scaladsl.MongoFlow.replaceOne(collection, options).map(fromTupleToPair).asJava + private def fromTupleToPair[T, R] = (akka.japi.Pair.create[T, R] _).tupled } diff --git a/mongodb/src/main/scala/akka/stream/alpakka/mongodb/javadsl/MongoSink.scala b/mongodb/src/main/scala/akka/stream/alpakka/mongodb/javadsl/MongoSink.scala index 5f7061dbcf..e160176aa6 100644 --- a/mongodb/src/main/scala/akka/stream/alpakka/mongodb/javadsl/MongoSink.scala +++ b/mongodb/src/main/scala/akka/stream/alpakka/mongodb/javadsl/MongoSink.scala @@ -7,15 +7,16 @@ package akka.stream.alpakka.mongodb.javadsl import java.util.concurrent.CompletionStage import akka.{Done, NotUsed} -import akka.stream.alpakka.mongodb.DocumentUpdate +import akka.stream.alpakka.mongodb.{DocumentReplace, DocumentUpdate} import akka.stream.alpakka.mongodb.scaladsl.MongoFlow.{ DefaultDeleteOptions, DefaultInsertManyOptions, DefaultInsertOneOptions, + DefaultReplaceOptions, DefaultUpdateOptions } import akka.stream.javadsl.{Keep, Sink} -import com.mongodb.client.model.{DeleteOptions, InsertManyOptions, InsertOneOptions, UpdateOptions} +import com.mongodb.client.model.{DeleteOptions, InsertManyOptions, InsertOneOptions, ReplaceOptions, UpdateOptions} import com.mongodb.reactivestreams.client.MongoCollection import org.bson.conversions.Bson @@ -126,4 +127,24 @@ object MongoSink { def deleteMany[T](collection: MongoCollection[T], options: DeleteOptions): Sink[Bson, CompletionStage[Done]] = MongoFlow.deleteMany(collection, options).toMat(Sink.ignore(), Keep.right[NotUsed, CompletionStage[Done]]) + /** + * A [[akka.stream.javadsl.Sink Sink]] that will replace document as defined by a [[akka.stream.alpakka.mongodb.DocumentReplace]]. + * + * @param collection the mongo db collection to update. + */ + def replaceOne[T](collection: MongoCollection[T]): Sink[DocumentReplace[T], CompletionStage[Done]] = + replaceOne(collection, DefaultReplaceOptions) + + /** + * A [[akka.stream.javadsl.Sink Sink]] that will replace document as defined by a [[akka.stream.alpakka.mongodb.DocumentReplace]]. + * + * @param collection the mongo db collection to update. + * @param options options to apply to the operation + */ + def replaceOne[T]( + collection: MongoCollection[T], + options: ReplaceOptions + ): Sink[DocumentReplace[T], CompletionStage[Done]] = + MongoFlow.replaceOne(collection, options).toMat(Sink.ignore(), Keep.right[NotUsed, CompletionStage[Done]]) + } diff --git a/mongodb/src/main/scala/akka/stream/alpakka/mongodb/scaladsl/MongoSink.scala b/mongodb/src/main/scala/akka/stream/alpakka/mongodb/scaladsl/MongoSink.scala index 778ce4f50f..dee5e45cd7 100644 --- a/mongodb/src/main/scala/akka/stream/alpakka/mongodb/scaladsl/MongoSink.scala +++ b/mongodb/src/main/scala/akka/stream/alpakka/mongodb/scaladsl/MongoSink.scala @@ -64,18 +64,6 @@ object MongoSink { ): Sink[DocumentUpdate, Future[Done]] = MongoFlow.updateMany(collection, options).toMat(Sink.ignore)(Keep.right) - /** - * A [[akka.stream.scaladsl.Sink Sink]] that will replace document as defined by a [[akka.stream.alpakka.mongodb.DocumentReplace]]. - * - * @param collection the mongo db collection to update. - * @param options options to apply to the operation - */ - def replaceOne[T]( - collection: MongoCollection[T], - options: ReplaceOptions = DefaultReplaceOptions - ): Sink[DocumentReplace[T], Future[Done]] = - MongoFlow.replaceOne(collection, options).toMat(Sink.ignore)(Keep.right) - /** * A [[akka.stream.scaladsl.Sink Sink]] that will delete individual documents as defined by a [[org.bson.conversions.Bson Bson]] filter query. * @@ -96,4 +84,15 @@ object MongoSink { options: DeleteOptions = DefaultDeleteOptions): Sink[Bson, Future[Done]] = MongoFlow.deleteMany(collection, options).toMat(Sink.ignore)(Keep.right) + /** + * A [[akka.stream.scaladsl.Sink Sink]] that will replace document as defined by a [[akka.stream.alpakka.mongodb.DocumentReplace]]. + * + * @param collection the mongo db collection to update. + * @param options options to apply to the operation + */ + def replaceOne[T]( + collection: MongoCollection[T], + options: ReplaceOptions = DefaultReplaceOptions + ): Sink[DocumentReplace[T], Future[Done]] = + MongoFlow.replaceOne(collection, options).toMat(Sink.ignore)(Keep.right) } diff --git a/mongodb/src/test/java/docs/javadsl/DomainObject.java b/mongodb/src/test/java/docs/javadsl/DomainObject.java new file mode 100644 index 0000000000..e0dfa7167a --- /dev/null +++ b/mongodb/src/test/java/docs/javadsl/DomainObject.java @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2016-2020 Lightbend Inc. + */ + +package docs.javadsl; + +import java.util.Objects; + +// #pojo +public final class DomainObject { + private Integer id; + private String firstProperty; + private String secondProperty; + + public DomainObject() {} + + public DomainObject(Integer id, String firstProperty, String secondProperty) { + this.id = id; + this.firstProperty = firstProperty; + this.secondProperty = secondProperty; + } + + public Integer getId() { + return id; + } + + public void setId(Integer id) { + this.id = id; + } + + public String getFirstProperty() { + return firstProperty; + } + + public void setFirstProperty(String firstProperty) { + this.firstProperty = firstProperty; + } + + public String getSecondProperty() { + return secondProperty; + } + + public void setSecondProperty(String secondProperty) { + this.secondProperty = secondProperty; + } + + // #pojo + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DomainObject that = (DomainObject) o; + return Objects.equals(id, that.id) + && Objects.equals(firstProperty, that.firstProperty) + && Objects.equals(secondProperty, that.secondProperty); + } + + @Override + public int hashCode() { + return Objects.hash(id, firstProperty, secondProperty); + } + + @Override + public String toString() { + return "DomainObject{" + + "id=" + + id + + ", firstProperty='" + + firstProperty + + '\'' + + ", secondProperty='" + + secondProperty + + '\'' + + '}'; + } + // #pojo +} +// #pojo diff --git a/mongodb/src/test/java/docs/javadsl/MongoSinkTest.java b/mongodb/src/test/java/docs/javadsl/MongoSinkTest.java index e896b9d201..349ba068e2 100644 --- a/mongodb/src/test/java/docs/javadsl/MongoSinkTest.java +++ b/mongodb/src/test/java/docs/javadsl/MongoSinkTest.java @@ -10,6 +10,7 @@ import akka.japi.Pair; import akka.stream.ActorMaterializer; import akka.stream.Materializer; +import akka.stream.alpakka.mongodb.DocumentReplace; import akka.stream.alpakka.mongodb.DocumentUpdate; import akka.stream.alpakka.mongodb.javadsl.MongoSink; import akka.stream.alpakka.testkit.javadsl.LogCapturingJunit4; @@ -25,18 +26,20 @@ import com.mongodb.reactivestreams.client.MongoDatabase; import org.bson.Document; import org.bson.codecs.ValueCodecProvider; -import org.bson.codecs.configuration.CodecRegistries; import org.bson.codecs.configuration.CodecRegistry; import org.bson.codecs.pojo.PojoCodecProvider; import org.bson.conversions.Bson; import org.junit.*; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.bson.codecs.configuration.CodecRegistries.fromProviders; +import static org.bson.codecs.configuration.CodecRegistries.fromRegistries; import static org.junit.Assert.assertEquals; public class MongoSinkTest { @@ -49,6 +52,8 @@ public class MongoSinkTest { private final MongoDatabase db; private final MongoCollection numbersDocumentColl; private final MongoCollection numbersColl; + private final MongoCollection domainObjectsDocumentColl; + private final MongoCollection domainObjectsColl; private final List testRange = IntStream.range(1, 10).boxed().collect(Collectors.toList()); @@ -57,14 +62,18 @@ public MongoSinkTest() { system = ActorSystem.create(); mat = ActorMaterializer.create(system); - PojoCodecProvider codecProvider = PojoCodecProvider.builder().register(Number.class).build(); + PojoCodecProvider codecProvider = + PojoCodecProvider.builder().register(Number.class, DomainObject.class).build(); CodecRegistry codecRegistry = - CodecRegistries.fromProviders(codecProvider, new ValueCodecProvider()); + fromRegistries(fromProviders(codecProvider, new ValueCodecProvider())); client = MongoClients.create("mongodb://localhost:27017"); db = client.getDatabase("MongoSinkTest"); numbersColl = db.getCollection("numbers", Number.class).withCodecRegistry(codecRegistry); numbersDocumentColl = db.getCollection("numbers"); + domainObjectsColl = + db.getCollection("domainObjects", DomainObject.class).withCodecRegistry(codecRegistry); + domainObjectsDocumentColl = db.getCollection("domainObjects"); } private void insertTestRange() throws Exception { @@ -75,20 +84,47 @@ private void insertTestRange() throws Exception { .get(5, TimeUnit.SECONDS); } - @Before - public void cleanDb() throws Exception { - Source.fromPublisher(numbersDocumentColl.deleteMany(new Document())) + private void insertDomainObjects() throws Exception { + Source.from(testRange) + .map( + i -> { + DomainObject domainObject = + new DomainObject( + i, + String.format("first-property-%s", i), + String.format("second-property-%s", i)); + System.out.println(String.format("%s inserting %s", i, domainObject)); + return domainObject; + }) + .runWith(MongoSink.insertOne(domainObjectsColl), mat) + .toCompletableFuture() + .get(5, TimeUnit.SECONDS); + } + + private void deleteCollection(MongoCollection collection) throws Exception { + Source.fromPublisher(collection.deleteMany(new Document())) .runWith(Sink.head(), mat) .toCompletableFuture() .get(5, TimeUnit.SECONDS); } - @After - public void checkForLeaks() throws Exception { - Source.fromPublisher(numbersDocumentColl.deleteMany(new Document())) + private void checkCollectionForLeaks(MongoCollection collection) throws Exception { + Source.fromPublisher(collection.deleteMany(new Document())) .runWith(Sink.head(), mat) .toCompletableFuture() .get(5, TimeUnit.SECONDS); + } + + @Before + public void cleanDb() throws Exception { + deleteCollection(numbersDocumentColl); + deleteCollection(domainObjectsDocumentColl); + } + + @After + public void checkForLeaks() throws Exception { + checkCollectionForLeaks(numbersDocumentColl); + checkCollectionForLeaks(domainObjectsDocumentColl); StreamTestKit.assertAllStagesStopped(mat); } @@ -287,4 +323,45 @@ public void deleteWithDeleteMany() throws Exception { assertEquals(true, found.toCompletableFuture().get(5, TimeUnit.SECONDS).isEmpty()); } + + @Test + public void replaceWithReplaceOne() throws Exception { + insertDomainObjects(); + + // #replace-one + final Source, NotUsed> source = + Source.from(testRange) + .map( + i -> + DocumentReplace.create( + Filters.eq("_id", i), + new DomainObject( + i, + String.format("updated-first-property-%s", i), + String.format("updated-second-property-%s", i)))); + final CompletionStage completion = + source.runWith(MongoSink.replaceOne(domainObjectsColl), mat); + // #replace-one + + completion.toCompletableFuture().get(5, TimeUnit.SECONDS); + + final List found = + new ArrayList<>( + Source.fromPublisher(domainObjectsColl.find()) + .runWith(Sink.seq(), mat) + .toCompletableFuture() + .get(5, TimeUnit.SECONDS)); + + final List expected = + testRange.stream() + .map( + i -> + new DomainObject( + i, + String.format("updated-first-property-%s", i), + String.format("updated-second-property-%s", i))) + .collect(Collectors.toList()); + + assertEquals(expected, found); + } } From 871c48bac56f155c5d5e4e40e4c38e97acb08d73 Mon Sep 17 00:00:00 2001 From: Jacek Ewertowski Date: Thu, 12 Mar 2020 17:52:35 +0100 Subject: [PATCH 3/7] MongoDB: Replace verbose Collectors.toList() with toList() --- .../test/java/docs/javadsl/MongoSinkTest.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/mongodb/src/test/java/docs/javadsl/MongoSinkTest.java b/mongodb/src/test/java/docs/javadsl/MongoSinkTest.java index 349ba068e2..f0cb4de182 100644 --- a/mongodb/src/test/java/docs/javadsl/MongoSinkTest.java +++ b/mongodb/src/test/java/docs/javadsl/MongoSinkTest.java @@ -35,9 +35,9 @@ import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.stream.Collectors.toList; import static org.bson.codecs.configuration.CodecRegistries.fromProviders; import static org.bson.codecs.configuration.CodecRegistries.fromRegistries; import static org.junit.Assert.assertEquals; @@ -56,7 +56,7 @@ public class MongoSinkTest { private final MongoCollection domainObjectsColl; private final List testRange = - IntStream.range(1, 10).boxed().collect(Collectors.toList()); + IntStream.range(1, 10).boxed().collect(toList()); public MongoSinkTest() { system = ActorSystem.create(); @@ -148,14 +148,14 @@ public void saveWithInsertOne() throws Exception { testRange, found.toCompletableFuture().get(5, TimeUnit.SECONDS).stream() .map(n -> n.getInteger("value")) - .collect(Collectors.toList())); + .collect(toList())); } @Test public void saveWithInsertOneAndCodecSupport() throws Exception { // #insert-one List testRangeObjects = - testRange.stream().map(Number::new).collect(Collectors.toList()); + testRange.stream().map(Number::new).collect(toList()); final CompletionStage completion = Source.from(testRangeObjects).runWith(MongoSink.insertOne(numbersColl), mat); // #insert-one @@ -182,14 +182,14 @@ public void saveWithInsertMany() throws Exception { testRange, found.toCompletableFuture().get(5, TimeUnit.SECONDS).stream() .map(n -> n.getInteger("value")) - .collect(Collectors.toList())); + .collect(toList())); } @Test public void saveWithInsertManyAndCodecSupport() throws Exception { // #insert-many final List testRangeObjects = - testRange.stream().map(Number::new).collect(Collectors.toList()); + testRange.stream().map(Number::new).collect(toList()); final CompletionStage completion = Source.from(testRangeObjects).grouped(2).runWith(MongoSink.insertMany(numbersColl), mat); // #insert-many @@ -220,13 +220,13 @@ public void saveWithInsertManyWithOptions() throws Exception { testRange, found.toCompletableFuture().get(5, TimeUnit.SECONDS).stream() .map(n -> n.getInteger("value")) - .collect(Collectors.toList())); + .collect(toList())); } @Test public void saveWithInsertManyWithOptionsAndCodecSupport() throws Exception { List testRangeObjects = - testRange.stream().map(Number::new).collect(Collectors.toList()); + testRange.stream().map(Number::new).collect(toList()); final CompletionStage completion = Source.from(testRangeObjects) .grouped(2) @@ -261,10 +261,10 @@ public void updateWithUpdateOne() throws Exception { Source.fromPublisher(numbersDocumentColl.find()).runWith(Sink.seq(), mat); assertEquals( - testRange.stream().map(i -> Pair.create(i, i * -1)).collect(Collectors.toList()), + testRange.stream().map(i -> Pair.create(i, i * -1)).collect(toList()), found.toCompletableFuture().get(5, TimeUnit.SECONDS).stream() .map(d -> Pair.create(d.getInteger("value"), d.getInteger("updateValue"))) - .collect(Collectors.toList())); + .collect(toList())); } @Test @@ -284,10 +284,10 @@ public void updateWithUpdateMany() throws Exception { Source.fromPublisher(numbersDocumentColl.find()).runWith(Sink.seq(), mat); assertEquals( - testRange.stream().map(i -> Pair.create(i, 0)).collect(Collectors.toList()), + testRange.stream().map(i -> Pair.create(i, 0)).collect(toList()), found.toCompletableFuture().get(5, TimeUnit.SECONDS).stream() .map(d -> Pair.create(d.getInteger("value"), d.getInteger("updateValue"))) - .collect(Collectors.toList())); + .collect(toList())); } @Test @@ -360,7 +360,7 @@ public void replaceWithReplaceOne() throws Exception { i, String.format("updated-first-property-%s", i), String.format("updated-second-property-%s", i))) - .collect(Collectors.toList()); + .collect(toList()); assertEquals(expected, found); } From 417e4e4dc7eec4e94eaaefa26037305d553f38ef Mon Sep 17 00:00:00 2001 From: Jacek Ewertowski Date: Thu, 12 Mar 2020 17:55:37 +0100 Subject: [PATCH 4/7] MongoDB: Simplify assertions --- .../test/java/docs/javadsl/MongoSinkTest.java | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/mongodb/src/test/java/docs/javadsl/MongoSinkTest.java b/mongodb/src/test/java/docs/javadsl/MongoSinkTest.java index f0cb4de182..525c64ce70 100644 --- a/mongodb/src/test/java/docs/javadsl/MongoSinkTest.java +++ b/mongodb/src/test/java/docs/javadsl/MongoSinkTest.java @@ -41,6 +41,7 @@ import static org.bson.codecs.configuration.CodecRegistries.fromProviders; import static org.bson.codecs.configuration.CodecRegistries.fromRegistries; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class MongoSinkTest { @Rule public final LogCapturingJunit4 logCapturing = new LogCapturingJunit4(); @@ -55,8 +56,7 @@ public class MongoSinkTest { private final MongoCollection domainObjectsDocumentColl; private final MongoCollection domainObjectsColl; - private final List testRange = - IntStream.range(1, 10).boxed().collect(toList()); + private final List testRange = IntStream.range(1, 10).boxed().collect(toList()); public MongoSinkTest() { system = ActorSystem.create(); @@ -154,8 +154,7 @@ public void saveWithInsertOne() throws Exception { @Test public void saveWithInsertOneAndCodecSupport() throws Exception { // #insert-one - List testRangeObjects = - testRange.stream().map(Number::new).collect(toList()); + List testRangeObjects = testRange.stream().map(Number::new).collect(toList()); final CompletionStage completion = Source.from(testRangeObjects).runWith(MongoSink.insertOne(numbersColl), mat); // #insert-one @@ -188,8 +187,7 @@ public void saveWithInsertMany() throws Exception { @Test public void saveWithInsertManyAndCodecSupport() throws Exception { // #insert-many - final List testRangeObjects = - testRange.stream().map(Number::new).collect(toList()); + final List testRangeObjects = testRange.stream().map(Number::new).collect(toList()); final CompletionStage completion = Source.from(testRangeObjects).grouped(2).runWith(MongoSink.insertMany(numbersColl), mat); // #insert-many @@ -225,8 +223,7 @@ public void saveWithInsertManyWithOptions() throws Exception { @Test public void saveWithInsertManyWithOptionsAndCodecSupport() throws Exception { - List testRangeObjects = - testRange.stream().map(Number::new).collect(toList()); + List testRangeObjects = testRange.stream().map(Number::new).collect(toList()); final CompletionStage completion = Source.from(testRangeObjects) .grouped(2) @@ -305,7 +302,7 @@ public void deleteWithDeleteOne() throws Exception { final CompletionStage> found = Source.fromPublisher(numbersDocumentColl.find()).runWith(Sink.seq(), mat); - assertEquals(true, found.toCompletableFuture().get(5, TimeUnit.SECONDS).isEmpty()); + assertTrue(found.toCompletableFuture().get(5, TimeUnit.SECONDS).isEmpty()); } @Test @@ -321,7 +318,7 @@ public void deleteWithDeleteMany() throws Exception { final CompletionStage> found = Source.fromPublisher(numbersDocumentColl.find()).runWith(Sink.seq(), mat); - assertEquals(true, found.toCompletableFuture().get(5, TimeUnit.SECONDS).isEmpty()); + assertTrue(found.toCompletableFuture().get(5, TimeUnit.SECONDS).isEmpty()); } @Test From 4763963cbd12dbb0f035138bfeede27c95eaebb7 Mon Sep 17 00:00:00 2001 From: Jacek Ewertowski Date: Thu, 12 Mar 2020 17:59:55 +0100 Subject: [PATCH 5/7] MongoDB: Remove all documents from collection domainObjectsSink after each test --- mongodb/src/test/scala/docs/scaladsl/MongoSinkSpec.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/mongodb/src/test/scala/docs/scaladsl/MongoSinkSpec.scala b/mongodb/src/test/scala/docs/scaladsl/MongoSinkSpec.scala index 930f91d7e2..77104a379a 100644 --- a/mongodb/src/test/scala/docs/scaladsl/MongoSinkSpec.scala +++ b/mongodb/src/test/scala/docs/scaladsl/MongoSinkSpec.scala @@ -49,15 +49,18 @@ class MongoSinkSpec private val db = client.getDatabase("MongoSinkSpec").withCodecRegistry(codecRegistry) private val numbersColl: MongoCollection[Number] = db.getCollection("numbersSink", classOf[Number]).withCodecRegistry(codecRegistry) + private val numbersDocumentColl = db.getCollection("numbersSink") private val domainObjectsColl: MongoCollection[DomainObject] = db.getCollection("domainObjectsSink", classOf[DomainObject]).withCodecRegistry(codecRegistry) - private val numbersDocumentColl = db.getCollection("numbersSink") + private val domainObjectsDocumentColl = db.getCollection("domainObjectsSink") implicit val defaultPatience = PatienceConfig(timeout = 5.seconds, interval = 50.millis) - override def afterEach(): Unit = + override def afterEach(): Unit = { Source.fromPublisher(numbersDocumentColl.deleteMany(new Document())).runWith(Sink.head).futureValue + Source.fromPublisher(domainObjectsDocumentColl.deleteMany(new Document())).runWith(Sink.head).futureValue + } override def afterAll(): Unit = system.terminate().futureValue @@ -241,5 +244,4 @@ class MongoSinkSpec found must contain theSameElementsAs updatedObjects } } - } From 92364d0e712cd4e19d392fc354ce4b106abab462 Mon Sep 17 00:00:00 2001 From: Jacek Ewertowski Date: Thu, 12 Mar 2020 21:43:51 +0100 Subject: [PATCH 6/7] MongoDB: Fix non-unique comments --- mongodb/src/test/java/docs/javadsl/DomainObject.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mongodb/src/test/java/docs/javadsl/DomainObject.java b/mongodb/src/test/java/docs/javadsl/DomainObject.java index e0dfa7167a..d1d52cda85 100644 --- a/mongodb/src/test/java/docs/javadsl/DomainObject.java +++ b/mongodb/src/test/java/docs/javadsl/DomainObject.java @@ -6,7 +6,7 @@ import java.util.Objects; -// #pojo +// #pojo-domain-object public final class DomainObject { private Integer id; private String firstProperty; @@ -44,7 +44,7 @@ public void setSecondProperty(String secondProperty) { this.secondProperty = secondProperty; } - // #pojo + // #pojo-domain-object @Override public boolean equals(Object o) { if (this == o) return true; @@ -73,6 +73,6 @@ public String toString() { + '\'' + '}'; } - // #pojo + // #pojo-domain-object } -// #pojo +// #pojo-domain-object From 24ba61e24825182618e57e98a5662d5170d6766a Mon Sep 17 00:00:00 2001 From: Jacek Ewertowski Date: Thu, 12 Mar 2020 21:56:42 +0100 Subject: [PATCH 7/7] MongoDB: Fix compilation error --- .../scala/akka/stream/alpakka/mongodb/DocumentReplace.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mongodb/src/main/scala/akka/stream/alpakka/mongodb/DocumentReplace.scala b/mongodb/src/main/scala/akka/stream/alpakka/mongodb/DocumentReplace.scala index d3769d9b5c..edf938e8aa 100644 --- a/mongodb/src/main/scala/akka/stream/alpakka/mongodb/DocumentReplace.scala +++ b/mongodb/src/main/scala/akka/stream/alpakka/mongodb/DocumentReplace.scala @@ -11,7 +11,7 @@ import org.bson.conversions.Bson * @param filter a document describing the query filter, which may not be null. This can be of any type for which a { @code Codec} is registered * @param replacement an object to replace the previous one, which may not be null. This can be of any type for which a { @code Codec} is registered */ -final case class DocumentReplace[T] private (filter: Bson, replacement: T) { +final class DocumentReplace[T] private (val filter: Bson, val replacement: T) { def withFilter(filter: Bson): DocumentReplace[T] = copy(filter = filter) def withReplacement[T1](replacement: T1): DocumentReplace[T1] = copy(replacement = replacement)