-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add atLeastOnce in SlickProjection, #24 #69
Conversation
@@ -99,6 +99,4 @@ import akka.stream.scaladsl.Source | |||
composedSource | |||
} | |||
|
|||
// FIXME | |||
def processElement(elt: StreamElement)(implicit ec: ExecutionContext): Future[Done] = ??? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed this because it was in the way and will be replaced by the approach in #53
@ApiMayChange | ||
def transactional[Offset, StreamElement, P <: JdbcProfile: ClassTag]( | ||
|
||
def exactlyOnce[Offset, StreamElement, P <: JdbcProfile: ClassTag]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed this
@@ -39,16 +42,94 @@ object SlickProjectionSpec { | |||
} | |||
} | |||
""") | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this chunk was just moved from the end of the class to the companion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Left a few comments. I think there are some small improvements on the Slick DBIO usage.
.map(elt => offsetExtractor(elt) -> elt) | ||
.mapAsync(1) { | ||
case (offset, element) => processElement(element).map(_ => offset) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why creating a tuple?
.map(elt => offsetExtractor(elt) -> elt) | |
.mapAsync(1) { | |
case (offset, element) => processElement(element).map(_ => offset) | |
} | |
.mapAsync(1) { elt => | |
processElement(elt).map(_ => offsetExtractor(elt)) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For slick, we can take the two DBIOs, combine them and run without using transactionally
.
It may give a small performance improvements as we take one thread from Slick thread pool, one connection and execute two SQL statements each in its own transaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will need to test the following though:
User returns in event handler:
dbio1.flatMap(_ => dbio2).transactionaly
we do:
userHandlerDbio // <- the one above
.flatMap(_ => offsetStore.saveOffset(projectionId, offset))
I'm 99% sure, but we need to confirm this behavior. If I recall correctly, user DBIO will be run in one transaction and offset store in another. That would be obvious, but I want to confirm this.
eventually { | ||
dbConfig.db.run(repository.findById(entityId)).futureValue.value.text should include("elem-22") | ||
} | ||
offsetStore.readOffset[Long](projectionId).futureValue.value shouldBe 20L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, TestSink support in ProjectionTestkit to the rescue.
* offset not stored in same transaction as user handler * usage would be to benefit from batching of offsets * which is probably a very small advantage for jdbc * it can be more interesting when we add support for user handler that is not storing in db but the offset should be stored in db
5cce46b
to
69cb1e8
Compare
@renatocaval take a look at d5505db to see if I understood that dbio transactional righ I still have to verify that they are separate. |
@patriknw, yes that correct. |
I'll verify the transaction boundaries afterwards |
that is not storing in db but the offset should be stored in db, created issue SlickProjection + handler that is not using db #68 for that
References #24