-
Notifications
You must be signed in to change notification settings - Fork 645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Google Cloud Pub/Sub gRPC: acknowledge flow #2422
Conversation
0b0757c
to
39c7ad2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Left some comments.
/** | ||
* Create a sink that accepts consumed message acknowledgements. | ||
* | ||
* The materialized value completes on stream completion. | ||
* | ||
* @param parallelism controls how many acknowledgements can be in-flight at any given time | ||
*/ | ||
def acknowledge(parallelism: Int): Sink[AcknowledgeRequest, Future[Done]] = | ||
def acknowledge(parallelism: Int): Sink[AcknowledgeRequest, Future[Done]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this method be deprecated like its javadsl counterpart?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was in the non-gRPC version.
def acknowledgeFlow(): Flow[AcknowledgeRequest, AcknowledgeRequest, NotUsed] = | ||
Flow | ||
.setup { (mat, attr) => | ||
Flow[AcknowledgeRequest] | ||
.mapAsync(1)( | ||
req => | ||
subscriber(mat, attr).client | ||
.acknowledge(req) | ||
.map(_ => req)(mat.executionContext) | ||
) | ||
} | ||
.mapMaterializedValue(_ => NotUsed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe there's an opportunity here to compose all the acknowledge sink/flow and java/scala DSL methods from this to DRY up the impls. It could be refactored into a private method that takes parallelism
to support the deprecated overloads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The gRPC generated types are different for the Java DSL and the Scala DSL. That's not easy to see.
project/Dependencies.scala
Outdated
@@ -238,7 +238,7 @@ object Dependencies { | |||
"com.google.auth" % "google-auth-library-oauth2-http" % "0.20.0", // BSD 3-clause | |||
// pull in Akka Discovery for our Akka version | |||
"com.typesafe.akka" %% "akka-discovery" % AkkaVersion | |||
) ++ Silencer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question] Curious as to why the Silencer
dep usage was moved to build.sbt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My idea was that it is not a dependency. It's just build infrastructure. I'll move it back.
.setup { (mat, attr) => | ||
Flow | ||
.create[AcknowledgeRequest] | ||
.mapAsyncUnordered(1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] I suppose this could just be mapAsync
if parallelism always equals 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, mapAsync
with parallelism 1 falls back to the mapAsyncUnordered
implementation anyway.
acknowledge
flow (see Google Pub/Sub gRPC - Expose acknowledge Sink as a Flow #1459)References #1459