-
Notifications
You must be signed in to change notification settings - Fork 645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Google Pub Sub connector built on Akka Grpc #1162
Conversation
Code is ready to be reviewed. Docs still need an update, since those have been copied from the current Google Pub Sub connector. |
...-grpc/src/main/scala/akka/stream/alpakka/googlecloud/pubsub/grpc/impl/AkkaGrpcSettings.scala
Show resolved
Hide resolved
Updated the docs to reflect the configuration of this connector. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Impressive how little is needed with Akka gRPC's support!
The module would improve a lot with a bit of Scaladoc. Generating Scaladoc fails for me with the generated code being the culprit. Didn't dig into that, but it may be because of multiple older Protobuf jars on the Alpakka classpath.
The generated code is created in c.g.pubsub.v1
for Java and c.g.pubsub.v1.pubsub
for Scala. Is there a way to control that? Make it javadsl
and scaladsl
?
Sprinkle a few @ApiMayChange
over this as it depends on the 0.2 version of Akka gRPC.
@@ -11,6 +11,7 @@ lazy val modules: Seq[ProjectReference] = Seq( | |||
geode, | |||
googleCloudPubSub, | |||
googleFcm, | |||
googleCloudPubSubGrpc, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something with this sort algorithm looks broken.
new PubSubConfig(host, port) | ||
|
||
def apply(conf: Config): PubSubConfig = { | ||
val c = conf.getConfig("alpakka.google.cloud.pubsub.grpc") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather make this expect to have selected the config scope already. That allows creating multiple settings from config.
class ExampleUsage { | ||
|
||
//#init-mat | ||
implicit val system = ActorSystem() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Annotate with types, please.
//#init-client | ||
|
||
//#publish-single | ||
val publishMessage: PubsubMessage = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make the relevant imports show in the snippet.
// #publish-fast | ||
Source<PubsubMessage, NotUsed> messageSource = Source.single(publishMessage); | ||
messageSource | ||
.groupedWithin(1000, FiniteDuration.apply(1, "min")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use java.time.Duration
instead.
.single(request) | ||
.concat( | ||
Source | ||
.tick(Duration.ZERO, pollInterval, ()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be pollInterval
as initial delay, as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. Nice catch.
.concat( | ||
Source | ||
.tick(Duration.ZERO, pollInterval, ()) | ||
.map(_ => subsequentRequest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use subsequentRequest
in tick
?
|
||
The google cloud pub/sub connector provides a way to connect to google clouds managed pub/sub https://cloud.google.com/pubsub/. | ||
|
||
This connector communicates to PubSub via gRPC protocol. The integration between Akka Stream and gRPC is handled by the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly add a callout with Google's explanation of what Pub/Sub offers. Good for googleability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And a note on the difference to the other Alpakka connector...
And vice versa.
Pushed commits that address feedback comments. I had no problem generating api docs when running I see that scalapb has support for generating scala files under a custom package. However those scalapb specific options need to be defined in the Added |
Looked a bit at generating Scala files into a custom package, but didn't see a way to achieve this. Submitted scalapb/ScalaPB#495 . Of course to have scaladsl/javadsl, we'd need the same for the protoc Java code generation as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly spelling.
Further information at the official [Google Cloud documentation website](https://cloud.google.com/pubsub/docs/overview). | ||
@@@ | ||
|
||
This connector communicates to Pub/Sub via gRPC protocol. The integration between Akka Stream and gRPC is handled by the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/via gRPC/via the gRPC/
## Configuration | ||
|
||
The connector comes with the default settings configured to work with the Google Pub Sub endpoint and uses the default way of | ||
locating credetianls by looking at the `GOOGLE_APPLICATION_CREDENTIAL` environment variable. Please check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/credetianls/credentials/
Java | ||
: @@snip (/google-cloud-pub-sub-grpc/src/test/java/docs/javadsl/ExampleUsageJava.java) { #publish-fast } | ||
|
||
Finally, to automatically acknowledge messages and send messages to your own sink, once can do the following: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/once/one/
## Subscribing | ||
|
||
To receive message from a subscription, first we create a `StreamingPullRequest` with a FQRS of a subscription and | ||
a deadline for acknowledgemts in seconds. Google requires that only the first `StreamingPullRequest` has the subscription |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/acknowledgemts/acknowledgements/
Java | ||
: @@snip (/google-cloud-pub-sub-grpc/src/test/java/docs/javadsl/ExampleUsageJava.java) { #subscribe } | ||
|
||
Here `pollInterval` is the time between `StreamingPullRequest` are sent when there are no messages in the subscription. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/StreamingPullRequest
/StreamingPullRequest
s/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
port = 443 | ||
|
||
# Set to "none" to disable TLS | ||
rootCa = "GoogleInternetAuthorityG3.crt" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be better to put the certificate in a sub-directory, so that it works well even with über-jars?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would having the certificate in the top directory not work with uber-jars?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It works. Just thought it might be good to show context to where it is used. Maybe irrelevant.
5dadb0d
to
3c303ad
Compare
This reverts commit 200afb8
f77a82a
to
b8001b6
Compare
I just realized, that there is no akka-grpc for 2.11 released currently (akka/akka-grpc#345). But we need to discuss if we really need it. Maybe Alpakka 1.0 will drop 2.11 and be released for Scala 2.12 and 2.13. |
Is there a way to build just this connector with Scala 2.12, only? |
Refactored compiled-only tests that are used in the documentation snippets to runnable ones. |
* test using Google PubSub emulator and client docker containers
33e26b7
to
2233374
Compare
Everything is green. This is ready to go in! |
Thank you @CremboC for the initial implementation of this. :) I hope you can try it out and see how it works in your use-case. |
This is an alternative connector to Google Pub Sub that uses gRPC protocol.
This is a continuation of #759 with the difference that it uses Akka Grpc instead of Google Java gRPC libraries.
This is still WIP, as it needs the following: