Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport] Added couchbase bucket replace implementation + tests #1975

Merged
merged 1 commit into from
Oct 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions couchbase/src/main/mima-filters/1.1.1.backwards.excludes
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Added replace to CouchbaseSession
ProblemFilters.exclude[Problem]("akka.stream.alpakka.couchbase.scaladsl.CouchbaseSession.replace*")
ProblemFilters.exclude[Problem]("akka.stream.alpakka.couchbase.javadsl.CouchbaseSession.replace*")
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,22 @@ final private[couchbase] class CouchbaseSessionImpl(asyncBucket: AsyncBucket, cl
TimeUnit.MILLISECONDS),
document.id)

def replace(document: JsonDocument): Future[JsonDocument] = replaceDoc(document)

def replaceDoc[T <: Document[_]](document: T): Future[T] =
singleObservableToFuture(asyncBucket.replace(document), document.id)

def replace(document: JsonDocument, writeSettings: CouchbaseWriteSettings): Future[JsonDocument] =
replaceDoc(document, writeSettings)

def replaceDoc[T <: Document[_]](document: T, writeSettings: CouchbaseWriteSettings): Future[T] =
singleObservableToFuture(asyncBucket.replace(document,
writeSettings.persistTo,
writeSettings.replicateTo,
writeSettings.timeout.toMillis,
TimeUnit.MILLISECONDS),
document.id)

