Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24691][SQL]Dispatch the type support check in FileFormat implementation #21667

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ case class DataSource(
hs.partitionSchema.map(_.name),
"in the partition schema",
equality)
DataSourceUtils.verifyReadSchema(hs.fileFormat, hs.dataSchema)
case _ =>
SchemaUtils.checkColumnNameDuplication(
relation.schema.map(_.name),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.types._


Expand All @@ -42,65 +39,14 @@ object DataSourceUtils {

/**
* Verify if the schema is supported in datasource. This verification should be done
* in a driver side, e.g., `prepareWrite`, `buildReader`, and `buildReaderWithPartitionValues`
* in `FileFormat`.
*
* Unsupported data types of csv, json, orc, and parquet are as follows;
* csv -> R/W: Interval, Null, Array, Map, Struct
* json -> W: Interval
* orc -> W: Interval, Null
* parquet -> R/W: Interval, Null
* in a driver side.
Copy link
Contributor

@cloud-fan cloud-fan Jul 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileFormat is internal so it's nothing about public API, but just about design choice.

Generally it's ok to have a central place to put some business logic for different cases. However, here we can't access all FileFormat implementations, Hive ORC is in Hive module. Now the only choice is: dispatch the business logic into implementations.

So +1 on the approach taken by this PR.

*/
private def verifySchema(format: FileFormat, schema: StructType, isReadPath: Boolean): Unit = {
def throwUnsupportedException(dataType: DataType): Unit = {
throw new UnsupportedOperationException(
s"$format data source does not support ${dataType.simpleString} data type.")
schema.foreach { field =>
if (!format.supportDataType(field.dataType, isReadPath)) {
throw new AnalysisException(
s"$format data source does not support ${field.dataType.simpleString} data type.")
}
}

def verifyType(dataType: DataType): Unit = dataType match {
case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType |
StringType | BinaryType | DateType | TimestampType | _: DecimalType =>

// All the unsupported types for CSV
case _: NullType | _: CalendarIntervalType | _: StructType | _: ArrayType | _: MapType
if format.isInstanceOf[CSVFileFormat] =>
throwUnsupportedException(dataType)

case st: StructType => st.foreach { f => verifyType(f.dataType) }

case ArrayType(elementType, _) => verifyType(elementType)

case MapType(keyType, valueType, _) =>
verifyType(keyType)
verifyType(valueType)

case udt: UserDefinedType[_] => verifyType(udt.sqlType)

// Interval type not supported in all the write path
case _: CalendarIntervalType if !isReadPath =>
throwUnsupportedException(dataType)

// JSON and ORC don't support an Interval type, but we pass it in read pass
// for back-compatibility.
case _: CalendarIntervalType if format.isInstanceOf[JsonFileFormat] ||
format.isInstanceOf[OrcFileFormat] =>

// Interval type not supported in the other read path
case _: CalendarIntervalType =>
throwUnsupportedException(dataType)

// For JSON & ORC backward-compatibility
case _: NullType if format.isInstanceOf[JsonFileFormat] ||
(isReadPath && format.isInstanceOf[OrcFileFormat]) =>

// Null type not supported in the other path
case _: NullType =>
throwUnsupportedException(dataType)

// We keep this default case for safeguards
case _ => throwUnsupportedException(dataType)
}

schema.foreach(field => verifyType(field.dataType))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{DataType, StructType}


/**
Expand Down Expand Up @@ -57,7 +57,7 @@ trait FileFormat {
dataSchema: StructType): OutputWriterFactory

/**
* Returns whether this format support returning columnar batch or not.
* Returns whether this format supports returning columnar batch or not.
*
* TODO: we should just have different traits for the different formats.
*/
Expand Down Expand Up @@ -152,6 +152,11 @@ trait FileFormat {
}
}

/**
* Returns whether this format supports the given [[DataType]] in read/write path.
* By default all data types are supported.
*/
def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who does not overwrite it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HiveFileFormat. Currently I don't know Hive well. If someone can override it for HiveFileFormat, please create a follow up PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then why not remove this default implementation and create HiveFileFormat#supportDataType to return true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we also need to update LibSVMFileFormat, and several file format in unit test. I really prefer to have a default behavior here, as FileFormat can still work without the new method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically this is an internal API but we better concern about the compatibility particularly here in practice when it's possible. I think I already see we are concerned about Avro, right? I still doubt if it's a good idea to expose this in this trait.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only way to allow avro to define its supported types. BTW the default true value here is good for compatibility: if a file source doesn't know this API, it doesn't need to implement it and the behavior is unchanged, which is, no check applied.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to say it's not free to remove or change the signature later once we happen to add it. Do we plan to refactor or remove this FileFormat out in the near future? If so I am okay.

Copy link
Contributor

@cloud-fan cloud-fan Jul 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the entire FileFormat will be migrated to data source v2 in the future. The FileFormat will be still there for backward compatibility, and I don't think we will update it frequently.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ object FileFormatWriter extends Logging {

val caseInsensitiveOptions = CaseInsensitiveMap(options)

val dataSchema = dataColumns.toStructType
DataSourceUtils.verifyWriteSchema(fileFormat, dataSchema)
// Note: prepareWrite has side effect. It sets "job".
val outputWriterFactory =
fileFormat.prepareWrite(sparkSession, job, caseInsensitiveOptions, dataColumns.toStructType)
fileFormat.prepareWrite(sparkSession, job, caseInsensitiveOptions, dataSchema)

val description = new WriteJobDescription(
uuid = UUID.randomUUID().toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
DataSourceUtils.verifyWriteSchema(this, dataSchema)
val conf = job.getConfiguration
val csvOptions = new CSVOptions(
options,
Expand Down Expand Up @@ -98,7 +97,6 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
DataSourceUtils.verifyReadSchema(this, dataSchema)
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

Expand Down Expand Up @@ -153,6 +151,15 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
override def hashCode(): Int = getClass.hashCode()

override def equals(other: Any): Boolean = other.isInstanceOf[CSVFileFormat]

override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match {
case _: AtomicType => true

case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


case _ => false
}

}

private[csv] class CsvOutputWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSON
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.types._
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we employ the blacklist, I think it'd be better that you don't fold these imports.

import org.apache.spark.util.SerializableConfiguration

class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
Expand Down Expand Up @@ -65,8 +65,6 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
DataSourceUtils.verifyWriteSchema(this, dataSchema)

val conf = job.getConfiguration
val parsedOptions = new JSONOptions(
options,
Expand Down Expand Up @@ -98,8 +96,6 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
DataSourceUtils.verifyReadSchema(this, dataSchema)

val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

Expand Down Expand Up @@ -148,6 +144,23 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
override def hashCode(): Int = getClass.hashCode()

override def equals(other: Any): Boolean = other.isInstanceOf[JsonFileFormat]

override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, supported types are very specific to datasource's implementation.

case _: AtomicType => true

case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) }

case ArrayType(elementType, _) => supportDataType(elementType, isReadPath)

case MapType(keyType, valueType, _) =>
supportDataType(keyType, isReadPath) && supportDataType(valueType, isReadPath)

case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath)

case _: NullType => true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why JSON supports null type but CSV doesn't?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently null type is not handled in UnivocityParser


case _ => false
}
}

