Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13719][SQL] Parse JSON rows having an array type and a struct type in the same fieild #11752

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.json
import java.io.CharArrayWriter

import com.fasterxml.jackson.core.JsonFactory
import com.google.common.base.Objects
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,31 @@ object JacksonParser {

/**
* Parse the current token (and related children) according to a desired schema
* This is an wrapper for the method `convertField()` to handle a row wrapped
* with an array.
*/
def convertField(
def convertRootField(
factory: JsonFactory,
parser: JsonParser,
schema: DataType): Any = {
import com.fasterxml.jackson.core.JsonToken._
(parser.getCurrentToken, schema) match {
case (START_ARRAY, st: StructType) =>
// SPARK-3308: support reading top level JSON arrays and take every element
// in such an array as a row
convertArray(factory, parser, st)

case (START_OBJECT, ArrayType(st, _)) =>
// the business end of SPARK-3308:
// when an object is found but an array is requested just wrap it in a list
convertField(factory, parser, st) :: Nil

case _ =>
convertField(factory, parser, schema)
}
}

private def convertField(
factory: JsonFactory,
parser: JsonParser,
schema: DataType): Any = {
Expand Down Expand Up @@ -157,19 +180,9 @@ object JacksonParser {
case (START_OBJECT, st: StructType) =>
convertObject(factory, parser, st)

case (START_ARRAY, st: StructType) =>
// SPARK-3308: support reading top level JSON arrays and take every element
// in such an array as a row
convertArray(factory, parser, st)

case (START_ARRAY, ArrayType(st, _)) =>
convertArray(factory, parser, st)

case (START_OBJECT, ArrayType(st, _)) =>
// the business end of SPARK-3308:
// when an object is found but an array is requested just wrap it in a list
convertField(factory, parser, st) :: Nil

case (START_OBJECT, MapType(StringType, kt, _)) =>
convertMap(factory, parser, kt)

Expand Down Expand Up @@ -264,7 +277,7 @@ object JacksonParser {
Utils.tryWithResource(factory.createParser(record)) { parser =>
parser.nextToken()

convertField(factory, parser, schema) match {
convertRootField(factory, parser, schema) match {
case null => failedRecord(record)
case row: InternalRow => row :: Nil
case array: ArrayData =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {

Utils.tryWithResource(factory.createParser(writer.toString)) { parser =>
parser.nextToken()
JacksonParser.convertField(factory, parser, dataType)
JacksonParser.convertRootField(factory, parser, dataType)
}
}

Expand Down Expand Up @@ -1426,6 +1426,23 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
}

test("Parse JSON rows having an array type and a struct type in the same field.") {
withTempDir { dir =>
val dir = Utils.createTempDir()
dir.delete()
val path = dir.getCanonicalPath
arrayAndStructRecords.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)

val schema =
StructType(
StructField("a", StructType(
StructField("b", StringType) :: Nil
)) :: Nil)
val jsonDF = sqlContext.read.schema(schema).json(path)
assert(jsonDF.count() == 2)
}
}

test("SPARK-12872 Support to specify the option for compression codec") {
withTempDir { dir =>
val dir = Utils.createTempDir()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ private[json] trait TestJsonData {
sqlContext.sparkContext.parallelize(
"""{"ts":1451732645}""" :: Nil)

def arrayAndStructRecords: RDD[String] =
sqlContext.sparkContext.parallelize(
"""{"a": {"b": 1}}""" ::
"""{"a": []}""" :: Nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this case, we will first throw an exception, catch it, and finally set the value to null, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It will set null in failedRecord().


lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil)

def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]())
Expand Down