diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 609fe9bc903a6..ee21a1e2b7602 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -96,6 +96,8 @@ class JacksonParser( options.dateFormatInRead.isEmpty } + private val enablePartialResults = SQLConf.get.jsonEnablePartialResults + /** * Create a converter which converts the JSON documents held by the `JsonParser` * to a value according to a desired schema. This is a wrapper for the method @@ -456,7 +458,7 @@ class JacksonParser( schema.existenceDefaultsBitmask(index) = false } catch { case e: SparkUpgradeException => throw e - case NonFatal(e) => + case NonFatal(e) if isRoot || enablePartialResults => badRecordException = badRecordException.orElse(Some(e)) parser.skipChildren() } @@ -489,10 +491,10 @@ class JacksonParser( try { values += fieldConverter.apply(parser) } catch { - case PartialResultException(row, cause) => + case PartialResultException(row, cause) if enablePartialResults => badRecordException = badRecordException.orElse(Some(cause)) values += row - case NonFatal(e) => + case NonFatal(e) if enablePartialResults => badRecordException = badRecordException.orElse(Some(e)) parser.skipChildren() } @@ -525,7 +527,7 @@ class JacksonParser( if (isRoot && v == null) throw QueryExecutionErrors.rootConverterReturnNullError() values += v } catch { - case PartialResultException(row, cause) => + case PartialResultException(row, cause) if enablePartialResults => badRecordException = badRecordException.orElse(Some(cause)) values += row } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 84d78f365acbc..792090f7261cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3629,6 +3629,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val JSON_ENABLE_PARTIAL_RESULTS = + buildConf("spark.sql.json.enablePartialResults") + .internal() + .doc("When set to true, enables partial results for structs, maps, and arrays in JSON " + + "when one or more fields do not match the schema") + .version("3.4.0") + .booleanConf + .createWithDefault(false) + val LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK = buildConf("spark.sql.legacy.csv.enableDateTimeParsingFallback") .internal() @@ -4772,6 +4781,8 @@ class SQLConf extends Serializable with Logging { def avroFilterPushDown: Boolean = getConf(AVRO_FILTER_PUSHDOWN_ENABLED) + def jsonEnablePartialResults: Boolean = getConf(JSON_ENABLE_PARTIAL_RESULTS) + def jsonEnableDateTimeParsingFallback: Option[Boolean] = getConf(LEGACY_JSON_ENABLE_DATE_TIME_PARSING_FALLBACK) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 99d5fc60cad7e..399665c0de696 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -920,8 +920,17 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { checkAnswer(df1.select(from_json($"c0", st)), Row(Row(123456, null))) val df2 = Seq("""{"data": {"c2": [19], "c1": 123456}}""").toDF("c0") checkAnswer(df2.select(from_json($"c0", new StructType().add("data", st))), Row(Row(null))) - val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0") - checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(Array(Row(123456, null)))) + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { + val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0") + checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(Array(Row(123456, null)))) + } + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { + val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0") + checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(null)) + } + val df4 = Seq("""{"c2": [19]}""").toDF("c0") checkAnswer(df4.select(from_json($"c0", MapType(StringType, st))), Row(null)) } @@ -933,10 +942,20 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { // "c2" is expected to be an array of structs but it is a struct in the data. val df = Seq("""[{"c2": {"a": 1}, "c1": "abc"}]""").toDF("c0") - checkAnswer( - df.select(from_json($"c0", ArrayType(st))), - Row(Array(Row("abc", null))) - ) + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { + checkAnswer( + df.select(from_json($"c0", ArrayType(st))), + Row(Array(Row("abc", null))) + ) + } + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { + checkAnswer( + df.select(from_json($"c0", ArrayType(st))), + Row(null) + ) + } } test("SPARK-40646: return partial results for JSON maps") { @@ -946,10 +965,20 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { // Map "c2" has "k2" key that is a string, not an integer. val df = Seq("""{"c1": {"k1": 1, "k2": "A", "k3": 3}, "c2": "abc"}""").toDF("c0") - checkAnswer( - df.select(from_json($"c0", st)), - Row(Row(null, "abc")) - ) + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { + checkAnswer( + df.select(from_json($"c0", st)), + Row(Row(null, "abc")) + ) + } + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { + checkAnswer( + df.select(from_json($"c0", st)), + Row(Row(null, null)) + ) + } } test("SPARK-40646: return partial results for JSON arrays") { @@ -990,10 +1019,20 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { // Value "a" cannot be parsed as an integer, // the error cascades to "c2", thus making its value null. val df = Seq("""[{"c1": [{"c2": ["a"]}]}]""").toDF("c0") - checkAnswer( - df.select(from_json($"c0", ArrayType(st))), - Row(Array(Row(null))) - ) + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { + checkAnswer( + df.select(from_json($"c0", ArrayType(st))), + Row(Array(Row(null))) + ) + } + + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { + checkAnswer( + df.select(from_json($"c0", ArrayType(st))), + Row(null) + ) + } } test("SPARK-33270: infers schema for JSON field with spaces and pass them to from_json") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 6cf5ec74ab0e9..0d2c98316e779 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -3393,14 +3393,25 @@ abstract class JsonSuite .repartition(1) .write.text(path.getAbsolutePath) - val df = spark.read - .schema("a struct>, b struct") - .json(path.getAbsolutePath) + for (enablePartialResults <- Seq(true, false)) { + withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> s"$enablePartialResults") { + val df = spark.read + .schema("a struct>, b struct") + .json(path.getAbsolutePath) - checkAnswer( - df, - Seq(Row(null, Row(1)), Row(Row(2, null), Row(2))) - ) + if (enablePartialResults) { + checkAnswer( + df, + Seq(Row(null, Row(1)), Row(Row(2, null), Row(2))) + ) + } else { + checkAnswer( + df, + Seq(Row(null, null), Row(Row(2, null), Row(2))) + ) + } + } + } } }