Skip to content

Commit

Permalink
HIVE-28274: Iceberg: Add support for 'If Not Exists' and 'or Replace'…
Browse files Browse the repository at this point in the history
… for Create Branch.
  • Loading branch information
ayushtkn committed May 22, 2024
1 parent f0f6e03 commit 9e7e252
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,10 @@ public void alterTableSnapshotRefOperation(org.apache.hadoop.hive.ql.metadata.Ta
case CREATE_BRANCH:
AlterTableSnapshotRefSpec.CreateSnapshotRefSpec createBranchSpec =
(AlterTableSnapshotRefSpec.CreateSnapshotRefSpec) alterTableSnapshotRefSpec.getOperationParams();
if (createBranchSpec.isIfNotExists() &&
IcebergSnapshotRefExec.branchExists(icebergTable, createBranchSpec.getRefName())) {
return;
}
IcebergSnapshotRefExec.createBranch(icebergTable, createBranchSpec);
break;
case CREATE_TAG:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.util.Optional;
import org.apache.hadoop.hive.ql.parse.AlterTableSnapshotRefSpec;
import org.apache.iceberg.ManageSnapshots;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.util.SnapshotUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,6 +44,7 @@ private IcebergSnapshotRefExec() {
*/
public static void createBranch(Table table, AlterTableSnapshotRefSpec.CreateSnapshotRefSpec createBranchSpec) {
String branchName = createBranchSpec.getRefName();
boolean isReplace = createBranchSpec.isReplace() && branchExists(table, branchName);
Long snapshotId = null;
if (createBranchSpec.getSnapshotId() != null) {
snapshotId = createBranchSpec.getSnapshotId();
Expand All @@ -56,16 +59,35 @@ public static void createBranch(Table table, AlterTableSnapshotRefSpec.CreateSna
throw new IllegalArgumentException(String.format("Tag %s does not exist", tagName));
}
} else {
snapshotId = Optional.ofNullable(table.currentSnapshot()).map(snapshot -> snapshot.snapshotId()).orElse(null);
snapshotId = Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId).orElse(null);
}
ManageSnapshots manageSnapshots = table.manageSnapshots();
Transaction trans = null;
ManageSnapshots manageSnapshots;
if (snapshotId != null) {
LOG.info("Creating a branch {} on an iceberg table {} with snapshotId {}", branchName, table.name(), snapshotId);
manageSnapshots.createBranch(branchName, snapshotId);
manageSnapshots = createBranch(table, isReplace, branchName, snapshotId);
} else {
LOG.info("Creating a branch {} on an empty iceberg table {}", branchName, table.name());
manageSnapshots.createBranch(branchName);
if (isReplace) {
LOG.info("Replacing a branch {} on an empty iceberg table {}", branchName, table.name());
trans = table.newTransaction();
trans.manageSnapshots().removeBranch(branchName).commit();
manageSnapshots = trans.manageSnapshots();
manageSnapshots.createBranch(branchName);
} else {
LOG.info("Creating a branch {} on an empty iceberg table {}", branchName, table.name());
manageSnapshots = table.manageSnapshots();
manageSnapshots.createBranch(branchName);
}
}
setCreateBranchOptionalParams(createBranchSpec, manageSnapshots, branchName);

manageSnapshots.commit();
if (trans != null) {
trans.commitTransaction();
}
}

private static void setCreateBranchOptionalParams(AlterTableSnapshotRefSpec.CreateSnapshotRefSpec createBranchSpec,
ManageSnapshots manageSnapshots, String branchName) {
if (createBranchSpec.getMaxRefAgeMs() != null) {
manageSnapshots.setMaxRefAgeMs(branchName, createBranchSpec.getMaxRefAgeMs());
}
Expand All @@ -75,8 +97,23 @@ public static void createBranch(Table table, AlterTableSnapshotRefSpec.CreateSna
if (createBranchSpec.getMaxSnapshotAgeMs() != null) {
manageSnapshots.setMaxSnapshotAgeMs(branchName, createBranchSpec.getMaxSnapshotAgeMs());
}
}

manageSnapshots.commit();
private static ManageSnapshots createBranch(Table table, boolean isReplace, String branchName, Long snapshotId) {
ManageSnapshots manageSnapshots = table.manageSnapshots();
if (isReplace) {
LOG.info("Replacing branch {} on an iceberg table {} with snapshotId {}", branchName, table.name(), snapshotId);
manageSnapshots.replaceBranch(branchName, snapshotId);
} else {
LOG.info("Creating a branch {} on an iceberg table {} with snapshotId {}", branchName, table.name(), snapshotId);
manageSnapshots.createBranch(branchName, snapshotId);
}
return manageSnapshots;
}

public static boolean branchExists(Table table, String branchName) {
SnapshotRef branchRef = table.refs().get(branchName);
return branchRef != null && branchRef.isBranch();
}

public static void dropBranch(Table table, AlterTableSnapshotRefSpec.DropSnapshotRefSpec dropBranchSpec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,18 @@ create table iceTbl (id int, name string) Stored by Iceberg;
explain alter table iceTbl create branch test_branch_0;
alter table iceTbl create branch test_branch_0;

-- insert into the branch
insert into default.iceTbl.branch_test_branch_0 values (0, 'ming');
select * from default.iceTbl.branch_test_branch_0;

-- create or replace with empty table
explain alter table iceTbl create or replace branch test_branch_0;
alter table iceTbl create or replace branch test_branch_0;
select * from default.iceTbl.branch_test_branch_0;

insert into iceTbl values(1, 'jack');

-- create s branch test_branch_1 with default values based on the current snapshotId
-- create a branch test_branch_1 with default values based on the current snapshotId
explain alter table iceTbl create branch test_branch_1;
alter table iceTbl create branch test_branch_1;
-- check the values, one value
Expand Down Expand Up @@ -41,6 +50,15 @@ alter table iceTbl create tag test_tag;
explain alter table iceTbl create branch test_branch_10 for tag as of test_tag;
alter table iceTbl create branch test_branch_10 for tag as of test_tag;

-- create a branch which already exists
explain alter table iceTbl create branch if not exists test_branch_10;
alter table iceTbl create branch if not exists test_branch_10;

-- create or replace
explain alter table iceTbl create or replace branch test_branch_1;
alter table iceTbl create or replace branch test_branch_1;
select * from default.iceTbl.branch_test_branch_1;

-- drop a branch
explain alter table iceTbl drop branch test_branch_3;
alter table iceTbl drop branch test_branch_3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,60 @@ STAGE PLANS:
Stage: Stage-0
SnapshotRef Operation
table name: default.iceTbl
spec: AlterTableSnapshotRefSpec{operationType=CREATE_BRANCH, operationParams=CreateSnapshotRefSpec{refName=test_branch_0}}
spec: AlterTableSnapshotRefSpec{operationType=CREATE_BRANCH, operationParams=CreateSnapshotRefSpec{refName=test_branch_0, isReplace=false, ifNotExists=false}}

PREHOOK: query: alter table iceTbl create branch test_branch_0
PREHOOK: type: ALTERTABLE_CREATEBRANCH
PREHOOK: Input: default@icetbl
POSTHOOK: query: alter table iceTbl create branch test_branch_0
POSTHOOK: type: ALTERTABLE_CREATEBRANCH
POSTHOOK: Input: default@icetbl
PREHOOK: query: insert into default.iceTbl.branch_test_branch_0 values (0, 'ming')
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@icetbl
POSTHOOK: query: insert into default.iceTbl.branch_test_branch_0 values (0, 'ming')
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@icetbl
PREHOOK: query: select * from default.iceTbl.branch_test_branch_0
PREHOOK: type: QUERY
PREHOOK: Input: default@icetbl
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: select * from default.iceTbl.branch_test_branch_0
POSTHOOK: type: QUERY
POSTHOOK: Input: default@icetbl
POSTHOOK: Output: hdfs://### HDFS PATH ###
0 ming
PREHOOK: query: explain alter table iceTbl create or replace branch test_branch_0
PREHOOK: type: ALTERTABLE_CREATEBRANCH
PREHOOK: Input: default@icetbl
POSTHOOK: query: explain alter table iceTbl create or replace branch test_branch_0
POSTHOOK: type: ALTERTABLE_CREATEBRANCH
POSTHOOK: Input: default@icetbl
STAGE DEPENDENCIES:
Stage-0 is a root stage

STAGE PLANS:
Stage: Stage-0
SnapshotRef Operation
table name: default.iceTbl
spec: AlterTableSnapshotRefSpec{operationType=CREATE_BRANCH, operationParams=CreateSnapshotRefSpec{refName=test_branch_0, isReplace=true, ifNotExists=false}}

PREHOOK: query: alter table iceTbl create or replace branch test_branch_0
PREHOOK: type: ALTERTABLE_CREATEBRANCH
PREHOOK: Input: default@icetbl
POSTHOOK: query: alter table iceTbl create or replace branch test_branch_0
POSTHOOK: type: ALTERTABLE_CREATEBRANCH
POSTHOOK: Input: default@icetbl
PREHOOK: query: select * from default.iceTbl.branch_test_branch_0
PREHOOK: type: QUERY
PREHOOK: Input: default@icetbl
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: select * from default.iceTbl.branch_test_branch_0
POSTHOOK: type: QUERY
POSTHOOK: Input: default@icetbl
POSTHOOK: Output: hdfs://### HDFS PATH ###
PREHOOK: query: insert into iceTbl values(1, 'jack')
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
Expand All @@ -48,7 +94,7 @@ STAGE PLANS:
Stage: Stage-0
SnapshotRef Operation
table name: default.iceTbl
spec: AlterTableSnapshotRefSpec{operationType=CREATE_BRANCH, operationParams=CreateSnapshotRefSpec{refName=test_branch_1}}
spec: AlterTableSnapshotRefSpec{operationType=CREATE_BRANCH, operationParams=CreateSnapshotRefSpec{refName=test_branch_1, isReplace=false, ifNotExists=false}}

PREHOOK: query: alter table iceTbl create branch test_branch_1
PREHOOK: type: ALTERTABLE_CREATEBRANCH
Expand Down Expand Up @@ -86,7 +132,7 @@ STAGE PLANS:
Stage: Stage-0
SnapshotRef Operation
table name: default.iceTbl
spec: AlterTableSnapshotRefSpec{operationType=CREATE_BRANCH, operationParams=CreateSnapshotRefSpec{refName=test_branch_2, maxRefAgeMs=432000000}}
spec: AlterTableSnapshotRefSpec{operationType=CREATE_BRANCH, operationParams=CreateSnapshotRefSpec{refName=test_branch_2, maxRefAgeMs=432000000, isReplace=false, ifNotExists=false}}

PREHOOK: query: alter table iceTbl create branch test_branch_2 retain 5 days
PREHOOK: type: ALTERTABLE_CREATEBRANCH
Expand Down Expand Up @@ -125,7 +171,7 @@ STAGE PLANS:
Stage: Stage-0
SnapshotRef Operation
table name: default.iceTbl
spec: AlterTableSnapshotRefSpec{operationType=CREATE_BRANCH, operationParams=CreateSnapshotRefSpec{refName=test_branch_3, minSnapshotsToKeep=5}}
spec: AlterTableSnapshotRefSpec{operationType=CREATE_BRANCH, operationParams=CreateSnapshotRefSpec{refName=test_branch_3, minSnapshotsToKeep=5, isReplace=false, ifNotExists=false}}

PREHOOK: query: alter table iceTbl create branch test_branch_3 with snapshot retention 5 snapshots
PREHOOK: type: ALTERTABLE_CREATEBRANCH
Expand Down Expand Up @@ -165,7 +211,7 @@ STAGE PLANS:
Stage: Stage-0
SnapshotRef Operation
table name: default.iceTbl
spec: AlterTableSnapshotRefSpec{operationType=CREATE_BRANCH, operationParams=CreateSnapshotRefSpec{refName=test_branch_4, minSnapshotsToKeep=5, maxSnapshotAgeMs=432000000}}
spec: AlterTableSnapshotRefSpec{operationType=CREATE_BRANCH, operationParams=CreateSnapshotRefSpec{refName=test_branch_4, minSnapshotsToKeep=5, maxSnapshotAgeMs=432000000, isReplace=false, ifNotExists=false}}

PREHOOK: query: alter table iceTbl create branch test_branch_4 with snapshot retention 5 snapshots 5 days
PREHOOK: type: ALTERTABLE_CREATEBRANCH
Expand Down Expand Up @@ -204,14 +250,68 @@ STAGE PLANS:
Stage: Stage-0
SnapshotRef Operation
table name: default.iceTbl
spec: AlterTableSnapshotRefSpec{operationType=CREATE_BRANCH, operationParams=CreateSnapshotRefSpec{refName=test_branch_10}}
spec: AlterTableSnapshotRefSpec{operationType=CREATE_BRANCH, operationParams=CreateSnapshotRefSpec{refName=test_branch_10, isReplace=false, ifNotExists=false}}

PREHOOK: query: alter table iceTbl create branch test_branch_10 for tag as of test_tag
PREHOOK: type: ALTERTABLE_CREATEBRANCH
PREHOOK: Input: default@icetbl
POSTHOOK: query: alter table iceTbl create branch test_branch_10 for tag as of test_tag
POSTHOOK: type: ALTERTABLE_CREATEBRANCH
POSTHOOK: Input: default@icetbl
PREHOOK: query: explain alter table iceTbl create branch if not exists test_branch_10
PREHOOK: type: ALTERTABLE_CREATEBRANCH
PREHOOK: Input: default@icetbl
POSTHOOK: query: explain alter table iceTbl create branch if not exists test_branch_10
POSTHOOK: type: ALTERTABLE_CREATEBRANCH
POSTHOOK: Input: default@icetbl
STAGE DEPENDENCIES:
Stage-0 is a root stage

STAGE PLANS:
Stage: Stage-0
SnapshotRef Operation
table name: default.iceTbl
spec: AlterTableSnapshotRefSpec{operationType=CREATE_BRANCH, operationParams=CreateSnapshotRefSpec{refName=test_branch_10, isReplace=false, ifNotExists=true}}

PREHOOK: query: alter table iceTbl create branch if not exists test_branch_10
PREHOOK: type: ALTERTABLE_CREATEBRANCH
PREHOOK: Input: default@icetbl
POSTHOOK: query: alter table iceTbl create branch if not exists test_branch_10
POSTHOOK: type: ALTERTABLE_CREATEBRANCH
POSTHOOK: Input: default@icetbl
PREHOOK: query: explain alter table iceTbl create or replace branch test_branch_1
PREHOOK: type: ALTERTABLE_CREATEBRANCH
PREHOOK: Input: default@icetbl
POSTHOOK: query: explain alter table iceTbl create or replace branch test_branch_1
POSTHOOK: type: ALTERTABLE_CREATEBRANCH
POSTHOOK: Input: default@icetbl
STAGE DEPENDENCIES:
Stage-0 is a root stage

STAGE PLANS:
Stage: Stage-0
SnapshotRef Operation
table name: default.iceTbl
spec: AlterTableSnapshotRefSpec{operationType=CREATE_BRANCH, operationParams=CreateSnapshotRefSpec{refName=test_branch_1, isReplace=true, ifNotExists=false}}

PREHOOK: query: alter table iceTbl create or replace branch test_branch_1
PREHOOK: type: ALTERTABLE_CREATEBRANCH
PREHOOK: Input: default@icetbl
POSTHOOK: query: alter table iceTbl create or replace branch test_branch_1
POSTHOOK: type: ALTERTABLE_CREATEBRANCH
POSTHOOK: Input: default@icetbl
PREHOOK: query: select * from default.iceTbl.branch_test_branch_1
PREHOOK: type: QUERY
PREHOOK: Input: default@icetbl
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: select * from default.iceTbl.branch_test_branch_1
POSTHOOK: type: QUERY
POSTHOOK: Input: default@icetbl
POSTHOOK: Output: hdfs://### HDFS PATH ###
1 jack
2 bob
3 tom
4 lisa
PREHOOK: query: explain alter table iceTbl drop branch test_branch_3
PREHOOK: type: ALTERTABLE_DROPBRANCH
PREHOOK: Input: default@icetbl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,8 +539,10 @@ alterStatementSuffixDropBranch
alterStatementSuffixCreateBranch
@init { gParent.pushMsg("alter table create branch", state); }
@after { gParent.popMsg(state); }
: KW_CREATE KW_BRANCH branchName=identifier snapshotIdOfRef? refRetain? retentionOfSnapshots?
-> ^(TOK_ALTERTABLE_CREATE_BRANCH $branchName snapshotIdOfRef? refRetain? retentionOfSnapshots?)
: KW_CREATE KW_BRANCH ifNotExists? branchName=identifier snapshotIdOfRef? refRetain? retentionOfSnapshots?
-> ^(TOK_ALTERTABLE_CREATE_BRANCH $branchName ifNotExists? snapshotIdOfRef? refRetain? retentionOfSnapshots?)
| KW_CREATE KW_OR KW_REPLACE KW_BRANCH branchName=identifier snapshotIdOfRef? refRetain? retentionOfSnapshots?
-> ^(TOK_ALTERTABLE_CREATE_BRANCH $branchName KW_REPLACE snapshotIdOfRef? refRetain? retentionOfSnapshots?)
;

snapshotIdOfRef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ protected void analyzeCommand(TableName tableName, Map<String, String> partition
Long maxRefAgeMs = null;
Integer minSnapshotsToKeep = null;
Long maxSnapshotAgeMs = null;
boolean isReplace = false;
boolean ifNotExists = false;
String asOfTag = null;
for (int i = 1; i < command.getChildCount(); i++) {
ASTNode childNode = (ASTNode) command.getChild(i);
Expand Down Expand Up @@ -93,14 +95,20 @@ protected void analyzeCommand(TableName tableName, Map<String, String> partition
.toMillis(Long.parseLong(maxSnapshotAge));
}
break;
case HiveParser.KW_REPLACE:
isReplace = true;
break;
case HiveParser.TOK_IFNOTEXISTS:
ifNotExists = true;
break;
default:
throw new SemanticException("Unrecognized token in ALTER " + alterTableType.getName() + " statement");
}
}

AlterTableSnapshotRefSpec.CreateSnapshotRefSpec createSnapshotRefSpec =
new AlterTableSnapshotRefSpec.CreateSnapshotRefSpec(refName, snapshotId, asOfTime,
maxRefAgeMs, minSnapshotsToKeep, maxSnapshotAgeMs, asOfTag);
maxRefAgeMs, minSnapshotsToKeep, maxSnapshotAgeMs, asOfTag, isReplace, ifNotExists);
AlterTableSnapshotRefSpec<AlterTableSnapshotRefSpec.CreateSnapshotRefSpec> alterTableSnapshotRefSpec
= new AlterTableSnapshotRefSpec(alterTableType, createSnapshotRefSpec);
AbstractAlterTableDesc alterTableDesc =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public static class CreateSnapshotRefSpec {
private Integer minSnapshotsToKeep;
private Long maxSnapshotAgeMs;
private String asOfTag;
private boolean isReplace;
private boolean ifNotExists;

public String getRefName() {
return refName;
Expand Down Expand Up @@ -85,21 +87,32 @@ public String getAsOfTag() {
return asOfTag;
}

public boolean isReplace() {
return isReplace;
}

public boolean isIfNotExists() {
return ifNotExists;
}

public CreateSnapshotRefSpec(String refName, Long snapShotId, Long asOfTime, Long maxRefAgeMs,
Integer minSnapshotsToKeep, Long maxSnapshotAgeMs, String asOfTag) {
Integer minSnapshotsToKeep, Long maxSnapshotAgeMs, String asOfTag, boolean isReplace, boolean ifNotExists) {
this.refName = refName;
this.snapshotId = snapShotId;
this.asOfTime = asOfTime;
this.maxRefAgeMs = maxRefAgeMs;
this.minSnapshotsToKeep = minSnapshotsToKeep;
this.maxSnapshotAgeMs = maxSnapshotAgeMs;
this.asOfTag = asOfTag;
this.isReplace = isReplace;
this.ifNotExists = ifNotExists;
}

public String toString() {
return MoreObjects.toStringHelper(this).add("refName", refName).add("snapshotId", snapshotId)
.add("asOfTime", asOfTime).add("maxRefAgeMs", maxRefAgeMs).add("minSnapshotsToKeep", minSnapshotsToKeep)
.add("maxSnapshotAgeMs", maxSnapshotAgeMs).omitNullValues().toString();
.add("maxSnapshotAgeMs", maxSnapshotAgeMs).add("isReplace", isReplace).add("ifNotExists", ifNotExists)
.omitNullValues().toString();
}
}
public static class DropSnapshotRefSpec {
Expand Down

0 comments on commit 9e7e252

Please sign in to comment.