[BEAM-3405] Make maxNumRecords a long instead of an int for KinesisIO#4339
[BEAM-3405] Make maxNumRecords a long instead of an int for KinesisIO#4339iemejia merged 1 commit intoapache:masterfrom
Conversation
| PTransform<PBegin, PCollection<KinesisRecord>> transform = unbounded; | ||
|
|
||
| if (getMaxNumRecords() < Long.MAX_VALUE) { | ||
| transform = unbounded.withMaxNumRecords(getMaxNumRecords()); |
There was a problem hiding this comment.
The previous code supported a combination of both options, but the current code does not. Is this intentional?
There was a problem hiding this comment.
I changed it to make it consistent with all the other uses on unbounded sources e.g. KafkaIO, MqttIO, etc.
Notice that the with methods on Read.Unbounded seem to be exclusive. See org.apache.beam.sdk.io.Read.Unbounded#withMaxNumRecords (and the method below), but they can be of course overwritten because BoundedReadFromUnboundedSource supports both parameters.
I suppose that a user that wants the combined options can just use use the time based one followed by Sample.any. What do you prefer? Should I let it like it was or make it consistent with the other IOs?
There was a problem hiding this comment.
Seems reasonable. Then can you just add code to verify that the user didn't specify both options at the same time, i.e. that we're not silently ignoring one of them?
ba0d99b to
99b1647
Compare
|
I decided to respect the semantics that it has before and add these semantics to the other Unbounded IOs on Beam in a future PR. I added an extra validation to see if a stream exists before continuing with the execution. PTAL @jkff |
| try { | ||
| DescribeStreamResult describeStreamResult = client.describeStream(streamName); | ||
| StreamDescription streamDescription = describeStreamResult.getStreamDescription(); | ||
| return streamName.equals(streamDescription.getStreamName()); |
There was a problem hiding this comment.
Is there any way this can return false?
There was a problem hiding this comment.
Well that's right that validation was quite silly. From looking at the official doc on the response of the Aws Kinesis API it actually is status code 200 or exception, so I updated to do it like that, for future ref.
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html
I hope everything is ok now, thanks again and sorry for the silly validation.
99b1647 to
912fa56
Compare
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue.mvn clean verifyto make sure basic checks pass. A more thorough check will be performed on your pull request automatically.