-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19092] [SQL] Save() API of DataFrameWriter should not scan all the saved files #16481
Conversation
// check the cache hit, we use the metric of METRIC_FILES_DISCOVERED and | ||
// METRIC_PARALLEL_LISTING_JOB_COUNT to check this, while the lock take effect, | ||
// only one thread can really do the build, so the listing job count is 2, the other | ||
// one is cache.load func. Also METRIC_FILES_DISCOVERED is $partition_num * 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is not accurate. The extra counts are from the save API call in setupPartitionedHiveTable
.
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() | ||
if (isForWriteOnly) { | ||
// Exit earlier and return null | ||
null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not know whether returning null is ok here. This is based on a similar early-exit solution used in getOrInferFileFormatSchema
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can change it to return an option?
cc @ericl @cloud-fan |
I think it's time to think about why |
spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write | ||
.partitionBy("partCol1", "partCol2") | ||
.mode("overwrite") | ||
.parquet(dir.getAbsolutePath) | ||
|
||
if (clearMetricsBeforeCreate) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice.
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() | ||
if (isForWriteOnly) { | ||
// Exit earlier and return null | ||
null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can change it to return an option?
Test build #70946 has finished for PR 16481 at commit
|
@cloud-fan I did a try in that direction, but I am afraid it might break the external data source that extends |
Test build #70956 has finished for PR 16481 at commit
|
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it. | ||
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() | ||
if (isForWriteOnly) { | ||
// Exit earlier and return null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd remove "and return null"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
Test build #71026 has finished for PR 16481 at commit
|
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { | ||
throw new AnalysisException("Cannot save interval data type into external storage.") | ||
} | ||
|
||
providingClass.newInstance() match { | ||
case dataSource: CreatableRelationProvider => | ||
dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data) | ||
Some(dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be really weird if CreatableRelationProvider.createRelation
can return a relation with different schema from the written data
. Is it safe to assume the schema won't change? cc @marmbrus @yhuai @liancheng
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can set a parameter here, let user to choose true or false, default is not refresh schema
* Writes the given [[DataFrame]] out to this [[DataSource]]. | ||
* | ||
* @param isForWriteOnly Whether to just write the data without returning a [[BaseRelation]]. | ||
*/ | ||
def write( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's create a new write
method that returns Unit
, and rename this write
to writeAndRead
, which should be removed eventually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Will do it.
mode = mode, | ||
catalogTable = catalogTable, | ||
fileIndex = fileIndex) | ||
sparkSession.sessionState.executePlan(plan).toRdd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To the reviewers, the code in writeInFileFormat
is copied from the case FileFormat
of the original write
function.
Test build #71279 has finished for PR 16481 at commit
|
LGTM, merging to master! It conflicts with branch-2.1, can you send a new PR? thanks |
I'll update JIRA once the service is back. |
Sure, will do it. |
… not scan all the saved files #16481 ### What changes were proposed in this pull request? #### This PR is to backport #16481 to Spark 2.1 --- `DataFrameWriter`'s [save() API](https://github.com/gatorsmile/spark/blob/5d38f09f47a767a342a0a8219c63efa2943b5d1f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L207) is performing a unnecessary full filesystem scan for the saved files. The save() API is the most basic/core API in `DataFrameWriter`. We should avoid it. ### How was this patch tested? Added and modified the test cases Author: gatorsmile <gatorsmile@gmail.com> Closes #16588 from gatorsmile/backport-19092.
…the saved files ### What changes were proposed in this pull request? `DataFrameWriter`'s [save() API](https://github.com/gatorsmile/spark/blob/5d38f09f47a767a342a0a8219c63efa2943b5d1f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L207) is performing a unnecessary full filesystem scan for the saved files. The save() API is the most basic/core API in `DataFrameWriter`. We should avoid it. The related PR: apache#16090 ### How was this patch tested? Updated the existing test cases. Author: gatorsmile <gatorsmile@gmail.com> Closes apache#16481 from gatorsmile/saveFileScan.
…the saved files ### What changes were proposed in this pull request? `DataFrameWriter`'s [save() API](https://github.com/gatorsmile/spark/blob/5d38f09f47a767a342a0a8219c63efa2943b5d1f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L207) is performing a unnecessary full filesystem scan for the saved files. The save() API is the most basic/core API in `DataFrameWriter`. We should avoid it. The related PR: apache#16090 ### How was this patch tested? Updated the existing test cases. Author: gatorsmile <gatorsmile@gmail.com> Closes apache#16481 from gatorsmile/saveFileScan.
## What changes were proposed in this pull request? As the discussion in #16481 and #18975 (comment) Currently the BaseRelation returned by `dataSource.writeAndRead` only used in `CreateDataSourceTableAsSelect`, planForWriting and writeAndRead has some common code paths. In this patch I removed the writeAndRead function and added the getRelation function which only use in `CreateDataSourceTableAsSelectCommand` while saving data to non-existing table. ## How was this patch tested? Existing UT Author: Yuanjian Li <xyliyuanjian@gmail.com> Closes #19941 from xuanyuanking/SPARK-22753.
What changes were proposed in this pull request?
DataFrameWriter
's save() API is performing a unnecessary full filesystem scan for the saved files. The save() API is the most basic/core API inDataFrameWriter
. We should avoid it.The related PR: #16090
How was this patch tested?
Updated the existing test cases.