-
Notifications
You must be signed in to change notification settings - Fork 320
Backport TextIO ValueProvider changes [BEAM-551] #499
Backport TextIO ValueProvider changes [BEAM-551] #499
Conversation
8757614 to
a178bcc
Compare
|
Rebased |
| /** | ||
| * Returns the base output filename for this file based sink. | ||
| */ | ||
| public String getBaseOutputFilename() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to keep this in Dataflow -- removal is a breaking change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| * implies {@code #getMaxEndOffSet()}. | ||
| */ | ||
| public FileBasedSource(String fileName, long minBundleSize, | ||
| long startOffset, long endOffset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noting that there is an API gap here -- you can't do a known offset with a VP filename.
This is probably okay, because if you don't know what file you're reading how could you know what positions you need to read?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree
| } | ||
|
|
||
| if (validate) { | ||
| checkState(filepattern.isAccessible(), "Cannot validate with a RVP."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think this is a useful error message -- how does a user know what a RVP is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
(Did a base review, but now I will also start diffing) |
sammcveety
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| /** | ||
| * Returns the base output filename for this file based sink. | ||
| */ | ||
| public String getBaseOutputFilename() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| * implies {@code #getMaxEndOffSet()}. | ||
| */ | ||
| public FileBasedSource(String fileName, long minBundleSize, | ||
| long startOffset, long endOffset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree
| } | ||
|
|
||
| if (validate) { | ||
| checkState(filepattern.isAccessible(), "Cannot validate with a RVP."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
dhalperi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most diffs look good. (This review is against original PR, without any fixups since my first comments.)
One minor change to improve diffs.
| /** | ||
| * Creates a {@code CompressedSource} from a delegate file based source and a decompressing | ||
| * channel factory. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Diff LGTM https://www.diffchecker.com/cLLJyJJs
| import static com.google.common.base.Preconditions.checkArgument; | ||
| import static com.google.common.base.Preconditions.checkNotNull; | ||
|
|
||
| import com.google.cloud.dataflow.sdk.coders.Coder; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lots of divergence here, but LGETM: https://www.diffchecker.com/kEkSAaQW
| package com.google.cloud.dataflow.sdk.io; | ||
|
|
||
| import static com.google.common.base.Preconditions.checkArgument; | ||
| import static com.google.common.base.Preconditions.checkState; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
diff LGTM: https://www.diffchecker.com/bv2jAfVJ
| * the License. | ||
| */ | ||
|
|
||
| package com.google.cloud.dataflow.sdk.io; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Diff LGTM: https://www.diffchecker.com/pZ072CV9
| /** | ||
| * Returns an XmlSink that writes objects of the class specified as XML elements. | ||
| * | ||
| * <p>The specified class must be able to be used to create a JAXB context. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Diff LGTM : https://www.diffchecker.com/k0x3fdKk
| import static com.google.cloud.dataflow.sdk.util.Structs.addLong; | ||
|
|
||
| import com.google.api.services.dataflow.model.SourceMetadata; | ||
| import com.google.cloud.dataflow.sdk.io.FileBasedSource; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| import com.google.cloud.dataflow.sdk.coders.VoidCoder; | ||
| import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader; | ||
| import com.google.cloud.dataflow.sdk.io.TextIO.CompressionType; | ||
| import com.google.cloud.dataflow.sdk.io.TextIO.TextSource; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| XmlSink.write() | ||
| .toFilenamePrefix(testFilePrefix) | ||
| .ofRecordClass(testClass) | ||
| .withRootElement(testRootElement); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| p.run(); | ||
| } | ||
|
|
||
| /** Options for testing. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move this to where it is in Beam? https://www.diffchecker.com/e0gOweZQ
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
L317/after testRun
| import com.google.cloud.dataflow.sdk.io.PubsubIO; | ||
| import com.google.cloud.dataflow.sdk.io.PubsubIO.PubsubTopic; | ||
| import com.google.cloud.dataflow.sdk.io.TextIO; | ||
| import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Done |
R: @dhalperi
apache/beam#1545
apache/beam#1475
apache/beam#1134