Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Couchbase based on akka-persistence-couchbase work #1411

Merged
merged 7 commits into from
Jan 16, 2019

Conversation

ennru
Copy link
Member

@ennru ennru commented Jan 11, 2019

This PR turns the newly merged but not yet released Couchbase connector around based o the work done for Akka Persistence Couchbase.

Compared to the earlier implementation by @dannylesnik it

  • offers a future-based API via CouchbaseSession which is appropriate for one-off requests to Couchbase
  • contains an Akka extension to cache (and later close) CouchbaseSession instances
  • reduces the number of operations offered in CouchbaseSource, CouchbaseFlow and CouchbaseSink to essential ones
  • relies on Scalatest for testing

@ennru ennru added this to the 1.0-M2 milestone Jan 11, 2019
Copy link
Member

@2m 2m left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great stuff! Just some documentation nitpicks.

docs/src/main/paradox/couchbase.md Outdated Show resolved Hide resolved
@@ -20,164 +30,117 @@ The table below shows direct dependencies of this module and the second tab show

@@dependencies { projectId="couchbase" }

# Overview
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above in the @@dependency change $scalaBinaryVersion$ to $scala.binary.version$ and $version$ to $project.version$.

docs/src/main/paradox/couchbase.md Outdated Show resolved Hide resolved

We will need an @scaladoc[ActorSystem](akka.actor.ActorSystem) and an @scaladoc[ActorMaterializer](akka.stream.ActorMaterializer).
* `CouchbaseSession` offers a direct API for one-off operations
* `CouchbaseSessionRegistry` is a Akka extension to keep track and share `CouchbaseSession`s within an `ActorSystem`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/a Akka/an Akka

docs/src/main/paradox/couchbase.md Outdated Show resolved Hide resolved

Creating flow with single upsert operation
These default values are not recommended for real use, as they do not observe persistence on disk for any node.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/real/production

s/observe peristence on disk/persist to disk


Scala
: @@snip (/couchbase/src/test/scala/docs/scaladsl/CouchbaseSupport.scala) { #delete-BulkFlow }
Read more about durability settings in the [Couchbase documentation](https://docs.couchbase.com/java-sdk/2.6/durability.html#configuring-durability).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We depend on couchbase 2.7.2 but here link to the 2.6 docs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed all the links to use extref now.

Copy link
Member

@2m 2m left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@dannylesnik
Copy link
Contributor

Looks very interesting and a lot to learn from. However I'm a little concerned about how do failures in in stage flow are now handled.

if I have a couchbase cluster with 3 nodes and you have stream of key from 0 to 9 they will be distributed in by its hash among all three master replicas. If for any reason one of the replicas fails, stream will fail and I will loose documents of this master replica and some of the documents that are currently travelled in predecessor stages of the stream. Even if I use recover in this stage, I'll not get failed document and propagate it to next stage.

I really would like to prevent situation when I send 10k of docs and got only 99995 docs inserted.
If I propagate failed document to next stage I can take care of it by persisting it in cache, or any other database, or log it somewhere and raise alert, but not loose this document.

For my point of view this is most important feature of this plugin and this is why I propagated Failed document and its exception to the next stage.

@ennru what do you think, How this functionality can be achieved now?

@johanandren
Copy link
Member

For reference this spawned this idea/issue over in Akka akka/akka#26250

@ennru
Copy link
Member Author

ennru commented Jan 16, 2019

Thank you @dannylesnik for looking into this PR.
You are right, you can't handle write failures with this API in the way you describe. I will add support for it before we release it, in a new PR.

@ennru ennru merged commit 42033c0 into akka:master Jan 16, 2019
@ennru ennru deleted the couchbase-from-persistence branch January 16, 2019 11:17
@2m 2m mentioned this pull request Jan 17, 2019
@He-Pin
Copy link
Member

He-Pin commented Sep 4, 2022

@ennru Hi, the the issue mentioned by @dannylesnik addressed ,thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants