Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24691][SQL]Dispatch the type support check in FileFormat implementation #21667

Closed

Conversation

gengliangwang
Copy link
Member

@gengliangwang gengliangwang commented Jun 29, 2018

What changes were proposed in this pull request?

With #21389, data source schema is validated on driver side before launching read/write tasks.
However,

  1. Putting all the validations together in DataSourceUtils is tricky and hard to maintain. On second thought after review, I find that the OrcFileFormat in hive package is not matched, so that its validation wrong.
  2. DataSourceUtils.verifyWriteSchema and DataSourceUtils.verifyReadSchema is not supposed to be called in every file format. We can move them to some upper entry.

So, I propose we can add a new method validateDataType in FileFormat. File format implementation can override the method to specify its supported/non-supported data types.
Although we should focus on data source V2 API, FileFormat should remain workable for some time. Adding this new method should be helpful.

How was this patch tested?

Unit test

@@ -156,28 +156,6 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton {
sql("select testType()").write.mode("overwrite").orc(orcDir)
}.getMessage
assert(msg.contains("ORC data source does not support calendarinterval data type."))

// read path
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In read path, ORC should support CalendarIntervalType and NullType.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any read path test already?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon No, the unit test is about unsupported data types, and ORC supports all data types in read path.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the tests were negative tests. so I was expecting that we'd have positive tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark can never write out interval type, do we really need to support interval type at read path?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for ^

@gengliangwang
Copy link
Member Author

@maropu @gatorsmile

@SparkQA
Copy link

SparkQA commented Jun 29, 2018

Test build #92459 has finished for PR 21667 at commit 7fdf603.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Jun 29, 2018

retest this please

*
* By default all data types are supported except [[CalendarIntervalType]] in write path.
*/
def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, shouldn't we better whitelist them rather then blacklist?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blacklist is easier.
With whitelist , we will have to validate

BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType |
          StringType | BinaryType | DateType | TimestampType | DecimalType

Of course we can have a default function to process these. But if we add a new data source which didn't support all of them, the implementation will be verbose.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be easier to write but it doesn't consider if we happened to have some more types on the other hand. It should better be explicit on what we support on the other hand.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote CSV's with whitelisting before per @hvanhovell's comment long time ago. I was (am still) okay either way but might be good to leave a cc for him.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the whilelist, too. As @HyukjinKwon said, if someone implements a new type, the blacklist pass through it...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whitelist for all file formats is behavior change. There are external file sources like https://github.com/databricks/spark-avro , which we probably have to update the code to make it compatible.

Currently exceptions are thrown in buildReader / buildReaderWithPartitionValues/ prepareWrite for unsupported types. New types are handled.

So overall I prefer blacklist.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant the cases in the match in each implementation within Spark. I didn't mean about the semantic about the API itself.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Sorry I don't understand. Do you mean the default case is not supported?

case _  =>  false

But how to make all the external formats work?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I meant, leaving the default case true

def supportDataType(...): Boolean = dataType match {
  case _ => true
}

and whitelist each type within each implementation, for example, in CSVFileFormat.scala

def supportDataType(...) ...
    case _: StringType | ... => true
    case _ => false

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blacklist is easier, but whitelist is safer.

@SparkQA
Copy link

SparkQA commented Jun 29, 2018

Test build #92465 has finished for PR 21667 at commit 7fdf603.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

@hvanhovell

@@ -306,6 +306,7 @@ case class FileSourceScanExec(
}

private lazy val inputRDD: RDD[InternalRow] = {
DataSourceUtils.verifyReadSchema(relation.fileFormat, relation.dataSchema)
Copy link
Member

@maropu maropu Jul 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some formats, is this verification applied two times, right?
ok, you removed the verification in each format implementation ;)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this change. This is very late(just before execution), is there a better place for this check that happens at analysis phase?

Copy link
Contributor

@cloud-fan cloud-fan Jul 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW why do we need to check schema at read path? For user-specified schema?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for user-specified schema.

