-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24993] [SQL] Make Avro Fast Again #21952
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@dbtsai, thanks! I am a bot who has found some folks who might be able to help with the review:@cloud-fan |
|
Test build #93922 has finished for PR 21952 at commit
|
|
This is serious and we should fix it before Spark 2.4. For the benchmark, I have 2 questions:
|
|
Maybe due to test environment difference, I ran the benchmark code above but didn't notice significant regression. See if others can confirm the regression too. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one more improvement: if the element is primitive type and not nullable, we can call arrayData.toBoolean/Int/...Array directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested this out, and this doesn't help much.
I guess the reason is the avro writer is expecting a boxed ArrayList, so even we call the primitive APIs, Scala will still do the auto-boxing which will not be much different than the current code.
|
@cloud-fan as you suggested, I benchmarked cache read performance, and the performance is the same. This makes sense, since it's unlikely that cache read performance is that bad so we can see the impact on avro writing. Spark 2.4 Spark 2.3 |
|
Hi @dbtsai , nice catch! If the input data is from data source, I doubt the improvement of this PR. As the data schema is always nullable for data source: Anyway we should add these checks. |
|
@dbtsai I didn't use Spark 2.3 when testing databricks-avro. I also used current master. But because a recent change of schema verifying ( So basically I tested built-in avro and databricks-avro both on current master. I think the difference between Spark 2.3 and current master may cause difference. Btw, in the following benchmark numbers I modify array feature length from 16000 to 1600. > "com.databricks.spark.avro"
scala> spark.sparkContext.parallelize(writeTimes.slice(50, 150)).toDF("writeTimes").describe("writeTimes").show()
+-------+--------------------+
|summary| writeTimes|
+-------+--------------------+
| count| 100|
| mean| 0.21102|
| stddev|0.010737435692590912|
| min| 0.195|
| max| 0.247|
+-------+--------------------+
scala> spark.sparkContext.parallelize(readTimes.slice(50, 150)).toDF("readTimes").describe("readTimes").show()
+-------+--------------------+
|summary| readTimes|
+-------+--------------------+
| count| 100|
| mean| 0.09441999999999999|
| stddev|0.016021563751722395|
| min| 0.07|
| max| 0.134|
+-------+--------------------+
> "avro"
scala> spark.sparkContext.parallelize(writeTimes.slice(50, 150)).toDF("writeTimes").describe("writeTimes").show()
+-------+--------------------+
|summary| writeTimes|
+-------+--------------------+
| count| 100|
| mean| 0.21445|
| stddev|0.008952596824329237|
| min| 0.201|
| max| 0.25|
+-------+--------------------+
scala> spark.sparkContext.parallelize(readTimes.slice(50, 150)).toDF("readTimes").describe("readTimes").show()
+-------+--------------------+
|summary| readTimes|
+-------+--------------------+
| count| 100|
| mean| 0.10792|
| stddev|0.015983375201386058|
| min| 0.08|
| max| 0.15|
+-------+--------------------+ |
|
add @beettlle @viirya since you don't see the performance regression between 2.4 + builtin reader and 2.4 + databricks reader, do you think we have another regression is somewhere else in Spark? Can you try 2.3 branch to confirm my finding? Thanks. @gengliangwang I thought the regression is not from the checking of nullability. Mostly from ArrayList which has default capacity of 10. When more elements are inserted, the ArrayList has to resize and copy over. Thanks. |
|
@dbtsai I was thinking the same thing. I will do the test later after I come back to my laptop. |
|
Test build #94062 has finished for PR 21952 at commit
|
|
Test build #94069 has finished for PR 21952 at commit
|
|
@dbtsai This is what I see when testing on Spark 2.3. Compared with above numbers, seems to me there are no such significant difference as same as your findings. > "com.databricks.spark.avro - Spark 2.3"
scala> spark.sparkContext.parallelize(writeTimes.slice(50, 150)).toDF("writeTimes").describe("writeTimes").show()
+-------+-------------------+
|summary| writeTimes|
+-------+-------------------+
| count| 100|
| mean|0.21722999999999998|
| stddev|0.04375479309963559|
| min| 0.176|
| max| 0.481|
+-------+-------------------+
scala> spark.sparkContext.parallelize(readTimes.slice(50, 150)).toDF("readTimes").describe("readTimes").show()
+-------+-------------------+
|summary| readTimes|
+-------+-------------------+
| count| 100|
| mean|0.12025999999999999|
| stddev|0.04034638406438311|
| min| 0.072|
| max| 0.26|
+-------+-------------------+ |
|
Test build #94079 has finished for PR 21952 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just new java.util.ArrayList[Any](len) here instead of creating an array and wrap it with array list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My previous experience in ml project told me that ArrayList has slower setter performance due to one extra function call, so my preference is using array as much as possible, and wrap it into the right container in the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this only works when all the fields are not nullable, I don't think it's very useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was addressing the feedback from @gengliangwang We can remove it since the cases when all the fields are not nullable will be probably fairly rare.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove it. We can fix the issue that Spark always turn schema to nullable later.
|
we can keep investigating the perf regression, this patch itself LGTM |
|
Ah, finally I can reproduce this. It needs to allocate the array feature with length 16000. I was reducing it to 1600 and it largely relieve the regression. Notice: > "com.databricks.spark.avro - Spark 2.3"
scala> spark.sparkContext.parallelize(writeTimes.slice(50, 150)).toDF("writeTimes").describe("writeTimes").show()
+-------+-------------------+
|summary| writeTimes|
+-------+-------------------+
| count| 100|
| mean| 0.9711099999999999|
| stddev|0.01940836797556013|
| min| 0.941|
| max| 1.037|
+-------+-------------------+
scala> spark.sparkContext.parallelize(readTimes.slice(50, 150)).toDF("readTimes").describe("readTimes").show()
+-------+-------------------+
|summary| readTimes|
+-------+-------------------+
| count| 100|
| mean| 0.36022|
| stddev|0.05807476546520342|
| min| 0.287|
| max| 0.626|
+-------+-------------------+
> "com.databricks.spark.avro - current master branch"
scala> spark.sparkContext.parallelize(writeTimes.slice(50, 150)).toDF("writeTimes").describe("writeTimes").show()
+-------+--------------------+
|summary| writeTimes|
+-------+--------------------+
| count| 100|
| mean| 1.6455800000000003|
| stddev|0.024971976212538632|
| min| 1.6|
| max| 1.733|
+-------+--------------------+
scala> spark.sparkContext.parallelize(readTimes.slice(50, 150)).toDF("readTimes").describe("readTimes").show()
+-------+-------------------+
|summary| readTimes|
+-------+-------------------+
| count| 100|
| mean|0.33659000000000006|
| stddev| 0.0431415558178404|
| min| 0.273|
| max| 0.462|
+-------+-------------------+
> "avro"
scala> spark.sparkContext.parallelize(writeTimes.slice(50, 150)).toDF("writeTimes").describe("writeTimes").show()
+-------+-------------------+
|summary| writeTimes|
+-------+-------------------+
| count| 100|
| mean| 1.7371699999999999|
| stddev|0.03504399976018602|
| min| 1.695|
| max| 1.886|
+-------+-------------------+
scala> spark.sparkContext.parallelize(readTimes.slice(50, 150)).toDF("readTimes").describe("readTimes").show()
+-------+-------------------+
|summary| readTimes|
+-------+-------------------+
| count| 100|
| mean|0.32348999999999994|
| stddev|0.06235617714615632|
| min| 0.263|
| max| 0.781|
+-------+-------------------+ |
|
LGTM |
|
Test build #94104 has finished for PR 21952 at commit
|
|
I noticed that the benchmark uses |
|
The regression happens at writing. Looks like when benchmarking writing time, we don't use |
ec17d58 to
2df8142
Compare
|
Merged into master. Thanks all for reviewing. |
|
Test build #94117 has finished for PR 21952 at commit
|
## What changes were proposed in this pull request? When lindblombr at apple developed [SPARK-24855](#21847) to support specified schema on write, we found a performance regression in Avro writer for our dataset. With this PR, the performance is improved, but not as good as Spark 2.3 + the old avro writer. There must be something we miss which we need to investigate further. Spark 2.4 ``` spark git:(master) ./build/mvn -DskipTests clean package spark git:(master) bin/spark-shell --jars external/avro/target/spark-avro_2.11-2.4.0-SNAPSHOT.jar ``` Spark 2.3 + databricks avro ``` spark git:(branch-2.3) ./build/mvn -DskipTests clean package spark git:(branch-2.3) bin/spark-shell --packages com.databricks:spark-avro_2.11:4.0.0 ``` Current master: ``` +-------+--------------------+ |summary| writeTimes| +-------+--------------------+ | count| 100| | mean| 2.95621| | stddev|0.030895815479469294| | min| 2.915| | max| 3.049| +-------+--------------------+ +-------+--------------------+ |summary| readTimes| +-------+--------------------+ | count| 100| | mean| 0.31072999999999995| | stddev|0.054139709842390006| | min| 0.259| | max| 0.692| +-------+--------------------+ ``` Current master with this PR: ``` +-------+--------------------+ |summary| writeTimes| +-------+--------------------+ | count| 100| | mean| 2.5804300000000002| | stddev|0.011175600225672079| | min| 2.558| | max| 2.62| +-------+--------------------+ +-------+--------------------+ |summary| readTimes| +-------+--------------------+ | count| 100| | mean| 0.29922000000000004| | stddev|0.058261961532514166| | min| 0.251| | max| 0.732| +-------+--------------------+ ``` Spark 2.3 + databricks avro: ``` +-------+--------------------+ |summary| writeTimes| +-------+--------------------+ | count| 100| | mean| 1.7730500000000005| | stddev|0.025199156230863575| | min| 1.729| | max| 1.833| +-------+--------------------+ +-------+-------------------+ |summary| readTimes| +-------+-------------------+ | count| 100| | mean| 0.29715| | stddev|0.05685643358850465| | min| 0.258| | max| 0.718| +-------+-------------------+ ``` The following is the test code to reproduce the result. ```scala spark.sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed") val sparkSession = spark import sparkSession.implicits._ val df = spark.sparkContext.range(1, 3000).repartition(1).map { uid => val features = Array.fill(16000)(scala.math.random) (uid, scala.math.random, java.util.UUID.randomUUID().toString, java.util.UUID.randomUUID().toString, features) }.toDF("uid", "random", "uuid1", "uuid2", "features").cache() val size = df.count() // Write into ramdisk to rule out the disk IO impact val tempSaveDir = s"/Volumes/ramdisk/${java.util.UUID.randomUUID()}/" val n = 150 val writeTimes = new Array[Double](n) var i = 0 while (i < n) { val t1 = System.currentTimeMillis() df.write .format("com.databricks.spark.avro") .mode("overwrite") .save(tempSaveDir) val t2 = System.currentTimeMillis() writeTimes(i) = (t2 - t1) / 1000.0 i += 1 } df.unpersist() // The first 50 runs are for warm-up val readTimes = new Array[Double](n) i = 0 while (i < n) { val t1 = System.currentTimeMillis() val readDF = spark.read.format("com.databricks.spark.avro").load(tempSaveDir) assert(readDF.count() == size) val t2 = System.currentTimeMillis() readTimes(i) = (t2 - t1) / 1000.0 i += 1 } spark.sparkContext.parallelize(writeTimes.slice(50, 150)).toDF("writeTimes").describe("writeTimes").show() spark.sparkContext.parallelize(readTimes.slice(50, 150)).toDF("readTimes").describe("readTimes").show() ``` ## How was this patch tested? Existing tests. Author: DB Tsai <d_tsai@apple.com> Author: Brian Lindblom <blindblom@apple.com> Closes #21952 from dbtsai/avro-performance-fix.
|
do we have the same regression for parquet? wondering if the regression comes from the |
When lindblombr at apple developed [SPARK-24855](apache#21847) to support specified schema on write, we found a performance regression in Avro writer for our dataset. With this PR, the performance is improved, but not as good as Spark 2.3 + the old avro writer. There must be something we miss which we need to investigate further. Spark 2.4 ``` spark git:(master) ./build/mvn -DskipTests clean package spark git:(master) bin/spark-shell --jars external/avro/target/spark-avro_2.11-2.4.0-SNAPSHOT.jar ``` Spark 2.3 + databricks avro ``` spark git:(branch-2.3) ./build/mvn -DskipTests clean package spark git:(branch-2.3) bin/spark-shell --packages com.databricks:spark-avro_2.11:4.0.0 ``` Current master: ``` +-------+--------------------+ |summary| writeTimes| +-------+--------------------+ | count| 100| | mean| 2.95621| | stddev|0.030895815479469294| | min| 2.915| | max| 3.049| +-------+--------------------+ +-------+--------------------+ |summary| readTimes| +-------+--------------------+ | count| 100| | mean| 0.31072999999999995| | stddev|0.054139709842390006| | min| 0.259| | max| 0.692| +-------+--------------------+ ``` Current master with this PR: ``` +-------+--------------------+ |summary| writeTimes| +-------+--------------------+ | count| 100| | mean| 2.5804300000000002| | stddev|0.011175600225672079| | min| 2.558| | max| 2.62| +-------+--------------------+ +-------+--------------------+ |summary| readTimes| +-------+--------------------+ | count| 100| | mean| 0.29922000000000004| | stddev|0.058261961532514166| | min| 0.251| | max| 0.732| +-------+--------------------+ ``` Spark 2.3 + databricks avro: ``` +-------+--------------------+ |summary| writeTimes| +-------+--------------------+ | count| 100| | mean| 1.7730500000000005| | stddev|0.025199156230863575| | min| 1.729| | max| 1.833| +-------+--------------------+ +-------+-------------------+ |summary| readTimes| +-------+-------------------+ | count| 100| | mean| 0.29715| | stddev|0.05685643358850465| | min| 0.258| | max| 0.718| +-------+-------------------+ ``` The following is the test code to reproduce the result. ```scala spark.sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed") val sparkSession = spark import sparkSession.implicits._ val df = spark.sparkContext.range(1, 3000).repartition(1).map { uid => val features = Array.fill(16000)(scala.math.random) (uid, scala.math.random, java.util.UUID.randomUUID().toString, java.util.UUID.randomUUID().toString, features) }.toDF("uid", "random", "uuid1", "uuid2", "features").cache() val size = df.count() // Write into ramdisk to rule out the disk IO impact val tempSaveDir = s"/Volumes/ramdisk/${java.util.UUID.randomUUID()}/" val n = 150 val writeTimes = new Array[Double](n) var i = 0 while (i < n) { val t1 = System.currentTimeMillis() df.write .format("com.databricks.spark.avro") .mode("overwrite") .save(tempSaveDir) val t2 = System.currentTimeMillis() writeTimes(i) = (t2 - t1) / 1000.0 i += 1 } df.unpersist() // The first 50 runs are for warm-up val readTimes = new Array[Double](n) i = 0 while (i < n) { val t1 = System.currentTimeMillis() val readDF = spark.read.format("com.databricks.spark.avro").load(tempSaveDir) assert(readDF.count() == size) val t2 = System.currentTimeMillis() readTimes(i) = (t2 - t1) / 1000.0 i += 1 } spark.sparkContext.parallelize(writeTimes.slice(50, 150)).toDF("writeTimes").describe("writeTimes").show() spark.sparkContext.parallelize(readTimes.slice(50, 150)).toDF("readTimes").describe("readTimes").show() ``` Existing tests. Author: DB Tsai <d_tsai@apple.com> Author: Brian Lindblom <blindblom@apple.com> Closes apache#21952 from dbtsai/avro-performance-fix. (cherry picked from commit 273b284) RB=1516361 R=fli,mshen,yezhou,edlu A=fli
What changes were proposed in this pull request?
When @lindblombr at @apple developed SPARK-24855 to support specified schema on write, we found a performance regression in Avro writer for our dataset.
With this PR, the performance is improved, but not as good as Spark 2.3 + the old avro writer. There must be something we miss which we need to investigate further.
Spark 2.4
Spark 2.3 + databricks avro
Current master:
Current master with this PR:
Spark 2.3 + databricks avro:
The following is the test code to reproduce the result.
How was this patch tested?
Existing tests.