New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-701] Support Avro write compression #1038
Conversation
68ceef1
to
172c112
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some design feedback. Thoughts?
@@ -443,13 +445,24 @@ private Read() {} | |||
} | |||
|
|||
/** | |||
* Returns a {@link PTransform} that writes Avro file(s) using specified codec. | |||
* Codec cna be a string value drawn from those in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drop L449 and 450? This function seems like it does not take a string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(typo: can
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
*/ | ||
|
||
/** Defines utilities that ease working with IO in Apache Beam. */ | ||
package org.apache.beam.sdk.io.util; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally trying to avoid util
packages. Any reason not to inline this code into AvroIO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My reasoning: AvroIO is already pretty big and complex class, adding another class would complicate it even further, making it even less human readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dhalperi therefor, is it fine if we keep this as a separate class, it could be in a different package than util
if needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move SerializableAvroCodecFactory
inside of the io
package and make it package-private? That would probably be easiest.
I've found that anything that appears to be part of the public API ends up having users that depend on it, even if we mark it as for internal use only. So the best solution is to use Java visibility to make sure it's a private utility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
* A wrapper allows {@link org.apache.avro.file.CodecFactory}s to be serialized using Java's | ||
* standard serialization mechanisms. | ||
*/ | ||
public class SerializableAvroCodecFactory implements Externalizable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am generally saddened that Avro's CodecFactory
is so unusable.
Suppose I proposed we made AvroIO
simply accept a String codecName
and that the code simply uses CodecFactory.fromString(codecName)
at execution time? I guess you'd argue that this would break xz
and deflate
outside of the default levels.
- Is that important?
- If so, can we move the list of valid strings / etc. inside of AvroIO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point. I would argue using String for configuration is error prone. Furthermore while setting or retrieving codec, from user perspective she can only see CodecFactory
- it's only inside AvroIO
that we use SerializableAvroCodecFactory
- so the whole thing is hidden from the user. Also it's not that big of an overhead - both logic and code wise.
What do you mean by If so, can we move the list of valid strings / etc. inside of AvroIO?
?
|
||
public SerializableAvroCodecFactory(CodecFactory codecFactory) { | ||
checkNotNull(codecFactory, "Codec can't be null"); | ||
checkState(checkIsSupportedCodec(codecFactory), "%s is not supported", codecFactory.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no .toString()
on stuff using checkState
-- it will do that automatically in the case that it prints the message, but will also save cycles by not toStringing in the success case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
172c112
to
a167bca
Compare
@dhalperi thanks for review. Added comments + some fixes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM modulo moving the utility into the IO package for proper visibility.
* A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or | ||
* multiple Avro files matching a sharding pattern). | ||
* | ||
* @param <T> the type of each of the elements of the input PCollection | ||
*/ | ||
public static class Bound<T> extends PTransform<PCollection<T>, PDone> { | ||
private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; | ||
private static final SerializableAvroCodecFactory DEFAULT_CODEC = | ||
new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the current default, or a behavior change? (I assume the actual current default is that we use the null
codec?) If so, put this behavior change in the commit message :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
*/ | ||
|
||
/** Defines utilities that ease working with IO in Apache Beam. */ | ||
package org.apache.beam.sdk.io.util; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move SerializableAvroCodecFactory
inside of the io
package and make it package-private? That would probably be easiest.
I've found that anything that appears to be part of the public API ends up having users that depend on it, even if we mark it as for internal use only. So the best solution is to use Java visibility to make sure it's a private utility.
AvroIO.Write.Bound<GenericRecord> serdeWrite = SerializableUtils.clone(write); | ||
|
||
assertTrue(serdeWrite.getCodec().toString().equals(CodecFactory.xzCodec(9).toString())); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is _completely optional. I'm only including this nitpick because there were other actual comments_
We'll get slightly better error message if the code was written as:
assertEquals(CodecFactory.xzCodec(9), serdWrite.getCodec())`
(with toString()
as it is now, if those equality checks don't work).
There are a few other optional instances of opportunities for improvement in this file and the other test file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. What are the other improvements?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping @dhalperi
eab8d8e
to
7d47d37
Compare
BEHAVIOUR CHANGE: prior to this change Avro output would not use compression. Starting from this commit, by default Avro output is compressed using deflate codec (level 6).
1a6a1db
to
9c954c7
Compare
9c954c7
to
449561c
Compare
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull request
mvn clean verify
. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>
in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.