Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26310][SQL] Verify applicability of JSON options #23257

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ case class JsonToStructs(

val nameOfCorruptRecord = SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
@transient lazy val parser = {
val parsedOptions = new JSONOptions(options, timeZoneId.get, nameOfCorruptRecord)
val parsedOptions = new JSONOptionsInRead(options, timeZoneId.get, nameOfCorruptRecord)
val mode = parsedOptions.parseMode
if (mode != PermissiveMode && mode != FailFastMode) {
throw new IllegalArgumentException(s"from_json() doesn't support the ${mode.name} mode. " +
Expand Down Expand Up @@ -660,7 +660,7 @@ case class StructsToJson(

@transient
lazy val gen = new JacksonGenerator(
inputSchema, writer, new JSONOptions(options, timeZoneId.get))
inputSchema, writer, new JSONOptionsInWrite(options, timeZoneId.get))

@transient
lazy val inputSchema = child.dataType
Expand Down Expand Up @@ -764,7 +764,7 @@ case class SchemaOfJson(
override def nullable: Boolean = false

@transient
private lazy val jsonOptions = new JSONOptions(options, "UTC")
private lazy val jsonOptions = new JSONOptionsInRead(options, "UTC")

@transient
private lazy val jsonFactory = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import com.fasterxml.jackson.core.{JsonFactory, JsonParser}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf

/**
* Options for parsing JSON data into Spark SQL rows.
*
* Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
*/
private[sql] class JSONOptions(
private[sql] abstract class JSONOptions(
@transient val parameters: CaseInsensitiveMap[String],
defaultTimeZoneId: String,
defaultColumnNameOfCorruptRecord: String)
Expand Down Expand Up @@ -128,6 +129,17 @@ private[sql] class JSONOptions(
allowBackslashEscapingAnyCharacter)
factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, allowUnquotedControlChars)
}

def notApplicableOptions: Set[String]
def checkOptions(where: String): Unit = {
val wrongOptions = notApplicableOptions.filter(parameters.contains(_))
if (!wrongOptions.isEmpty && SQLConf.get.verifyDataSourceOptions) {
// scalastyle:off throwerror
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need this.

throw new IllegalArgumentException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also consider to warn when verifyDataSourceOptions is disabled.

s"""The JSON options are not applicable $where : ${wrongOptions.mkString(", ")}.""")
// scalastyle:on throwerror
}
}
}

private[sql] class JSONOptionsInRead(
Expand Down Expand Up @@ -158,6 +170,11 @@ private[sql] class JSONOptionsInRead(

enc
}

override def notApplicableOptions: Set[String] = Set(
"compression",
"pretty")
checkOptions("in read")
}

private[sql] object JSONOptionsInRead {
Expand All @@ -172,3 +189,34 @@ private[sql] object JSONOptionsInRead {
Charset.forName("UTF-32")
)
}

private[sql] class JSONOptionsInWrite(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't also need private[sql] since we're already in a private package.

@transient override val parameters: CaseInsensitiveMap[String],
defaultTimeZoneId: String)
extends JSONOptions(parameters, defaultTimeZoneId, "") {

def this(
parameters: Map[String, String],
defaultTimeZoneId: String) = {
this(
CaseInsensitiveMap(parameters),
defaultTimeZoneId)
}

override def notApplicableOptions: Set[String] = Set(
"samplingRatio",
"primitivesAsString",
"prefersDecimal",
"allowComments",
"allowUnquotedFieldNames",
"allowSingleQuotes",
"allowNumericLeadingZeros",
"allowNonNumericNumbers",
"allowBackslashEscapingAnyCharacter",
"allowUnquotedControlChars",
"mode",
"columnNameOfCorruptRecord",
"dropFieldIfAllNull",
"multiLine")
checkOptions("in write")
}
Original file line number Diff line number Diff line change
Expand Up @@ -1635,6 +1635,14 @@ object SQLConf {
"java.time.* packages are used for the same purpose.")
.booleanConf
.createWithDefault(false)

val VERIFY_DATASOURCE_OPTIONS = buildConf("spark.sql.verifyDataSourceOptions")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a config for this? seems more like a bug fix, even if it's a behavior change, and we can be stricter about validation in Spark 3.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove the config but @gatorsmile asked to add it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I can't say I feel too strongly about it, but when would this be set to false @gatorsmile ? this is a case where the user is already setting useless options right, so they can a) somehow discover this flag or b) just remove the options. b) seems much more likely to happen. Is the worry about legacy code or libraries that can't be easily modified? those already may not work on Spark 3.

