From 6cbe082cd3f41b479aa803a8d5410c91ed383496 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 16 Aug 2018 22:18:22 +0800 Subject: [PATCH 01/12] add avro-data-source-guide.md --- docs/avro-data-source-guide.md | 260 +++++++++++++++++++++++++++++++++ docs/sql-programming-guide.md | 3 + 2 files changed, 263 insertions(+) create mode 100644 docs/avro-data-source-guide.md diff --git a/docs/avro-data-source-guide.md b/docs/avro-data-source-guide.md new file mode 100644 index 0000000000000..42ba74be0a9f9 --- /dev/null +++ b/docs/avro-data-source-guide.md @@ -0,0 +1,260 @@ +--- +layout: global +title: AVRO Data Source Guide +--- + +Since Spark 2.4 release, [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides support for reading and writing AVRO data. + +## Deploying +The spark-avro module is external and not included in `spark-submit` or `spark-shell` by default. + +As with any Spark applications, `spark-submit` is used to launch your application. `spark-avro_{{site.SCALA_BINARY_VERSION}}` +and its dependencies can be directly added to `spark-submit` using `--packages`, such as, + + ./bin/spark-submit --packages org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + +For experimenting on `spark-shell`, you can also use `--packages` to add `org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}` and its dependencies directly, + + ./bin/spark-shell --packages org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + +See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Examples + +As `spark-avro` module is external, there is not such API as .avro in DataFrameReader or DataFrameWriter. +To load/save data in AVRO format, you need to specify the data source option format as short name avro or full name org.apache.spark.sql.avro. +
+
+{% highlight scala %} + +val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro") +usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro") + +{% endhighlight %} +
+
+{% highlight java %} + +Dataset usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro"); +usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro"); + +{% endhighlight %} +
+
+{% highlight python %} + +df = spark.read.format("avro").load("examples/src/main/resources/users.avro") +df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro") + +{% endhighlight %} +
+
+{% highlight r %} + +df <- read.df("examples/src/main/resources/users.avro", "avro") +write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro") + +{% endhighlight %} +
+
+ +## Configuration + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Property NameDefaultMeaningScope
avroSchemaNoneOptional AVRO schema provided by an user in JSON format.read and write
recordNametopLevelRecordTop level record name in write result, which is required in AVRO spec.write
recordNamespace""Record namespace in write result.write
ignoreExtensionfalseThe option controls ignoring of files without .avro extensions in read. If the option is enabled, all files (with and without .avro extension) are loaded.read
compressionsnappyThe compression option allows to specify a compression codec used in write. Currently supported codecs are uncompressed, snappy, deflate, bzip2 and xz. If the option is not set, the configuration spark.sql.avro.compression.codec config is taken into account.write
spark.sql.avro.outputTimestampTypeTIMESTAMP_MICROSSets which Avro timestamp type to use when Spark writes data to Avro files. Currently supported types are TIMESTAMP_MICROS and TIMESTAMP_MILLIS.write
+ +## Supported types for Avro -> Spark SQL conversion +Currently Spark supports reading all [primitive types](https://avro.apache.org/docs/1.8.2/spec.html#schema_primitive) and [complex types](https://avro.apache.org/docs/1.8.2/spec.html#schema_complex) of AVRO. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
AVRO typeSpark SQL type
booleanBooleanType
intIntegerType
longLongType
floatFloatType
doubleDoubleType
stringStringType
enumStringType
fixedBinaryType
bytesBinaryType
recordStructType
arrayArrayType
mapMapType
unionSee below
+ +In addition to the types listed above, it supports reading union types. The following three types are considered basic union types: + +1. union(int, long) will be mapped to LongType. +2. union(float, double) will be mapped to DoubleType. +3. union(something, null), where something is any supported Avro type. This will be mapped to the same Spark SQL type as that of something, with nullable set to true. +All other union types are considered complex. They will be mapped to StructType where field names are member0, member1, etc., in accordance with members of the union. This is consistent with the behavior when converting between Avro and Parquet. + +It also supports reading following AVRO [logical types](https://avro.apache.org/docs/1.8.2/spec.html#Logical+Types): + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
AVRO logical typeAvro typeSpark SQL type
dateintDateType
timestamp-millislongTimestampType
timestamp-microslongTimestampType
decimalbytesDecimalType
decimalbytesDecimalType
+At the moment, it ignores docs, aliases and other properties present in the Avro file. + +## Supported types for Spark SQL -> Avro conversion +Spark supports writing of all Spark SQL types into Avro. For most types, the mapping from Spark types to Avro types is straightforward (e.g. IntegerType gets converted to int); however, there are a few special cases which are listed below: + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Spark SQL typeAvro typeAvro logical type
ByteTypeint
ShortTypeint
BinaryTypebytes
Dateintdate
TimestampTypelongtimestamp-micros
DecimalTypefixeddecimal
+ +You can also specify the whole output AVRO schema with the option avroSchema, so that Spark SQL types can be converted into other Avro types. The following conversions is not by default and require user provided AVRO schema: + + + + + + + + + + + + + + + + + + + + + + + +
Spark SQL typeAvro typeAvro logical type
BinaryTypefixed
StringTypeenum
TimestampTypelongtimestamp-millis
DecimalTypebytesdecimal
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index d9ebc3cfe4674..e41446b9b1824 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1482,6 +1482,9 @@ SELECT * FROM resultTable +## AVRO Files +See the [AVRO data source guide](avro-data-source-guide.html). + ## Troubleshooting * The JDBC driver class must be visible to the primordial class loader on the client session and on all executors. This is because Java's DriverManager class does a security check that results in it ignoring all drivers not visible to the primordial class loader when one goes to open a connection. One convenient way to do this is to modify compute_classpath.sh on all worker nodes to include your driver JARs. From bb842b91fbc2fef7c1aed55341bd5e449dadf919 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 17 Aug 2018 02:06:35 +0800 Subject: [PATCH 02/12] revise doc --- docs/avro-data-source-guide.md | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/docs/avro-data-source-guide.md b/docs/avro-data-source-guide.md index 42ba74be0a9f9..ec362a1fd2775 100644 --- a/docs/avro-data-source-guide.md +++ b/docs/avro-data-source-guide.md @@ -1,9 +1,9 @@ --- layout: global -title: AVRO Data Source Guide +title: Avro Data Source Guide --- -Since Spark 2.4 release, [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides support for reading and writing AVRO data. +Since Spark 2.4 release, [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides support for reading and writing Avro data. ## Deploying The spark-avro module is external and not included in `spark-submit` or `spark-shell` by default. @@ -21,8 +21,9 @@ See [Application Submission Guide](submitting-applications.html) for more detail ## Examples -As `spark-avro` module is external, there is not such API as .avro in DataFrameReader or DataFrameWriter. -To load/save data in AVRO format, you need to specify the data source option format as short name avro or full name org.apache.spark.sql.avro. +Since `spark-avro` module is external, there is not such API as .avro in +DataFrameReader or DataFrameWriter. +To load/save data in Avro format, you need to specify the data source option format as short name avro or full name org.apache.spark.sql.avro.
{% highlight scala %} @@ -64,13 +65,13 @@ write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro") avroSchema None - Optional AVRO schema provided by an user in JSON format. + Optional Avro schema provided by an user in JSON format. read and write recordName topLevelRecord - Top level record name in write result, which is required in AVRO spec. + Top level record name in write result, which is required in Avro spec. write @@ -81,7 +82,7 @@ write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro") ignoreExtension - false + true The option controls ignoring of files without .avro extensions in read. If the option is enabled, all files (with and without .avro extension) are loaded. read @@ -100,9 +101,9 @@ write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro") ## Supported types for Avro -> Spark SQL conversion -Currently Spark supports reading all [primitive types](https://avro.apache.org/docs/1.8.2/spec.html#schema_primitive) and [complex types](https://avro.apache.org/docs/1.8.2/spec.html#schema_complex) of AVRO. +Currently Spark supports reading all [primitive types](https://avro.apache.org/docs/1.8.2/spec.html#schema_primitive) and [complex types](https://avro.apache.org/docs/1.8.2/spec.html#schema_complex) of Avro. - + @@ -164,10 +165,10 @@ In addition to the types listed above, it supports reading union ty 3. union(something, null), where something is any supported Avro type. This will be mapped to the same Spark SQL type as that of something, with nullable set to true. All other union types are considered complex. They will be mapped to StructType where field names are member0, member1, etc., in accordance with members of the union. This is consistent with the behavior when converting between Avro and Parquet. -It also supports reading following AVRO [logical types](https://avro.apache.org/docs/1.8.2/spec.html#Logical+Types): +It also supports reading the following Avro [logical types](https://avro.apache.org/docs/1.8.2/spec.html#Logical+Types):
AVRO typeSpark SQL type
Avro typeSpark SQL type
boolean BooleanType
- + @@ -233,7 +234,7 @@ Spark supports writing of all Spark SQL types into Avro. For most types, the map
AVRO logical typeAvro typeSpark SQL type
Avro logical typeAvro typeSpark SQL type
date int
-You can also specify the whole output AVRO schema with the option avroSchema, so that Spark SQL types can be converted into other Avro types. The following conversions is not by default and require user provided AVRO schema: +You can also specify the whole output Avro schema with the option avroSchema, so that Spark SQL types can be converted into other Avro types. The following conversions is not by default and require user specified Avro schema: From 9025940071ec7438b848c3b2b0ec9a7ea3cde6d0 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 17 Aug 2018 19:02:00 +0800 Subject: [PATCH 03/12] add conf spark.sql.avro.backwardCompatibility --- docs/avro-data-source-guide.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/avro-data-source-guide.md b/docs/avro-data-source-guide.md index ec362a1fd2775..c8da86680cde0 100644 --- a/docs/avro-data-source-guide.md +++ b/docs/avro-data-source-guide.md @@ -92,6 +92,12 @@ write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro") + + + + + + From 7f3729363957bb2347cef0bf6630b709f3003795 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Sat, 18 Aug 2018 03:15:54 +0800 Subject: [PATCH 04/12] address comments --- docs/avro-data-source-guide.md | 43 ++++++++++++++-------------------- docs/sql-programming-guide.md | 4 ++-- 2 files changed, 20 insertions(+), 27 deletions(-) diff --git a/docs/avro-data-source-guide.md b/docs/avro-data-source-guide.md index c8da86680cde0..e8461cbe3adde 100644 --- a/docs/avro-data-source-guide.md +++ b/docs/avro-data-source-guide.md @@ -1,12 +1,15 @@ --- layout: global -title: Avro Data Source Guide +title: Apache Avro Data Source Guide --- -Since Spark 2.4 release, [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides support for reading and writing Avro data. +* This will become a table of contents (this text will be scraped). +{:toc} + +Since Spark 2.4 release, [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides built-in support for reading and writing Apache Avro data. ## Deploying -The spark-avro module is external and not included in `spark-submit` or `spark-shell` by default. +The `spark-avro` module is external and not included in `spark-submit` or `spark-shell` by default. As with any Spark applications, `spark-submit` is used to launch your application. `spark-avro_{{site.SCALA_BINARY_VERSION}}` and its dependencies can be directly added to `spark-submit` using `--packages`, such as, @@ -19,11 +22,11 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `org.ap See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. -## Examples +## Load/Save Functions -Since `spark-avro` module is external, there is not such API as .avro in -DataFrameReader or DataFrameWriter. -To load/save data in Avro format, you need to specify the data source option format as short name avro or full name org.apache.spark.sql.avro. +Since `spark-avro` module is external, there is not such API as `.avro` in +`DataFrameReader` or `DataFrameWriter`. +To load/save data in Avro format, you need to specify the data source option `format` as short name `avro` or full name `org.apache.spark.sql.avro`.
{% highlight scala %} @@ -59,7 +62,9 @@ write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro")
-## Configuration +## Data Source Options + +Data source options of Avro can be set using the `.option` method on `DataFrameReader` or `DataFrameWriter`.
Spark SQL typeAvro typeAvro logical type
The compression option allows to specify a compression codec used in write. Currently supported codecs are uncompressed, snappy, deflate, bzip2 and xz. If the option is not set, the configuration spark.sql.avro.compression.codec config is taken into account. write
spark.sql.avro.backwardCompatibilitytrueIf it is set to true, the data source provider com.databricks.spark.avro is mapped to the built-in module org.apache.spark.sql.avro for backward compatibility.read and write
spark.sql.avro.outputTimestampType TIMESTAMP_MICROS
@@ -92,18 +97,6 @@ write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro") - - - - - - - - - - - -
Property NameDefaultMeaningScope
The compression option allows to specify a compression codec used in write. Currently supported codecs are uncompressed, snappy, deflate, bzip2 and xz. If the option is not set, the configuration spark.sql.avro.compression.codec config is taken into account. write
spark.sql.avro.backwardCompatibilitytrueIf it is set to true, the data source provider com.databricks.spark.avro is mapped to the built-in module org.apache.spark.sql.avro for backward compatibility.read and write
spark.sql.avro.outputTimestampTypeTIMESTAMP_MICROSSets which Avro timestamp type to use when Spark writes data to Avro files. Currently supported types are TIMESTAMP_MICROS and TIMESTAMP_MILLIS.write
## Supported types for Avro -> Spark SQL conversion @@ -164,11 +157,11 @@ Currently Spark supports reading all [primitive types](https://avro.apache.org/d -In addition to the types listed above, it supports reading union types. The following three types are considered basic union types: +In addition to the types listed above, it supports reading `union` types. The following three types are considered basic `union` types: -1. union(int, long) will be mapped to LongType. -2. union(float, double) will be mapped to DoubleType. -3. union(something, null), where something is any supported Avro type. This will be mapped to the same Spark SQL type as that of something, with nullable set to true. +1. `union(int, long)` will be mapped to LongType. +2. `union(float, double)` will be mapped to DoubleType. +3. `union(something, null)`, where something is any supported Avro type. This will be mapped to the same Spark SQL type as that of something, with nullable set to true. All other union types are considered complex. They will be mapped to StructType where field names are member0, member1, etc., in accordance with members of the union. This is consistent with the behavior when converting between Avro and Parquet. It also supports reading the following Avro [logical types](https://avro.apache.org/docs/1.8.2/spec.html#Logical+Types): @@ -240,7 +233,7 @@ Spark supports writing of all Spark SQL types into Avro. For most types, the map -You can also specify the whole output Avro schema with the option avroSchema, so that Spark SQL types can be converted into other Avro types. The following conversions is not by default and require user specified Avro schema: +You can also specify the whole output Avro schema with the option `avroSchema`, so that Spark SQL types can be converted into other Avro types. The following conversions is not by default and require user specified Avro schema: diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index e41446b9b1824..d6772a8e92389 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1482,8 +1482,8 @@ SELECT * FROM resultTable -## AVRO Files -See the [AVRO data source guide](avro-data-source-guide.html). +## Avro Files +See the [Apache Avro Data Source Guide](avro-data-source-guide.html). ## Troubleshooting From 515aa0a44f91e01aea590cd3d5636b674ddc0a9b Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Sun, 19 Aug 2018 01:35:18 +0800 Subject: [PATCH 05/12] address comment --- docs/avro-data-source-guide.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/avro-data-source-guide.md b/docs/avro-data-source-guide.md index e8461cbe3adde..d9e661e45dbc0 100644 --- a/docs/avro-data-source-guide.md +++ b/docs/avro-data-source-guide.md @@ -185,7 +185,7 @@ It also supports reading the following Avro [logical types](https://avro.apache. - + @@ -233,7 +233,7 @@ Spark supports writing of all Spark SQL types into Avro. For most types, the map
Spark SQL typeAvro typeAvro logical type
decimalbytesfixed DecimalType
-You can also specify the whole output Avro schema with the option `avroSchema`, so that Spark SQL types can be converted into other Avro types. The following conversions is not by default and require user specified Avro schema: +You can also specify the whole output Avro schema with the option `avroSchema`, so that Spark SQL types can be converted into other Avro types. The following conversions are not applied by default and require user specified Avro schema: From d2681ec51a7dbc0296800cdbedb3d46827bf2b6f Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 22 Aug 2018 16:45:47 +0800 Subject: [PATCH 06/12] add more sections --- docs/avro-data-source-guide.md | 124 +++++++++++++++++++++++++++++++-- 1 file changed, 120 insertions(+), 4 deletions(-) diff --git a/docs/avro-data-source-guide.md b/docs/avro-data-source-guide.md index d9e661e45dbc0..868f20b3b0b75 100644 --- a/docs/avro-data-source-guide.md +++ b/docs/avro-data-source-guide.md @@ -22,7 +22,7 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `org.ap See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. -## Load/Save Functions +## Load and Save Functions Since `spark-avro` module is external, there is not such API as `.avro` in `DataFrameReader` or `DataFrameWriter`. @@ -62,7 +62,82 @@ write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro") -## Data Source Options +## to_avro() and from_avro() +Spark SQL provides function `to_avro` to encode a struct as a string and `from_avro()` to retrieve the struct as a complex type. + +Using Avro record as columns are useful when reading from or writing to a streaming source like Kafka. Each +Kafka key-value record will be augmented with some metadata, such as the ingestion timestamp into Kafka, the offset in Kafka, etc. +* If the "value" field that contains your data is in Avro, you could use `from_avro()` to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a file. +* `to_avro()` can be used to turn structs into Avro records. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka. + +Both methods are presently only available in Scala and Java. + +
+
+{% highlight scala %} +import org.apache.spark.sql.avro._ + +// `from_avro` requires Avro schema in JSON string format. +val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc"))) + +val df = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("subscribe", "topic1") + .load() + +// 1. Decode the Avro data into a struct; +// 2. Filter by column `favorite_color`; +// 3. Encode the column `name` in Avro format. +val output = df + .select(from_avro('value, jsonFormatSchema) as 'user) + .where("user.favorite_color == \"red\"") + .select(to_avro($"user.name") as 'value) + +val ds = output + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic2") + .start() + +{% endhighlight %} +
+
+{% highlight java %} +import org.apache.spark.sql.avro.* + +// `from_avro` requires Avro schema in JSON string format. +String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc"))) + +Dataset df = spark + .readStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("subscribe", "topic1") + .load() + +// 1. Decode the Avro data into a struct; +// 2. Filter by column `favorite_color`; +// 3. Encode the column `name` in Avro format. +DataFrame output = df + .select(from_avro(col("value"), jsonFormatSchema).as("user")) + .where("user.favorite_color == \"red\"") + .select(to_avro(col("user.name")).as("value")) + +StreamingQuery ds = output + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("topic", "topic2") + .start() + +{% endhighlight %} +
+
+ +## Data Source Option Data source options of Avro can be set using the `.option` method on `DataFrameReader` or `DataFrameWriter`.
Spark SQL typeAvro typeAvro logical type
@@ -88,17 +163,58 @@ Data source options of Avro can be set using the `.option` method on `DataFrameR - + - +
ignoreExtension trueThe option controls ignoring of files without .avro extensions in read. If the option is enabled, all files (with and without .avro extension) are loaded.The option controls ignoring of files without .avro extensions in read.
If the option is enabled, all files (with and without .avro extension) are loaded.
read
compression snappyThe compression option allows to specify a compression codec used in write. Currently supported codecs are uncompressed, snappy, deflate, bzip2 and xz. If the option is not set, the configuration spark.sql.avro.compression.codec config is taken into account.The compression option allows to specify a compression codec used in write.
+ Currently supported codecs are uncompressed, snappy, deflate, bzip2 and xz.
If the option is not set, the configuration spark.sql.avro.compression.codec config is taken into account.
write
+## Configuration +Configuration of Parquet can be done using the `setConf` method on SparkSession or by running `SET key=value` commands using SQL. + + + + + + + + + + + + + + + + + +
Property NameDefaultMeaning
spark.sql.legacy.replaceDatabricksSparkAvro.enabledtrueIf it is set to true, the data source provider com.databricks.spark.avro is mapped to the built-in but external Avro data source module for backward compatibility.
spark.sql.avro.compression.codecsnappyCompression codec used in writing of AVRO files. Supported codecs: uncompressed, deflate, snappy, bzip2 and xz. Default codec is snappy.
spark.sql.avro.deflate.level-1Compression level for the deflate codec used in writing of AVRO files. Valid value must be in the range of from 1 to 9 inclusive or -1. The default value is -1 which corresponds to 6 level in the current implementation.
+ +## Compatibility with Databricks spark-avro +This Avro data source module is originally from and compatible with Databricks's open source repository +[spark-avro](https://github.com/databricks/spark-avro). + +By default with the SQL configuration `spark.sql.legacy.replaceDatabricksSparkAvro.enabled` enabled, the data source provider `com.databricks.spark.avro` is +mapped to this built-in Avro module. For the Spark tables created with `Provider` property as `com.databricks.spark.avro` in +catalog meta store, the mapping is essential to load these tables if you are using this built-in Avro module. + +Note in Databricks's [spark-avro](https://github.com/databricks/spark-avro), implicit classes +`AvroDataFrameWriter` and `AvroDataFrameReader` were created for shortcut function `.avro()`. In this +built-in but external module, both implicit classes are removed. Please use `.format("avro")` in +`DataFrameWriter` or `DataFrameReader` instead, which should be clean and good enough. + +If you prefer using your own build of `spark-avro` jar file, you can simply disable the configuration +`spark.sql.legacy.replaceDatabricksSparkAvro.enabled`, and use the option `--jars` on deploying your +applications. Read the [Advanced Dependency Management](https://spark.apache +.org/docs/latest/submitting-applications.html#advanced-dependency-management) section in Application +Submission Guide for more details. + ## Supported types for Avro -> Spark SQL conversion Currently Spark supports reading all [primitive types](https://avro.apache.org/docs/1.8.2/spec.html#schema_primitive) and [complex types](https://avro.apache.org/docs/1.8.2/spec.html#schema_complex) of Avro. From d9c5352c8ffc70d271a8aa68c3ffec41b4158ece Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 22 Aug 2018 17:24:48 +0800 Subject: [PATCH 07/12] revise --- docs/avro-data-source-guide.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/avro-data-source-guide.md b/docs/avro-data-source-guide.md index 868f20b3b0b75..cd7041c7e2a55 100644 --- a/docs/avro-data-source-guide.md +++ b/docs/avro-data-source-guide.md @@ -26,7 +26,8 @@ See [Application Submission Guide](submitting-applications.html) for more detail Since `spark-avro` module is external, there is not such API as `.avro` in `DataFrameReader` or `DataFrameWriter`. -To load/save data in Avro format, you need to specify the data source option `format` as short name `avro` or full name `org.apache.spark.sql.avro`. + +To load/save data in Avro format, you need to specify the data source option `format` as `avro`(or `org.apache.spark.sql.avro`).
{% highlight scala %} @@ -176,7 +177,7 @@ Data source options of Avro can be set using the `.option` method on `DataFrameR
## Configuration -Configuration of Parquet can be done using the `setConf` method on SparkSession or by running `SET key=value` commands using SQL. +Configuration of Avro can be done using the `setConf` method on SparkSession or by running `SET key=value` commands using SQL. From 8da82506e06e36d63bf91fdda194a866f2d977ea Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 22 Aug 2018 22:29:12 +0800 Subject: [PATCH 08/12] address comment --- docs/avro-data-source-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/avro-data-source-guide.md b/docs/avro-data-source-guide.md index cd7041c7e2a55..74a3760d66011 100644 --- a/docs/avro-data-source-guide.md +++ b/docs/avro-data-source-guide.md @@ -24,7 +24,7 @@ See [Application Submission Guide](submitting-applications.html) for more detail ## Load and Save Functions -Since `spark-avro` module is external, there is not such API as `.avro` in +Since `spark-avro` module is external, there is no `.avro` API in `DataFrameReader` or `DataFrameWriter`. To load/save data in Avro format, you need to specify the data source option `format` as `avro`(or `org.apache.spark.sql.avro`). From 006ea40ce0d7a3939241c6e0126732e9cebb59ca Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 23 Aug 2018 01:11:06 +0800 Subject: [PATCH 09/12] address comments --- docs/avro-data-source-guide.md | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/docs/avro-data-source-guide.md b/docs/avro-data-source-guide.md index 74a3760d66011..4cf14f2b36c17 100644 --- a/docs/avro-data-source-guide.md +++ b/docs/avro-data-source-guide.md @@ -64,7 +64,8 @@ write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro") ## to_avro() and from_avro() -Spark SQL provides function `to_avro` to encode a struct as a string and `from_avro()` to retrieve the struct as a complex type. +The Avro package provides function `to_avro` to encode a struct as binary and `from_avro()` to retrieve the +struct as a complex type. Using Avro record as columns are useful when reading from or writing to a streaming source like Kafka. Each Kafka key-value record will be augmented with some metadata, such as the ingestion timestamp into Kafka, the offset in Kafka, etc. @@ -107,17 +108,17 @@ val ds = output
{% highlight java %} -import org.apache.spark.sql.avro.* +import org.apache.spark.sql.avro.*; // `from_avro` requires Avro schema in JSON string format. -String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc"))) +String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc"))); Dataset df = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") - .load() + .load(); // 1. Decode the Avro data into a struct; // 2. Filter by column `favorite_color`; @@ -125,14 +126,14 @@ Dataset df = spark DataFrame output = df .select(from_avro(col("value"), jsonFormatSchema).as("user")) .where("user.favorite_color == \"red\"") - .select(to_avro(col("user.name")).as("value")) + .select(to_avro(col("user.name")).as("value")); StreamingQuery ds = output .writeStream() .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "topic2") - .start() + .start(); {% endhighlight %}
@@ -217,7 +218,7 @@ applications. Read the [Advanced Dependency Management](https://spark.apache Submission Guide for more details. ## Supported types for Avro -> Spark SQL conversion -Currently Spark supports reading all [primitive types](https://avro.apache.org/docs/1.8.2/spec.html#schema_primitive) and [complex types](https://avro.apache.org/docs/1.8.2/spec.html#schema_complex) of Avro. +Currently Spark supports reading all [primitive types](https://avro.apache.org/docs/1.8.2/spec.html#schema_primitive) and [complex types](https://avro.apache.org/docs/1.8.2/spec.html#schema_complex) under records of Avro.
Property NameDefaultMeaning
@@ -334,7 +335,7 @@ Spark supports writing of all Spark SQL types into Avro. For most types, the map - + From 581b7e60e70deac79a15e0a903a78deb10d4f7ac Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 23 Aug 2018 01:44:24 +0800 Subject: [PATCH 10/12] revise option doc --- docs/avro-data-source-guide.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/avro-data-source-guide.md b/docs/avro-data-source-guide.md index 4cf14f2b36c17..5f8c675a8a11a 100644 --- a/docs/avro-data-source-guide.md +++ b/docs/avro-data-source-guide.md @@ -147,7 +147,8 @@ Data source options of Avro can be set using the `.option` method on `DataFrameR - + From 824580684c05c2a3c1654517b77864ca5d504ee0 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 23 Aug 2018 12:59:39 +0800 Subject: [PATCH 11/12] revise --- docs/avro-data-source-guide.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/docs/avro-data-source-guide.md b/docs/avro-data-source-guide.md index 5f8c675a8a11a..5555eddea527c 100644 --- a/docs/avro-data-source-guide.md +++ b/docs/avro-data-source-guide.md @@ -64,8 +64,9 @@ write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro") ## to_avro() and from_avro() -The Avro package provides function `to_avro` to encode a struct as binary and `from_avro()` to retrieve the -struct as a complex type. +The Avro package provides function `to_avro` to encode a column as binary in Avro +format, and `from_avro()` to decode Avro binary data into a column. Both functions transform one column to +another column, and the input/output SQL data type can be complex type or primitive type. Using Avro record as columns are useful when reading from or writing to a streaming source like Kafka. Each Kafka key-value record will be augmented with some metadata, such as the ingestion timestamp into Kafka, the offset in Kafka, etc. @@ -97,7 +98,7 @@ val output = df .where("user.favorite_color == \"red\"") .select(to_avro($"user.name") as 'value) -val ds = output +val query = output .writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") @@ -123,12 +124,12 @@ Dataset df = spark // 1. Decode the Avro data into a struct; // 2. Filter by column `favorite_color`; // 3. Encode the column `name` in Avro format. -DataFrame output = df +Dataset output = df .select(from_avro(col("value"), jsonFormatSchema).as("user")) .where("user.favorite_color == \"red\"") .select(to_avro(col("user.name")).as("value")); -StreamingQuery ds = output +StreamingQuery query = output .writeStream() .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") From 1f253bf536c3a7bd1c07ba5ea5600f661c8e106e Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 23 Aug 2018 13:14:14 +0800 Subject: [PATCH 12/12] more revise --- docs/avro-data-source-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/avro-data-source-guide.md b/docs/avro-data-source-guide.md index 5555eddea527c..d3b81f029d377 100644 --- a/docs/avro-data-source-guide.md +++ b/docs/avro-data-source-guide.md @@ -73,7 +73,7 @@ Kafka key-value record will be augmented with some metadata, such as the ingesti * If the "value" field that contains your data is in Avro, you could use `from_avro()` to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a file. * `to_avro()` can be used to turn structs into Avro records. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka. -Both methods are presently only available in Scala and Java. +Both functions are currently only available in Scala and Java.
Avro typeSpark SQL type
DateDateType int date
avroSchema NoneOptional Avro schema provided by an user in JSON format.Optional Avro schema provided by an user in JSON format. The date type and naming of record fields + should match the input Avro data or Catalyst data, otherwise the read/write action will fail. read and write