-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24768][SQL] Have a built-in AVRO data source implementation #21742
Conversation
.avro(AvroFileGenerator.outputDir) | ||
.select("string") | ||
.count() | ||
val endTime = System.nanoTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you use the Benchmark API for the benchmarks:
private[spark] class Benchmark( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is for initial import. I have created sub task:
https://issues.apache.org/jira/browse/SPARK-24777
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, shall we add the whole benchmark separately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense to me.
val endTime = System.nanoTime | ||
val executionTime = TimeUnit.SECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) | ||
|
||
println(s"\n\n\nFinished benchmark test - result was $executionTime seconds\n\n\n") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you leave benchmark results here like in JsonBenchmarks for example:
spark/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala
Lines 76 to 79 in bd14da6
JSON schema inferring: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | |
-------------------------------------------------------------------------------------------- | |
No encoding 38902 / 39282 2.6 389.0 1.0X | |
UTF-8 is set 56959 / 57261 1.8 569.6 0.7X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is for initial import. I have created sub task:
https://issues.apache.org/jira/browse/SPARK-24777
"is empty. First you should generate some files to run a benchmark with (see README)") | ||
} | ||
|
||
val spark = SparkSession.builder().master("local[2]").appName("AvroReadBenchmark") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it make sense to read from 2 task in parallel. You just introduce unnecessary fluctuations in the results. I would do measurements in one task.
* Adds a method, `avro`, to DataFrameReader that allows you to read avro files using | ||
* the DataFileReade | ||
*/ | ||
implicit class AvroDataFrameReader(reader: DataFrameReader) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is better to extend DataFrameReader
by new method avro
as for other supported datasources.
Test build #92824 has finished for PR 21742 at commit
|
|
||
/** | ||
* Adds a method, `avro`, to DataFrameReader that allows you to read avro files using | ||
* the DataFileReade |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: DataFileReade
* the DataFileWriter | ||
*/ | ||
implicit class AvroDataFrameWriter[T](writer: DataFrameWriter[T]) { | ||
def avro: String => Unit = writer.format("avro").save |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean avro
must be at the end of call chain of DataFileWriter
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is the same as orc
, csv
, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it doesn't support Java and Python though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in most cases, users will directly use df.write.format("avro")
, which should be good enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, in that case, I think we wouldn't even need this in Scala side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about this. This exists in spark-avro for a long time, and it should be used by some users. I can't see any downside of keeping it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, can we move this into DataFrameReader and DataFrameWriter for Python and Java usages too?
I think this was just a workaround to resemble Spark 2.0.0's API shape. spark-avro as a thridparty would probably keep source and binary compatibility but here I think we don't keep them here although we will probably keep the behaviours. So, I think it's better to minimise the exposed APIs when we are in doubt.
To me, I can't see any particular advantage of keeping it on the other hand as implicit here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For instance, CSV didn't bring all other APIs at the initial implementation:
and its parser API https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/CsvParser.scala
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd remove the Scala API.
/** | ||
* This object runs a simple benchmark test on the avro files in benchmarkFilesDir. It measures | ||
* how long does it take to convert them into DataFrame and run count() method on them. See | ||
* README on how to invoke it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
|
||
/** | ||
* This object allows you to generate large avro files that can be used for speed benchmarking. | ||
* See README on how to use it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this README located in original spark-avro project? Shall we copy it here?
* Adds a method, `avro`, to DataFrameWriter that allows you to write avro files using | ||
* the DataFileWriter | ||
*/ | ||
implicit class AvroDataFrameWriter[T](writer: DataFrameWriter[T]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this was just ported as is from the avro. Shall we expose these into Spark's DataFrameReader and Writer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about this. If we put the package under /external
, I don't think we should expose it in DataFrameReader and Writer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could do that separately though like we did in CSV.
@@ -170,6 +170,16 @@ def __hash__(self): | |||
] | |||
) | |||
|
|||
avro = Module( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this need a separate module unlike other datasources?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is much cleaner, like what we did for kafka, which is also a built-in data source. Ideally, we should separate parquet, orc and other built-in data sources from sql module. We can do the refactoring in the future, if needed
dataFileWriter.append(avroRec) | ||
} | ||
|
||
dataFileWriter.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we put this in finally
.
write(internalRowConverter(row)) | ||
} | ||
|
||
// api in spark 2.0 - 2.1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be probably new to Spark 2.4.0. Do we need this?
@@ -0,0 +1,35 @@ | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems missed to delete it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
Test build #92848 has finished for PR 21742 at commit
|
Test build #92854 has finished for PR 21742 at commit
|
since we decided to follow sql-kafka here, I think we should not add |
|
||
}.getRecordWriter(context) | ||
|
||
def write(internalRow: InternalRow): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add overwrite
Test build #92937 has finished for PR 21742 at commit
|
LGTM Thanks! Merged to master. @gengliangwang Please submit the follow-up PRs to resolve the sub tasks and the good comment in the PR review. |
Apache Avro (https://avro.apache.org) is a popular data serialization format. It is widely used in the Spark and Hadoop ecosystem, especially for Kafka-based data pipelines. Using the external package https://github.com/databricks/spark-avro, Spark SQL can read and write the avro data. Making spark-Avro built-in can provide a better experience for first-time users of Spark SQL and structured streaming. We expect the built-in Avro data source can further improve the adoption of structured streaming. The proposal is to inline code from spark-avro package (https://github.com/databricks/spark-avro). The target release is Spark 2.4. [Built-in AVRO Data Source In Spark 2.4.pdf](https://github.com/apache/spark/files/2181511/Built-in.AVRO.Data.Source.In.Spark.2.4.pdf) Unit test Author: Gengliang Wang <gengliang.wang@databricks.com> Closes apache#21742 from gengliangwang/export_avro. (cherry picked from commit 395860a)
…roDataFrameReader As per Reynold's comment: apache#21742 (comment) It makes sense to remove the implicit class AvroDataFrameWriter/AvroDataFrameReader, since the Avro package is external module. Unit test Author: Gengliang Wang <gengliang.wang@databricks.com> Closes apache#21841 from gengliangwang/removeImplicit. (cherry picked from commit f59de52)
What changes were proposed in this pull request?
Apache Avro (https://avro.apache.org) is a popular data serialization format. It is widely used in the Spark and Hadoop ecosystem, especially for Kafka-based data pipelines. Using the external package https://github.com/databricks/spark-avro, Spark SQL can read and write the avro data. Making spark-Avro built-in can provide a better experience for first-time users of Spark SQL and structured streaming. We expect the built-in Avro data source can further improve the adoption of structured streaming.
The proposal is to inline code from spark-avro package (https://github.com/databricks/spark-avro). The target release is Spark 2.4.
Built-in AVRO Data Source In Spark 2.4.pdf
How was this patch tested?
Unit test