New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27506][SQL] Allow deserialization of Avro data using compatible schemas #24405
Conversation
@giamo, what use case does this PR target? Most of operations can be already done by Spark APIs. |
@HyukjinKwon Imagine that every hour we run a Spark job that loads some data containing Avro records from an external source into a Spark dataframe, uses the Without doing schema evolution, the job works fine. Now imagine that at some point the business requires we evolve the schema by adding a new field, so the data loaded in the next run of the Spark job now contains some events serialized with the old version of the schema and some others serialized with the new one. Our job now breaks because it tries to read the older data with the new schema and this is not currently supported by A workaround using the Spark API would require detecting the differences between the schema versions and patching the older records to make them have the same structure as newer records. This would be non-trivial to implement, less efficient and actually unnecessary since Avro already offers the functionality by passing both the reader and an optional writer schema to a GenericDatumReader (which is used by Spark underneath). This PR simply allows to do the same in the Spark interface. |
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
Outdated
Show resolved
Hide resolved
@HyukjinKwon @mgaido91 any thoughts on this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the use case makes sense, especially in streaming apps. WDYT @HyukjinKwon ?
external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
Outdated
Show resolved
Hide resolved
external/avro/src/test/java/org/apache/spark/sql/avro/JavaAvroFunctionsSuite.java
Outdated
Show resolved
Hide resolved
external/avro/src/main/scala/org/apache/spark/sql/avro/functions.scala
Outdated
Show resolved
Hide resolved
@mgaido91 @HyukjinKwon changes pushed |
@mgaido91 @HyukjinKwon do you think we can move this forward? |
I think this change makes sense, @HyukjinKwon WDYT? |
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
Outdated
Show resolved
Hide resolved
Overall LGTM. |
Thanks! I think I need an "ok to test" from a committer to start the Jenkins build, right? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, too apart from minor comments
external/avro/src/main/scala/org/apache/spark/sql/avro/functions.scala
Outdated
Show resolved
Hide resolved
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
Outdated
Show resolved
Hide resolved
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
Outdated
Show resolved
Hide resolved
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
Outdated
Show resolved
Hide resolved
ok to test |
retest this please |
Test build #108171 has finished for PR 24405 at commit
|
Test build #108183 has finished for PR 24405 at commit
|
external/avro/src/main/scala/org/apache/spark/sql/avro/functions.scala
Outdated
Show resolved
Hide resolved
external/avro/src/main/scala/org/apache/spark/sql/avro/functions.scala
Outdated
Show resolved
Hide resolved
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
Outdated
Show resolved
Hide resolved
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
Outdated
Show resolved
Hide resolved
@giamo, can you clarify the usecase with codes in PR description? We talk by codes. |
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
Show resolved
Hide resolved
cc @dbtsai @gengliangwang do we have this feature in the avro data source? e.g. can we read avro files with a different but compatible schema? |
@cloud-fan yes, that's what the GenericDatumReader constructor that I linked above allows you to do. There are plenty of examples online. |
I mean the Spark avro data source. |
+1 for having it in the options too, thanks @HyukjinKwon ! |
Test build #110593 has finished for PR 24405 at commit
|
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
Outdated
Show resolved
Hide resolved
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
Show resolved
Hide resolved
A bit confused by the example provided:
So them |
ok to test |
@giamo mind updating PR? Sorry for my late response. Looks like we're getting closer to merge. |
Test build #111933 has finished for PR 24405 at commit
|
The example is correct, |
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
Outdated
Show resolved
Hide resolved
Test build #112144 has finished for PR 24405 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, two small suggestions
<td>None</td> | ||
<td>Optional Avro schema (in JSON format) that was used to serialize the data. This should be set if the schema provided | ||
for deserialization is compatible with - but not the same as - the one used to originally convert the data to Avro. | ||
</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to link to the Confluent documentation? They have an excellent document on schema compatibility and evolution: https://docs.confluent.io/current/schema-registry/avro.html
@@ -153,4 +153,45 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession { | |||
assert(df.collect().map(_.get(0)) === Seq(Row("one"), Row("two"), Row("three"), Row("four"))) | |||
} | |||
} | |||
|
|||
test("SPARK-27506: roundtrip in to_avro and from_avro with different compatible schemas") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would also add a test with an incompatible schema, for example, changing a string
to an int
.
retest this please |
@@ -240,6 +240,14 @@ Data source options of Avro can be set via: | |||
</td> | |||
<td>function <code>from_avro</code></td> | |||
</tr> | |||
<tr> | |||
<td><code>writerSchema</code></td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about actualSchema
? I think it is more straightforward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would stick to writerSchema
, mostly because this is also the term used in Avro itself: https://avro.apache.org/docs/1.9.1/api/java/org/apache/avro/hadoop/io/AvroValueDeserializer.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I come up with this because in the implementation it also uses the name actual
: https://github.com/rdblue/avro-java/blob/master/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java#L67
But I am OK with writerSchema
as well since it is the name in the constructor method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, actually writerSchema
name is super confusing to me. Can we use actualSchema
? I would prefer this one more.
Test build #114811 has finished for PR 24405 at commit
|
@giamo, sorry it took long. very close to go. Can you address the comments? |
Can any of you take over this (by picking the commits here so that I make this guy a co-author) if @giamo is being inactive? There are only few rather minor comments left to address ... |
I'm happy to cherry-pick @giamo's work and fix the last few comments |
Please go ahead. Let me credit to both as co-authors. |
I've opened up a follow up under #26780 |
…e schemas Follow up of #24405 ### What changes were proposed in this pull request? The current implementation of _from_avro_ and _AvroDataToCatalyst_ doesn't allow doing schema evolution since it requires the deserialization of an Avro record with the exact same schema with which it was serialized. The proposed change is to add a new option `actualSchema` to allow passing the schema used to serialize the records. This allows using a different compatible schema for reading by passing both schemas to _GenericDatumReader_. If no writer's schema is provided, nothing changes from before. ### Why are the changes needed? Consider the following example. ``` // schema ID: 1 val schema1 = """ { "type": "record", "name": "MySchema", "fields": [ {"name": "col1", "type": "int"}, {"name": "col2", "type": "string"} ] } """ // schema ID: 2 val schema2 = """ { "type": "record", "name": "MySchema", "fields": [ {"name": "col1", "type": "int"}, {"name": "col2", "type": "string"}, {"name": "col3", "type": "string", "default": ""} ] } """ ``` The two schemas are compatible - i.e. you can use `schema2` to deserialize events serialized with `schema1`, in which case there will be the field `col3` with the default value. Now imagine that you have two dataframes (read from batch or streaming), one with Avro events from schema1 and the other with events from schema2. **We want to combine them into one dataframe** for storing or further processing. With the current `from_avro` function we can only decode each of them with the corresponding schema: ``` scalaval df1 = ... // Avro events created with schema1 df1: org.apache.spark.sql.DataFrame = [eventBytes: binary] scalaval decodedDf1 = df1.select(from_avro('eventBytes, schema1) as "decoded") decodedDf1: org.apache.spark.sql.DataFrame = [decoded: struct<col1: int, col2: string>] scalaval df2= ... // Avro events created with schema2 df2: org.apache.spark.sql.DataFrame = [eventBytes: binary] scalaval decodedDf2 = df2.select(from_avro('eventBytes, schema2) as "decoded") decodedDf2: org.apache.spark.sql.DataFrame = [decoded: struct<col1: int, col2: string, col3: string>] ``` but then `decodedDf1` and `decodedDf2` have different Spark schemas and we can't union them. Instead, with the proposed change we can decode `df1` in the following way: ``` scalaimport scala.collection.JavaConverters._ scalaval decodedDf1 = df1.select(from_avro(data = 'eventBytes, jsonFormatSchema = schema2, options = Map("actualSchema" -> schema1).asJava) as "decoded") decodedDf1: org.apache.spark.sql.DataFrame = [decoded: struct<col1: int, col2: string, col3: string>] ``` so that both dataframes have the same schemas and can be merged. ### Does this PR introduce any user-facing change? This PR allows users to pass a new configuration but it doesn't affect current code. ### How was this patch tested? A new unit test was added. Closes #26780 from Fokko/SPARK-27506. Lead-authored-by: Fokko Driesprong <fokko@apache.org> Co-authored-by: Gianluca Amori <gianluca.amori@gmail.com> Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
@gengliangwang can you close this one as well? |
What changes were proposed in this pull request?
The current implementation of from_avro and AvroDataToCatalyst doesn't allow doing schema evolution since it requires the deserialization of an Avro record with the exact same schema with which it was serialized.
The proposed change is to add a new option to allow passing the schema used to serialize the records. This allows using a different compatible schema for reading by passing both schemas to GenericDatumReader. If no writer's schema is provided, nothing changes from before.
Why are the changes needed?
Consider the following example.
The two schemas are compatible - i.e. you can use
schema2
to deserialize events serialized withschema1
, in which case there will be the fieldcol3
with the default value.Now imagine that you have two dataframes (read from batch or streaming), one with Avro events from schema1 and the other with events from schema2. We want to combine them into one dataframe for storing or further processing.
With the current
from_avro
function we can only decode each of them with the corresponding schema:but then
decodedDf1
anddecodedDf2
have different Spark schemas and we can't union them. Instead, with the proposed change we can decodedf1
in the following way:so that both dataframes have the same schemas and can be merged.
Does this PR introduce any user-facing change?
This PR allows users to pass a new configuration but it doesn't affect current code.
How was this patch tested?
A new unit test was added.