@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSON
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.types._
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we employ the blacklist, I think it'd be better that you don't fold these imports.

*
* By default all data types are supported except [[CalendarIntervalType]] in write path.
*/
def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, do we really need this API? All what it does it is just to check the type and throw an exception.

throw new UnsupportedOperationException(
s"$format data source does not support ${dataType.simpleString} data type.")
}
dataType match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait .. why we do the recursive thing here? What if the top level type is supported but nested is not? For example, Arrow integration in Spark doesn't currently support nested timestamp conversion for localization issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for general purpose, so that developer can skip matching arrays/maps/structs.
I don't know about nested timestamp, but we can override supportDataType to make sure the case is unsupported, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea.. then this code bit is not really general purpose anymore ... developers should check the codes inside and see if the nested types are automatically checked or not ..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know..But if developers didn't read inside and process the case of arrays/maps/structs, the code should still work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, this code bit becomes rather obsolete .. To me Spark's dev API is too difficult for me to understand :-) .. Personally, I don't like to be too clever when it comes to API thing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's tricky to rely on 2 places to correctly determine the unsupported type. format.supportDataType should handle complex types themselves, to make the code clearer and easier to maintain.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I will update it.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jul 2, 2018

Reading it again closely, I am actually not super happy on the proposal about introducing API change if the purpose of this is just to check the type and throw an exception. Apparently, it looks so. I am less sure how useful it is by looking the current change. It reduces the size of codes because it blacklists. I would suggest to make the API change separate with this PR.

/**
* Returns whether this format supports the given [[DataType]] in read/write path.
*
* By default all data types are supported except [[CalendarIntervalType]] in write path.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, CalendarIntervalType isn't completely public yet .. cc @cloud-fan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, it's not by default, we can't write out interval type. This check is in DataSource.planForWriting

@gengliangwang
Copy link
Member Author

I agree that making it an API is a bit over.
But current there are problems(bug) as I listed in PR description.
Maybe we can create another separate Trait?

@HyukjinKwon
Copy link
Member

The fixes about the bug look all okay but the API thing. Mind if I ask to proceed separately for the API change if that's possible?

@gengliangwang
Copy link
Member Author

Sure, I am actually OK if we can have a different approach other than API.

@gengliangwang gengliangwang changed the title [SPARK-24691][SQL]Add new API supportDataType in FileFormat [SPARK-24691][SQL]Dispatch the type support check in FileFormat implementation Jul 3, 2018
* json -> W: Interval
* orc -> W: Interval, Null
* parquet -> R/W: Interval, Null
* in a driver side.
Copy link
Contributor

@cloud-fan cloud-fan Jul 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileFormat is internal so it's nothing about public API, but just about design choice.

Generally it's ok to have a central place to put some business logic for different cases. However, here we can't access all FileFormat implementations, Hive ORC is in Hive module. Now the only choice is: dispatch the business logic into implementations.

So +1 on the approach taken by this PR.

* If the [[DataType]] is not supported, an exception will be thrown.
* By default all data types are supported.
*/
def validateDataType(dataType: DataType, isReadPath: Boolean): Unit = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to return boolean here, and let the caller side to throw exception, so that we can unify the error message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that was what I did in first commit. If the unsupported type is inside struct/array, then the error message is not accurate as the current way.
I am OK with revert to return Boolean though.

@@ -148,6 +144,28 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
override def hashCode(): Int = getClass.hashCode()

override def equals(other: Any): Boolean = other.isInstanceOf[JsonFileFormat]

override def validateDataType(dataType: DataType, isReadPath: Boolean): Unit = dataType match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

the base class
def validateDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match {
  case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType |
    StringType | BinaryType | DateType | TimestampType | _: DecimalType => true
  case _ => false
}

json
override def validateDataType(dataType: DataType, isReadPath: Boolean): Boolean = {
  case st: StructType => st.forall { f => validateDataType(f.dataType, isReadPath) }
  case ArrayType...
  ...
  case other => super.validateDataType(other)
}


Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then the base class could break other existing file formats, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean by break? If people use internal API(like avro), they are responsible to update their code for the internal API changes in new Spark releases.

