Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28698][SQL] Support user-specified output schema in to_avro #25419

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,19 +19,24 @@ package org.apache.spark.sql.avro

import java.io.ByteArrayOutputStream

import org.apache.avro.Schema
import org.apache.avro.generic.GenericDatumWriter
import org.apache.avro.io.{BinaryEncoder, EncoderFactory}

import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.types.{BinaryType, DataType}

case class CatalystDataToAvro(child: Expression) extends UnaryExpression {
case class CatalystDataToAvro(
child: Expression,
jsonFormatSchema: Option[String]) extends UnaryExpression {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a default value None?

-   jsonFormatSchema: Option[String]) extends UnaryExpression {
+   jsonFormatSchema: Option[String] = None) extends UnaryExpression {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I am trying to avoid parameter with a default value. The result is quite different with/without a specified schema.
Also, this is consistent with CatalystDataToAvro.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless the default value is used a lot in tests, I don't think we should add default value in internal classes. We should force the caller side to specify the parameter when they instantiate the internal class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, @gengliangwang and @cloud-fan .


override def dataType: DataType = BinaryType

@transient private lazy val avroType =
SchemaConverters.toAvroType(child.dataType, child.nullable)
jsonFormatSchema
.map(new Schema.Parser().parse)
.getOrElse(SchemaConverters.toAvroType(child.dataType, child.nullable))

@transient private lazy val serializer =
new AvroSerializer(child.dataType, avroType, child.nullable)
Expand Down
Expand Up @@ -72,6 +72,19 @@ object functions {
*/
@Experimental
def to_avro(data: Column): Column = {
new Column(CatalystDataToAvro(data.expr))
new Column(CatalystDataToAvro(data.expr, None))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have the default value None, we don't need to touch this function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment in #25419 (comment)


/**
* Converts a column into binary of avro format.
*
* @param data the data column.
* @param jsonFormatSchema user-specified output avro schema in JSON string format.
*
* @since 3.0.0
*/
@Experimental
def to_avro(data: Column, jsonFormatSchema: String): Column = {
new Column(CatalystDataToAvro(data.expr, Some(jsonFormatSchema)))
}
}
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.avro

import org.apache.avro.Schema

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{ExpressionEvalHelper, GenericInternalRow, Literal}
Expand All @@ -38,12 +38,12 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite

private def checkResult(data: Literal, schema: String, expected: Any): Unit = {
checkEvaluation(
AvroDataToCatalyst(CatalystDataToAvro(data), schema, Map.empty),
AvroDataToCatalyst(CatalystDataToAvro(data, None), schema, Map.empty),
prepareExpectedResult(expected))
}

protected def checkUnsupportedRead(data: Literal, schema: String): Unit = {
val binary = CatalystDataToAvro(data)
val binary = CatalystDataToAvro(data, None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if we have the default value, we don't need to change line 41 and 46.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment in #25419 (comment)

intercept[Exception] {
AvroDataToCatalyst(binary, schema, Map("mode" -> "FAILFAST")).eval()
}
Expand Down Expand Up @@ -209,4 +209,31 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
checkUnsupportedRead(input, avroSchema)
}
}

test("user-specified schema") {
val data = Literal("SPADES")
val jsonFormatSchema =
"""
|{ "type": "enum",
| "name": "Suit",
| "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
|}
""".stripMargin
checkEvaluation(
AvroDataToCatalyst(
CatalystDataToAvro(
data,
Some(jsonFormatSchema)),
jsonFormatSchema,
options = Map.empty),
data.eval())
intercept[SparkException] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the error message?

AvroDataToCatalyst(
CatalystDataToAvro(
data,
None),
jsonFormatSchema,
options = Map.empty).eval()
}
}
}
21 changes: 15 additions & 6 deletions python/pyspark/sql/avro/functions.py
Expand Up @@ -69,26 +69,35 @@ def from_avro(data, jsonFormatSchema, options={}):

@ignore_unicode_prefix
@since(3.0)
def to_avro(data):
def to_avro(data, jsonFormatSchema=""):
"""
Converts a column into binary of avro format.

Note: Avro is built-in but external data source module since Spark 2.4. Please deploy the
application as per the deployment section of "Apache Avro Data Source Guide".

:param data: the data column.
:param jsonFormatSchema: user-specified output avro schema in JSON string format.

>>> from pyspark.sql import Row
>>> from pyspark.sql.avro.functions import to_avro
>>> data = [(1, Row(name='Alice', age=2))]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_avro(df.value).alias("avro")).collect()
[Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))]
>>> data = ['SPADES']
>>> df = spark.createDataFrame(data, "string")
>>> df.select(to_avro(df.value).alias("suite")).collect()
[Row(suite=bytearray(b'\x00\x0cSPADES'))]
>>> jsonFormatSchema='''["null", {"type": "enum", "name": "value",
... "symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]}]'''
>>> df.select(to_avro(df.value, jsonFormatSchema).alias("suite")).collect()
[Row(suite=bytearray(b'\x02\x00'))]
"""

sc = SparkContext._active_spark_context
try:
jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(_to_java_column(data))
if jsonFormatSchema == "":
jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(_to_java_column(data))
else:
jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(
_to_java_column(data), jsonFormatSchema)
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
_print_missing_jar("Avro", "avro", "avro", sc.version)
Expand Down