-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19058][SQL] fix partition related behaviors with DataFrameWriter.saveAsTable #16460
Conversation
Test build #70815 has finished for PR 16460 at commit
|
options = options, | ||
query = data.logicalPlan, | ||
mode = mode, | ||
catalogTable = catalogTable) | ||
catalogTable = catalogTable, | ||
fileIndex = fileIndex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be issuing refresh table instead of refreshing the index directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was following the original behavior: https://github.com/apache/spark/pull/16460/files#diff-d99813bd5bbc18277e4090475e4944cfL240
Besides, it's hard to issue refresh table at DataSourceAnalysis
. The table could be a temp view, and the CatalogTable
in LogicalRelation
could be empty. Then we lose the table identifier and can not issue refresh table.
createTable(tableIdent) | ||
createTable(tableIdentWithDB) | ||
// Refresh the cache of the table in the catalog. | ||
catalog.refreshTable(tableIdentWithDB) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this already done by the insertion command?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved it from insertion command to here, as we only need to refresh the table for overwrite. For append, we only need to refresh the FileIndex
@@ -74,12 +69,29 @@ case class InsertIntoHadoopFsRelationCommand( | |||
val fs = outputPath.getFileSystem(hadoopConf) | |||
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) | |||
|
|||
val partitionsTrackedByCatalog = catalogTable.isDefined && | |||
catalogTable.get.partitionColumnNames.nonEmpty && | |||
catalogTable.get.tracksPartitionsInCatalog |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also check sparkSession.sessionState.conf.manageFilesourcePartitions
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is something I wanna check with @ericl . What if users create a table with partition management, then turn it off, and read this table? If we treat this table as normal table, then the data in custom partition path will be ignored.
I think we should respect the partition management flag when the table was created, not when the table is read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, in other parts of the code we assume that the feature is completely disabled when the flag is off. This is probably needed since there is no way to revert a table otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean we should completely ignore the partition information in metastore, when the flag is off, so that we should also ignore the data in custom partition path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I think we should revert to 2.0 behavior as if querying the table from 2.0
fs: FileSystem, | ||
qualifiedOutputPath: Path, | ||
partitions: Seq[CatalogTablePartition]): Map[TablePartitionSpec, String] = { | ||
val table = catalogTable.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we pass catalogTable
as a function parm? .get
looks a little bit risky.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea good idea
partitionColumns = columns, | ||
bucketSpec = bucketSpec, | ||
fileFormat = format, | ||
refreshFunction = _ => Unit, // No existing table needs to be refreshed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, in this case, we do not call refreshPartitionsCallback
. After this PR, we always refresh it. Is my understanding right?
How did it work without this PR changes? Does that mean we just rely on Hive to implicitly call AlterTableAddPartitionCommand
/createPartition
when the existing table does not exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, we did not refresh anything here, but we will repair the partitions in CreateDataSourceTableAsSelectCommand
. After this PR, we only repair the partitions in CreateDataSourceTableAsSelectCommand
when we are creating a new table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But, datasource.write
is also called by DataFrameWriter
's save(). Thus, it is not covered by CreateDataSourceTableAsSelectCommand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm. It does not store the metadata in catalog.
Test build #70879 has finished for PR 16460 at commit
|
catalogTable.get.tracksPartitionsInCatalog | ||
|
||
var initialMatchingPartitions: Seq[TablePartitionSpec] = Nil | ||
var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds like we do not need to use var
for initialMatchingPartitions
and customPartitionLocations
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea it's true, but then the code may looks ugly, e.g.
val (longVariableNameXXXX: LongTypeNameXXX, longVariableNameXXXX: LongTypeNameXXX) = {
...
}
LGTM except one minor comment. |
cc @ericl anymore comments on this PR? |
looks good |
thanks for the view, merging to master! |
…er.saveAsTable ## What changes were proposed in this pull request? When we append data to a partitioned table with `DataFrameWriter.saveAsTable`, there are 2 issues: 1. doesn't work when the partition has custom location. 2. will recover all partitions This PR fixes them by moving the special partition handling code from `DataSourceAnalysis` to `InsertIntoHadoopFsRelationCommand`, so that the `DataFrameWriter.saveAsTable` code path can also benefit from it. ## How was this patch tested? newly added regression tests Author: Wenchen Fan <wenchen@databricks.com> Closes apache#16460 from cloud-fan/append.
…er.saveAsTable ## What changes were proposed in this pull request? When we append data to a partitioned table with `DataFrameWriter.saveAsTable`, there are 2 issues: 1. doesn't work when the partition has custom location. 2. will recover all partitions This PR fixes them by moving the special partition handling code from `DataSourceAnalysis` to `InsertIntoHadoopFsRelationCommand`, so that the `DataFrameWriter.saveAsTable` code path can also benefit from it. ## How was this patch tested? newly added regression tests Author: Wenchen Fan <wenchen@databricks.com> Closes apache#16460 from cloud-fan/append.
What changes were proposed in this pull request?
When we append data to a partitioned table with
DataFrameWriter.saveAsTable
, there are 2 issues:This PR fixes them by moving the special partition handling code from
DataSourceAnalysis
toInsertIntoHadoopFsRelationCommand
, so that theDataFrameWriter.saveAsTable
code path can also benefit from it.How was this patch tested?
newly added regression tests