From f47483951e12d563b7696940a2cfc2fdc3b27ab2 Mon Sep 17 00:00:00 2001 From: Arun Mahadevan Date: Tue, 28 Aug 2018 01:00:17 -0700 Subject: [PATCH 1/2] [SPARK-25260][SQL] Fix namespace handling in SchemaConverters.toAvroType --- .../spark/sql/avro/SchemaConverters.scala | 18 ++++++--------- .../org/apache/spark/sql/avro/AvroSuite.scala | 22 ++++++++++++++++++- 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index 3a15e8d087fa4..bd1576587d7fa 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -123,7 +123,7 @@ object SchemaConverters { catalystType: DataType, nullable: Boolean = false, recordName: String = "topLevelRecord", - prevNameSpace: String = "") + nameSpace: String = "") : Schema = { val builder = SchemaBuilder.builder() @@ -143,29 +143,25 @@ object SchemaConverters { val avroType = LogicalTypes.decimal(d.precision, d.scale) val fixedSize = minBytesForPrecision(d.precision) // Need to avoid naming conflict for the fixed fields - val name = prevNameSpace match { + val name = nameSpace match { case "" => s"$recordName.fixed" - case _ => s"$prevNameSpace.$recordName.fixed" + case _ => s"$nameSpace.$recordName.fixed" } avroType.addToSchema(SchemaBuilder.fixed(name).size(fixedSize)) case BinaryType => builder.bytesType() case ArrayType(et, containsNull) => builder.array() - .items(toAvroType(et, containsNull, recordName, prevNameSpace)) + .items(toAvroType(et, containsNull, recordName, nameSpace)) case MapType(StringType, vt, valueContainsNull) => builder.map() - .values(toAvroType(vt, valueContainsNull, recordName, prevNameSpace)) + .values(toAvroType(vt, valueContainsNull, recordName, nameSpace)) case st: StructType => - val nameSpace = prevNameSpace match { - case "" => recordName - case _ => s"$prevNameSpace.$recordName" - } - + val childNameSpace = if (nameSpace != "") s"$nameSpace.$recordName" else recordName val fieldsAssembler = builder.record(recordName).namespace(nameSpace).fields() st.foreach { f => val fieldAvroType = - toAvroType(f.dataType, f.nullable, f.name, nameSpace) + toAvroType(f.dataType, f.nullable, f.name, childNameSpace) fieldsAssembler.name(f.name).`type`(fieldAvroType).noDefault() } fieldsAssembler.endRecord() diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 72bef9e3aed41..1b31986943fd6 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -1082,7 +1082,6 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { val schema = getAvroSchemaStringFromFiles(dir.toString) assert(schema.contains("\"namespace\":\"topLevelRecord\"")) assert(schema.contains("\"namespace\":\"topLevelRecord.data\"")) - assert(schema.contains("\"namespace\":\"topLevelRecord.data.data\"")) } } @@ -1099,6 +1098,27 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } + test("check namespace - toAvroType") { + val sparkSchema = StructType(Seq( + StructField("name", StringType, nullable = false), + StructField("address", StructType(Seq( + StructField("city", StringType, nullable = false), + StructField("state", StringType, nullable = false))), + nullable = false))) + val employeeType = SchemaConverters.toAvroType(sparkSchema, + recordName = "employee", + nameSpace = "foo.bar") + + assert("foo.bar.employee" == employeeType.getFullName) + assert("employee" == employeeType.getName) + assert("foo.bar" == employeeType.getNamespace) + + val addressType = employeeType.getField("address").schema() + assert("foo.bar.employee.address" == addressType.getFullName) + assert("address" == addressType.getName) + assert("foo.bar.employee" == addressType.getNamespace) + } + case class NestedMiddleArray(id: Int, data: Array[NestedBottom]) case class NestedTopArray(id: Int, data: NestedMiddleArray) From 2153428f62dca7d462d17c85576426ecca52d1cb Mon Sep 17 00:00:00 2001 From: Arun Mahadevan Date: Tue, 28 Aug 2018 10:33:07 -0700 Subject: [PATCH 2/2] Added test case for toAvroType nested with empty namespace --- .../org/apache/spark/sql/avro/AvroSuite.scala | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 1b31986943fd6..9ad4388414eaa 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -1109,14 +1109,34 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { recordName = "employee", nameSpace = "foo.bar") - assert("foo.bar.employee" == employeeType.getFullName) - assert("employee" == employeeType.getName) - assert("foo.bar" == employeeType.getNamespace) + assert(employeeType.getFullName == "foo.bar.employee") + assert(employeeType.getName == "employee") + assert(employeeType.getNamespace == "foo.bar") val addressType = employeeType.getField("address").schema() - assert("foo.bar.employee.address" == addressType.getFullName) - assert("address" == addressType.getName) - assert("foo.bar.employee" == addressType.getNamespace) + assert(addressType.getFullName == "foo.bar.employee.address") + assert(addressType.getName == "address") + assert(addressType.getNamespace == "foo.bar.employee") + } + + test("check empty namespace - toAvroType") { + val sparkSchema = StructType(Seq( + StructField("name", StringType, nullable = false), + StructField("address", StructType(Seq( + StructField("city", StringType, nullable = false), + StructField("state", StringType, nullable = false))), + nullable = false))) + val employeeType = SchemaConverters.toAvroType(sparkSchema, + recordName = "employee") + + assert(employeeType.getFullName == "employee") + assert(employeeType.getName == "employee") + assert(employeeType.getNamespace == null) + + val addressType = employeeType.getField("address").schema() + assert(addressType.getFullName == "employee.address") + assert(addressType.getName == "address") + assert(addressType.getNamespace == "employee") } case class NestedMiddleArray(id: Int, data: Array[NestedBottom])