-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-53] Java-only Pubsub sink for streaming. #171
Conversation
R: @kennknowles |
Note the progression were on is: |
// ================================================================================ | ||
|
||
/** | ||
* Number of cores available for publishing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is a core in this context? My initial instinct is across machines, but it seems to be being used as the sharding factor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is now 'numShards' and needs to be chosen to balance num records batch with pubsub latency. Note that a random long indeed shards but results in most bundles having a single element which kills apiary/grpc quota.
I currently (ie in a pending branch) hard code this in PubsubIO. It is comparable to the heuristics we have baked in for calculating the initial splits for an UnboundedReader.
Better to have the owner of I/O review this. Also just want to call out here that the follow-ups suggested on #120 should probably go first, but I'll leave that up to Dan. |
ACK working on a pubsub-apiary follow up. Thanks! On Wed, Apr 13, 2016 at 7:16 PM, Kenn Knowles notifications@github.com
|
elementCounter.addValue(1L); | ||
byte[] elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element()); | ||
long timestampMsSinceEpoch = c.timestamp().getMillis(); | ||
c.output(KV.of(ThreadLocalRandom.current().nextInt(numCores * SCALE_OUT), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a random int will allow most systems to choose the sharding to an even more arbitrary degree, and then you can remove numCores
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment above.
Looking into this -- looks like you'll have to rebase. I see this code is pre-name-change. |
PTAL |
/** | ||
* Coder for conveying outgoing messages between internal stages. | ||
*/ | ||
private static final Coder<PubsubClient.OutgoingMessage> CODER = new |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noting that this is a case we support quite poorly today: A library author offers a sink, requiring a user to convert to a particular datatype Foo
. The library author writes a coder for Foo
and would like the user to get this benefit automatically.
Not necessarily directly applicable here, but putting it out there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note this is entirely internal to the Sink / PubsubClient interface. I could put the coder in PubsubClient but since it has no other PCollection/Coder/etc dependencies it felt better leaving it outside.
Added a couple initial comments to share the review load. |
elementCounter.addValue(1L); | ||
byte[] elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element()); | ||
long timestampMsSinceEpoch = c.timestamp().getMillis(); | ||
c.output(KV.of(ThreadLocalRandom.current().nextInt(numShards), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is preferable (in terms of serialization overhead, which I understand is important for streaming) to make these static
inner classes and then pass in constants in constructors. But up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, good catch.
PTAL. |
* BLOCKING | ||
* Send {@code messages} as a batch to Pubsub. | ||
*/ | ||
private void publishBatch(List<PubsubClient.OutgoingMessage> messages, int bytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't much care, but you write this as both OutgoingMessage
and PubsubClient.OutgoingMessage
in this file. Probably could pick one and stick with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Generally looks good to me. Let's sync tomorrow over any remaining comments. |
PTAL |
throws IOException { | ||
long nowMsSinceEpoch = System.currentTimeMillis(); | ||
int n = pubsubClient.publish(topic, messages); | ||
Preconditions.checkState(n == messages.size()); | ||
checkState(n == messages.size(), "Attempted to publish %d messaged but %d were successful", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
PTAL |
…th-value-sorting 158 extend reduce by key with value sorting
Co-authored-by: Elias Segundo <elias.segundo@luisrazo.local> Co-authored-by: Elias Segundo Antonio <eliassegundo.segundo@gmail.com> Co-authored-by: Elias Segundo <elias.segundo@luisrazo.local> Co-authored-by: Danny McCormick <dannymccormick@google.com>
Co-authored-by: Elias Segundo <elias.segundo@luisrazo.local> Co-authored-by: Elias Segundo Antonio <eliassegundo.segundo@gmail.com> Co-authored-by: Elias Segundo <elias.segundo@luisrazo.local> Co-authored-by: Danny McCormick <dannymccormick@google.com>
No description provided.