Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The result is strange when casting string to date in ORC reading via spark (Schema Evolution) #1237

Closed
sinkinben opened this issue Aug 28, 2022 · 6 comments
Labels

Comments

@sinkinben
Copy link

sinkinben commented Aug 28, 2022

I created an ORC file by the code as follows.

val data = Seq(
    ("", "2022-01-32"),  // pay attention to this, null
    ("", "9808-02-30"),  // pay attention to this, 9808-02-29
    ("", "2022-06-31"),  // pay attention to this, 2022-06-30

)
val cols = Seq("str", "date_str")
val df=spark.createDataFrame(data).toDF(cols:_*).repartition(1)
df.printSchema()
df.show(100)
df.write.mode("overwrite").orc("/tmp/orc/data.orc")

Please note that these three cases are invalid date.
And I read it via:

scala> var df = spark.read.schema("date_str date").orc("/tmp/orc/data.orc"); df.show()
+----------+
|  date_str|
+----------+
|      null|
|9808-02-29|
|2022-06-30|
+----------+

Why is 2022-01-32 converted to null, while 9808-02-30 is converted to 9808-02-29?

Intuitively, they are invalid date, we should return 3 nulls.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Aug 29, 2022

Hi, @sinkinben . You are trying Schema Evolution (Upcasting).

Both Apache Spark and ORC community recommend to use explicit SQL CAST method instead of depending on data source's Schema Evolution. There are three reasons.

  • First of all, if you use explicit CAST syntax, you will get the expected result.
scala> sql("select cast('2022-01-32' as DATE)").show()
+------------------------+
|CAST(2022-01-32 AS DATE)|
+------------------------+
|                    null|
+------------------------+


scala> sql("select cast('9808-02-30' as DATE)").show()
+------------------------+
|CAST(9808-02-30 AS DATE)|
+------------------------+
|                    null|
+------------------------+


scala> sql("select cast('2022-06-31' as DATE)").show()
+------------------------+
|CAST(2022-06-31 AS DATE)|
+------------------------+
|                    null|
+------------------------+
  • Second, Spark provides many data sources like CSV/Avro/Parquet/ORC. A data source's schema evolution capability is heterogeneous from each others. In other words, you cannot expect a consistent result when you change the file-based data source format. You will get different results from other data sources like Parquet. FYI, Apache Spark community has a test coverage for that feature parity issue and has been tracking it.

https://github.com/apache/spark/blob/146f187342140635b83bfe775b6c327755edfbe1/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaTest.scala#L40-L49

 * The reader schema is said to be evolved (or projected) when it changed after the data is
 * written by writers. The followings are supported in file-based data sources.
 * Note that partition columns are not maintained in files. Here, `column` means non-partition
 * column.
 *
 *   1. Add a column
 *   2. Hide a column
 *   3. Change a column position
 *   4. Change a column type (Upcast)
 *
 * Here, we consider safe changes without data loss. For example, data type changes should be
 * from small types to larger types like `int`-to-`long`, not vice versa.
 *
 * So far, file-based data sources have the following coverages.
 *
 *   | File Format  | Coverage     | Note                                                   |
 *   | ------------ | ------------ | ------------------------------------------------------ |
 *   | TEXT         | N/A          | Schema consists of a single string column.             |
 *   | CSV          | 1, 2, 4      |                                                        |
 *   | JSON         | 1, 2, 3, 4   |                                                        |
 *   | ORC          | 1, 2, 3, 4   | Native vectorized ORC reader has the widest coverage.  |
 *   | PARQUET      | 1, 2, 3      |                                                        |
 *   | AVRO         | 1, 2, 3      |                                                        |
  • Last but not least, Apache Spark has three ORC readers. For your use case, you can set spark.sql.orc.impl=hive to get a correct result if you really need to depend on Apache ORC's Schema Evolution inevitably.
scala> sql("set spark.sql.orc.impl=hive")

scala> :paste
// Entering paste mode (ctrl-D to finish)