def remove(id: String): Future[Done] =
singleObservableToFuture(asyncBucket.remove(id), id)
.map(_ => Done)(ExecutionContexts.sameThreadExecutionContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ private[couchbase] final class CouchbaseSessionJavaAdapter(delegate: scaladsl.Co
override def upsertDoc[T <: Document[_]](document: T, writeSettings: CouchbaseWriteSettings): CompletionStage[T] =
delegate.upsertDoc(document, writeSettings).toJava

override def replace(document: JsonDocument): CompletionStage[JsonDocument] = delegate.replace(document).toJava

override def replaceDoc[T <: Document[_]](document: T): CompletionStage[T] = delegate.replaceDoc(document).toJava

override def replace(document: JsonDocument, writeSettings: CouchbaseWriteSettings): CompletionStage[JsonDocument] =
delegate.replace(document, writeSettings).toJava

override def replaceDoc[T <: Document[_]](document: T, writeSettings: CouchbaseWriteSettings): CompletionStage[T] =
delegate.replaceDoc(document, writeSettings).toJava

override def remove(id: String): CompletionStage[Done] = delegate.remove(id).toJava

override def remove(id: String, writeSettings: CouchbaseWriteSettings): CompletionStage[Done] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,31 @@ object CouchbaseFlow {
bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] =
scaladsl.CouchbaseFlow.upsertDocWithResult(sessionSettings, writeSettings, bucketName).asJava

/**
* Create a flow to replace a Couchbase [[com.couchbase.client.java.document.JsonDocument JsonDocument]].
*/
def replace(sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[JsonDocument, JsonDocument, NotUsed] =
scaladsl.CouchbaseFlow.replace(sessionSettings, writeSettings, bucketName).asJava

/**
* Create a flow to replace a Couchbase document of the given class.
*/
def replaceDoc[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[T, T, NotUsed] =
scaladsl.CouchbaseFlow.replaceDoc(sessionSettings, writeSettings, bucketName).asJava

/**
* Create a flow to replace a Couchbase document of the given class and emit a result so that write failures
* can be handled in-stream.
*/
def replaceDocWithResult[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] =
scaladsl.CouchbaseFlow.replaceDocWithResult(sessionSettings, writeSettings, bucketName).asJava

/**
* Create a flow to delete documents from Couchbase by `id`. Emits the same `id`.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,41 @@ abstract class CouchbaseSession {
*/
def upsertDoc[T <: Document[_]](document: T, writeSettings: CouchbaseWriteSettings): CompletionStage[T]

/**
* Replace using the default write settings
*
* For replacing other types of documents see `replaceDoc`.
*
* @return a CompletionStage that completes when the replace is done
*/
def replace(document: JsonDocument): CompletionStage[JsonDocument]

/**
* Replace using the default write settings.
* Separate from `replace` to make the most common case smoother with the type inference
*
* @return a CompletionStage that completes when the replace is done
*/
def replaceDoc[T <: Document[_]](document: T): CompletionStage[T]

/**
* Replace using the given write settings.
*
* For replacing other types of documents see `replaceDoc`.
*
* @return a CompletionStage that completes when the replace done
*/
def replace(document: JsonDocument, writeSettings: CouchbaseWriteSettings): CompletionStage[JsonDocument]

/**
* Replace using the given write settings.
*
* Separate from `replace` to make the most common case smoother with the type inference
*
* @return a CompletionStage that completes when the replace is done
*/
def replaceDoc[T <: Document[_]](document: T, writeSettings: CouchbaseWriteSettings): CompletionStage[T]

/**
* Remove a document by id using the default write settings.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,26 @@ object CouchbaseSink {
.upsertDoc[T](sessionSettings, writeSettings, bucketName)
.toMat(Sink.ignore(), Keep.right[NotUsed, CompletionStage[Done]])

/**
* Create a sink to replace a Couchbase [[com.couchbase.client.java.document.JsonDocument JsonDocument]].
*/
def replace(sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Sink[JsonDocument, CompletionStage[Done]] =
CouchbaseFlow
.replace(sessionSettings, writeSettings, bucketName)
.toMat(Sink.ignore(), Keep.right[NotUsed, CompletionStage[Done]])

/**
* Create a sink to replace a Couchbase document of the given class.
*/
def replaceDoc[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Sink[T, CompletionStage[Done]] =
CouchbaseFlow
.replaceDoc[T](sessionSettings, writeSettings, bucketName)
.toMat(Sink.ignore(), Keep.right[NotUsed, CompletionStage[Done]])

/**
* Create a sink to delete documents from Couchbase by `id`.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,63 @@ object CouchbaseFlow {
}
.mapMaterializedValue(_ => NotUsed)

/**
* Create a flow to replace a Couchbase [[com.couchbase.client.java.document.JsonDocument JsonDocument]].
*/
def replace(sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[JsonDocument, JsonDocument, NotUsed] =
Flow
.setup { (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[JsonDocument]
.mapAsync(writeSettings.parallelism)(
doc => session.flatMap(_.replace(doc, writeSettings))(materializer.system.dispatcher)
)
}
.mapMaterializedValue(_ => NotUsed)

/**
* Create a flow to replace a Couchbase document of the given class.
*/
def replaceDoc[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[T, T, NotUsed] =
Flow
.setup { (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[T]
.mapAsync(writeSettings.parallelism)(
doc => session.flatMap(_.replaceDoc(doc, writeSettings))(materializer.system.dispatcher)
)
}
.mapMaterializedValue(_ => NotUsed)

/**
* Create a flow to replace a Couchbase document of the given class and emit a result so that write failures
* can be handled in-stream.
*/
def replaceDocWithResult[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] =
Flow
.setup { (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[T]
.mapAsync(writeSettings.parallelism)(
doc => {
implicit val executor = materializer.system.dispatcher
session
.flatMap(_.replaceDoc(doc, writeSettings))
.map(_ => CouchbaseWriteSuccess(doc))
.recover {
case exception => CouchbaseWriteFailure(doc, exception)
}
}
)
}
.mapMaterializedValue(_ => NotUsed)

/**
* Create a flow to delete documents from Couchbase by `id`. Emits the same `id`.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,42 @@ trait CouchbaseSession {
*/
def upsertDoc[T <: Document[_]](document: T, writeSettings: CouchbaseWriteSettings): Future[T]

/**
* Replace using the default write settings.
*
* For replacing other types of documents see `replaceDoc`.
*
* @return a future that completes when the replace is done
*/
def replace(document: JsonDocument): Future[JsonDocument]

/**
* Replace using the default write settings.
*
* Separate from `replace` to make the most common case smoother with the type inference
*
* @return a future that completes when the replace is done
*/
def replaceDoc[T <: Document[_]](document: T): Future[T]

/**
* Replace using the given write settings
*
* For replacing other types of documents see `replaceDoc`.
*
* @return a future that completes when the replace is done
*/
def replace(document: JsonDocument, writeSettings: CouchbaseWriteSettings): Future[JsonDocument]

/**
* Replace using the given write settings
*
* Separate from `replace` to make the most common case smoother with the type inference
*
* @return a future that completes when the replace is done
*/
def replaceDoc[T <: Document[_]](document: T, writeSettings: CouchbaseWriteSettings): Future[T]

/**
* Remove a document by id using the default write settings.
*
Expand Down
Loading