From 4ab1e80e8bca868fc2387cf74004cc44b6e4664a Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 16 Mar 2016 13:36:56 +0900 Subject: [PATCH] Parse JSON rows having an array type and a struct type in the same field. --- .../datasources/json/JSONRelation.scala | 1 - .../datasources/json/JacksonParser.scala | 37 +++++++++++++------ .../datasources/json/JsonSuite.scala | 19 +++++++++- .../datasources/json/TestJsonData.scala | 5 +++ 4 files changed, 48 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala index 05b44d1a2a04f..417d5928b6ffc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.json import java.io.CharArrayWriter import com.fasterxml.jackson.core.JsonFactory -import com.google.common.base.Objects import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.{LongWritable, NullWritable, Text} import org.apache.hadoop.mapred.{JobConf, TextInputFormat} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index b2f5c1e96421d..3252b6c77f888 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -49,8 +49,31 @@ object JacksonParser { /** * Parse the current token (and related children) according to a desired schema + * This is an wrapper for the method `convertField()` to handle a row wrapped + * with an array. */ - def convertField( + def convertRootField( + factory: JsonFactory, + parser: JsonParser, + schema: DataType): Any = { + import com.fasterxml.jackson.core.JsonToken._ + (parser.getCurrentToken, schema) match { + case (START_ARRAY, st: StructType) => + // SPARK-3308: support reading top level JSON arrays and take every element + // in such an array as a row + convertArray(factory, parser, st) + + case (START_OBJECT, ArrayType(st, _)) => + // the business end of SPARK-3308: + // when an object is found but an array is requested just wrap it in a list + convertField(factory, parser, st) :: Nil + + case _ => + convertField(factory, parser, schema) + } + } + + private def convertField( factory: JsonFactory, parser: JsonParser, schema: DataType): Any = { @@ -157,19 +180,9 @@ object JacksonParser { case (START_OBJECT, st: StructType) => convertObject(factory, parser, st) - case (START_ARRAY, st: StructType) => - // SPARK-3308: support reading top level JSON arrays and take every element - // in such an array as a row - convertArray(factory, parser, st) - case (START_ARRAY, ArrayType(st, _)) => convertArray(factory, parser, st) - case (START_OBJECT, ArrayType(st, _)) => - // the business end of SPARK-3308: - // when an object is found but an array is requested just wrap it in a list - convertField(factory, parser, st) :: Nil - case (START_OBJECT, MapType(StringType, kt, _)) => convertMap(factory, parser, kt) @@ -264,7 +277,7 @@ object JacksonParser { Utils.tryWithResource(factory.createParser(record)) { parser => parser.nextToken() - convertField(factory, parser, schema) match { + convertRootField(factory, parser, schema) match { case null => failedRecord(record) case row: InternalRow => row :: Nil case array: ArrayData => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 097ece3525a26..4237b4f944dfc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -65,7 +65,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { Utils.tryWithResource(factory.createParser(writer.toString)) { parser => parser.nextToken() - JacksonParser.convertField(factory, parser, dataType) + JacksonParser.convertRootField(factory, parser, dataType) } } @@ -1426,6 +1426,23 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } + test("Parse JSON rows having an array type and a struct type in the same field.") { + withTempDir { dir => + val dir = Utils.createTempDir() + dir.delete() + val path = dir.getCanonicalPath + arrayAndStructRecords.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) + + val schema = + StructType( + StructField("a", StructType( + StructField("b", StringType) :: Nil + )) :: Nil) + val jsonDF = sqlContext.read.schema(schema).json(path) + assert(jsonDF.count() == 2) + } + } + test("SPARK-12872 Support to specify the option for compression codec") { withTempDir { dir => val dir = Utils.createTempDir() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala index a0836058d3c74..b2eff816ee65c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -209,6 +209,11 @@ private[json] trait TestJsonData { sqlContext.sparkContext.parallelize( """{"ts":1451732645}""" :: Nil) + def arrayAndStructRecords: RDD[String] = + sqlContext.sparkContext.parallelize( + """{"a": {"b": 1}}""" :: + """{"a": []}""" :: Nil) + lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil) def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]())