val data = Seq(
    ("", "2022-01-32"),  // pay attention to this, null
    ("", "9808-02-30"),  // pay attention to this, 9808-02-29
    ("", "2022-06-31"),  // pay attention to this, 2022-06-30
)
val cols = Seq("str", "date_str")
val df = spark.createDataFrame(data).toDF(cols:_*).repartition(1)
df.write.format("orc").mode("overwrite").save("/tmp/df")
spark.read.format("orc").schema("date_str date").load("/tmp/df").show(false)

// Exiting paste mode, now interpreting.

+--------+
|date_str|
+--------+
|null    |
|null    |
|null    |
+--------+

@dongjoon-hyun
Copy link
Member

Since there is a recommended way, I'll close this Q&A issue. We can still continue to discuss on this thread, @sinkinben .

@dongjoon-hyun dongjoon-hyun changed the title The result is strange when casting string to date in ORC reading via spark. The result is strange when casting string to date in ORC reading via spark (Schema Evolution) Aug 29, 2022
@sinkinben
Copy link
Author

Hi, @dongjoon-hyun , many thx for you reply.

I have made more tests after I set the conf spark.sql.orc.impl.

scala> :paste
// Entering paste mode (ctrl-D to finish)

val data = Seq(
    ("", "2002-01-01"),
    ("", "2022-08-29"),
    ("", "2022-08-31")
)
val cols = Seq("str", "date_str")
val df=spark.createDataFrame(data).toDF(cols:_*).repartition(1)
df.printSchema()
df.show(100)
df.write.mode("overwrite").orc("/tmp/orc/data.orc")

// Exiting paste mode, now interpreting.

root
 |-- str: string (nullable = true)
 |-- date_str: string (nullable = true)

+---+----------+
|str|  date_str|
+---+----------+
|   |2002-01-01|
|   |2022-08-29|
|   |2022-08-31|
+---+----------+

data: Seq[(String, String)] = List(("",2002-01-01), ("",2022-08-29), ("",2022-08-31))
cols: Seq[String] = List(str, date_str)
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [str: string, date_str: string]

scala> sql("set spark.sql.orc.impl=hive")
res1: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> var df = spark.read.schema("date_str date").orc("/tmp/orc/data.orc"); df.show()
+--------+
|date_str|
+--------+
|    null|
|    null|
|    null|
+--------+

df: org.apache.spark.sql.DataFrame = [date_str: date]

These three cases are valid date, but why are they converted to nulls?

@dongjoon-hyun
Copy link
Member

  1. First, may I ask why you cannot follow the community recommendation (1), @sinkinben ?
  2. Second, for your question, that's an independent question because spark.sql.orc.impl=hive is not Apache ORC code. It's Apache Spark code before Apache ORC adoption. You can file an Apache Spark JIRA issue for further investigation in Apache Spark community. I guess it will goes to Apache Hive HiveDecimalWritable or related classes in the end.

@sinkinben
Copy link
Author

@dongjoon-hyun

  • I am working on the project: https://github.com/NVIDIA/spark-rapids
    • And I am working on a feature, that is to support reading ORC file as an cuDF (CUDA DataFrame). cuDF is an in-memory data-format of GPU.
    • So I need to follow the behaviors of ORC reading in CPU. Otherwise, the users of spark-rapids will feel strange with the results.
    • Therefore I want to know why those happpened.
  • Thanks for your advice, and I will have a look on ORC reading by hive-sql.

@dongjoon-hyun
Copy link
Member

Thank you for sharing the background. I know spark-rapids. However, reading ORC files is different from trying to schema evolution like String-to-Decimal.

And I am working on a feature, that is to support reading ORC file as an cuDF (CUDA DataFrame). cuDF is an in-memory data-format of GPU.

IMO, you don't need to follow any behaviors on Schema Evolution. It has been never robust nor fast. Not only in ORC, it's generally not good in other data formats too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants