New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28698][SQL] Support user-specified output schema in to_avro
#25419
Conversation
Test build #108970 has finished for PR 25419 at commit
|
jsonFormatSchema, | ||
options = Map.empty), | ||
data.eval()) | ||
intercept[SparkException] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the error message?
makes sense to me, let's fix the build first. |
Test build #108974 has finished for PR 25419 at commit
|
Test build #108977 has finished for PR 25419 at commit
|
cc @dbtsai |
to_avro
to_avro
case class CatalystDataToAvro(child: Expression) extends UnaryExpression { | ||
case class CatalystDataToAvro( | ||
child: Expression, | ||
jsonFormatSchema: Option[String]) extends UnaryExpression { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a default value None
?
- jsonFormatSchema: Option[String]) extends UnaryExpression {
+ jsonFormatSchema: Option[String] = None) extends UnaryExpression {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I am trying to avoid parameter with a default value. The result is quite different with/without a specified schema.
Also, this is consistent with CatalystDataToAvro
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unless the default value is used a lot in tests, I don't think we should add default value in internal classes. We should force the caller side to specify the parameter when they instantiate the internal class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, @gengliangwang and @cloud-fan .
@@ -72,6 +72,19 @@ object functions { | |||
*/ | |||
@Experimental | |||
def to_avro(data: Column): Column = { | |||
new Column(CatalystDataToAvro(data.expr)) | |||
new Column(CatalystDataToAvro(data.expr, None)) | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have the default value None
, we don't need to touch this function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment in #25419 (comment)
prepareExpectedResult(expected)) | ||
} | ||
|
||
protected def checkUnsupportedRead(data: Literal, schema: String): Unit = { | ||
val binary = CatalystDataToAvro(data) | ||
val binary = CatalystDataToAvro(data, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, if we have the default value, we don't need to change line 41 and 46.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment in #25419 (comment)
jsonFormatSchema, | ||
options = Map.empty).eval() | ||
}.getMessage | ||
assert(message.contains("Malformed records are detected in record parsing.")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this PR, CatalystDataToAvro
ignores the given scheme in case of None
, doesn't it? For me, this error seems to come from AvroDataToCatalyst
instead of CatalystDataToAvro
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this error comes from AvroDataToCatalyst
, this test coverage is misleading. For example, we had better have a test coverage for
- a test whether
CatalystDataToAvro(data, None)
successfully ignoresNone
without any exception. - a test whether
CatalystDataToAvro(data, "")
fails with that error message (?)
How do you think about that, @gengliangwang ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here AvroDataToCatalyst
is just to check the Avro schema of CatalystDataToAvro
.
- When
jsonFormatSchema
is provided inCatalystDataToAvro
, the output Avro schema isenum
type, and we validate it withAvroDataToCatalyst
. This proves that the provided schema works. - When the
jsonFormatSchema
is None, the output Avro schema isstring
type, and it can't be parsed asenum
type.
I will change the order of the two checks in the case and add a new test case for invalid user-specified schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Thanks, @gengliangwang .
Test build #109016 has finished for PR 25419 at commit
|
thanks, merging to master! |
is it possible to get this enhancement back-ported to 2.4.4? |
@dirrao in general only bug fixes can go to earlier branches. |
The mapping of Spark schema to Avro schema is many-to-many. (See https://spark.apache.org/docs/latest/sql-data-sources-avro.html#supported-types-for-spark-sql---avro-conversion) The default schema mapping might not be exactly what users want. For example, by default, a "string" column is always written as "string" Avro type, but users might want to output the column as "enum" Avro type. With PR apache#21847, Spark supports user-specified schema in the batch writer. For the function `to_avro`, we should support user-specified output schema as well. Unit test. Closes apache#25419 from gengliangwang/to_avro. Authored-by: Gengliang Wang <gengliang.wang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 48adc91)
The mapping of Spark schema to Avro schema is many-to-many. (See https://spark.apache.org/docs/latest/sql-data-sources-avro.html#supported-types-for-spark-sql---avro-conversion) The default schema mapping might not be exactly what users want. For example, by default, a "string" column is always written as "string" Avro type, but users might want to output the column as "enum" Avro type. With PR apache#21847, Spark supports user-specified schema in the batch writer. For the function `to_avro`, we should support user-specified output schema as well. Unit test. Closes apache#25419 from gengliangwang/to_avro. Authored-by: Gengliang Wang <gengliang.wang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 48adc91)
What changes were proposed in this pull request?
The mapping of Spark schema to Avro schema is many-to-many. (See https://spark.apache.org/docs/latest/sql-data-sources-avro.html#supported-types-for-spark-sql---avro-conversion)
The default schema mapping might not be exactly what users want. For example, by default, a "string" column is always written as "string" Avro type, but users might want to output the column as "enum" Avro type.
With PR #21847, Spark supports user-specified schema in the batch writer.
For the function
to_avro
, we should support user-specified output schema as well.How was this patch tested?
Unit test.