private[json] class JsonOutputWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ class OrcFileFormat
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
DataSourceUtils.verifyWriteSchema(this, dataSchema)

val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)

val conf = job.getConfiguration
Expand Down Expand Up @@ -143,8 +141,6 @@ class OrcFileFormat
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
DataSourceUtils.verifyReadSchema(this, dataSchema)

if (sparkSession.sessionState.conf.orcFilterPushDown) {
OrcFilters.createFilter(dataSchema, filters).foreach { f =>
OrcInputFormat.setSearchArgument(hadoopConf, f, dataSchema.fieldNames)
Expand Down Expand Up @@ -228,4 +224,21 @@ class OrcFileFormat
}
}
}

override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match {
case _: AtomicType => true

case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) }

case ArrayType(elementType, _) => supportDataType(elementType, isReadPath)

case MapType(keyType, valueType, _) =>
supportDataType(keyType, isReadPath) && supportDataType(valueType, isReadPath)

case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath)

case _: NullType => isReadPath

case _ => false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ class ParquetFileFormat
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
DataSourceUtils.verifyWriteSchema(this, dataSchema)

val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)

val conf = ContextUtil.getConfiguration(job)
Expand Down Expand Up @@ -303,8 +301,6 @@ class ParquetFileFormat
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
DataSourceUtils.verifyReadSchema(this, dataSchema)

hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
hadoopConf.set(
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
Expand Down Expand Up @@ -454,6 +450,21 @@ class ParquetFileFormat
}
}
}

override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match {
case _: AtomicType => true

case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) }

case ArrayType(elementType, _) => supportDataType(elementType, isReadPath)

case MapType(keyType, valueType, _) =>
supportDataType(keyType, isReadPath) && supportDataType(valueType, isReadPath)

case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath)

case _ => false
}
}

object ParquetFileFormat extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.types.{DataType, StringType, StructType}
import org.apache.spark.util.SerializableConfiguration

/**
Expand All @@ -47,11 +47,6 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
throw new AnalysisException(
s"Text data source supports only a single column, and you have ${schema.size} columns.")
}
val tpe = schema(0).dataType
if (tpe != StringType) {
throw new AnalysisException(
s"Text data source supports only a string column, but you have ${tpe.simpleString}.")
}
}

override def isSplitable(
Expand Down Expand Up @@ -141,6 +136,9 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
}
}
}

override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean =
dataType == StringType
}

class TextOutputWriter(
Expand Down