Skip to content

Commit

Permalink
add config flag
Browse files Browse the repository at this point in the history
  • Loading branch information
sadikovi committed Dec 12, 2022
1 parent 6034e2c commit cf2d583
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ class JacksonParser(
options.dateFormatInRead.isEmpty
}

private val enablePartialResults = SQLConf.get.jsonEnablePartialResults

/**
* Create a converter which converts the JSON documents held by the `JsonParser`
* to a value according to a desired schema. This is a wrapper for the method
Expand Down Expand Up @@ -456,7 +458,7 @@ class JacksonParser(
schema.existenceDefaultsBitmask(index) = false
} catch {
case e: SparkUpgradeException => throw e
case NonFatal(e) =>
case NonFatal(e) if isRoot || enablePartialResults =>
badRecordException = badRecordException.orElse(Some(e))
parser.skipChildren()
}
Expand Down Expand Up @@ -489,10 +491,10 @@ class JacksonParser(
try {
values += fieldConverter.apply(parser)
} catch {
case PartialResultException(row, cause) =>
case PartialResultException(row, cause) if enablePartialResults =>
badRecordException = badRecordException.orElse(Some(cause))
values += row
case NonFatal(e) =>
case NonFatal(e) if enablePartialResults =>
badRecordException = badRecordException.orElse(Some(e))
parser.skipChildren()
}
Expand Down Expand Up @@ -525,7 +527,7 @@ class JacksonParser(
if (isRoot && v == null) throw QueryExecutionErrors.rootConverterReturnNullError()
values += v
} catch {
case PartialResultException(row, cause) =>
case PartialResultException(row, cause) if enablePartialResults =>
badRecordException = badRecordException.orElse(Some(cause))
values += row
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3629,6 +3629,15 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val JSON_ENABLE_PARTIAL_RESULTS =
buildConf("spark.sql.json.enablePartialResults")
.internal()
.doc("When set to true, enables partial results for structs, maps, and arrays in JSON " +
"when one or more fields do not match the schema")
.version("3.4.0")
.booleanConf
.createWithDefault(false)

val LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK =
buildConf("spark.sql.legacy.csv.enableDateTimeParsingFallback")
.internal()
Expand Down Expand Up @@ -4772,6 +4781,8 @@ class SQLConf extends Serializable with Logging {

def avroFilterPushDown: Boolean = getConf(AVRO_FILTER_PUSHDOWN_ENABLED)

def jsonEnablePartialResults: Boolean = getConf(JSON_ENABLE_PARTIAL_RESULTS)

def jsonEnableDateTimeParsingFallback: Option[Boolean] =
getConf(LEGACY_JSON_ENABLE_DATE_TIME_PARSING_FALLBACK)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -920,8 +920,17 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
checkAnswer(df1.select(from_json($"c0", st)), Row(Row(123456, null)))
val df2 = Seq("""{"data": {"c2": [19], "c1": 123456}}""").toDF("c0")
checkAnswer(df2.select(from_json($"c0", new StructType().add("data", st))), Row(Row(null)))
val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0")
checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(Array(Row(123456, null))))

withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0")
checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(Array(Row(123456, null))))
}

withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") {
val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0")
checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(null))
}

val df4 = Seq("""{"c2": [19]}""").toDF("c0")
checkAnswer(df4.select(from_json($"c0", MapType(StringType, st))), Row(null))
}
Expand All @@ -933,10 +942,20 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {

// "c2" is expected to be an array of structs but it is a struct in the data.
val df = Seq("""[{"c2": {"a": 1}, "c1": "abc"}]""").toDF("c0")
checkAnswer(
df.select(from_json($"c0", ArrayType(st))),
Row(Array(Row("abc", null)))
)

withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
checkAnswer(
df.select(from_json($"c0", ArrayType(st))),
Row(Array(Row("abc", null)))
)
}

withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") {
checkAnswer(
df.select(from_json($"c0", ArrayType(st))),
Row(null)
)
}
}

test("SPARK-40646: return partial results for JSON maps") {
Expand All @@ -946,10 +965,20 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {

// Map "c2" has "k2" key that is a string, not an integer.
val df = Seq("""{"c1": {"k1": 1, "k2": "A", "k3": 3}, "c2": "abc"}""").toDF("c0")
checkAnswer(
df.select(from_json($"c0", st)),
Row(Row(null, "abc"))
)

withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
checkAnswer(
df.select(from_json($"c0", st)),
Row(Row(null, "abc"))
)
}

withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") {
checkAnswer(
df.select(from_json($"c0", st)),
Row(Row(null, null))
)
}
}

test("SPARK-40646: return partial results for JSON arrays") {
Expand Down Expand Up @@ -990,10 +1019,20 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
// Value "a" cannot be parsed as an integer,
// the error cascades to "c2", thus making its value null.
val df = Seq("""[{"c1": [{"c2": ["a"]}]}]""").toDF("c0")
checkAnswer(
df.select(from_json($"c0", ArrayType(st))),
Row(Array(Row(null)))
)

withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
checkAnswer(
df.select(from_json($"c0", ArrayType(st))),
Row(Array(Row(null)))
)
}

withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") {
checkAnswer(
df.select(from_json($"c0", ArrayType(st))),
Row(null)
)
}
}

test("SPARK-33270: infers schema for JSON field with spaces and pass them to from_json") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3393,14 +3393,25 @@ abstract class JsonSuite
.repartition(1)
.write.text(path.getAbsolutePath)

val df = spark.read
.schema("a struct<x: int, y: struct<x: int>>, b struct<x: int>")
.json(path.getAbsolutePath)
for (enablePartialResults <- Seq(true, false)) {
withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> s"$enablePartialResults") {
val df = spark.read
.schema("a struct<x: int, y: struct<x: int>>, b struct<x: int>")
.json(path.getAbsolutePath)

checkAnswer(
df,
Seq(Row(null, Row(1)), Row(Row(2, null), Row(2)))
)
if (enablePartialResults) {
checkAnswer(
df,
Seq(Row(null, Row(1)), Row(Row(2, null), Row(2)))
)
} else {
checkAnswer(
df,
Seq(Row(null, null), Row(Row(2, null), Row(2)))
)
}
}
}
}
}

Expand Down

0 comments on commit cf2d583

Please sign in to comment.