Skip to content

Commit

Permalink
Parse JSON rows having an array type and a struct type in the same fi…
Browse files Browse the repository at this point in the history
…eld.
  • Loading branch information
HyukjinKwon committed Mar 16, 2016
1 parent 1840852 commit 4ab1e80
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.json
import java.io.CharArrayWriter

import com.fasterxml.jackson.core.JsonFactory
import com.google.common.base.Objects
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,31 @@ object JacksonParser {

/**
* Parse the current token (and related children) according to a desired schema
* This is an wrapper for the method `convertField()` to handle a row wrapped
* with an array.
*/
def convertField(
def convertRootField(
factory: JsonFactory,
parser: JsonParser,
schema: DataType): Any = {
import com.fasterxml.jackson.core.JsonToken._
(parser.getCurrentToken, schema) match {
case (START_ARRAY, st: StructType) =>
// SPARK-3308: support reading top level JSON arrays and take every element
// in such an array as a row
convertArray(factory, parser, st)

case (START_OBJECT, ArrayType(st, _)) =>
// the business end of SPARK-3308:
// when an object is found but an array is requested just wrap it in a list
convertField(factory, parser, st) :: Nil

case _ =>
convertField(factory, parser, schema)
}
}

private def convertField(
factory: JsonFactory,
parser: JsonParser,
schema: DataType): Any = {
Expand Down Expand Up @@ -157,19 +180,9 @@ object JacksonParser {
case (START_OBJECT, st: StructType) =>
convertObject(factory, parser, st)

case (START_ARRAY, st: StructType) =>
// SPARK-3308: support reading top level JSON arrays and take every element
// in such an array as a row
convertArray(factory, parser, st)

case (START_ARRAY, ArrayType(st, _)) =>
convertArray(factory, parser, st)

case (START_OBJECT, ArrayType(st, _)) =>
// the business end of SPARK-3308:
// when an object is found but an array is requested just wrap it in a list
convertField(factory, parser, st) :: Nil

case (START_OBJECT, MapType(StringType, kt, _)) =>
convertMap(factory, parser, kt)

Expand Down Expand Up @@ -264,7 +277,7 @@ object JacksonParser {
Utils.tryWithResource(factory.createParser(record)) { parser =>
parser.nextToken()

convertField(factory, parser, schema) match {
convertRootField(factory, parser, schema) match {
case null => failedRecord(record)
case row: InternalRow => row :: Nil
case array: ArrayData =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {

Utils.tryWithResource(factory.createParser(writer.toString)) { parser =>
parser.nextToken()
JacksonParser.convertField(factory, parser, dataType)
JacksonParser.convertRootField(factory, parser, dataType)
}
}

Expand Down Expand Up @@ -1426,6 +1426,23 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
}

test("Parse JSON rows having an array type and a struct type in the same field.") {
withTempDir { dir =>
val dir = Utils.createTempDir()
dir.delete()
val path = dir.getCanonicalPath
arrayAndStructRecords.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)

val schema =
StructType(
StructField("a", StructType(
StructField("b", StringType) :: Nil
)) :: Nil)
val jsonDF = sqlContext.read.schema(schema).json(path)
assert(jsonDF.count() == 2)
}
}

test("SPARK-12872 Support to specify the option for compression codec") {
withTempDir { dir =>
val dir = Utils.createTempDir()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ private[json] trait TestJsonData {
sqlContext.sparkContext.parallelize(
"""{"ts":1451732645}""" :: Nil)

def arrayAndStructRecords: RDD[String] =
sqlContext.sparkContext.parallelize(
"""{"a": {"b": 1}}""" ::
"""{"a": []}""" :: Nil)

lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil)

def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]())
Expand Down

0 comments on commit 4ab1e80

Please sign in to comment.