.doc("Options passed to datasource are checked that rather they could be applied in read or " +
"in write when this configuration property is set to true. For example, If an option can " +
"be applied only in read but applied in write, an exception is raised. " +
"To disable the verification, set it to false.")
.booleanConf
.createWithDefault(true)
}

/**
Expand Down Expand Up @@ -1820,6 +1828,8 @@ class SQLConf extends Serializable with Logging {

def fastHashAggregateRowMaxCapacityBit: Int = getConf(FAST_HASH_AGGREGATE_MAX_ROWS_CAPACITY_BIT)

def verifyDataSourceOptions: Boolean = getConf(VERIFY_DATASOURCE_OPTIONS)

/**
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
* identifiers are equal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.types._
class JacksonGeneratorSuite extends SparkFunSuite {

val gmtId = DateTimeUtils.TimeZoneGMT.getID
val option = new JSONOptions(Map.empty, gmtId)
val option = new JSONOptionsInRead(Map.empty, gmtId)

test("initial with StructType and write out a row") {
val dataType = StructType(StructField("a", IntegerType) :: Nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions, UnivocityParser}
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptionsInRead}
import org.apache.spark.sql.catalyst.util.FailureSafeParser
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
Expand Down Expand Up @@ -440,7 +440,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @since 2.2.0
*/
def json(jsonDataset: Dataset[String]): DataFrame = {
val parsedOptions = new JSONOptions(
val parsedOptions = new JSONOptionsInRead(
extraOptions.toMap,
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection
import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptionsInWrite}
import org.apache.spark.sql.catalyst.optimizer.CombineUnions
import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils}
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -3118,7 +3118,7 @@ class Dataset[T] private[sql](
val writer = new CharArrayWriter()
// create the Generator without separator inserted between 2 records
val gen = new JacksonGenerator(rowSchema, writer,
new JSONOptions(Map.empty[String, String], sessionLocalTimeZone))
new JSONOptionsInWrite(Map.empty[String, String], sessionLocalTimeZone))

new Iterator[String] {
override def hasNext: Boolean = iter.hasNext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,9 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
val conf = job.getConfiguration
val parsedOptions = new JSONOptions(
val parsedOptions = new JSONOptionsInWrite(
options,
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
sparkSession.sessionState.conf.sessionLocalTimeZone)
parsedOptions.compressionCodec.foreach { codec =>
CompressionCodecs.setCodecConfiguration(conf, codec)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.json

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext

/**
Expand Down Expand Up @@ -135,4 +136,22 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
assert(df.first().getString(0) == "Cazen Lee")
assert(df.first().getString(1) == "$10")
}

test("verify options") {
withTempPath { dir =>
def invalidOptionUsage: Unit = {
val ds = Seq("""{"a": "b"}""").toDS()
ds.write.option("dropFieldIfAllNull", true).json(dir.getCanonicalPath)
}
val exception = intercept[IllegalArgumentException] {
invalidOptionUsage
}
assert(exception.getMessage.contains(
"The JSON options are not applicable in write : dropFieldIfAllNull"))

withSQLConf(SQLConf.VERIFY_DATASOURCE_OPTIONS.key -> "false") {
invalidOptionUsage
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
generator.flush()
}

val dummyOption = new JSONOptions(options, SQLConf.get.sessionLocalTimeZone)
val dummyOption = new JSONOptionsInRead(options, SQLConf.get.sessionLocalTimeZone)
val dummySchema = StructType(Seq.empty)
val parser = new JacksonParser(dummySchema, dummyOption, allowArrayAsStructs = true)

Expand Down Expand Up @@ -1383,7 +1383,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {

test("SPARK-6245 JsonInferSchema.infer on empty RDD") {
// This is really a test that it doesn't throw an exception
val options = new JSONOptions(Map.empty[String, String], "GMT")
val options = new JSONOptionsInRead(Map.empty[String, String], "GMT")
val emptySchema = new JsonInferSchema(options).infer(
empty.rdd,
CreateJacksonParser.string)
Expand All @@ -1410,7 +1410,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}

test("SPARK-8093 Erase empty structs") {
val options = new JSONOptions(Map.empty[String, String], "GMT")
val options = new JSONOptionsInRead(Map.empty[String, String], "GMT")
val emptySchema = new JsonInferSchema(options).infer(
emptyRecords.rdd,
CreateJacksonParser.string)
Expand Down Expand Up @@ -2337,7 +2337,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val ds = spark.createDataset(Seq(("a", 1))).repartition(1)
ds.write
.option("encoding", encoding)
.option("multiline", false)
.json(path.getCanonicalPath)
val jsonFiles = path.listFiles().filter(_.getName.endsWith("json"))
jsonFiles.foreach { jsonFile =>
Expand Down