-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-746][CORE] Added Avro Serialization to Kryo #7004
Conversation
… network IO involved during a shuffle. This compresses the schema and allows for users to register their schemas ahead of time to further reduce traffic. Changed how the records were serialized to reduce both the time and memory used removed unused variable
Jenkins, this is ok to test |
<groupId>org.apache.avro</groupId> | ||
<artifactId>avro</artifactId> | ||
<version>${avro.version}</version> | ||
<scope>${hadoop.deps.scope}</scope> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this included already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avro
and avro-mapred
are already in the dependencyMangement
of the parent POM, so nothing more than groupId
, artifactId
and perhaps scope
should be included here. In particular, version
should not generally be specified when it is already in the dependencyManagement
. This is a frequent error in the Spark POMs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to Mark's comments.
Hi @JDrit I left a few comments on the code. I know this is still a WIP, so I didn't go through very detailed, but there are some style and scoping (more things should be Since the point of this is to be more efficient, can you give some example numbers of the savings from this? But it seems like a great idea. (incidentally, @massie is also working on something to get shuffle to use parquet) |
One high-level question -- if a user does not call My instinct is that its not worth it for you to do any extra compression for the schema. There is already compression controlled by |
I wrote some benchmarking code for this and the results are recorded here. tl;dr: |
Which of the new classes should be marked |
Hey @JDrit, Serialization performance is a big interest of mine and I'd be happy to help with review of this patch. One question first, though: I've noticed that this adds a new Avro dependency, but it seems to be scoped to compile-only. I assume that this is because we don't want to introduce a hard dependency on Avro to Spark itself, since doing so might create dependency conflicts with user code. However, I'm worried about what happens if we run Spark without any version of Avro on the classpath: will we get ClassNotFoundExceptions when KryoSerializer ties to create a GenericAvroSerializer? If we can't come up with a clean way to handle this dependency issue in Spark, the next best solution might be to release this Avro serialization code as a third-package (e.g. via http://spark-packages.org or your own preferred distribution channel). I think that this might be possible by packaging the AvroSerializer code into its own JAR, then writing a custom Kryo registrator and instructing users on how to configure |
Also, to clarify: is this primarily intended to improve the performance of programs written against the Spark Core API? For Spark SQL + DataFrames, I think the spark-avro library will convert the Avro records into Spark SQL's internal Row representation, which should be more efficient to serialize and shuffle. I'd be curious to know whether you could see most of these benefits for simpler workflows by using Dataframes and leaving the serialization up to that. |
This was intended to be a performance increase for spark-core with RDDs. This works separately from spark-avro. I realize that dataframes could also be used but the goal was to make it easier for users who just want to use RDDs. |
Jenkins, this is ok to test. |
Any comment on the classpath issues or on whether this introduces a new Avro dependency? |
This would introduce Avro as a new dependency but this would be similar in how Spark has a dependency on Parquet or other projects. Spark already has compile time dependencies on things like Parquet, Flume, etc. |
Actually, maybe @pwendell or @zsxwing can comment on the Avro dependency concerns (@zsxwing, because you mentioned Avro dependency issues in #6830 (comment)). |
<dependency> | ||
<groupId>org.apache.avro</groupId> | ||
<artifactId>avro-mapred</artifactId> | ||
<version>${avro.version}</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly to the previous one, this one should only contain groupId / artifactId (everything else is inherited).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The one thing you have to add here, though, is:
<classifier>${avro.mapred.classifier}</classifier>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing that out, just fixed that.
I don't think it really changes anything. |
The dependency looks good to me. @JoshRosen
|
@JoshRosen does that satisfy your concerns and are there other changes you would want me to make? |
bos.toByteArray | ||
}) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: delete extra newline
Jenkins, retest this please |
private val schemaCache = new mutable.HashMap[Long, Schema]() | ||
|
||
/** This needs to be lazy since SparkEnv is not initialized yet sometimes when this is called */ | ||
private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious, where does this constructed when there isn't a SparkEnv?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just accept a SparkConf
in the GenericAvroSerializer
constructor instead of getting it from SparkEnv
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Several of the tests involving block replication fail when this value is not lazily defined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally had it accept SparkConf but I was getting serialization errors since SparkConf is not serializable. This way prevents having to send the configuration with the serializer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Joe and I talked about this a bit offline -- the reason for this is that ShuffleRDD
lets you set a Serializer directly, which is used in some tests, and that is why the serializer itself needs to be serializable. I'll add a comment here explaining why its necessary when I merge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for anybody that is curious -- I had gotten myself pretty confused about why this change would make the SparkConf
serialized. KryoSerializer
already had a conf
argument to the constructor, that wasn't changed. But the conf
there is only accessed in field initialization, never in methods, so it wasn't stored. But through the wonders of scala, when you access that conf
in a method, suddenly conf
also becomes a member variable, and now you can no longer serialize the KryoSerializer
.
In practice this means the lazy val codec
here is fine in actual use, but it could be very confusing in a unit test where the SparkEnv
hasn't been set. So I'll add comment explaining this a bit.
2 super minor comments, but otherwise lgtm! Also as a bit general house-keeping, can you put some of the benchmarks you have on the jira (since that serves as a better archive than github). @JoshRosen want to take another look at this? |
} | ||
|
||
/** Gets all the avro schemas in the configuration used in the generic Avro record serializer */ | ||
def getAvroSchema: Map[Long, String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do the keys and values of this map denote?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The keys are longs, which represent the a unique ID of the schema and the values are the string representation of the schema.
Jenkins, retest this please |
1 similar comment
Jenkins, retest this please |
Test build #1196 has finished for PR 7004 at commit
|
@squito @JoshRosen Have I addressed all your issues, or is there anything else you would like me to do? |
I'm going to defer to other reviewers who have been following this patch more closely. |
"""Error reading attempting to read avro data -- | ||
|encountered an unknown fingerprint: $fingerprint, not sure what schema to use. | ||
|This could happen if you registered additional schemas after starting your | ||
|spark context.""".stripMargin) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want to put in line breaks here (as you get w/ the stripMargin
effect) and need to use s"..."
to get $fingerprint
interpreted correctly
sorry I am trying to understand this serialization thing a bit better ... something doesn't make sense to me, but mostly outside of these changes. Once I get a handle on that, I will merge this, just want to make sure I'm not missing something ... |
merged to master, thanks @JDrit |
Did @srowen look at the build change? Sean or I should be signing off on any dependency changes in the build. |
There was a lot of up-thread discussion regarding the dependency change; my first comment on this PR was asking whether this introduced a new dependency, etc. One concern which might have been overlooked is whether the required dependencies will be packaged if Spark is build with the YARN profile disabled. It looks like the rationale upthread is that this dependency is already transitively included by our other dependencies, but I'm not sure if that will always be the case if we're relying on the YARN profile to include it. |
@pwendell @JoshRosen ah sorry, I thought @vanzin & @zsxwing gave it the thumbs up earlier -- just following the comments I forgot to get the approval of sean or you as well. Lemme know if you think this needs an immediate revert. I will also check w/ sean. |
Re: the avro dependency, this is a net new dependency for core. Previously this came in via the hive module (the metastore dependency to be specific). I suppose it relies on the Hive profile therefore, but not the YARN profile. In any event the right thing to do is include the dependency if it's being used, of course. I suppose this is evidence that the Spark assembly -- the Hive flavors -- have had this dep and have been fine. Avro doesn't bring anything in that we didn't already have, except Avro:
So, I think the net change here is only that Avro has been added to core. Unless there's an objection to adding Avro at all, I think this is OK from a build standpoint. |
Okay sounds good. Thanks for looking at it Sean.
On Wed, Jul 29, 2015 at 1:37 PM, Sean Owen notifications@github.com wrote:
|
Added a custom Kryo serializer for generic Avro records to reduce the network IO
involved during a shuffle. This compresses the schema and allows for users to
register their schemas ahead of time to further reduce traffic.
Currently Kryo tries to use its default serializer for generic Records, which will include
a lot of unneeded data in each record.