@gengliangwang
Copy link
Member Author

@HyukjinKwon @maropu I have updated the code. It is now using whitelist.
@cloud-fan Thanks for the review and +1

@SparkQA
Copy link

SparkQA commented Jul 3, 2018

Test build #92569 has finished for PR 21667 at commit 5c590eb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 4, 2018

Test build #92611 has finished for PR 21667 at commit 34134f1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match {
case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType |
StringType | BinaryType | DateType | TimestampType | _: DecimalType => true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it just case _: AtomicType => true?

@@ -141,6 +136,11 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
}
}
}

override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataType == StringType

* Returns whether this format supports the given [[DataType]] in read/write path.
* By default all data types are supported.
*/
def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who does not overwrite it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HiveFileFormat. Currently I don't know Hive well. If someone can override it for HiveFileFormat, please create a follow up PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then why not remove this default implementation and create HiveFileFormat#supportDataType to return true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we also need to update LibSVMFileFormat, and several file format in unit test. I really prefer to have a default behavior here, as FileFormat can still work without the new method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense


case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath)

case _: NullType => true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why JSON supports null type but CSV doesn't?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently null type is not handled in UnivocityParser

@cloud-fan
Copy link
Contributor

My major concern is when to apply this check. Ideally this should happen during analysis.

@SparkQA
Copy link

SparkQA commented Jul 5, 2018

Test build #92639 has finished for PR 21667 at commit 44cf265.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

@cloud-fan I can understand you concern. But I can't find better entries. The entry in FileFormatWriter is the only one entry for every write action, otherwise we have to add the check in multiple places. The same for read path.

@gengliangwang
Copy link
Member Author

retest this please.

@SparkQA
Copy link

SparkQA commented Jul 6, 2018

Test build #92686 has finished for PR 21667 at commit 7266611.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

retest this please.

@SparkQA
Copy link

SparkQA commented Jul 7, 2018

Test build #92711 has finished for PR 21667 at commit 7266611.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

retest this please.

override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match {
case _: AtomicType => true

case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Interval type
var schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil)
spark.range(1).write.format(format).mode("overwrite").save(tempDir)
spark.read.schema(schema).format(format).load(tempDir).collect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when the user-specified schema doesn't match the physical schema, the behavior is undefined. So I don't think this is about backward compatibility, +1 to forbid interval type.

@SparkQA
Copy link

SparkQA commented Jul 11, 2018

Test build #92864 has finished for PR 21667 at commit 757b82a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 11, 2018

Test build #92863 has finished for PR 21667 at commit 757b82a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 11, 2018

Test build #92867 has finished for PR 21667 at commit 9ed3a7d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 12, 2018

Test build #92913 has finished for PR 21667 at commit 13de60e.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jul 12, 2018

Test build #92930 has finished for PR 21667 at commit 13de60e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in e6c6f90 Jul 12, 2018
dongjoon-hyun pushed a commit that referenced this pull request Jan 27, 2019
…ld be consistent

## What changes were proposed in this pull request?

1. Remove parameter `isReadPath`. The supported types of read/write should be the same.

2. Disallow reading `NullType` for ORC data source. In #21667 and #21389, it was supposed that ORC supports reading `NullType`, but can't write it. This doesn't make sense. I read docs and did some tests. ORC doesn't support `NullType`.

## How was this patch tested?

Unit tset

Closes #23639 from gengliangwang/supportDataType.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…ld be consistent

## What changes were proposed in this pull request?

1. Remove parameter `isReadPath`. The supported types of read/write should be the same.

2. Disallow reading `NullType` for ORC data source. In apache#21667 and apache#21389, it was supposed that ORC supports reading `NullType`, but can't write it. This doesn't make sense. I read docs and did some tests. ORC doesn't support `NullType`.

## How was this patch tested?

Unit tset

Closes apache#23639 from gengliangwang/supportDataType.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants