New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28666] Support saveAsTable for V2 tables through Session Catalog #25402
Conversation
Test build #108893 has finished for PR 25402 at commit
|
Test build #108897 has finished for PR 25402 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good
val session = df.sparkSession | ||
val useV1Sources = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicated code with save
, possible to have a function?
import org.apache.spark.sql.execution.SQLExecution | ||
import org.apache.spark.sql.execution.command.DDLUtils | ||
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation} | ||
import org.apache.spark.sql.execution.datasources.v2._ | ||
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode | ||
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister} | ||
import org.apache.spark.sql.sources.BaseRelation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: duplicated import?
Test build #108985 has finished for PR 25402 at commit
|
import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedAttribute} | ||
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dammit IntelliJ :(
cc @cloud-fan @rdblue @jzhuge This is ready for review now. |
Test build #108992 has finished for PR 25402 at commit
|
Test build #108996 has finished for PR 25402 at commit
|
Test build #108997 has finished for PR 25402 at commit
|
Test build #108998 has finished for PR 25402 at commit
|
@@ -172,7 +173,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog { | |||
/** | |||
* An implementation of catalog v2 [[Table]] to expose v1 table metadata. | |||
*/ | |||
case class CatalogTableAsV2(v1Table: CatalogTable) extends Table { | |||
case class CatalogTableAsV2(v1Table: CatalogTable) extends UnresolvedTable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CatalogTable
is defined in sql unfortunately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's defined in catalyst: org.apache.spark.sql.catalyst.catalog.CatalogTable
in file sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I like that a lot more
verifyTable(t1, df) | ||
|
||
// Check that appends are by name | ||
df.select('data, 'id).write.format(v2Format).mode("append").saveAsTable(t1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, in DS v1, saveAsTable
fails if the table exists, but the table provider is different from the one specified in df.write.format
. Do we have this check in the v2 code path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add a test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the provider isn't necessarily exposed by the table API, I'm not sure if such a check is required/possible.
Test build #109056 has finished for PR 25402 at commit
|
Test build #109065 has finished for PR 25402 at commit
|
maybeCatalog.orElse(sessionCatalog) | ||
.flatMap(loadTable(_, ident)) | ||
.map(DataSourceV2Relation.create) | ||
.getOrElse(u) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A +1 on this.
@@ -493,8 +494,12 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn | |||
|
|||
sparkSession.sql(s"CREATE TABLE table_name USING parquet AS SELECT id, data FROM source") | |||
|
|||
// use the catalog name to force loading with the v2 catalog | |||
checkAnswer(sparkSession.sql(s"TABLE session.table_name"), sparkSession.table("source")) | |||
checkAnswer(sparkSession.sql(s"TABLE default.table_name"), sparkSession.table("source")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can maintain this behavior, but I'd rather not, as the V2SessionCatalog can't properly handle views and such
Test build #109081 has finished for PR 25402 at commit
|
Test build #109080 has finished for PR 25402 at commit
|
Test build #109084 has finished for PR 25402 at commit
|
val session = df.sparkSession | ||
val provider = DataSource.lookupDataSource(source, session.sessionState.conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the provider
here may not be the actual table provider, as saveAsTable
can write to an existing table. Maybe we should always use v2 session catalog?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That works for me. Since the V2 code path will fallback to the V1 code path if it sees an UnresolvedTable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm. actually that causes issues if the table doesn't exist. Maybe we should use the statements instead of the logical plans?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on using statements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's do it in a followup.
Test build #109102 has finished for PR 25402 at commit
|
Test build #109114 has finished for PR 25402 at commit
|
thanks, merging to master! Please address #25402 (comment) in a followup. |
Thanks @cloud-fan! |
@brkyvz Scala source file |
What changes were proposed in this pull request?
We add support for the V2SessionCatalog for saveAsTable, such that V2 tables can plug in and leverage existing DataFrameWriter.saveAsTable APIs to write and create tables through the session catalog.
How was this patch tested?
Unit tests. A lot of tests broke under hive when things were not working properly under
ResolveTables
, therefore I believe the current set of tests should be sufficient in testing the table resolution and read code paths.