From 667dcd503f6d9ea47151846cf2824642d735b462 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 20 Feb 2018 14:03:49 +0000 Subject: [PATCH] Returns partial row when part of columns are failed to parse. --- .../sql/catalyst/json/JacksonParser.scala | 132 +++++++++++------- .../catalyst/util/BadRecordException.scala | 7 +- .../datasources/FailureSafeParser.scala | 19 ++- .../datasources/csv/UnivocityParser.scala | 7 +- .../test/resources/test-data/partial1.json | 2 + .../test/resources/test-data/partial2.json | 2 + .../datasources/json/JsonSuite.scala | 33 ++++- 7 files changed, 145 insertions(+), 57 deletions(-) create mode 100644 sql/core/src/test/resources/test-data/partial1.json create mode 100644 sql/core/src/test/resources/test-data/partial2.json diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index bd144c9575c72..73ad7f6d45b59 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -44,7 +44,7 @@ class JacksonParser( // A `ValueConverter` is responsible for converting a value from `JsonParser` // to a value in a field for `InternalRow`. - private type ValueConverter = JsonParser => AnyRef + private type ValueConverter = JsonParser => (AnyRef, Seq[Boolean]) // `ValueConverter`s for the root schema for all fields in the schema private val rootConverter = makeRootConverter(schema) @@ -55,13 +55,17 @@ class JacksonParser( /** * Create a converter which converts the JSON documents held by the `JsonParser` * to a value according to a desired schema. This is a wrapper for the method - * `makeConverter()` to handle a row wrapped with an array. + * `makeConverter()` to handle a row wrapped with an array. The converter returns + * a tuple in which the first element is converted `InternalRow`s and the second + * element is boolean values indicating whether the JSON documents are bad records. */ - private def makeRootConverter(st: StructType): JsonParser => Seq[InternalRow] = { + private def makeRootConverter(st: StructType): JsonParser => (Seq[InternalRow], Seq[Boolean]) = { val elementConverter = makeConverter(st) val fieldConverters = st.map(_.dataType).map(makeConverter).toArray (parser: JsonParser) => parseJsonToken[Seq[InternalRow]](parser, st) { - case START_OBJECT => convertObject(parser, st, fieldConverters) :: Nil + case START_OBJECT => + val (row, hasBadField) = convertObject(parser, st, fieldConverters) + (row :: Nil, hasBadField) // SPARK-3308: support reading top level JSON arrays and take every element // in such an array as a row // @@ -76,13 +80,13 @@ class JacksonParser( // List([str_a_2,null], [null,str_b_3]) // case START_ARRAY => - val array = convertArray(parser, elementConverter) + val (array, hasErrors) = convertArray(parser, elementConverter) // Here, as we support reading top level JSON arrays and take every element // in such an array as a row, this case is possible. if (array.numElements() == 0) { - Nil + (Nil, Nil) } else { - array.toArray[InternalRow](schema).toSeq + (array.toArray[InternalRow](schema).toSeq, hasErrors) } } } @@ -94,64 +98,66 @@ class JacksonParser( def makeConverter(dataType: DataType): ValueConverter = dataType match { case BooleanType => (parser: JsonParser) => parseJsonToken[java.lang.Boolean](parser, dataType) { - case VALUE_TRUE => true - case VALUE_FALSE => false + case VALUE_TRUE => (true, false :: Nil) + case VALUE_FALSE => (false, false :: Nil) } case ByteType => (parser: JsonParser) => parseJsonToken[java.lang.Byte](parser, dataType) { - case VALUE_NUMBER_INT => parser.getByteValue + case VALUE_NUMBER_INT => (parser.getByteValue, false :: Nil) } case ShortType => (parser: JsonParser) => parseJsonToken[java.lang.Short](parser, dataType) { - case VALUE_NUMBER_INT => parser.getShortValue + case VALUE_NUMBER_INT => (parser.getShortValue, false :: Nil) } case IntegerType => (parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) { - case VALUE_NUMBER_INT => parser.getIntValue + case VALUE_NUMBER_INT => (parser.getIntValue, false :: Nil) } case LongType => (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) { - case VALUE_NUMBER_INT => parser.getLongValue + case VALUE_NUMBER_INT => (parser.getLongValue, false :: Nil) } case FloatType => (parser: JsonParser) => parseJsonToken[java.lang.Float](parser, dataType) { case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT => - parser.getFloatValue + (parser.getFloatValue, false :: Nil) case VALUE_STRING => // Special case handling for NaN and Infinity. - parser.getText match { + val value = parser.getText match { case "NaN" => Float.NaN case "Infinity" => Float.PositiveInfinity case "-Infinity" => Float.NegativeInfinity case other => throw new RuntimeException(s"Cannot parse $other as FloatType.") } + (value, false :: Nil) } case DoubleType => (parser: JsonParser) => parseJsonToken[java.lang.Double](parser, dataType) { case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT => - parser.getDoubleValue + (parser.getDoubleValue, false :: Nil) case VALUE_STRING => // Special case handling for NaN and Infinity. - parser.getText match { + val value = parser.getText match { case "NaN" => Double.NaN case "Infinity" => Double.PositiveInfinity case "-Infinity" => Double.NegativeInfinity case other => throw new RuntimeException(s"Cannot parse $other as DoubleType.") } + (value, false :: Nil) } case StringType => (parser: JsonParser) => parseJsonToken[UTF8String](parser, dataType) { case VALUE_STRING => - UTF8String.fromString(parser.getText) + (UTF8String.fromString(parser.getText), false :: Nil) case _ => // Note that it always tries to convert the data as string without the case of failure. @@ -159,7 +165,7 @@ class JacksonParser( Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) { generator => generator.copyCurrentStructure(parser) } - UTF8String.fromBytes(writer.toByteArray) + (UTF8String.fromBytes(writer.toByteArray), false :: Nil) } case TimestampType => @@ -168,7 +174,7 @@ class JacksonParser( val stringValue = parser.getText // This one will lose microseconds parts. // See https://issues.apache.org/jira/browse/SPARK-10681. - Long.box { + val value = Long.box { Try(options.timestampFormat.parse(stringValue).getTime * 1000L) .getOrElse { // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards @@ -176,9 +182,10 @@ class JacksonParser( DateTimeUtils.stringToTime(stringValue).getTime * 1000L } } + (value, false :: Nil) case VALUE_NUMBER_INT => - parser.getLongValue * 1000000L + (parser.getLongValue * 1000000L, false :: Nil) } case DateType => @@ -187,7 +194,7 @@ class JacksonParser( val stringValue = parser.getText // This one will lose microseconds parts. // See https://issues.apache.org/jira/browse/SPARK-10681.x - Int.box { + val value = Int.box { Try(DateTimeUtils.millisToDays(options.dateFormat.parse(stringValue).getTime)) .orElse { // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards @@ -200,29 +207,33 @@ class JacksonParser( stringValue.toInt } } + (value, false :: Nil) } case BinaryType => (parser: JsonParser) => parseJsonToken[Array[Byte]](parser, dataType) { - case VALUE_STRING => parser.getBinaryValue + case VALUE_STRING => (parser.getBinaryValue, false :: Nil) } case dt: DecimalType => (parser: JsonParser) => parseJsonToken[Decimal](parser, dataType) { case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) => - Decimal(parser.getDecimalValue, dt.precision, dt.scale) + (Decimal(parser.getDecimalValue, dt.precision, dt.scale), false :: Nil) } case st: StructType => val fieldConverters = st.map(_.dataType).map(makeConverter).toArray (parser: JsonParser) => parseJsonToken[InternalRow](parser, dataType) { - case START_OBJECT => convertObject(parser, st, fieldConverters) + case START_OBJECT => + convertObject(parser, st, fieldConverters) } case at: ArrayType => val elementConverter = makeConverter(at.elementType) (parser: JsonParser) => parseJsonToken[ArrayData](parser, dataType) { - case START_ARRAY => convertArray(parser, elementConverter) + case START_ARRAY => + val (parsed, hasErrors) = convertArray(parser, elementConverter) + (parsed, hasErrors) } case mt: MapType => @@ -239,7 +250,8 @@ class JacksonParser( // Here, we pass empty `PartialFunction` so that this case can be // handled as a failed conversion. It will throw an exception as // long as the value is not null. - parseJsonToken[AnyRef](parser, dataType)(PartialFunction.empty[JsonToken, AnyRef]) + parseJsonToken[AnyRef](parser, + dataType)(PartialFunction.empty[JsonToken, (AnyRef, Seq[Boolean])]) } /** @@ -249,14 +261,14 @@ class JacksonParser( */ private def parseJsonToken[R >: Null]( parser: JsonParser, - dataType: DataType)(f: PartialFunction[JsonToken, R]): R = { + dataType: DataType)(f: PartialFunction[JsonToken, (R, Seq[Boolean])]): (R, Seq[Boolean]) = { parser.getCurrentToken match { case FIELD_NAME => // There are useless FIELD_NAMEs between START_OBJECT and END_OBJECT tokens parser.nextToken() parseJsonToken[R](parser, dataType)(f) - case null | VALUE_NULL => null + case null | VALUE_NULL => (null, false :: Nil) case other => f.applyOrElse(other, failedConversion(parser, dataType)) } @@ -268,17 +280,23 @@ class JacksonParser( */ private def failedConversion[R >: Null]( parser: JsonParser, - dataType: DataType): PartialFunction[JsonToken, R] = { + dataType: DataType): PartialFunction[JsonToken, (R, Seq[Boolean])] = { case VALUE_STRING if parser.getTextLength < 1 => // If conversion is failed, this produces `null` rather than throwing exception. // This will protect the mismatch of types. - null + (null, false :: Nil) case token => - // We cannot parse this token based on the given data type. So, we throw a - // RuntimeException and this exception will be caught by `parse` method. - throw new RuntimeException( - s"Failed to parse a value for data type ${dataType.catalogString} (current token: $token).") + // Under `PermissiveMode`, if exception occurs during parsing an index, + // we don't skip whole parsing because we still can get partial result. + if (options.parseMode == PermissiveMode) { + (null, true :: Nil) + } else { + // We cannot parse this token based on the given data type. So, we throw a + // RuntimeException and this exception will be caught by `parse` method. + throw new RuntimeException("Failed to parse a value for data type " + + s"${dataType.catalogString} (current token: $token).") + } } /** @@ -288,19 +306,22 @@ class JacksonParser( private def convertObject( parser: JsonParser, schema: StructType, - fieldConverters: Array[ValueConverter]): InternalRow = { + fieldConverters: Array[ValueConverter]): (InternalRow, Seq[Boolean]) = { val row = new GenericInternalRow(schema.length) + var hasBadField = false while (nextUntil(parser, JsonToken.END_OBJECT)) { schema.getFieldIndex(parser.getCurrentName) match { case Some(index) => - row.update(index, fieldConverters(index).apply(parser)) + val (parsed, hasError) = fieldConverters(index).apply(parser) + if (hasError.exists(identity)) hasBadField = true + row.update(index, parsed) case None => parser.skipChildren() } } - row + (row, hasBadField :: Nil) } /** @@ -308,15 +329,18 @@ class JacksonParser( */ private def convertMap( parser: JsonParser, - fieldConverter: ValueConverter): MapData = { + fieldConverter: ValueConverter): (MapData, Seq[Boolean]) = { val keys = ArrayBuffer.empty[UTF8String] val values = ArrayBuffer.empty[Any] + var hasError = false while (nextUntil(parser, JsonToken.END_OBJECT)) { keys += UTF8String.fromString(parser.getCurrentName) - values += fieldConverter.apply(parser) + val (parsed, hasErr) = fieldConverter.apply(parser) + values += parsed + if (hasErr.exists(identity)) hasError = true } - ArrayBasedMapData(keys.toArray, values.toArray) + (ArrayBasedMapData(keys.toArray, values.toArray), hasError :: Nil) } /** @@ -324,13 +348,16 @@ class JacksonParser( */ private def convertArray( parser: JsonParser, - fieldConverter: ValueConverter): ArrayData = { + fieldConverter: ValueConverter): (ArrayData, Seq[Boolean]) = { val values = ArrayBuffer.empty[Any] + val hasErrors = ArrayBuffer.empty[Boolean] while (nextUntil(parser, JsonToken.END_ARRAY)) { - values += fieldConverter.apply(parser) + val (parsed, hasError) = fieldConverter.apply(parser) + values += parsed + hasErrors += hasError.exists(identity) } - new GenericArrayData(values.toArray) + (new GenericArrayData(values.toArray), hasErrors.toSeq) } /** @@ -350,14 +377,25 @@ class JacksonParser( parser.nextToken() match { case null => Nil case _ => rootConverter.apply(parser) match { - case null => throw new RuntimeException("Root converter returned null") - case rows => rows + case (null, _) => throw new RuntimeException("Root converter returned null") + case (Nil, _) => Nil // An top-level array containing zero element. + case (rows, hasErrors) => + if (hasErrors.forall(identity) && hasErrors.length > 1) { + // For a top-level array, if all elements are failed to parse, we only return one + // row with null fields. + throw BadRecordException(() => recordLiteral(record), () => None, () => None, null) + } else if (hasErrors.exists(identity)) { + throw BadRecordException(() => recordLiteral(record), () => Some(rows), + () => Some(hasErrors), null) + } else { + rows + } } } } } catch { case e @ (_: RuntimeException | _: JsonProcessingException) => - throw BadRecordException(() => recordLiteral(record), () => None, e) + throw BadRecordException(() => recordLiteral(record), () => None, () => None, e) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala index 985f0dc1cd60e..b903590a551b3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala @@ -23,11 +23,14 @@ import org.apache.spark.unsafe.types.UTF8String /** * Exception thrown when the underlying parser meet a bad record and can't parse it. * @param record a function to return the record that cause the parser to fail - * @param partialResult a function that returns an optional row, which is the partial result of + * @param partialResults a function that returns an optional rows, which is the partial results of * parsing this bad record. + * @param hasErrors a function that returns an optional booleans, which is used to know if a row + * encounters error when parsing it. * @param cause the actual exception about why the record is bad and can't be parsed. */ case class BadRecordException( record: () => UTF8String, - partialResult: () => Option[InternalRow], + partialResults: () => Option[Seq[InternalRow]], + hasErrors: () => Option[Seq[Boolean]], cause: Throwable) extends Exception(cause) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala index 43591a9ff524a..4fb13f898ac9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala @@ -39,7 +39,7 @@ class FailureSafeParser[IN]( // schema doesn't contain a field for corrupted record, we just return the partial result or a // row with all fields null. If the given schema contains a field for corrupted record, we will // set the bad record to this field, and set other fields according to the partial result or null. - private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { + private val toResultRow: (Option[InternalRow], UTF8String) => InternalRow = { if (corruptFieldIndex.isDefined) { (row, badRecord) => { var i = 0 @@ -48,7 +48,7 @@ class FailureSafeParser[IN]( resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull i += 1 } - resultRow(corruptFieldIndex.get) = badRecord() + resultRow(corruptFieldIndex.get) = badRecord resultRow } } else { @@ -58,11 +58,22 @@ class FailureSafeParser[IN]( def parse(input: IN): Iterator[InternalRow] = { try { - rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), null)) } catch { case e: BadRecordException => mode match { case PermissiveMode => - Iterator(toResultRow(e.partialResult(), e.record)) + val partialRows = e.partialResults().getOrElse(Seq(null)) + val hasErrors = e.hasErrors().getOrElse(Seq.fill(partialRows.length)(true)) + // We could lazily obtain later record, instead of current record which fails to parse. + // So we must get the record here. + val record = e.record() + partialRows.zip(hasErrors).toIterator.map { case (partialRow, hasError) => + if (hasError || partialRow == null) { + toResultRow(Option(partialRow), record) + } else { + toResultRow(Option(partialRow), null) + } + } case DropMalformedMode => Iterator.empty case FailFastMode => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 7d6d7e7eef926..ef824dd91af47 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -196,9 +196,9 @@ class UnivocityParser( } else { tokens.take(schema.length) } - def getPartialResult(): Option[InternalRow] = { + def getPartialResult(): Option[Seq[InternalRow]] = { try { - Some(convert(checkedTokens)) + Some(convert(checkedTokens) :: Nil) } catch { case _: BadRecordException => None } @@ -206,6 +206,7 @@ class UnivocityParser( throw BadRecordException( () => getCurrentInput, () => getPartialResult(), + () => None, new RuntimeException("Malformed CSV record")) } else { try { @@ -218,7 +219,7 @@ class UnivocityParser( row } catch { case NonFatal(e) => - throw BadRecordException(() => getCurrentInput, () => None, e) + throw BadRecordException(() => getCurrentInput, () => None, () => None, e) } } } diff --git a/sql/core/src/test/resources/test-data/partial1.json b/sql/core/src/test/resources/test-data/partial1.json new file mode 100644 index 0000000000000..84be7e0ddfbc9 --- /dev/null +++ b/sql/core/src/test/resources/test-data/partial1.json @@ -0,0 +1,2 @@ +{"attr1":"val1","attr2":"[val2]","attr3":1} +{"attr1":"val3","attr2":["val4"],"attr3":2} diff --git a/sql/core/src/test/resources/test-data/partial2.json b/sql/core/src/test/resources/test-data/partial2.json new file mode 100644 index 0000000000000..57601d4ac54a9 --- /dev/null +++ b/sql/core/src/test/resources/test-data/partial2.json @@ -0,0 +1,2 @@ +[{"attr1":"val1","attr2":"[val2]","attr3":1}, {"attr1":"val3","attr2":["val4"],"attr3":2}] +[{"attr1":"val5","attr2":["val6"],"attr3":3}, {"attr1":"val7","attr2":["val8"],"attr3":4}] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 8c8d41ebf115a..bab4cf9651f64 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -71,7 +71,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { Utils.tryWithResource(factory.createParser(writer.toString)) { jsonParser => jsonParser.nextToken() val converter = parser.makeConverter(dataType) - converter.apply(jsonParser) + converter.apply(jsonParser)._1 } } @@ -2063,4 +2063,35 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ) } } + + test("SPARK-23448: Json reader should return partial rows even some fields can't be parsed") { + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + val partialJsonFile1 = "test-data/partial1.json" + val partialJsonFile2 = "test-data/partial2.json" + + val schema = StructType( + Seq(StructField("attr1", StringType, true), + StructField("attr2", ArrayType(StringType, true), true), + StructField("attr3", IntegerType, true), + StructField("_corrupt_record", StringType, true))) + + val partial1Json = spark.read.schema(schema).json(testFile(partialJsonFile1)) + val partial2Json = spark.read.schema(schema).json(testFile(partialJsonFile2)) + + checkAnswer(partial1Json, + Row("val1", null, 1, """{"attr1":"val1","attr2":"[val2]","attr3":1}""") :: + Row("val3", Seq("val4"), 2, null) :: Nil) + + val corrupted_record = """[{"attr1":"val1","attr2":"[val2]","attr3":1}, """ + + """{"attr1":"val3","attr2":["val4"],"attr3":2}]""" + + checkAnswer(partial2Json, + Row("val1", null, 1, corrupted_record) :: + Row("val3", Seq("val4"), 2, null) :: + Row("val5", Seq("val6"), 3, null) :: + Row("val7", Seq("val8"), 4, null) :: Nil) + } }