From 74fa944209491b9884dbfc8b71e56e36b45e28a4 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 8 Jul 2016 10:14:18 +0900 Subject: [PATCH 01/23] Avoid record-per type dispatch in JSON when reading --- .../apache/spark/sql/DataFrameReader.scala | 21 +- .../datasources/json/InferSchema.scala | 2 +- .../datasources/json/JSONOptions.scala | 7 +- .../datasources/json/JacksonParser.scala | 467 +++++++++++------- .../datasources/json/JsonFileFormat.scala | 24 +- .../datasources/json/JsonSuite.scala | 24 +- 6 files changed, 319 insertions(+), 226 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index e8c2885d7737c..c6a14e4c777c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -309,26 +309,23 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def json(jsonRDD: RDD[String]): DataFrame = { - val parsedOptions: JSONOptions = new JSONOptions(extraOptions.toMap) - val columnNameOfCorruptRecord = - parsedOptions.columnNameOfCorruptRecord - .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) + val conf = sparkSession.sessionState.conf + val parsedOptions: JSONOptions = new JSONOptions(extraOptions.toMap, conf) val schema = userSpecifiedSchema.getOrElse { InferSchema.infer( jsonRDD, - columnNameOfCorruptRecord, parsedOptions) } + val parsed = jsonRDD.mapPartitions { iter => + val parser = new JacksonParser(schema, parsedOptions) + iter.flatMap { record => + parser.parse(record) + } + } Dataset.ofRows( sparkSession, - LogicalRDD( - schema.toAttributes, - JacksonParser.parse( - jsonRDD, - schema, - columnNameOfCorruptRecord, - parsedOptions))(sparkSession)) + LogicalRDD(schema.toAttributes, parsed)(sparkSession)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala index 579b036417d24..69d2014eee079 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala @@ -37,11 +37,11 @@ private[sql] object InferSchema { */ def infer( json: RDD[String], - columnNameOfCorruptRecords: String, configOptions: JSONOptions): StructType = { require(configOptions.samplingRatio > 0, s"samplingRatio (${configOptions.samplingRatio}) should be greater than 0") val shouldHandleCorruptRecord = configOptions.permissive + val columnNameOfCorruptRecords = configOptions.columnNameOfCorruptRecord val schemaData = if (configOptions.samplingRatio > 0.99) { json } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala index 66f1126fb9ae6..9ba48a32c268a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala @@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.{JsonFactory, JsonParser} import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes} +import org.apache.spark.sql.internal.SQLConf /** * Options for the JSON data source. @@ -28,7 +29,8 @@ import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]]. */ private[sql] class JSONOptions( - @transient private val parameters: Map[String, String]) + @transient private val parameters: Map[String, String], + @transient private val sqlConf: SQLConf) extends Logging with Serializable { val samplingRatio = @@ -51,7 +53,8 @@ private[sql] class JSONOptions( parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false) val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName) private val parseMode = parameters.getOrElse("mode", "PERMISSIVE") - val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord") + val columnNameOfCorruptRecord = + parameters.getOrElse("columnNameOfCorruptRecord", sqlConf.columnNameOfCorruptRecord) // Parse mode flags if (!ParseModes.isValidMode(parseMode)) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 733fcbfea101e..9b75f4f2a0daf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -24,7 +24,6 @@ import scala.collection.mutable.ArrayBuffer import com.fasterxml.jackson.core._ import org.apache.spark.internal.Logging -import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util._ @@ -35,16 +34,47 @@ import org.apache.spark.util.Utils private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg) -object JacksonParser extends Logging { +private[sql] class JacksonParser(schema: StructType, options: JSONOptions) extends Logging { + import com.fasterxml.jackson.core.JsonToken._ - def parse( - input: RDD[String], - schema: StructType, - columnNameOfCorruptRecords: String, - configOptions: JSONOptions): RDD[InternalRow] = { + // A `ValueConverter` is responsible for converting a value from `JsonParser` + // to a value in a field for `InternalRow`. + private type ValueConverter = (JsonParser) => Any - input.mapPartitions { iter => - parseJson(iter, schema, columnNameOfCorruptRecords, configOptions) + // `ValueConverter`s for the root schema for all fields in the schema + private val rootConverter: ValueConverter = makeRootConverter(schema) + + private val factory = new JsonFactory() + + private def failedConversion( + parser: JsonParser, + dataType: DataType, + token: JsonToken): Any = token match { + case null | VALUE_NULL => null + case _ if parser.getTextLength < 1 => null + case _ => + // We cannot parse this token based on the given data type. So, we throw a + // SparkSQLJsonProcessingException and this exception will be caught by + // parseJson method. + throw new SparkSQLJsonProcessingException( + s"Failed to parse a value for data type $dataType (current token: $token).") + } + + private def failedRecord(record: String): Seq[InternalRow] = { + // create a row even if no corrupt record column is present + if (options.failFast) { + throw new RuntimeException(s"Malformed line in FAILFAST mode: $record") + } + if (options.dropMalformed) { + logWarning(s"Dropping malformed line: $record") + Nil + } else { + val row = new GenericMutableRow(schema.length) + for (corruptIndex <- schema.getFieldIndex(options.columnNameOfCorruptRecord)) { + require(schema(corruptIndex).dataType == StringType) + row.update(corruptIndex, UTF8String.fromString(record)) + } + Seq(row) } } @@ -53,149 +83,232 @@ object JacksonParser extends Logging { * This is a wrapper for the method `convertField()` to handle a row wrapped * with an array. */ - def convertRootField( - factory: JsonFactory, - parser: JsonParser, - schema: DataType): Any = { - import com.fasterxml.jackson.core.JsonToken._ - (parser.getCurrentToken, schema) match { - case (START_ARRAY, st: StructType) => - // SPARK-3308: support reading top level JSON arrays and take every element - // in such an array as a row - convertArray(factory, parser, st) - - case (START_OBJECT, ArrayType(st, _)) => - // the business end of SPARK-3308: - // when an object is found but an array is requested just wrap it in a list - convertField(factory, parser, st) :: Nil + def makeRootConverter(dataType: DataType): ValueConverter = dataType match { + case st: StructType => + // SPARK-3308: support reading top level JSON arrays and take every element + // in such an array as a row + val elementConverter = makeConverter(st) + val fieldConverters = st.map(_.dataType).map(makeConverter) + (parser: JsonParser) => parser.getCurrentToken match { + case START_ARRAY => convertArray(parser, elementConverter) + case START_OBJECT => convertObject(parser, st, fieldConverters) + case token => failedConversion(parser, st, token) + } - case _ => - convertField(factory, parser, schema) - } + case ArrayType(st: StructType, _) => + // the business end of SPARK-3308: + // when an object is found but an array is requested just wrap it in a list + val elementConverter = makeConverter(st) + val fieldConverters = st.map(_.dataType).map(makeConverter) + (parser: JsonParser) => parser.getCurrentToken match { + case START_OBJECT => convertObject(parser, st, fieldConverters) + case START_ARRAY => convertArray(parser, elementConverter) + case token => failedConversion(parser, st, token) + } + + case _ => makeConverter(dataType) } - private def convertField( - factory: JsonFactory, - parser: JsonParser, - schema: DataType): Any = { - import com.fasterxml.jackson.core.JsonToken._ - (parser.getCurrentToken, schema) match { - case (null | VALUE_NULL, _) => - null - - case (FIELD_NAME, _) => - parser.nextToken() - convertField(factory, parser, schema) - - case (VALUE_STRING, StringType) => - UTF8String.fromString(parser.getText) - - case (VALUE_STRING, _) if parser.getTextLength < 1 => - // guard the non string type - null - - case (VALUE_STRING, BinaryType) => - parser.getBinaryValue - - case (VALUE_STRING, DateType) => - val stringValue = parser.getText - if (stringValue.contains("-")) { - // The format of this string will probably be "yyyy-mm-dd". - DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime) - } else { - // In Spark 1.5.0, we store the data as number of days since epoch in string. - // So, we just convert it to Int. - stringValue.toInt + private def makeConverter(dataType: DataType): ValueConverter = dataType match { + case dt: BooleanType => + (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + parser.getCurrentToken match { + case VALUE_TRUE => true + case VALUE_FALSE => false + case token => failedConversion(parser, dt, token) } + } - case (VALUE_STRING, TimestampType) => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - DateTimeUtils.stringToTime(parser.getText).getTime * 1000L + case dt: ByteType => + (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + parser.getCurrentToken match { + case VALUE_NUMBER_INT => parser.getByteValue + case token => failedConversion(parser, dt, token) + } + } - case (VALUE_NUMBER_INT, TimestampType) => - parser.getLongValue * 1000000L + case dt: ShortType => + (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + parser.getCurrentToken match { + case VALUE_NUMBER_INT => parser.getShortValue + case token => failedConversion(parser, dt, token) + } + } - case (_, StringType) => - val writer = new ByteArrayOutputStream() - Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) { - generator => generator.copyCurrentStructure(parser) + case dt: IntegerType => + (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + parser.getCurrentToken match { + case VALUE_NUMBER_INT => parser.getIntValue + case token => failedConversion(parser, dt, token) } - UTF8String.fromBytes(writer.toByteArray) - - case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, FloatType) => - parser.getFloatValue - - case (VALUE_STRING, FloatType) => - // Special case handling for NaN and Infinity. - val value = parser.getText - val lowerCaseValue = value.toLowerCase() - if (lowerCaseValue.equals("nan") || - lowerCaseValue.equals("infinity") || - lowerCaseValue.equals("-infinity") || - lowerCaseValue.equals("inf") || - lowerCaseValue.equals("-inf")) { - value.toFloat - } else { - throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.") + } + + case dt: LongType => + (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + parser.getCurrentToken match { + case VALUE_NUMBER_INT => parser.getLongValue + case token => failedConversion(parser, dt, token) } + } + + case dt: FloatType => + (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + parser.getCurrentToken match { + case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT => + parser.getFloatValue + + case VALUE_STRING => + // Special case handling for NaN and Infinity. + val value = parser.getText + val lowerCaseValue = value.toLowerCase + if (lowerCaseValue.equals("nan") || + lowerCaseValue.equals("infinity") || + lowerCaseValue.equals("-infinity") || + lowerCaseValue.equals("inf") || + lowerCaseValue.equals("-inf")) { + value.toFloat + } else { + throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.") + } - case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, DoubleType) => - parser.getDoubleValue - - case (VALUE_STRING, DoubleType) => - // Special case handling for NaN and Infinity. - val value = parser.getText - val lowerCaseValue = value.toLowerCase() - if (lowerCaseValue.equals("nan") || - lowerCaseValue.equals("infinity") || - lowerCaseValue.equals("-infinity") || - lowerCaseValue.equals("inf") || - lowerCaseValue.equals("-inf")) { - value.toDouble - } else { - throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.") + case token => failedConversion(parser, dt, token) } + } - case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, dt: DecimalType) => - Decimal(parser.getDecimalValue, dt.precision, dt.scale) + case dt: DoubleType => + (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + parser.getCurrentToken match { + case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT => + parser.getDoubleValue + + case VALUE_STRING => + // Special case handling for NaN and Infinity. + val value = parser.getText + val lowerCaseValue = value.toLowerCase + if (lowerCaseValue.equals("nan") || + lowerCaseValue.equals("infinity") || + lowerCaseValue.equals("-infinity") || + lowerCaseValue.equals("inf") || + lowerCaseValue.equals("-inf")) { + value.toDouble + } else { + throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.") + } - case (VALUE_NUMBER_INT, ByteType) => - parser.getByteValue + case token => failedConversion(parser, dt, token) + } + } - case (VALUE_NUMBER_INT, ShortType) => - parser.getShortValue + case StringType => + (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + parser.getCurrentToken match { + case VALUE_STRING => + UTF8String.fromString(parser.getText) - case (VALUE_NUMBER_INT, IntegerType) => - parser.getIntValue + case token if token != VALUE_NULL => + val writer = new ByteArrayOutputStream() + Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) { + generator => generator.copyCurrentStructure(parser) + } + UTF8String.fromBytes(writer.toByteArray) - case (VALUE_NUMBER_INT, LongType) => - parser.getLongValue + case token => failedConversion(parser, dataType, token) + } + } - case (VALUE_TRUE, BooleanType) => - true + case dt: TimestampType => + (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + parser.getCurrentToken match { + case VALUE_STRING => + // This one will lose microseconds parts. + // See https://issues.apache.org/jira/browse/SPARK-10681. + DateTimeUtils.stringToTime(parser.getText).getTime * 1000L - case (VALUE_FALSE, BooleanType) => - false + case VALUE_NUMBER_INT => + parser.getLongValue * 1000000L - case (START_OBJECT, st: StructType) => - convertObject(factory, parser, st) + case token => failedConversion(parser, dt, token) + } + } + + case dt: DateType => + (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + parser.getCurrentToken match { + case VALUE_STRING => + val stringValue = parser.getText + if (stringValue.contains("-")) { + // The format of this string will probably be "yyyy-mm-dd". + DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime) + } else { + // In Spark 1.5.0, we store the data as number of days since epoch in string. + // So, we just convert it to Int. + stringValue.toInt + } + + case token => failedConversion(parser, dt, token) + } + } + + case dt: BinaryType => + (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + parser.getCurrentToken match { + case VALUE_STRING => parser.getBinaryValue + case token => failedConversion(parser, dt, token) + } + } + + case dt: DecimalType => + (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + parser.getCurrentToken match { + case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) => + Decimal(parser.getDecimalValue, dt.precision, dt.scale) - case (START_ARRAY, ArrayType(st, _)) => - convertArray(factory, parser, st) + case token => failedConversion(parser, dt, token) + } + } - case (START_OBJECT, MapType(StringType, kt, _)) => - convertMap(factory, parser, kt) + case st: StructType => + val fieldConverters = st.map(_.dataType).map(makeConverter) + (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + parser.getCurrentToken match { + case START_OBJECT => convertObject(parser, st, fieldConverters) + case token => failedConversion(parser, st, token) + } + } - case (_, udt: UserDefinedType[_]) => - convertField(factory, parser, udt.sqlType) + case at: ArrayType => + val elementConverter = makeConverter(at.elementType) + (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + parser.getCurrentToken match { + case START_ARRAY => convertArray(parser, elementConverter) + case token => failedConversion(parser, at, token) + } + } - case (token, dataType) => - // We cannot parse this token based on the given data type. So, we throw a - // SparkSQLJsonProcessingException and this exception will be caught by - // parseJson method. - throw new SparkSQLJsonProcessingException( - s"Failed to parse a value for data type $dataType (current token: $token).") + case mt: MapType => + val valueConverter = makeConverter(mt.valueType) + (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + parser.getCurrentToken match { + case START_OBJECT => convertMap(parser, valueConverter) + case token => failedConversion(parser, mt, token) + } + } + + case t: UserDefinedType[_] => + makeConverter(t.sqlType) + + case dt: DataType => + (parser: JsonParser) => + failedConversion(parser, dt, parser.getCurrentToken) + } + + private def skipFieldNameTokenIfExists(parser: JsonParser)(f: => Any): Any = { + parser.getCurrentToken match { + case FIELD_NAME => + parser.nextToken + f + case _ => + f } } @@ -205,14 +318,14 @@ object JacksonParser extends Logging { * Fields in the json that are not defined in the requested schema will be dropped. */ private def convertObject( - factory: JsonFactory, parser: JsonParser, - schema: StructType): InternalRow = { - val row = new GenericMutableRow(schema.length) + currentSchema: StructType, + fieldConverters: Seq[ValueConverter]): InternalRow = { + val row = new GenericMutableRow(currentSchema.length) while (nextUntil(parser, JsonToken.END_OBJECT)) { - schema.getFieldIndex(parser.getCurrentName) match { + currentSchema.getFieldIndex(parser.getCurrentName) match { case Some(index) => - row.update(index, convertField(factory, parser, schema(index).dataType)) + row.update(index, fieldConverters(index).apply(parser)) case None => parser.skipChildren() @@ -226,84 +339,54 @@ object JacksonParser extends Logging { * Parse an object as a Map, preserving all fields */ private def convertMap( - factory: JsonFactory, parser: JsonParser, - valueType: DataType): MapData = { + fieldConverter: ValueConverter): MapData = { val keys = ArrayBuffer.empty[UTF8String] val values = ArrayBuffer.empty[Any] while (nextUntil(parser, JsonToken.END_OBJECT)) { keys += UTF8String.fromString(parser.getCurrentName) - values += convertField(factory, parser, valueType) + values += fieldConverter.apply(parser) } + ArrayBasedMapData(keys.toArray, values.toArray) } private def convertArray( - factory: JsonFactory, parser: JsonParser, - elementType: DataType): ArrayData = { + fieldConverter: ValueConverter): ArrayData = { val values = ArrayBuffer.empty[Any] while (nextUntil(parser, JsonToken.END_ARRAY)) { - values += convertField(factory, parser, elementType) + values += fieldConverter.apply(parser) } new GenericArrayData(values.toArray) } - def parseJson( - input: Iterator[String], - schema: StructType, - columnNameOfCorruptRecords: String, - configOptions: JSONOptions): Iterator[InternalRow] = { - - def failedRecord(record: String): Seq[InternalRow] = { - // create a row even if no corrupt record column is present - if (configOptions.failFast) { - throw new RuntimeException(s"Malformed line in FAILFAST mode: $record") - } - if (configOptions.dropMalformed) { - logWarning(s"Dropping malformed line: $record") - Nil - } else { - val row = new GenericMutableRow(schema.length) - for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) { - require(schema(corruptIndex).dataType == StringType) - row.update(corruptIndex, UTF8String.fromString(record)) - } - Seq(row) - } - } - - val factory = new JsonFactory() - configOptions.setJacksonOptions(factory) - - input.flatMap { record => - if (record.trim.isEmpty) { - Nil - } else { - try { - Utils.tryWithResource(factory.createParser(record)) { parser => - parser.nextToken() - - convertRootField(factory, parser, schema) match { - case null => failedRecord(record) - case row: InternalRow => row :: Nil - case array: ArrayData => - if (array.numElements() == 0) { - Nil - } else { - array.toArray[InternalRow](schema) - } - case _ => - failedRecord(record) - } + def parse(input: String): Seq[InternalRow] = { + if (input.trim.isEmpty) { + Nil + } else { + try { + Utils.tryWithResource(factory.createParser(input)) { parser => + parser.nextToken() + rootConverter.apply(parser) match { + case null => failedRecord(input) + case row: InternalRow => row :: Nil + case array: ArrayData => + if (array.numElements() == 0) { + Nil + } else { + array.toArray[InternalRow](schema) + } + case _ => + failedRecord(input) } - } catch { - case _: JsonProcessingException => - failedRecord(record) - case _: SparkSQLJsonProcessingException => - failedRecord(record) } + } catch { + case _: JsonProcessingException => + failedRecord(input) + case _: SparkSQLJsonProcessingException => + failedRecord(input) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 86aef1f7d4411..ab3cc362ecb43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -49,10 +49,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { if (files.isEmpty) { None } else { - val parsedOptions: JSONOptions = new JSONOptions(options) - val columnNameOfCorruptRecord = - parsedOptions.columnNameOfCorruptRecord - .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) + val parsedOptions: JSONOptions = + new JSONOptions(options, sparkSession.sessionState.conf) val jsonFiles = files.filterNot { status => val name = status.getPath.getName name.startsWith("_") || name.startsWith(".") @@ -60,7 +58,6 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { val jsonSchema = InferSchema.infer( createBaseRdd(sparkSession, jsonFiles), - columnNameOfCorruptRecord, parsedOptions) checkConstraints(jsonSchema) @@ -74,7 +71,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { val conf = job.getConfiguration - val parsedOptions: JSONOptions = new JSONOptions(options) + val parsedOptions: JSONOptions = + new JSONOptions(options, sparkSession.sessionState.conf) parsedOptions.compressionCodec.foreach { codec => CompressionCodecs.setCodecConfiguration(conf, codec) } @@ -101,18 +99,16 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - val parsedOptions: JSONOptions = new JSONOptions(options) - val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord - .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) + val parsedOptions: JSONOptions = + new JSONOptions(options, sparkSession.sessionState.conf) (file: PartitionedFile) => { val lines = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map(_.toString) + val parser = new JacksonParser(requiredSchema, parsedOptions) - JacksonParser.parseJson( - lines, - requiredSchema, - columnNameOfCorruptRecord, - parsedOptions) + lines.flatMap { record => + parser.parse(record) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 6c72019702c3d..2835bf0c7e338 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -64,9 +64,15 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { generator.flush() } - Utils.tryWithResource(factory.createParser(writer.toString)) { parser => - parser.nextToken() - JacksonParser.convertRootField(factory, parser, dataType) + val dummyOption = + new JSONOptions(Map.empty[String, String], new SQLConf()) + val dummySchema = StructType(Seq.empty) + val parser = new JacksonParser(dummySchema, dummyOption) + + Utils.tryWithResource(factory.createParser(writer.toString)) { jsonParser => + jsonParser.nextToken() + val converter = parser.makeRootConverter(dataType) + converter.apply(jsonParser) } } @@ -263,6 +269,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { jsonDF.createOrReplaceTempView("jsonTable") + val aa = jsonDF.collect() + checkAnswer( sql("select * from jsonTable"), Row(new java.math.BigDecimal("92233720368547758070"), @@ -1336,7 +1344,10 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("SPARK-6245 JsonRDD.inferSchema on empty RDD") { // This is really a test that it doesn't throw an exception - val emptySchema = InferSchema.infer(empty, "", new JSONOptions(Map())) + val options = Map("columnNameOfCorruptRecord" -> "") + val parsedOptions = + new JSONOptions(options, new SQLConf()) + val emptySchema = InferSchema.infer(empty, parsedOptions) assert(StructType(Seq()) === emptySchema) } @@ -1360,7 +1371,10 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("SPARK-8093 Erase empty structs") { - val emptySchema = InferSchema.infer(emptyRecords, "", new JSONOptions(Map())) + val options = Map("columnNameOfCorruptRecord" -> "") + val parsedOptions = + new JSONOptions(options, new SQLConf()) + val emptySchema = InferSchema.infer(emptyRecords, parsedOptions) assert(StructType(Seq()) === emptySchema) } From 2d77f66f2c78bb139212011bfa1fa2efbf6b9d5b Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 8 Jul 2016 10:17:28 +0900 Subject: [PATCH 02/23] Remove unused imports and unused variable --- .../spark/sql/execution/datasources/json/JsonSuite.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 2835bf0c7e338..e88bb767fae7f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -21,10 +21,7 @@ import java.io.{File, StringWriter} import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import scala.collection.JavaConverters._ - import com.fasterxml.jackson.core.JsonFactory -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec @@ -269,8 +266,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { jsonDF.createOrReplaceTempView("jsonTable") - val aa = jsonDF.collect() - checkAnswer( sql("select * from jsonTable"), Row(new java.math.BigDecimal("92233720368547758070"), From 9dbcec74b2fe64e06c2d013da56d6c4a46fdabb7 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 8 Jul 2016 13:18:58 +0900 Subject: [PATCH 03/23] Add missed JSON options --- .../spark/sql/execution/datasources/json/JacksonParser.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 9b75f4f2a0daf..9ef3ed5bd00a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -45,6 +45,8 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten private val rootConverter: ValueConverter = makeRootConverter(schema) private val factory = new JsonFactory() + options.setJacksonOptions(factory) + private def failedConversion( parser: JsonParser, From cd44ac518645b17f2dc1397d09709f60fa20066d Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 8 Jul 2016 14:50:19 +0900 Subject: [PATCH 04/23] Remove a newline --- .../spark/sql/execution/datasources/json/JacksonParser.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 9ef3ed5bd00a3..60c8d4b4a6bda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -47,7 +47,6 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten private val factory = new JsonFactory() options.setJacksonOptions(factory) - private def failedConversion( parser: JsonParser, dataType: DataType, From c5cce08b615606185855b667f5ffe5c960e098b5 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 8 Jul 2016 15:01:38 +0900 Subject: [PATCH 05/23] Clean up pattern matching --- .../datasources/json/JacksonParser.scala | 50 +++++++++---------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 60c8d4b4a6bda..9c450adcbab1a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -91,8 +91,8 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten val elementConverter = makeConverter(st) val fieldConverters = st.map(_.dataType).map(makeConverter) (parser: JsonParser) => parser.getCurrentToken match { - case START_ARRAY => convertArray(parser, elementConverter) case START_OBJECT => convertObject(parser, st, fieldConverters) + case START_ARRAY => convertArray(parser, elementConverter) case token => failedConversion(parser, st, token) } @@ -111,48 +111,48 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten } private def makeConverter(dataType: DataType): ValueConverter = dataType match { - case dt: BooleanType => + case BooleanType => (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { parser.getCurrentToken match { case VALUE_TRUE => true case VALUE_FALSE => false - case token => failedConversion(parser, dt, token) + case token => failedConversion(parser, dataType, token) } } - case dt: ByteType => + case ByteType => (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT => parser.getByteValue - case token => failedConversion(parser, dt, token) + case token => failedConversion(parser, dataType, token) } } - case dt: ShortType => + case ShortType => (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT => parser.getShortValue - case token => failedConversion(parser, dt, token) + case token => failedConversion(parser, dataType, token) } } - case dt: IntegerType => + case IntegerType => (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT => parser.getIntValue - case token => failedConversion(parser, dt, token) + case token => failedConversion(parser, dataType, token) } } - case dt: LongType => + case LongType => (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT => parser.getLongValue - case token => failedConversion(parser, dt, token) + case token => failedConversion(parser, dataType, token) } } - case dt: FloatType => + case FloatType => (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT => @@ -172,11 +172,11 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.") } - case token => failedConversion(parser, dt, token) + case token => failedConversion(parser, dataType, token) } } - case dt: DoubleType => + case DoubleType => (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT => @@ -196,7 +196,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.") } - case token => failedConversion(parser, dt, token) + case token => failedConversion(parser, dataType, token) } } @@ -217,7 +217,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten } } - case dt: TimestampType => + case TimestampType => (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { parser.getCurrentToken match { case VALUE_STRING => @@ -228,11 +228,11 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten case VALUE_NUMBER_INT => parser.getLongValue * 1000000L - case token => failedConversion(parser, dt, token) + case token => failedConversion(parser, dataType, token) } } - case dt: DateType => + case DateType => (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { parser.getCurrentToken match { case VALUE_STRING => @@ -246,15 +246,15 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten stringValue.toInt } - case token => failedConversion(parser, dt, token) + case token => failedConversion(parser, dataType, token) } } - case dt: BinaryType => + case BinaryType => (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { parser.getCurrentToken match { case VALUE_STRING => parser.getBinaryValue - case token => failedConversion(parser, dt, token) + case token => failedConversion(parser, dataType, token) } } @@ -295,12 +295,12 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten } } - case t: UserDefinedType[_] => - makeConverter(t.sqlType) + case udt: UserDefinedType[_] => + makeConverter(udt.sqlType) - case dt: DataType => + case _ => (parser: JsonParser) => - failedConversion(parser, dt, parser.getCurrentToken) + failedConversion(parser, dataType, parser.getCurrentToken) } private def skipFieldNameTokenIfExists(parser: JsonParser)(f: => Any): Any = { From 5d30f2b60aff2820cdff5d2a1b641570c550e174 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 8 Jul 2016 15:09:26 +0900 Subject: [PATCH 06/23] Add some more comments --- .../datasources/json/JacksonParser.scala | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 9c450adcbab1a..63fb61760070d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -51,8 +51,13 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten parser: JsonParser, dataType: DataType, token: JsonToken): Any = token match { - case null | VALUE_NULL => null - case _ if parser.getTextLength < 1 => null + case null | VALUE_NULL => + null + + case _ if parser.getTextLength < 1 => + // guard the non string type + null + case _ => // We cannot parse this token based on the given data type. So, we throw a // SparkSQLJsonProcessingException and this exception will be caught by @@ -80,9 +85,9 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten } /** - * Parse the current token (and related children) according to a desired schema - * This is a wrapper for the method `convertField()` to handle a row wrapped - * with an array. + * Create a converter which converts the JSON documents held by the `JsonParser` + * to a value according to a desired schema. This is a wrapper for the method + * `makeConverter()` to handle a row wrapped with an array. */ def makeRootConverter(dataType: DataType): ValueConverter = dataType match { case st: StructType => @@ -110,6 +115,10 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten case _ => makeConverter(dataType) } + /** + * Create a converter which converts the JSON documents held by the `JsonParser` + * to a value according to a desired schema. + */ private def makeConverter(dataType: DataType): ValueConverter = dataType match { case BooleanType => (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { @@ -315,7 +324,6 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten /** * Parse an object from the token stream into a new Row representing the schema. - * * Fields in the json that are not defined in the requested schema will be dropped. */ private def convertObject( @@ -352,6 +360,9 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten ArrayBasedMapData(keys.toArray, values.toArray) } + /** + * Parse an object as a Array + */ private def convertArray( parser: JsonParser, fieldConverter: ValueConverter): ArrayData = { @@ -363,6 +374,9 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten new GenericArrayData(values.toArray) } + /** + * Parse the string JSON input to the set of [[InternalRow]]s. + */ def parse(input: String): Seq[InternalRow] = { if (input.trim.isEmpty) { Nil From 83a7ee9bb5aca13d3eb51132ff220b9498aaa412 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 8 Jul 2016 15:31:56 +0900 Subject: [PATCH 07/23] Clean up pattern matching more --- .../datasources/json/JacksonParser.scala | 41 +++++++++---------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 63fb61760070d..d14ab31753074 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -49,8 +49,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten private def failedConversion( parser: JsonParser, - dataType: DataType, - token: JsonToken): Any = token match { + dataType: DataType): Any = parser.getCurrentToken match { case null | VALUE_NULL => null @@ -58,7 +57,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten // guard the non string type null - case _ => + case token => // We cannot parse this token based on the given data type. So, we throw a // SparkSQLJsonProcessingException and this exception will be caught by // parseJson method. @@ -98,7 +97,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten (parser: JsonParser) => parser.getCurrentToken match { case START_OBJECT => convertObject(parser, st, fieldConverters) case START_ARRAY => convertArray(parser, elementConverter) - case token => failedConversion(parser, st, token) + case _ => failedConversion(parser, st) } case ArrayType(st: StructType, _) => @@ -109,7 +108,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten (parser: JsonParser) => parser.getCurrentToken match { case START_OBJECT => convertObject(parser, st, fieldConverters) case START_ARRAY => convertArray(parser, elementConverter) - case token => failedConversion(parser, st, token) + case _ => failedConversion(parser, st) } case _ => makeConverter(dataType) @@ -125,7 +124,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten parser.getCurrentToken match { case VALUE_TRUE => true case VALUE_FALSE => false - case token => failedConversion(parser, dataType, token) + case _ => failedConversion(parser, dataType) } } @@ -133,7 +132,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT => parser.getByteValue - case token => failedConversion(parser, dataType, token) + case _ => failedConversion(parser, dataType) } } @@ -141,7 +140,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT => parser.getShortValue - case token => failedConversion(parser, dataType, token) + case _ => failedConversion(parser, dataType) } } @@ -149,7 +148,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT => parser.getIntValue - case token => failedConversion(parser, dataType, token) + case _ => failedConversion(parser, dataType) } } @@ -157,7 +156,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT => parser.getLongValue - case token => failedConversion(parser, dataType, token) + case _ => failedConversion(parser, dataType) } } @@ -181,7 +180,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.") } - case token => failedConversion(parser, dataType, token) + case _ => failedConversion(parser, dataType) } } @@ -205,7 +204,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.") } - case token => failedConversion(parser, dataType, token) + case _ => failedConversion(parser, dataType) } } @@ -222,7 +221,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten } UTF8String.fromBytes(writer.toByteArray) - case token => failedConversion(parser, dataType, token) + case _ => failedConversion(parser, dataType) } } @@ -237,7 +236,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten case VALUE_NUMBER_INT => parser.getLongValue * 1000000L - case token => failedConversion(parser, dataType, token) + case _ => failedConversion(parser, dataType) } } @@ -255,7 +254,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten stringValue.toInt } - case token => failedConversion(parser, dataType, token) + case _ => failedConversion(parser, dataType) } } @@ -263,7 +262,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { parser.getCurrentToken match { case VALUE_STRING => parser.getBinaryValue - case token => failedConversion(parser, dataType, token) + case _ => failedConversion(parser, dataType) } } @@ -273,7 +272,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) => Decimal(parser.getDecimalValue, dt.precision, dt.scale) - case token => failedConversion(parser, dt, token) + case _ => failedConversion(parser, dt) } } @@ -282,7 +281,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { parser.getCurrentToken match { case START_OBJECT => convertObject(parser, st, fieldConverters) - case token => failedConversion(parser, st, token) + case _ => failedConversion(parser, st) } } @@ -291,7 +290,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { parser.getCurrentToken match { case START_ARRAY => convertArray(parser, elementConverter) - case token => failedConversion(parser, at, token) + case _ => failedConversion(parser, at) } } @@ -300,7 +299,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { parser.getCurrentToken match { case START_OBJECT => convertMap(parser, valueConverter) - case token => failedConversion(parser, mt, token) + case _ => failedConversion(parser, mt) } } @@ -309,7 +308,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten case _ => (parser: JsonParser) => - failedConversion(parser, dataType, parser.getCurrentToken) + failedConversion(parser, dataType) } private def skipFieldNameTokenIfExists(parser: JsonParser)(f: => Any): Any = { From 95c8d77bb3175a6174295fd2c8900bd1e64169fe Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 19 Jul 2016 09:52:20 +0900 Subject: [PATCH 08/23] Revert columnNameOfCorruptRecord option change --- .../org/apache/spark/sql/DataFrameReader.scala | 9 ++++++--- .../datasources/json/InferSchema.scala | 6 +++--- .../datasources/json/JSONOptions.scala | 7 ++----- .../datasources/json/JacksonParser.scala | 8 ++++++-- .../datasources/json/JsonFileFormat.scala | 14 +++++++++----- .../execution/datasources/json/JsonSuite.scala | 17 ++++++----------- 6 files changed, 32 insertions(+), 29 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index c6a14e4c777c0..948ed7877a115 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -309,15 +309,18 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def json(jsonRDD: RDD[String]): DataFrame = { - val conf = sparkSession.sessionState.conf - val parsedOptions: JSONOptions = new JSONOptions(extraOptions.toMap, conf) + val parsedOptions: JSONOptions = new JSONOptions(extraOptions.toMap) + val columnNameOfCorruptRecord = + parsedOptions.columnNameOfCorruptRecord + .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) val schema = userSpecifiedSchema.getOrElse { InferSchema.infer( jsonRDD, + columnNameOfCorruptRecord, parsedOptions) } val parsed = jsonRDD.mapPartitions { iter => - val parser = new JacksonParser(schema, parsedOptions) + val parser = new JacksonParser(schema, columnNameOfCorruptRecord, parsedOptions) iter.flatMap { record => parser.parse(record) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala index 69d2014eee079..91c58d059d287 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala @@ -37,11 +37,11 @@ private[sql] object InferSchema { */ def infer( json: RDD[String], + columnNameOfCorruptRecord: String, configOptions: JSONOptions): StructType = { require(configOptions.samplingRatio > 0, s"samplingRatio (${configOptions.samplingRatio}) should be greater than 0") val shouldHandleCorruptRecord = configOptions.permissive - val columnNameOfCorruptRecords = configOptions.columnNameOfCorruptRecord val schemaData = if (configOptions.samplingRatio > 0.99) { json } else { @@ -60,13 +60,13 @@ private[sql] object InferSchema { } } catch { case _: JsonParseException if shouldHandleCorruptRecord => - Some(StructType(Seq(StructField(columnNameOfCorruptRecords, StringType)))) + Some(StructType(Seq(StructField(columnNameOfCorruptRecord, StringType)))) case _: JsonParseException => None } } }.fold(StructType(Seq()))( - compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord)) + compatibleRootType(columnNameOfCorruptRecord, shouldHandleCorruptRecord)) canonicalizeType(rootType) match { case Some(st: StructType) => st diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala index 9ba48a32c268a..66f1126fb9ae6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala @@ -21,7 +21,6 @@ import com.fasterxml.jackson.core.{JsonFactory, JsonParser} import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes} -import org.apache.spark.sql.internal.SQLConf /** * Options for the JSON data source. @@ -29,8 +28,7 @@ import org.apache.spark.sql.internal.SQLConf * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]]. */ private[sql] class JSONOptions( - @transient private val parameters: Map[String, String], - @transient private val sqlConf: SQLConf) + @transient private val parameters: Map[String, String]) extends Logging with Serializable { val samplingRatio = @@ -53,8 +51,7 @@ private[sql] class JSONOptions( parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false) val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName) private val parseMode = parameters.getOrElse("mode", "PERMISSIVE") - val columnNameOfCorruptRecord = - parameters.getOrElse("columnNameOfCorruptRecord", sqlConf.columnNameOfCorruptRecord) + val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord") // Parse mode flags if (!ParseModes.isValidMode(parseMode)) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index d14ab31753074..3d3414e4c7175 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -34,7 +34,11 @@ import org.apache.spark.util.Utils private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg) -private[sql] class JacksonParser(schema: StructType, options: JSONOptions) extends Logging { +private[sql] class JacksonParser( + schema: StructType, + columnNameOfCorruptRecord: String, + options: JSONOptions) extends Logging { + import com.fasterxml.jackson.core.JsonToken._ // A `ValueConverter` is responsible for converting a value from `JsonParser` @@ -75,7 +79,7 @@ private[sql] class JacksonParser(schema: StructType, options: JSONOptions) exten Nil } else { val row = new GenericMutableRow(schema.length) - for (corruptIndex <- schema.getFieldIndex(options.columnNameOfCorruptRecord)) { + for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) { require(schema(corruptIndex).dataType == StringType) row.update(corruptIndex, UTF8String.fromString(record)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index ab3cc362ecb43..353f4c864819b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -49,8 +49,10 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { if (files.isEmpty) { None } else { - val parsedOptions: JSONOptions = - new JSONOptions(options, sparkSession.sessionState.conf) + val parsedOptions: JSONOptions = new JSONOptions(options) + val columnNameOfCorruptRecord = + parsedOptions.columnNameOfCorruptRecord + .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) val jsonFiles = files.filterNot { status => val name = status.getPath.getName name.startsWith("_") || name.startsWith(".") @@ -58,6 +60,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { val jsonSchema = InferSchema.infer( createBaseRdd(sparkSession, jsonFiles), + columnNameOfCorruptRecord, parsedOptions) checkConstraints(jsonSchema) @@ -99,12 +102,13 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - val parsedOptions: JSONOptions = - new JSONOptions(options, sparkSession.sessionState.conf) + val parsedOptions: JSONOptions = new JSONOptions(options) + val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord + .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) (file: PartitionedFile) => { val lines = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map(_.toString) - val parser = new JacksonParser(requiredSchema, parsedOptions) + val parser = new JacksonParser(requiredSchema, columnNameOfCorruptRecord, parsedOptions) lines.flatMap { record => parser.parse(record) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index e88bb767fae7f..a7278b386af33 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -61,10 +61,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { generator.flush() } - val dummyOption = - new JSONOptions(Map.empty[String, String], new SQLConf()) + val dummyOption = new JSONOptions(Map.empty[String, String]) val dummySchema = StructType(Seq.empty) - val parser = new JacksonParser(dummySchema, dummyOption) + val parser = new JacksonParser(dummySchema, "", dummyOption) Utils.tryWithResource(factory.createParser(writer.toString)) { jsonParser => jsonParser.nextToken() @@ -1339,10 +1338,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("SPARK-6245 JsonRDD.inferSchema on empty RDD") { // This is really a test that it doesn't throw an exception - val options = Map("columnNameOfCorruptRecord" -> "") - val parsedOptions = - new JSONOptions(options, new SQLConf()) - val emptySchema = InferSchema.infer(empty, parsedOptions) + val parsedOptions = new JSONOptions(Map.empty[String, String]) + val emptySchema = InferSchema.infer(empty, "", parsedOptions) assert(StructType(Seq()) === emptySchema) } @@ -1366,10 +1363,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("SPARK-8093 Erase empty structs") { - val options = Map("columnNameOfCorruptRecord" -> "") - val parsedOptions = - new JSONOptions(options, new SQLConf()) - val emptySchema = InferSchema.infer(emptyRecords, parsedOptions) + val parsedOptions = new JSONOptions(Map.empty[String, String]) + val emptySchema = InferSchema.infer(emptyRecords, "", parsedOptions) assert(StructType(Seq()) === emptySchema) } From cfe6beda1a1db64aab5d2f84a68a5ee1e2bdd905 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 19 Jul 2016 10:42:11 +0900 Subject: [PATCH 09/23] Address comments --- .../datasources/json/JacksonParser.scala | 95 ++++++++++++------- .../datasources/json/JsonFileFormat.scala | 3 +- 2 files changed, 63 insertions(+), 35 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 3d3414e4c7175..682e611edb36e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -54,19 +54,17 @@ private[sql] class JacksonParser( private def failedConversion( parser: JsonParser, dataType: DataType): Any = parser.getCurrentToken match { - case null | VALUE_NULL => - null - case _ if parser.getTextLength < 1 => - // guard the non string type + // If conversion is failed, this produces `null` rather than + // returning empty string. This will protect the mismatch of types. null case token => - // We cannot parse this token based on the given data type. So, we throw a - // SparkSQLJsonProcessingException and this exception will be caught by - // parseJson method. - throw new SparkSQLJsonProcessingException( - s"Failed to parse a value for data type $dataType (current token: $token).") + // We cannot parse this token based on the given data type. So, we throw a + // SparkSQLJsonProcessingException and this exception will be caught by + // parseJson method. + throw new SparkSQLJsonProcessingException( + s"Failed to parse a value for data type $dataType (current token: $token).") } private def failedRecord(record: String): Seq[InternalRow] = { @@ -94,22 +92,34 @@ private[sql] class JacksonParser( */ def makeRootConverter(dataType: DataType): ValueConverter = dataType match { case st: StructType => - // SPARK-3308: support reading top level JSON arrays and take every element - // in such an array as a row val elementConverter = makeConverter(st) val fieldConverters = st.map(_.dataType).map(makeConverter) (parser: JsonParser) => parser.getCurrentToken match { case START_OBJECT => convertObject(parser, st, fieldConverters) + // SPARK-3308: support reading top level JSON arrays and take every element + // in such an array as a row + // + // For example, we support, the JSON data as below: + // + // [{"a":"str_a_1"}] + // [{"a":"str_a_2"}, {"b":"str_b_3"}] + // + // resulting in: + // + // List([str_a_1,null]) + // List([str_a_2,null], [null,str_b_3]) + // case START_ARRAY => convertArray(parser, elementConverter) case _ => failedConversion(parser, st) } case ArrayType(st: StructType, _) => - // the business end of SPARK-3308: - // when an object is found but an array is requested just wrap it in a list val elementConverter = makeConverter(st) val fieldConverters = st.map(_.dataType).map(makeConverter) (parser: JsonParser) => parser.getCurrentToken match { + // the business end of SPARK-3308: + // when an object is found but an array is requested just wrap it in a list. + // This is being wrapped in `JacksonParser.parse`. case START_OBJECT => convertObject(parser, st, fieldConverters) case START_ARRAY => convertArray(parser, elementConverter) case _ => failedConversion(parser, st) @@ -124,7 +134,7 @@ private[sql] class JacksonParser( */ private def makeConverter(dataType: DataType): ValueConverter = dataType match { case BooleanType => - (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + (parser: JsonParser) => convertField(parser) { parser.getCurrentToken match { case VALUE_TRUE => true case VALUE_FALSE => false @@ -133,7 +143,7 @@ private[sql] class JacksonParser( } case ByteType => - (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + (parser: JsonParser) => convertField(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT => parser.getByteValue case _ => failedConversion(parser, dataType) @@ -141,7 +151,7 @@ private[sql] class JacksonParser( } case ShortType => - (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + (parser: JsonParser) => convertField(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT => parser.getShortValue case _ => failedConversion(parser, dataType) @@ -149,7 +159,7 @@ private[sql] class JacksonParser( } case IntegerType => - (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + (parser: JsonParser) => convertField(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT => parser.getIntValue case _ => failedConversion(parser, dataType) @@ -157,7 +167,7 @@ private[sql] class JacksonParser( } case LongType => - (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + (parser: JsonParser) => convertField(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT => parser.getLongValue case _ => failedConversion(parser, dataType) @@ -165,7 +175,7 @@ private[sql] class JacksonParser( } case FloatType => - (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + (parser: JsonParser) => convertField(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT => parser.getFloatValue @@ -189,7 +199,7 @@ private[sql] class JacksonParser( } case DoubleType => - (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + (parser: JsonParser) => convertField(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT => parser.getDoubleValue @@ -213,24 +223,22 @@ private[sql] class JacksonParser( } case StringType => - (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + (parser: JsonParser) => convertField(parser) { parser.getCurrentToken match { case VALUE_STRING => UTF8String.fromString(parser.getText) - case token if token != VALUE_NULL => + case _ => val writer = new ByteArrayOutputStream() Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) { generator => generator.copyCurrentStructure(parser) } UTF8String.fromBytes(writer.toByteArray) - - case _ => failedConversion(parser, dataType) } } case TimestampType => - (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + (parser: JsonParser) => convertField(parser) { parser.getCurrentToken match { case VALUE_STRING => // This one will lose microseconds parts. @@ -245,7 +253,7 @@ private[sql] class JacksonParser( } case DateType => - (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + (parser: JsonParser) => convertField(parser) { parser.getCurrentToken match { case VALUE_STRING => val stringValue = parser.getText @@ -263,7 +271,7 @@ private[sql] class JacksonParser( } case BinaryType => - (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + (parser: JsonParser) => convertField(parser) { parser.getCurrentToken match { case VALUE_STRING => parser.getBinaryValue case _ => failedConversion(parser, dataType) @@ -271,7 +279,7 @@ private[sql] class JacksonParser( } case dt: DecimalType => - (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + (parser: JsonParser) => convertField(parser) { parser.getCurrentToken match { case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) => Decimal(parser.getDecimalValue, dt.precision, dt.scale) @@ -282,7 +290,7 @@ private[sql] class JacksonParser( case st: StructType => val fieldConverters = st.map(_.dataType).map(makeConverter) - (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + (parser: JsonParser) => convertField(parser) { parser.getCurrentToken match { case START_OBJECT => convertObject(parser, st, fieldConverters) case _ => failedConversion(parser, st) @@ -291,7 +299,7 @@ private[sql] class JacksonParser( case at: ArrayType => val elementConverter = makeConverter(at.elementType) - (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + (parser: JsonParser) => convertField(parser) { parser.getCurrentToken match { case START_ARRAY => convertArray(parser, elementConverter) case _ => failedConversion(parser, at) @@ -300,7 +308,7 @@ private[sql] class JacksonParser( case mt: MapType => val valueConverter = makeConverter(mt.valueType) - (parser: JsonParser) => skipFieldNameTokenIfExists(parser) { + (parser: JsonParser) => convertField(parser) { parser.getCurrentToken match { case START_OBJECT => convertMap(parser, valueConverter) case _ => failedConversion(parser, mt) @@ -315,11 +323,32 @@ private[sql] class JacksonParser( failedConversion(parser, dataType) } - private def skipFieldNameTokenIfExists(parser: JsonParser)(f: => Any): Any = { + /** + * This converts a field. If this is called after `START_OBJECT`, then, the next token can be + * `FIELD_NAME`. Since the names are kept in `JacksonParser.convertObject`, this `FIELD_NAME` + * token can be skipped as below. When this is called after `START_ARRAY`, the tokens become + * ones about values until `END_ARRAY`. In this case, we don't have to skip. + */ + private def convertField(parser: JsonParser)(f: => Any): Any = { parser.getCurrentToken match { case FIELD_NAME => parser.nextToken - f + convertValue(parser)(f) + + case _ => + convertValue(parser)(f) + } + } + + /** + * This converts a value. The given function `f` is responsible for converting the actual values + * but before trying to convert it, here we check if the current value should be converted into + * null or not. This should be checked for all the data types. + */ + private def convertValue(parser: JsonParser)(f: => Any): Any = { + parser.getCurrentToken match { + case null | VALUE_NULL => null + case _ => f } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 353f4c864819b..384c337b3ee1e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -74,8 +74,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { val conf = job.getConfiguration - val parsedOptions: JSONOptions = - new JSONOptions(options, sparkSession.sessionState.conf) + val parsedOptions: JSONOptions = new JSONOptions(options) parsedOptions.compressionCodec.foreach { codec => CompressionCodecs.setCodecConfiguration(conf, codec) } From 47fefe881f82bc0b1fe75d61bb17354ffcc4a891 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 21 Jul 2016 15:01:58 +0900 Subject: [PATCH 10/23] Fix indentation and style nits --- .../scala/org/apache/spark/sql/DataFrameReader.scala | 9 ++------- .../sql/execution/datasources/json/JacksonParser.scala | 4 +--- .../sql/execution/datasources/json/JsonFileFormat.scala | 5 +---- 3 files changed, 4 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 948ed7877a115..b3bd803363658 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -314,16 +314,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) val schema = userSpecifiedSchema.getOrElse { - InferSchema.infer( - jsonRDD, - columnNameOfCorruptRecord, - parsedOptions) + InferSchema.infer(jsonRDD, columnNameOfCorruptRecord, parsedOptions) } val parsed = jsonRDD.mapPartitions { iter => val parser = new JacksonParser(schema, columnNameOfCorruptRecord, parsedOptions) - iter.flatMap { record => - parser.parse(record) - } + iter.flatMap(parser.parse) } Dataset.ofRows( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 682e611edb36e..e5419ae13f193 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -348,9 +348,7 @@ private[sql] class JacksonParser( private def convertValue(parser: JsonParser)(f: => Any): Any = { parser.getCurrentToken match { case null | VALUE_NULL => null - - case _ => - f + case _ => f } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 384c337b3ee1e..584cd5b146eb5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -108,10 +108,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { (file: PartitionedFile) => { val lines = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map(_.toString) val parser = new JacksonParser(requiredSchema, columnNameOfCorruptRecord, parsedOptions) - - lines.flatMap { record => - parser.parse(record) - } + lines.flatMap(parser.parse) } } From 4cf5d11b0a24bab7f1baf9400511b377e33d90c9 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 21 Jul 2016 15:03:03 +0900 Subject: [PATCH 11/23] Fix indentation --- .../main/scala/org/apache/spark/sql/DataFrameReader.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index b3bd803363658..e23dacc7a1c05 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -314,7 +314,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) val schema = userSpecifiedSchema.getOrElse { - InferSchema.infer(jsonRDD, columnNameOfCorruptRecord, parsedOptions) + InferSchema.infer( + jsonRDD, + columnNameOfCorruptRecord, + parsedOptions) } val parsed = jsonRDD.mapPartitions { iter => val parser = new JacksonParser(schema, columnNameOfCorruptRecord, parsedOptions) From 25c142385a27595699c00e9aeb22ab83b73bc47b Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 10 Aug 2016 09:22:43 +0900 Subject: [PATCH 12/23] Remove private[sql] in JacksonParser --- .../spark/sql/execution/datasources/json/JacksonParser.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index e5419ae13f193..4235a452ffcd7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -34,7 +34,7 @@ import org.apache.spark.util.Utils private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg) -private[sql] class JacksonParser( +class JacksonParser( schema: StructType, columnNameOfCorruptRecord: String, options: JSONOptions) extends Logging { From 309932bb8787d20dcebb750bf87f94cc54cc777e Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 10 Aug 2016 18:38:45 +0900 Subject: [PATCH 13/23] Address comments --- .../datasources/json/JacksonParser.scala | 59 ++++++++----------- .../datasources/json/JsonSuite.scala | 6 +- 2 files changed, 28 insertions(+), 37 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 4235a452ffcd7..1b9cada75a2f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -54,7 +54,7 @@ class JacksonParser( private def failedConversion( parser: JsonParser, dataType: DataType): Any = parser.getCurrentToken match { - case _ if parser.getTextLength < 1 => + case VALUE_STRING if parser.getTextLength < 1 => // If conversion is failed, this produces `null` rather than // returning empty string. This will protect the mismatch of types. null @@ -134,7 +134,7 @@ class JacksonParser( */ private def makeConverter(dataType: DataType): ValueConverter = dataType match { case BooleanType => - (parser: JsonParser) => convertField(parser) { + (parser: JsonParser) => handleNullAndFieldTokens(parser) { parser.getCurrentToken match { case VALUE_TRUE => true case VALUE_FALSE => false @@ -143,7 +143,7 @@ class JacksonParser( } case ByteType => - (parser: JsonParser) => convertField(parser) { + (parser: JsonParser) => handleNullAndFieldTokens(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT => parser.getByteValue case _ => failedConversion(parser, dataType) @@ -151,7 +151,7 @@ class JacksonParser( } case ShortType => - (parser: JsonParser) => convertField(parser) { + (parser: JsonParser) => handleNullAndFieldTokens(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT => parser.getShortValue case _ => failedConversion(parser, dataType) @@ -159,7 +159,7 @@ class JacksonParser( } case IntegerType => - (parser: JsonParser) => convertField(parser) { + (parser: JsonParser) => handleNullAndFieldTokens(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT => parser.getIntValue case _ => failedConversion(parser, dataType) @@ -167,7 +167,7 @@ class JacksonParser( } case LongType => - (parser: JsonParser) => convertField(parser) { + (parser: JsonParser) => handleNullAndFieldTokens(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT => parser.getLongValue case _ => failedConversion(parser, dataType) @@ -175,7 +175,7 @@ class JacksonParser( } case FloatType => - (parser: JsonParser) => convertField(parser) { + (parser: JsonParser) => handleNullAndFieldTokens(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT => parser.getFloatValue @@ -199,7 +199,7 @@ class JacksonParser( } case DoubleType => - (parser: JsonParser) => convertField(parser) { + (parser: JsonParser) => handleNullAndFieldTokens(parser) { parser.getCurrentToken match { case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT => parser.getDoubleValue @@ -223,7 +223,7 @@ class JacksonParser( } case StringType => - (parser: JsonParser) => convertField(parser) { + (parser: JsonParser) => handleNullAndFieldTokens(parser) { parser.getCurrentToken match { case VALUE_STRING => UTF8String.fromString(parser.getText) @@ -238,7 +238,7 @@ class JacksonParser( } case TimestampType => - (parser: JsonParser) => convertField(parser) { + (parser: JsonParser) => handleNullAndFieldTokens(parser) { parser.getCurrentToken match { case VALUE_STRING => // This one will lose microseconds parts. @@ -253,7 +253,7 @@ class JacksonParser( } case DateType => - (parser: JsonParser) => convertField(parser) { + (parser: JsonParser) => handleNullAndFieldTokens(parser) { parser.getCurrentToken match { case VALUE_STRING => val stringValue = parser.getText @@ -271,7 +271,7 @@ class JacksonParser( } case BinaryType => - (parser: JsonParser) => convertField(parser) { + (parser: JsonParser) => handleNullAndFieldTokens(parser) { parser.getCurrentToken match { case VALUE_STRING => parser.getBinaryValue case _ => failedConversion(parser, dataType) @@ -279,7 +279,7 @@ class JacksonParser( } case dt: DecimalType => - (parser: JsonParser) => convertField(parser) { + (parser: JsonParser) => handleNullAndFieldTokens(parser) { parser.getCurrentToken match { case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) => Decimal(parser.getDecimalValue, dt.precision, dt.scale) @@ -290,7 +290,7 @@ class JacksonParser( case st: StructType => val fieldConverters = st.map(_.dataType).map(makeConverter) - (parser: JsonParser) => convertField(parser) { + (parser: JsonParser) => handleNullAndFieldTokens(parser) { parser.getCurrentToken match { case START_OBJECT => convertObject(parser, st, fieldConverters) case _ => failedConversion(parser, st) @@ -299,7 +299,7 @@ class JacksonParser( case at: ArrayType => val elementConverter = makeConverter(at.elementType) - (parser: JsonParser) => convertField(parser) { + (parser: JsonParser) => handleNullAndFieldTokens(parser) { parser.getCurrentToken match { case START_ARRAY => convertArray(parser, elementConverter) case _ => failedConversion(parser, at) @@ -308,7 +308,7 @@ class JacksonParser( case mt: MapType => val valueConverter = makeConverter(mt.valueType) - (parser: JsonParser) => convertField(parser) { + (parser: JsonParser) => handleNullAndFieldTokens(parser) { parser.getCurrentToken match { case START_OBJECT => convertMap(parser, valueConverter) case _ => failedConversion(parser, mt) @@ -324,29 +324,22 @@ class JacksonParser( } /** - * This converts a field. If this is called after `START_OBJECT`, then, the next token can be + * This handles nulls ahead before trying to check the tokens, and applies + * the conversion function. Also, it checks `FIELD_NAME` if exists and then skip. + * If this is called after `START_OBJECT`, then, the next token can be * `FIELD_NAME`. Since the names are kept in `JacksonParser.convertObject`, this `FIELD_NAME` * token can be skipped as below. When this is called after `START_ARRAY`, the tokens become * ones about values until `END_ARRAY`. In this case, we don't have to skip. */ - private def convertField(parser: JsonParser)(f: => Any): Any = { + private def handleNullAndFieldTokens (parser: JsonParser)(f: => Any): Any = { parser.getCurrentToken match { case FIELD_NAME => - parser.nextToken - convertValue(parser)(f) - - case _ => - convertValue(parser)(f) - } - } - - /** - * This converts a value. The given function `f` is responsible for converting the actual values - * but before trying to convert it, here we check if the current value should be converted into - * null or not. This should be checked for all the data types. - */ - private def convertValue(parser: JsonParser)(f: => Any): Any = { - parser.getCurrentToken match { + parser.nextToken match { + // Before trying to convert it, here we check if the current value should be converted + // into null or not. This should be checked for all the data types. + case null | VALUE_NULL => null + case _ => f + } case null | VALUE_NULL => null case _ => f } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index a7e89b3a64402..0b23986572bc1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1338,8 +1338,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("SPARK-6245 JsonRDD.inferSchema on empty RDD") { // This is really a test that it doesn't throw an exception - val parsedOptions = new JSONOptions(Map.empty[String, String]) - val emptySchema = InferSchema.infer(empty, "", parsedOptions) + val emptySchema = InferSchema.infer(empty, "", new JSONOptions(Map())) assert(StructType(Seq()) === emptySchema) } @@ -1363,8 +1362,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("SPARK-8093 Erase empty structs") { - val parsedOptions = new JSONOptions(Map.empty[String, String]) - val emptySchema = InferSchema.infer(emptyRecords, "", parsedOptions) + val emptySchema = InferSchema.infer(empty, "", new JSONOptions(Map())) assert(StructType(Seq()) === emptySchema) } From 7077ee6254ea307afd2c4385c8c19a4fedac89a1 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 10 Aug 2016 18:40:46 +0900 Subject: [PATCH 14/23] Fix the test --- .../apache/spark/sql/execution/datasources/json/JsonSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 0b23986572bc1..342fd3e82ee06 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1362,7 +1362,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("SPARK-8093 Erase empty structs") { - val emptySchema = InferSchema.infer(empty, "", new JSONOptions(Map())) + val emptySchema = InferSchema.infer(emptyRecords, "", new JSONOptions(Map())) assert(StructType(Seq()) === emptySchema) } From 726aa7fcb1a438d0a3885e77d1004bb782bbf5ec Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 10 Aug 2016 19:02:02 +0900 Subject: [PATCH 15/23] Fix comments --- .../sql/execution/datasources/json/JacksonParser.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 1b9cada75a2f6..803313ed317cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -56,7 +56,7 @@ class JacksonParser( dataType: DataType): Any = parser.getCurrentToken match { case VALUE_STRING if parser.getTextLength < 1 => // If conversion is failed, this produces `null` rather than - // returning empty string. This will protect the mismatch of types. + // rather than throw exception. This will protect the mismatch of types. null case token => @@ -368,7 +368,7 @@ class JacksonParser( } /** - * Parse an object as a Map, preserving all fields + * Parse an object as a Map, preserving all fields. */ private def convertMap( parser: JsonParser, @@ -384,7 +384,7 @@ class JacksonParser( } /** - * Parse an object as a Array + * Parse an object as a Array. */ private def convertArray( parser: JsonParser, From bceda7ba4f06c0b6fd99f11ef2662f9f3a154af0 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 11 Aug 2016 11:45:37 +0900 Subject: [PATCH 16/23] Address the suggestion and then fix some comments --- .../datasources/json/JacksonParser.scala | 297 ++++++++---------- 1 file changed, 128 insertions(+), 169 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 803313ed317cd..e35b9e037e6b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -51,22 +51,6 @@ class JacksonParser( private val factory = new JsonFactory() options.setJacksonOptions(factory) - private def failedConversion( - parser: JsonParser, - dataType: DataType): Any = parser.getCurrentToken match { - case VALUE_STRING if parser.getTextLength < 1 => - // If conversion is failed, this produces `null` rather than - // rather than throw exception. This will protect the mismatch of types. - null - - case token => - // We cannot parse this token based on the given data type. So, we throw a - // SparkSQLJsonProcessingException and this exception will be caught by - // parseJson method. - throw new SparkSQLJsonProcessingException( - s"Failed to parse a value for data type $dataType (current token: $token).") - } - private def failedRecord(record: String): Seq[InternalRow] = { // create a row even if no corrupt record column is present if (options.failFast) { @@ -94,7 +78,7 @@ class JacksonParser( case st: StructType => val elementConverter = makeConverter(st) val fieldConverters = st.map(_.dataType).map(makeConverter) - (parser: JsonParser) => parser.getCurrentToken match { + (parser: JsonParser) => parseJsonToken(parser, dataType) { case START_OBJECT => convertObject(parser, st, fieldConverters) // SPARK-3308: support reading top level JSON arrays and take every element // in such an array as a row @@ -110,19 +94,17 @@ class JacksonParser( // List([str_a_2,null], [null,str_b_3]) // case START_ARRAY => convertArray(parser, elementConverter) - case _ => failedConversion(parser, st) } case ArrayType(st: StructType, _) => val elementConverter = makeConverter(st) val fieldConverters = st.map(_.dataType).map(makeConverter) - (parser: JsonParser) => parser.getCurrentToken match { + (parser: JsonParser) => parseJsonToken(parser, dataType) { // the business end of SPARK-3308: // when an object is found but an array is requested just wrap it in a list. // This is being wrapped in `JacksonParser.parse`. case START_OBJECT => convertObject(parser, st, fieldConverters) case START_ARRAY => convertArray(parser, elementConverter) - case _ => failedConversion(parser, st) } case _ => makeConverter(dataType) @@ -134,185 +116,136 @@ class JacksonParser( */ private def makeConverter(dataType: DataType): ValueConverter = dataType match { case BooleanType => - (parser: JsonParser) => handleNullAndFieldTokens(parser) { - parser.getCurrentToken match { - case VALUE_TRUE => true - case VALUE_FALSE => false - case _ => failedConversion(parser, dataType) - } + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case VALUE_TRUE => true + case VALUE_FALSE => false } case ByteType => - (parser: JsonParser) => handleNullAndFieldTokens(parser) { - parser.getCurrentToken match { - case VALUE_NUMBER_INT => parser.getByteValue - case _ => failedConversion(parser, dataType) - } + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case VALUE_NUMBER_INT => parser.getByteValue } case ShortType => - (parser: JsonParser) => handleNullAndFieldTokens(parser) { - parser.getCurrentToken match { - case VALUE_NUMBER_INT => parser.getShortValue - case _ => failedConversion(parser, dataType) - } + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case VALUE_NUMBER_INT => parser.getShortValue } case IntegerType => - (parser: JsonParser) => handleNullAndFieldTokens(parser) { - parser.getCurrentToken match { - case VALUE_NUMBER_INT => parser.getIntValue - case _ => failedConversion(parser, dataType) - } + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case VALUE_NUMBER_INT => parser.getIntValue } case LongType => - (parser: JsonParser) => handleNullAndFieldTokens(parser) { - parser.getCurrentToken match { - case VALUE_NUMBER_INT => parser.getLongValue - case _ => failedConversion(parser, dataType) - } + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case VALUE_NUMBER_INT => parser.getLongValue } case FloatType => - (parser: JsonParser) => handleNullAndFieldTokens(parser) { - parser.getCurrentToken match { - case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT => - parser.getFloatValue - - case VALUE_STRING => - // Special case handling for NaN and Infinity. - val value = parser.getText - val lowerCaseValue = value.toLowerCase - if (lowerCaseValue.equals("nan") || - lowerCaseValue.equals("infinity") || - lowerCaseValue.equals("-infinity") || - lowerCaseValue.equals("inf") || - lowerCaseValue.equals("-inf")) { - value.toFloat - } else { - throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.") - } - - case _ => failedConversion(parser, dataType) - } + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT => + parser.getFloatValue + + case VALUE_STRING => + // Special case handling for NaN and Infinity. + val value = parser.getText + val lowerCaseValue = value.toLowerCase + if (lowerCaseValue.equals("nan") || + lowerCaseValue.equals("infinity") || + lowerCaseValue.equals("-infinity") || + lowerCaseValue.equals("inf") || + lowerCaseValue.equals("-inf")) { + value.toFloat + } else { + throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.") + } } case DoubleType => - (parser: JsonParser) => handleNullAndFieldTokens(parser) { - parser.getCurrentToken match { - case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT => - parser.getDoubleValue - - case VALUE_STRING => - // Special case handling for NaN and Infinity. - val value = parser.getText - val lowerCaseValue = value.toLowerCase - if (lowerCaseValue.equals("nan") || - lowerCaseValue.equals("infinity") || - lowerCaseValue.equals("-infinity") || - lowerCaseValue.equals("inf") || - lowerCaseValue.equals("-inf")) { - value.toDouble - } else { - throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.") - } - - case _ => failedConversion(parser, dataType) - } + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT => + parser.getDoubleValue + + case VALUE_STRING => + // Special case handling for NaN and Infinity. + val value = parser.getText + val lowerCaseValue = value.toLowerCase + if (lowerCaseValue.equals("nan") || + lowerCaseValue.equals("infinity") || + lowerCaseValue.equals("-infinity") || + lowerCaseValue.equals("inf") || + lowerCaseValue.equals("-inf")) { + value.toDouble + } else { + throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.") + } } case StringType => - (parser: JsonParser) => handleNullAndFieldTokens(parser) { - parser.getCurrentToken match { - case VALUE_STRING => - UTF8String.fromString(parser.getText) - - case _ => - val writer = new ByteArrayOutputStream() - Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) { - generator => generator.copyCurrentStructure(parser) - } - UTF8String.fromBytes(writer.toByteArray) - } + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case VALUE_STRING => + UTF8String.fromString(parser.getText) + + case _ => + val writer = new ByteArrayOutputStream() + Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) { + generator => generator.copyCurrentStructure(parser) + } + UTF8String.fromBytes(writer.toByteArray) } case TimestampType => - (parser: JsonParser) => handleNullAndFieldTokens(parser) { - parser.getCurrentToken match { - case VALUE_STRING => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - DateTimeUtils.stringToTime(parser.getText).getTime * 1000L - - case VALUE_NUMBER_INT => - parser.getLongValue * 1000000L - - case _ => failedConversion(parser, dataType) - } + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case VALUE_STRING => + // This one will lose microseconds parts. + // See https://issues.apache.org/jira/browse/SPARK-10681. + DateTimeUtils.stringToTime(parser.getText).getTime * 1000L + + case VALUE_NUMBER_INT => + parser.getLongValue * 1000000L } case DateType => - (parser: JsonParser) => handleNullAndFieldTokens(parser) { - parser.getCurrentToken match { - case VALUE_STRING => - val stringValue = parser.getText - if (stringValue.contains("-")) { - // The format of this string will probably be "yyyy-mm-dd". - DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime) - } else { - // In Spark 1.5.0, we store the data as number of days since epoch in string. - // So, we just convert it to Int. - stringValue.toInt - } - - case _ => failedConversion(parser, dataType) - } + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case VALUE_STRING => + val stringValue = parser.getText + if (stringValue.contains("-")) { + // The format of this string will probably be "yyyy-mm-dd". + DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime) + } else { + // In Spark 1.5.0, we store the data as number of days since epoch in string. + // So, we just convert it to Int. + stringValue.toInt + } } case BinaryType => - (parser: JsonParser) => handleNullAndFieldTokens(parser) { - parser.getCurrentToken match { - case VALUE_STRING => parser.getBinaryValue - case _ => failedConversion(parser, dataType) - } + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case VALUE_STRING => parser.getBinaryValue } case dt: DecimalType => - (parser: JsonParser) => handleNullAndFieldTokens(parser) { - parser.getCurrentToken match { - case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) => - Decimal(parser.getDecimalValue, dt.precision, dt.scale) - - case _ => failedConversion(parser, dt) - } + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) => + Decimal(parser.getDecimalValue, dt.precision, dt.scale) } case st: StructType => val fieldConverters = st.map(_.dataType).map(makeConverter) - (parser: JsonParser) => handleNullAndFieldTokens(parser) { - parser.getCurrentToken match { - case START_OBJECT => convertObject(parser, st, fieldConverters) - case _ => failedConversion(parser, st) - } + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case START_OBJECT => convertObject(parser, st, fieldConverters) } case at: ArrayType => val elementConverter = makeConverter(at.elementType) - (parser: JsonParser) => handleNullAndFieldTokens(parser) { - parser.getCurrentToken match { - case START_ARRAY => convertArray(parser, elementConverter) - case _ => failedConversion(parser, at) - } + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case START_ARRAY => convertArray(parser, elementConverter) } case mt: MapType => val valueConverter = makeConverter(mt.valueType) - (parser: JsonParser) => handleNullAndFieldTokens(parser) { - parser.getCurrentToken match { - case START_OBJECT => convertMap(parser, valueConverter) - case _ => failedConversion(parser, mt) - } + (parser: JsonParser) => parseJsonToken(parser, dataType) { + case START_OBJECT => convertMap(parser, valueConverter) } case udt: UserDefinedType[_] => @@ -320,28 +253,54 @@ class JacksonParser( case _ => (parser: JsonParser) => - failedConversion(parser, dataType) + parseJsonToken(parser, dataType) { + case token => + throw new SparkSQLJsonProcessingException( + s"Failed to parse a value for data type $dataType (current token: $token).") + } } /** - * This handles nulls ahead before trying to check the tokens, and applies - * the conversion function. Also, it checks `FIELD_NAME` if exists and then skip. - * If this is called after `START_OBJECT`, then, the next token can be - * `FIELD_NAME`. Since the names are kept in `JacksonParser.convertObject`, this `FIELD_NAME` - * token can be skipped as below. When this is called after `START_ARRAY`, the tokens become - * ones about values until `END_ARRAY`. In this case, we don't have to skip. + * This handles nulls ahead before trying to check the tokens, and applies the conversion + * function and then checks failed the conversion afterward if it `f` fails to convert the value. + * + * In more details, it checks `FIELD_NAME` if exists and then skip. If this is called after + * `START_OBJECT`, then, the next token can be `FIELD_NAME`. Since the names are kept in + * `JacksonParser.convertObject`, this `FIELD_NAME` token can be skipped as below. When this + * is called after `START_ARRAY`, the tokens become ones about values until `END_ARRAY`. + * In this case, we don't have to skip. + * + * We check if the current token is null or not after that. Then, we apply `f` to convert + * the value and then we check failed conversion afterward if it `f` fails to convert the value. */ - private def handleNullAndFieldTokens (parser: JsonParser)(f: => Any): Any = { + private def parseJsonToken( + parser: JsonParser, + dataType: DataType)(f: PartialFunction[JsonToken, Any]): Any = { parser.getCurrentToken match { case FIELD_NAME => - parser.nextToken match { - // Before trying to convert it, here we check if the current value should be converted - // into null or not. This should be checked for all the data types. - case null | VALUE_NULL => null - case _ => f - } + parser.nextToken() + parseJsonToken(parser, dataType)(f) + case null | VALUE_NULL => null - case _ => f + + case other => f.orElse { + // We should specify the type of this `PartialFunction`. Otherwise this will + // throw a compilation error, "The argument types of an anonymous function + // must be fully known. (SLS 8.5)". + { + case VALUE_STRING if parser.getTextLength < 1 => + // If conversion is failed, this produces `null` rather than + // rather than throw exception. This will protect the mismatch of types. + null + + case token => + // We cannot parse this token based on the given data type. So, we throw a + // SparkSQLJsonProcessingException and this exception will be caught by + // parseJson method. + throw new SparkSQLJsonProcessingException( + s"Failed to parse a value for data type $dataType (current token: $token).") + }: PartialFunction[JsonToken, Any] + }.apply(other) } } From 055e14bb665c81979c8ee14b9b7619b2ea6b6d18 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 11 Aug 2016 16:10:03 +0900 Subject: [PATCH 17/23] Address comments --- .../datasources/json/JacksonParser.scala | 61 +++++++++++-------- 1 file changed, 35 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index e35b9e037e6b3..68c746b8b9c6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -51,6 +51,10 @@ class JacksonParser( private val factory = new JsonFactory() options.setJacksonOptions(factory) + /** + * This function deals with the cases it fails to parse. This function will be called + * when exceptions are caught during converting. This functions also deals with `mode` option. + */ private def failedRecord(record: String): Seq[InternalRow] = { // create a row even if no corrupt record column is present if (options.failFast) { @@ -69,6 +73,26 @@ class JacksonParser( } } + /** + * This function will be called afterward except the case for `StringType`. we + * throw an exception when it is failed unless the value is null. + */ + private def failedConversion( + parser: JsonParser, + dataType: DataType): PartialFunction[JsonToken, Any] = { + case VALUE_STRING if parser.getTextLength < 1 => + // If conversion is failed, this produces `null` rather than + // rather than throw exception. This will protect the mismatch of types. + null + + case token => + // We cannot parse this token based on the given data type. So, we throw a + // SparkSQLJsonProcessingException and this exception will be caught by + // `parse` method. + throw new SparkSQLJsonProcessingException( + s"Failed to parse a value for data type $dataType (current token: $token).") + } + /** * Create a converter which converts the JSON documents held by the `JsonParser` * to a value according to a desired schema. This is a wrapper for the method @@ -187,6 +211,7 @@ class JacksonParser( UTF8String.fromString(parser.getText) case _ => + // Note that it always tries to convert the data as string without the case of failure. val writer = new ByteArrayOutputStream() Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) { generator => generator.copyCurrentStructure(parser) @@ -253,11 +278,10 @@ class JacksonParser( case _ => (parser: JsonParser) => - parseJsonToken(parser, dataType) { - case token => - throw new SparkSQLJsonProcessingException( - s"Failed to parse a value for data type $dataType (current token: $token).") - } + // Here, we pass empty `PartialFunction` so that this case can be + // handled as a failed conversion. It will throw an exception as + // long as the value is not null. + parseJsonToken(parser, dataType)(PartialFunction.empty[JsonToken, Any]) } /** @@ -283,24 +307,7 @@ class JacksonParser( case null | VALUE_NULL => null - case other => f.orElse { - // We should specify the type of this `PartialFunction`. Otherwise this will - // throw a compilation error, "The argument types of an anonymous function - // must be fully known. (SLS 8.5)". - { - case VALUE_STRING if parser.getTextLength < 1 => - // If conversion is failed, this produces `null` rather than - // rather than throw exception. This will protect the mismatch of types. - null - - case token => - // We cannot parse this token based on the given data type. So, we throw a - // SparkSQLJsonProcessingException and this exception will be caught by - // parseJson method. - throw new SparkSQLJsonProcessingException( - s"Failed to parse a value for data type $dataType (current token: $token).") - }: PartialFunction[JsonToken, Any] - }.apply(other) + case other => f.applyOrElse(other, failedConversion(parser, dataType)) } } @@ -310,11 +317,11 @@ class JacksonParser( */ private def convertObject( parser: JsonParser, - currentSchema: StructType, + schema: StructType, fieldConverters: Seq[ValueConverter]): InternalRow = { - val row = new GenericMutableRow(currentSchema.length) + val row = new GenericMutableRow(schema.length) while (nextUntil(parser, JsonToken.END_OBJECT)) { - currentSchema.getFieldIndex(parser.getCurrentName) match { + schema.getFieldIndex(parser.getCurrentName) match { case Some(index) => row.update(index, fieldConverters(index).apply(parser)) @@ -370,6 +377,8 @@ class JacksonParser( case null => failedRecord(input) case row: InternalRow => row :: Nil case array: ArrayData => + // Here, as we support reading top level JSON arrays and take every element + // in such an array as a row, this case is possible. if (array.numElements() == 0) { Nil } else { From 51452e90597026e36e72e3cf832a9c40e581c849 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 11 Aug 2016 17:32:23 +0900 Subject: [PATCH 18/23] Fix documentation and commnets --- .../datasources/json/JacksonParser.scala | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 68c746b8b9c6c..5a703374b737c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -81,8 +81,8 @@ class JacksonParser( parser: JsonParser, dataType: DataType): PartialFunction[JsonToken, Any] = { case VALUE_STRING if parser.getTextLength < 1 => - // If conversion is failed, this produces `null` rather than - // rather than throw exception. This will protect the mismatch of types. + // If conversion is failed, this produces `null` rather than throwing exception. + // This will protect the mismatch of types. null case token => @@ -286,22 +286,29 @@ class JacksonParser( /** * This handles nulls ahead before trying to check the tokens, and applies the conversion - * function and then checks failed the conversion afterward if it `f` fails to convert the value. + * function and then checks failed the conversion afterward if the `f` fails to convert + * the value. * * In more details, it checks `FIELD_NAME` if exists and then skip. If this is called after * `START_OBJECT`, then, the next token can be `FIELD_NAME`. Since the names are kept in * `JacksonParser.convertObject`, this `FIELD_NAME` token can be skipped as below. When this - * is called after `START_ARRAY`, the tokens become ones about values until `END_ARRAY`. - * In this case, we don't have to skip. + * is called after `START_ARRAY`, the tokens are consecutive tokens for values without + * `FIELD_NAME` until `END_ARRAY`. In this case, we don't have to skip. * - * We check if the current token is null or not after that. Then, we apply `f` to convert - * the value and then we check failed conversion afterward if it `f` fails to convert the value. + * For example, parsing ["a", "b", "c"] will produce the tokens as below: + * + * [START_ARRAY, VALUE_STRING, VALUE_STRING, VALUE_STRING, END_ARRAY] + * + * Then, we check if the current token is null or not. Then, we apply `f` to convert + * the value and then we check failed conversion afterward if the `f` fails to convert the value. */ private def parseJsonToken( parser: JsonParser, dataType: DataType)(f: PartialFunction[JsonToken, Any]): Any = { parser.getCurrentToken match { case FIELD_NAME => + // Here, probably we are parsing consecutive fields in JSON document between `START_OBJECT` + // and `END_OBJECT` tokens. parser.nextToken() parseJsonToken(parser, dataType)(f) From 6401e5331242bade986d813413f348cc42215971 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 11 Aug 2016 17:33:37 +0900 Subject: [PATCH 19/23] Move faildConversion function --- .../datasources/json/JacksonParser.scala | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 5a703374b737c..b71881db16f90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -73,26 +73,6 @@ class JacksonParser( } } - /** - * This function will be called afterward except the case for `StringType`. we - * throw an exception when it is failed unless the value is null. - */ - private def failedConversion( - parser: JsonParser, - dataType: DataType): PartialFunction[JsonToken, Any] = { - case VALUE_STRING if parser.getTextLength < 1 => - // If conversion is failed, this produces `null` rather than throwing exception. - // This will protect the mismatch of types. - null - - case token => - // We cannot parse this token based on the given data type. So, we throw a - // SparkSQLJsonProcessingException and this exception will be caught by - // `parse` method. - throw new SparkSQLJsonProcessingException( - s"Failed to parse a value for data type $dataType (current token: $token).") - } - /** * Create a converter which converts the JSON documents held by the `JsonParser` * to a value according to a desired schema. This is a wrapper for the method @@ -318,6 +298,26 @@ class JacksonParser( } } + /** + * This function will be called afterward except the case for `StringType`. we + * throw an exception when it is failed unless the value is null. + */ + private def failedConversion( + parser: JsonParser, + dataType: DataType): PartialFunction[JsonToken, Any] = { + case VALUE_STRING if parser.getTextLength < 1 => + // If conversion is failed, this produces `null` rather than throwing exception. + // This will protect the mismatch of types. + null + + case token => + // We cannot parse this token based on the given data type. So, we throw a + // SparkSQLJsonProcessingException and this exception will be caught by + // `parse` method. + throw new SparkSQLJsonProcessingException( + s"Failed to parse a value for data type $dataType (current token: $token).") + } + /** * Parse an object from the token stream into a new Row representing the schema. * Fields in the json that are not defined in the requested schema will be dropped. From aaac7db58c0b303aefa8eda98b47d2734e29633f Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 11 Aug 2016 17:34:37 +0900 Subject: [PATCH 20/23] Fix indentation --- .../spark/sql/execution/datasources/json/JacksonParser.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index b71881db16f90..7e160829a711f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -303,8 +303,8 @@ class JacksonParser( * throw an exception when it is failed unless the value is null. */ private def failedConversion( - parser: JsonParser, - dataType: DataType): PartialFunction[JsonToken, Any] = { + parser: JsonParser, + dataType: DataType): PartialFunction[JsonToken, Any] = { case VALUE_STRING if parser.getTextLength < 1 => // If conversion is failed, this produces `null` rather than throwing exception. // This will protect the mismatch of types. From 6b1761ff77ddd98abedcd41e968f49870f46b699 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 11 Aug 2016 17:39:02 +0900 Subject: [PATCH 21/23] Fix comment more --- .../spark/sql/execution/datasources/json/JacksonParser.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 7e160829a711f..e7ee04426ae14 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -287,8 +287,8 @@ class JacksonParser( dataType: DataType)(f: PartialFunction[JsonToken, Any]): Any = { parser.getCurrentToken match { case FIELD_NAME => - // Here, probably we are parsing consecutive fields in JSON document between `START_OBJECT` - // and `END_OBJECT` tokens. + // Here, probably we are parsing consecutive pairs of a field name and a value + // in a JSON document between `START_OBJECT` and `END_OBJECT` tokens. parser.nextToken() parseJsonToken(parser, dataType)(f) From a8b6a04aa753773aaf7d5e582cc4657f69bce9f9 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 11 Aug 2016 17:39:43 +0900 Subject: [PATCH 22/23] Fix the choice of word --- .../spark/sql/execution/datasources/json/JacksonParser.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index e7ee04426ae14..732c0fe9dce9c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -288,7 +288,7 @@ class JacksonParser( parser.getCurrentToken match { case FIELD_NAME => // Here, probably we are parsing consecutive pairs of a field name and a value - // in a JSON document between `START_OBJECT` and `END_OBJECT` tokens. + // in a JSON object between `START_OBJECT` and `END_OBJECT` tokens. parser.nextToken() parseJsonToken(parser, dataType)(f) From 80b2d3a9d638eaeb87665a4db4c535954639b148 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 11 Aug 2016 19:16:27 +0900 Subject: [PATCH 23/23] Fix comments as suggested --- .../datasources/json/JacksonParser.scala | 26 +++++-------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 732c0fe9dce9c..4ae9376b5a504 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -265,30 +265,16 @@ class JacksonParser( } /** - * This handles nulls ahead before trying to check the tokens, and applies the conversion - * function and then checks failed the conversion afterward if the `f` fails to convert - * the value. - * - * In more details, it checks `FIELD_NAME` if exists and then skip. If this is called after - * `START_OBJECT`, then, the next token can be `FIELD_NAME`. Since the names are kept in - * `JacksonParser.convertObject`, this `FIELD_NAME` token can be skipped as below. When this - * is called after `START_ARRAY`, the tokens are consecutive tokens for values without - * `FIELD_NAME` until `END_ARRAY`. In this case, we don't have to skip. - * - * For example, parsing ["a", "b", "c"] will produce the tokens as below: - * - * [START_ARRAY, VALUE_STRING, VALUE_STRING, VALUE_STRING, END_ARRAY] - * - * Then, we check if the current token is null or not. Then, we apply `f` to convert - * the value and then we check failed conversion afterward if the `f` fails to convert the value. + * This method skips `FIELD_NAME`s at the beginning, and handles nulls ahead before trying + * to parse the JSON token using given function `f`. If the `f` failed to parse and convert the + * token, call `failedConversion` to handle the token. */ private def parseJsonToken( parser: JsonParser, dataType: DataType)(f: PartialFunction[JsonToken, Any]): Any = { parser.getCurrentToken match { case FIELD_NAME => - // Here, probably we are parsing consecutive pairs of a field name and a value - // in a JSON object between `START_OBJECT` and `END_OBJECT` tokens. + // There are useless FIELD_NAMEs between START_OBJECT and END_OBJECT tokens parser.nextToken() parseJsonToken(parser, dataType)(f) @@ -299,8 +285,8 @@ class JacksonParser( } /** - * This function will be called afterward except the case for `StringType`. we - * throw an exception when it is failed unless the value is null. + * This function throws an exception for failed conversion, but returns null for empty string, + * to guard the non string types. */ private def failedConversion( parser: JsonParser,