-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20728][SQL] Make OrcFileFormat configurable between sql/hive and sql/core #19871
Conversation
@@ -363,6 +363,11 @@ object SQLConf { | |||
.checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo")) | |||
.createWithDefault("snappy") | |||
|
|||
val ORC_ENABLED = buildConf("spark.sql.orc.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about spark.sql.orc.useNewVersion
? Also let's make it an internal config and enable it by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure!
@@ -568,8 +570,13 @@ object DataSource extends Logging { | |||
"org.apache.spark.Logging") | |||
|
|||
/** Given a provider name, look up the data source class definition. */ | |||
def lookupDataSource(provider: String): Class[_] = { | |||
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) | |||
def lookupDataSource(sparkSession: SparkSession, provider: String): Class[_] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of passing the SparkSession
, I think we only need SQLConf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep.
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) | ||
def lookupDataSource(sparkSession: SparkSession, provider: String): Class[_] = { | ||
var provider1 = backwardCompatibilityMap.getOrElse(provider, provider) | ||
if (Seq("orc", "org.apache.spark.sql.hive.orc.OrcFileFormat").contains(provider1.toLowerCase) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"org.apache.spark.sql.hive.orc.OrcFileFormat" should still point to the old implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.
test("should fail to load ORC without Hive Support") { | ||
val e = intercept[AnalysisException] { | ||
spark.read.format("orc").load() | ||
test("should fail to load ORC only if spark.sql.orc.enabled=false and without Hive Support") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this test is replaced by https://github.com/apache/spark/pull/19871/files#diff-5a2e7f03d14856c8769fd3ddea8742bdR2788
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, those tests cover different cases.
- In this test:
true
-> Use new OrcFileFormat,false
-> Throw Exception (the existing behavior) - In that test:
true
-> Use new OrcFileFormat,false
-> Use old OrcFileFormat (the existing behavior).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that test also check the exception: https://github.com/apache/spark/pull/19871/files#diff-5a2e7f03d14856c8769fd3ddea8742bdR2790
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I confused with SQLQuerySuite.scala in hive. Sorry, I'll remove this.
@@ -363,6 +363,11 @@ object SQLConf { | |||
.checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo")) | |||
.createWithDefault("snappy") | |||
|
|||
val ORC_ENABLED = buildConf("spark.sql.orc.enabled") | |||
.doc("When true, use OrcFileFormat in sql/core module instead of the one in sql/hive module.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description should include the major difference of these two orc versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I'll elaborate more.
def lookupDataSource(sparkSession: SparkSession, provider: String): Class[_] = { | ||
var provider1 = backwardCompatibilityMap.getOrElse(provider, provider) | ||
if (Seq("orc", "org.apache.spark.sql.hive.orc.OrcFileFormat").contains(provider1.toLowerCase) && | ||
sparkSession.conf.get(SQLConf.ORC_ENABLED)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we get the conf from sessionState
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's done.
Test build #84409 has finished for PR 19871 at commit
|
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) | ||
def lookupDataSource(conf: SQLConf, provider: String): Class[_] = { | ||
var provider1 = backwardCompatibilityMap.getOrElse(provider, provider) | ||
if (Seq("orc").contains(provider1.toLowerCase) && conf.getConf(SQLConf.ORC_USE_NEW_VERSION)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"orc".equalsIgnoreCase(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh. Yep.
Thank you for review, @cloud-fan and @jiangxb1987 . |
def lookupDataSource(provider: String): Class[_] = { | ||
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) | ||
def lookupDataSource(conf: SQLConf, provider: String): Class[_] = { | ||
var provider1 = backwardCompatibilityMap.getOrElse(provider, provider) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of using var
, you can use the pattern match
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also add the maps for new orc format to backwardCompatibilityMap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Sure.
Test build #84413 has finished for PR 19871 at commit
|
Test build #84412 has finished for PR 19871 at commit
|
Test build #84411 has finished for PR 19871 at commit
|
@@ -2153,4 +2153,21 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { | |||
} | |||
} | |||
} | |||
|
|||
test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move it to OrcQuerySuite
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep.
sessionCatalog.metastoreCatalog.convertToLogicalRelation( | ||
relation, | ||
options, | ||
classOf[org.apache.spark.sql.hive.orc.OrcFileFormat], "orc") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Test build #84421 has finished for PR 19871 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we enable
ignore("LZO compression options for writing to an ORC file not supported in Hive 1.2.1") {
in OrcQuerySuite
too?
@@ -85,7 +87,8 @@ case class DataSource( | |||
|
|||
case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String]) | |||
|
|||
lazy val providingClass: Class[_] = DataSource.lookupDataSource(className) | |||
lazy val providingClass: Class[_] = | |||
DataSource.lookupDataSource(sparkSession.sessionState.conf, className) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd put this conf as the last argument actually if you wouldn't mind ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure!
val ORC_USE_NEW_VERSION = buildConf("spark.sql.orc.useNewVersion") | ||
.doc("When true, use new OrcFileFormat in sql/core module instead of the one in sql/hive. " + | ||
"Since new OrcFileFormat uses Apache ORC library instead of ORC library Hive 1.2.1, it is " + | ||
"more stable and faster.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tiny nit: let's take out more stable
..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for review, @HyukjinKwon .
Do you mean Apache ORC library is more stable, but new OrcFileFormat is not
because it's introduced newly?
Actually, that's true in the Spark's viewpoint, but new OrcFileFormat contains more bug fixes and new features too. If you allow, I want to keep this. :)
@@ -568,8 +574,12 @@ object DataSource extends Logging { | |||
"org.apache.spark.Logging") | |||
|
|||
/** Given a provider name, look up the data source class definition. */ | |||
def lookupDataSource(provider: String): Class[_] = { | |||
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) | |||
def lookupDataSource(conf: SQLConf, provider: String): Class[_] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After more thinking, I think it don't worth to pass the whole SQLConf into this function, we just need to know whether SQLConf.ORC_USE_NEW_VERSION
is enabled. WDYT @cloud-fan @gatorsmile ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, are you suggesting lookupDataSource(provider, useNewOrc=true)
, @jiangxb1987 ?
@HyukjinKwon , for enabling the following test, I'm restructuring ORC tests now. I'll make a PR today for that.
|
@@ -363,6 +363,14 @@ object SQLConf { | |||
.checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo")) | |||
.createWithDefault("snappy") | |||
|
|||
val ORC_USE_NEW_VERSION = buildConf("spark.sql.orc.useNewVersion") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.sql.orc.impl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem to change to it. But, since the name is given by @cloud-fan before, ping @cloud-fan .
"Since new OrcFileFormat uses Apache ORC library instead of ORC library Hive 1.2.1, it is " + | ||
"more stable and faster.") | ||
.internal() | ||
.booleanConf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.checkValues(Set("hive", "native"))
.createWithDefault("native")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep.
val ORC_USE_NEW_VERSION = buildConf("spark.sql.orc.useNewVersion") | ||
.doc("When true, use new OrcFileFormat in sql/core module instead of the one in sql/hive. " + | ||
"Since new OrcFileFormat uses Apache ORC library instead of ORC library Hive 1.2.1, it is " + | ||
"more stable and faster.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When
native
, use the native version of ORC support instead of the ORC library in Hive 1.2.1. It is by defaulthive
prior to Spark 2.3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
@@ -537,6 +540,7 @@ object DataSource extends Logging { | |||
val csv = classOf[CSVFileFormat].getCanonicalName | |||
val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat" | |||
val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat" | |||
val newOrc = classOf[OrcFileFormat].getCanonicalName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not use the name like newXYZ
. When the newer one was added, the name will be confusing.
How about nativeOrc
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. It sounds better.
Test build #84436 has finished for PR 19871 at commit
|
Test build #84439 has finished for PR 19871 at commit
|
Could you review this again, @cloud-fan and @gatorsmile ? |
case name if name.equalsIgnoreCase("orc") && | ||
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" => | ||
classOf[OrcFileFormat].getCanonicalName | ||
case name => name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if ORC_IMPLEMENTATION
is hive
, we leave the provider as it was, which may be orc
. Then we will hit Multiple sources found
issue, aren't we? Both the old and new orc has the same short name orc
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was looking at the exact same path. It seems not because it's not registered to ServiceLoader
(src/main/resources/org.apache.spark.sql.sources.DataSourceRegister
). So, short name for the newer ORC source would not be used here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan . To avoid that issue, new OrcFileFormat is not registered intentionally.
@HyukjinKwon 's comment is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds counter-intuitive, I think we should register the new orc instead of the old one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and also add comments here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for ^
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with both of you.
Just for explanation: The original design completely preserves the previous behavior.
Without SQLConf.ORC_IMPLEMENTATION
option, Spark doesn't know OrcFileFormat. So, in case of non-Hive support, creating data source with "orc" will fail with unknown data source.
Anyway, I'm happy to update according to your advice. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, there is no more The ORC data source must be used with Hive support enabled
.
If hive
impl is request in sql/core
, it will show more proper messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And for here, I added the following to prevent Multiple sources found
. Last time, I missed this way. My bad.
+ case name if name.equalsIgnoreCase("orc") &&
+ conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
+ "org.apache.spark.sql.hive.orc.OrcFileFormat"
Test build #84461 has finished for PR 19871 at commit
|
Test build #84460 has finished for PR 19871 at commit
|
Retest this please. |
LGTM |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM too
@@ -587,7 +601,8 @@ object DataSource extends Logging { | |||
if (provider1.toLowerCase(Locale.ROOT) == "orc" || | |||
provider1.startsWith("org.apache.spark.sql.hive.orc")) { | |||
throw new AnalysisException( | |||
"The ORC data source must be used with Hive support enabled") | |||
"Hive-based ORC data source must be used with Hive support enabled. " + | |||
"Please use native ORC data source instead") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should make this more actionable, saying spark.sql.orc.impl
should be set to native
explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon .
For this one, I made #19903.
test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") { | ||
Seq( | ||
("native", classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat]), | ||
("hive", classOf[org.apache.spark.sql.hive.orc.OrcFileFormat])).foreach { case (i, format) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i
=> orcImpl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this one, I will update #19882 .
I updated in my local and am running some tests.
Test build #84464 has finished for PR 19871 at commit
|
since this PR blocks the test moving PR, I'm merging to master now. @dongjoon-hyun please address @HyukjinKwon 's comments in your next PR. Thanks! |
Thank you, @cloud-fan and @HyukjinKwon . I'll address that in the next PR. |
@@ -553,6 +557,8 @@ object DataSource extends Logging { | |||
"org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet, | |||
"org.apache.spark.sql.hive.orc.DefaultSource" -> orc, | |||
"org.apache.spark.sql.hive.orc" -> orc, | |||
"org.apache.spark.sql.execution.datasources.orc.DefaultSource" -> nativeOrc, | |||
"org.apache.spark.sql.execution.datasources.orc" -> nativeOrc, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This map is for backward compatibility in case we move data sources around. I think this datasources.orc
is newly added. Why we need to add them here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good catch! sounds like we don't need compatibility rule for the new orc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like USING org.apache.spark.sql.hive.orc
, we want to use USING org.apache.spark.sql.execution.datasources.orc
, don't we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I received the advice, I thought it's for consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for safety. We also do it for parquet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For parquet, this is for historical reason, see #13311.
Previously you can use parquet by org.apache.spark.sql.execution.datasources.parquet
and org.apache.spark.sql.execution.datasources.parquet.DefaultSource
. So it is also for backward compatibility.
For this new orc, it is not the same case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should rename variable and/or fix the comments there when we touch some codes around there to prevent confusion next time though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
org.apache.spark.sql.execution.*
path is meant to be private too. But I think it's okay to leave it for practical use cases with some comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes are pretty safe. In case, we move the orc to the other location, it will still refer to the right location.
@@ -587,7 +601,8 @@ object DataSource extends Logging { | |||
if (provider1.toLowerCase(Locale.ROOT) == "orc" || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
provider1
can't be "orc" anymore. It can be only classOf[OrcFileFormat].getCanonicalName
or "org.apache.spark.sql.hive.orc.OrcFileFormat"
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I'll remove this, provider1.toLowerCase(Locale.ROOT) == "orc" ||
.
What changes were proposed in this pull request?
This PR aims to provide a configuration to choose the default
OrcFileFormat
from legacysql/hive
module or newsql/core
module.For example, this configuration will affects the following operations.
How was this patch tested?
Pass the Jenkins with new test suites.