Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SHALLOW CLONE for Delta Lake #1505

Closed
wants to merge 5 commits into from

Conversation

jackierwzhang
Copy link
Contributor

@jackierwzhang jackierwzhang commented Dec 1, 2022

Description

This PR introduces support for SHALLOW CLONE for both Delta and Parquet table on Delta Lake, specifically the following command:
CREATE [OR REPLACE] TABLE [IF NOT EXISTS] target SHALLOW CLONE source [VERSION AS OF version | TIMESTAMP AS OF timestamp] [TBLPROPERTIES clause] [LOCATION path]

This enables the following use cases:

  1. Create a target table with Delta log pointing to the files from the source table. The source table can be either a Delta table or a Parquet table.

You may also specify a custom path to create as an external table in a path, a clause for additional table properties to append to, or a version to create the target table as a time-travelled version of the source table (if is Delta).

E.g. CREATE TABLE target SHALLOW CLONE source

  1. Replace/restore a table by shallow-cloning itself. This requires the table to be empty by the time of cloning to avoid data duplication.

E.g. REPLACE TABLE source SHALLOW CLONE source VERSION AS OF 1

How was this patch tested?

Unit tests.

Does this PR introduce any user-facing changes?

No.

core/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 Outdated Show resolved Hide resolved
with DeltaColumnMappingTestUtils
{
// scalastyle:off argcount
override protected def cloneTable(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a Scala/Python API planned in addition to the SQL API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Planned, but not yet available. It can be a follow up.

output = CloneTableCommand.output)

// Non-delta metastore table already exists at target
case LogicalRelation(_, _, existingCatalog @ Some(catalog), _) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we do if there is a non-delta table exists in catalog which is same as the clone target table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would depend on the SaveMode, e.g. here

protected def getOutputSeq(operationMetrics: Map[String, Long]): Seq[Row]


protected val fnfExceptionMessage: String = "Tried to clone a version of the table where " +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this used? Do we check for the existence of files in source table before the clone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah we do, but this error message is not useful any more.

checkColumnMappingMode(newMetadata, metadataToUpdate)
// 3. Checks for column mapping mode conflicts with existing metadata if there's any
if (txn.readVersion >= 0) {
checkColumnMappingMode(txn.snapshot.metadata, metadataToUpdate)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to check for column mapping mode when we are replacing the target table? Also is there a test for it, in case we need to check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah because we allow changing table properties during replace, this checks against that.

@jackierwzhang jackierwzhang requested review from vkorukanti and removed request for scottsand-db December 5, 2022 23:43
Copy link
Collaborator

@vkorukanti vkorukanti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

One test for checking the target table history after clone, to make sure all op metrics are correct, seems to be missing. If it exists already, you can ignore the comment. If it is missing, you can post that in a separate PR.

@felipepessoto
Copy link
Contributor

@jackierwzhang, @vkorukanti, it seems the config name doesn't follow the same pattern used by recent features, should we change it?

clone.replaceEnabled -> clone.replace.enabled?

Example:

val DELTA_CONVERT_ICEBERG_ENABLED =
buildConf("convert.iceberg.enabled")
.internal()
.doc("If enabled, Iceberg tables can be converted into a Delta table.")
.booleanConf
.createWithDefault(true)

val DELTA_CONVERT_ICEBERG_PARTITION_EVOLUTION_ENABLED =
buildConf("convert.iceberg.partitionEvolution.enabled")
.doc("If enabled, support conversion of iceberg tables experienced partition evolution.")
.booleanConf
.createWithDefault(false)

allisonport-db pushed a commit that referenced this pull request Dec 16, 2022
As a followup to the SHALLOW CLONE [support](#1505) for Delta Lake, it would be great if we could enable SHALLOW CLONE on an Iceberg table as well. This will be a CLONVERT (CLONE + CONVERT) operation, in which we will create a Delta catalog table with files pointing to the original Iceberg table in one transaction.

1. It allows users to quickly experiment with Delta Lake without modifying the original Iceberg table's data.
2. It simplifies the user flow by combining a Delta catalog table creation with an Iceberg conversion.

Similar to SHALLOW CLONE, it will work as follows:

1. Clone a Iceberg catalog table (after the setup [here](https://iceberg.apache.org/docs/latest/getting-started/))

```
CREATE TABLE [IF NOT EXISTS] delta SHALLOW CLONE iceberg.db.table [TBLPROPERTIES clause] [LOCATION path]
```

2. Clone a path-based Iceberg table

```
CREATE TABLE [IF NOT EXISTS] delta SHALLOW CLONE iceberg.`/path/to/iceberg/table`[TBLPROPERTIES clause] [LOCATION path]
```

Closes #1522

New unit tests.

No.

GitOrigin-RevId: e01994e037cf44e06f4ef3d6f185f5925dd77e48
}

override def run(sparkSession: SparkSession): Seq[Row] = {
if (!targetPath.isAbsolute) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jackierwzhang, do you remember why we need to validate if it is an absolute path?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants