Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,8 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
// filter any existing manifests
List<ManifestFile> filtered =
filterManager.filterManifests(
base.schema(), snapshot != null ? snapshot.dataManifests(ops.io()) : null);
SnapshotUtil.schemaFor(base, targetBranch()),
snapshot != null ? snapshot.dataManifests(ops.io()) : null);
long minDataSequenceNumber =
filtered.stream()
.map(ManifestFile::minSequenceNumber)
Expand All @@ -989,7 +990,8 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
deleteFilterManager.dropDeleteFilesOlderThan(minDataSequenceNumber);
List<ManifestFile> filteredDeletes =
deleteFilterManager.filterManifests(
base.schema(), snapshot != null ? snapshot.deleteManifests(ops.io()) : null);
SnapshotUtil.schemaFor(base, targetBranch()),
snapshot != null ? snapshot.deleteManifests(ops.io()) : null);

// only keep manifests that have live data files or that were written by this commit
Predicate<ManifestFile> shouldKeep =
Expand Down
15 changes: 6 additions & 9 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Exceptions;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
Expand Down Expand Up @@ -149,6 +150,10 @@ protected void targetBranch(String branch) {
this.targetBranch = branch;
}

protected String targetBranch() {
return targetBranch;
}

protected ExecutorService workerPool() {
return this.workerPool;
}
Expand Down Expand Up @@ -202,15 +207,7 @@ protected void validate(TableMetadata currentMetadata, Snapshot snapshot) {}
@Override
public Snapshot apply() {
refresh();
Snapshot parentSnapshot = base.currentSnapshot();
if (targetBranch != null) {
SnapshotRef branch = base.ref(targetBranch);
if (branch != null) {
parentSnapshot = base.snapshot(branch.snapshotId());
} else if (base.currentSnapshot() != null) {
parentSnapshot = base.currentSnapshot();
}
}
Snapshot parentSnapshot = SnapshotUtil.latestSnapshot(base, targetBranch);

long sequenceNumber = base.nextSequenceNumber();
Long parentSnapshotId = parentSnapshot == null ? null : parentSnapshot.snapshotId();
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,16 @@ public ThisT useSnapshot(long scanSnapshotId) {
}

public ThisT useRef(String name) {
if (SnapshotRef.MAIN_BRANCH.equals(name)) {
return newRefinedScan(table(), tableSchema(), context());
}

Preconditions.checkArgument(
snapshotId() == null, "Cannot override ref, already set snapshot id=%s", snapshotId());
Snapshot snapshot = table().snapshot(name);
Preconditions.checkArgument(snapshot != null, "Cannot find ref %s", name);
return newRefinedScan(table(), tableSchema(), context().useSnapshotId(snapshot.snapshotId()));
TableScanContext newContext = context().useSnapshotId(snapshot.snapshotId());
return newRefinedScan(table(), SnapshotUtil.schemaFor(table(), name), newContext);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch with the schema.

}

public ThisT asOfTime(long timestampMillis) {
Expand Down
65 changes: 60 additions & 5 deletions core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,53 @@ public static Schema schemaFor(Table table, Long snapshotId, Long timestampMilli
return table.schema();
}

/**
* Return the schema of the snapshot at a given branch.
*
* <p>If branch does not exist, the table schema is returned because it will be the schema when
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here for both finding schema and latest snapshot allows branch to be non existing. The reason I decided to do this way is because the core library will still allow auto-creation of branch, so it makes more sense to support that case for these util methods. We only block writing to non-existing branch through table identifier in Spark module, but we will support other cases like WAP branch that will leverage the core feature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. We should block at the write, not in the helper methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me too.

* the new branch is created.
*
* @param table a {@link Table}
* @param branch branch name of the table (nullable)
* @return schema of the specific snapshot at the given branch
*/
public static Schema schemaFor(Table table, String branch) {
if (branch == null || branch.equals(SnapshotRef.MAIN_BRANCH)) {
return table.schema();
}

Snapshot ref = table.snapshot(branch);
if (ref == null) {
return table.schema();
}

return schemaFor(table, ref.snapshotId());
}

/**
* Return the schema of the snapshot at a given branch.
*
* <p>If branch does not exist, the table schema is returned because it will be the schema when
* the new branch is created.
*
* @param metadata a {@link TableMetadata}
* @param branch branch name of the table (nullable)
* @return schema of the specific snapshot at the given branch
*/
public static Schema schemaFor(TableMetadata metadata, String branch) {
if (branch == null || branch.equals(SnapshotRef.MAIN_BRANCH)) {
return metadata.schema();
}

SnapshotRef ref = metadata.ref(branch);
if (ref == null) {
return metadata.schema();
}

Snapshot snapshot = metadata.snapshot(ref.snapshotId());
return metadata.schemas().get(snapshot.schemaId());
}

/**
* Fetch the snapshot at the head of the given branch in the given table.
*
Expand All @@ -405,11 +452,11 @@ public static Schema schemaFor(Table table, Long snapshotId, Long timestampMilli
* code path to ensure backwards compatibility.
*
* @param table a {@link Table}
* @param branch branch name of the table
* @param branch branch name of the table (nullable)
* @return the latest snapshot for the given branch
*/
public static Snapshot latestSnapshot(Table table, String branch) {
if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
if (branch == null || branch.equals(SnapshotRef.MAIN_BRANCH)) {
return table.currentSnapshot();
}

Expand All @@ -423,15 +470,23 @@ public static Snapshot latestSnapshot(Table table, String branch) {
* TableMetadata#ref(String)}} for the main branch so that existing code still goes through the
* old code path to ensure backwards compatibility.
*
* <p>If branch does not exist, the table's latest snapshot is returned it will be the schema when
* the new branch is created.
*
* @param metadata a {@link TableMetadata}
* @param branch branch name of the table metadata
* @param branch branch name of the table metadata (nullable)
* @return the latest snapshot for the given branch
*/
public static Snapshot latestSnapshot(TableMetadata metadata, String branch) {
if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
if (branch == null || branch.equals(SnapshotRef.MAIN_BRANCH)) {
return metadata.currentSnapshot();
}

SnapshotRef ref = metadata.ref(branch);
if (ref == null) {
return metadata.currentSnapshot();
}

return metadata.snapshot(metadata.ref(branch).snapshotId());
return metadata.snapshot(ref.snapshotId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
Expand Down Expand Up @@ -70,24 +71,27 @@ public abstract class SparkRowLevelOperationsTestBase extends SparkExtensionsTes
protected final String fileFormat;
protected final boolean vectorized;
protected final String distributionMode;
protected final String branch;

public SparkRowLevelOperationsTestBase(
String catalogName,
String implementation,
Map<String, String> config,
String fileFormat,
boolean vectorized,
String distributionMode) {
String distributionMode,
String branch) {
super(catalogName, implementation, config);
this.fileFormat = fileFormat;
this.vectorized = vectorized;
this.distributionMode = distributionMode;
this.branch = branch;
}

@Parameters(
name =
"catalogName = {0}, implementation = {1}, config = {2},"
+ " format = {3}, vectorized = {4}, distributionMode = {5}")
+ " format = {3}, vectorized = {4}, distributionMode = {5}, branch = {6}")
public static Object[][] parameters() {
return new Object[][] {
{
Expand All @@ -98,7 +102,8 @@ public static Object[][] parameters() {
"default-namespace", "default"),
"orc",
true,
WRITE_DISTRIBUTION_MODE_NONE
WRITE_DISTRIBUTION_MODE_NONE,
SnapshotRef.MAIN_BRANCH
},
{
"testhive",
Expand All @@ -108,15 +113,17 @@ public static Object[][] parameters() {
"default-namespace", "default"),
"parquet",
true,
WRITE_DISTRIBUTION_MODE_NONE
WRITE_DISTRIBUTION_MODE_NONE,
null,
},
{
"testhadoop",
SparkCatalog.class.getName(),
ImmutableMap.of("type", "hadoop"),
"parquet",
RANDOM.nextBoolean(),
WRITE_DISTRIBUTION_MODE_HASH
WRITE_DISTRIBUTION_MODE_HASH,
null
},
{
"spark_catalog",
Expand All @@ -131,7 +138,8 @@ public static Object[][] parameters() {
),
"avro",
false,
WRITE_DISTRIBUTION_MODE_RANGE
WRITE_DISTRIBUTION_MODE_RANGE,
"test"
}
};
}
Expand Down Expand Up @@ -181,6 +189,7 @@ protected void createAndInitTable(String schema, String partitioning, String jso
try {
Dataset<Row> ds = toDS(schema, jsonData);
ds.coalesce(1).writeTo(tableName).append();
createBranchIfNeeded();
} catch (NoSuchTableException e) {
throw new RuntimeException("Failed to write data", e);
}
Expand Down Expand Up @@ -315,4 +324,20 @@ protected DataFile writeDataFile(Table table, List<GenericRecord> records) {
throw new UncheckedIOException(e);
}
}

@Override
protected String commitTarget() {
return branch == null ? tableName : String.format("%s.branch_%s", tableName, branch);
}

@Override
protected String selectTarget() {
return branch == null ? tableName : String.format("%s VERSION AS OF '%s'", tableName, branch);
}

protected void createBranchIfNeeded() {
if (branch != null && !branch.equals(SnapshotRef.MAIN_BRANCH)) {
sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branch);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
Expand All @@ -42,6 +43,7 @@
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.internal.SQLConf;
Expand All @@ -58,8 +60,9 @@ public TestCopyOnWriteDelete(
Map<String, String> config,
String fileFormat,
Boolean vectorized,
String distributionMode) {
super(catalogName, implementation, config, fileFormat, vectorized, distributionMode);
String distributionMode,
String branch) {
super(catalogName, implementation, config, fileFormat, vectorized, distributionMode, branch);
}

@Override
Expand All @@ -82,6 +85,7 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception
tableName, DELETE_ISOLATION_LEVEL, "snapshot");

sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
createBranchIfNeeded();

Table table = Spark3Util.loadIcebergTable(spark, tableName);

Expand All @@ -101,7 +105,7 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception
sleep(10);
}

sql("DELETE FROM %s WHERE id IN (SELECT * FROM deleted_id)", tableName);
sql("DELETE FROM %s WHERE id IN (SELECT * FROM deleted_id)", commitTarget());

barrier.incrementAndGet();
}
Expand All @@ -111,7 +115,7 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception
Future<?> appendFuture =
executorService.submit(
() -> {
GenericRecord record = GenericRecord.create(table.schema());
GenericRecord record = GenericRecord.create(SnapshotUtil.schemaFor(table, branch));
record.set(0, 1); // id
record.set(1, "hr"); // dep

Expand All @@ -126,7 +130,12 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception

for (int numAppends = 0; numAppends < 5; numAppends++) {
DataFile dataFile = writeDataFile(table, ImmutableList.of(record));
table.newFastAppend().appendFile(dataFile).commit();
AppendFiles appendFiles = table.newFastAppend().appendFile(dataFile);
if (branch != null) {
appendFiles.toBranch(branch);
}

appendFiles.commit();
sleep(10);
}

Expand All @@ -153,7 +162,8 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception
public void testRuntimeFilteringWithPreservedDataGrouping() throws NoSuchTableException {
createAndInitPartitionedTable();

append(new Employee(1, "hr"), new Employee(3, "hr"));
append(tableName, new Employee(1, "hr"), new Employee(3, "hr"));
createBranchIfNeeded();
append(new Employee(1, "hardware"), new Employee(2, "hardware"));

Map<String, String> sqlConf =
Expand All @@ -163,17 +173,17 @@ public void testRuntimeFilteringWithPreservedDataGrouping() throws NoSuchTableEx
SparkSQLProperties.PRESERVE_DATA_GROUPING,
"true");

withSQLConf(sqlConf, () -> sql("DELETE FROM %s WHERE id = 2", tableName));
withSQLConf(sqlConf, () -> sql("DELETE FROM %s WHERE id = 2", commitTarget()));

Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 3 snapshots", 3, Iterables.size(table.snapshots()));

Snapshot currentSnapshot = table.currentSnapshot();
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
validateCopyOnWrite(currentSnapshot, "1", "1", "1");

assertEquals(
"Should have expected rows",
ImmutableList.of(row(1, "hardware"), row(1, "hr"), row(3, "hr")),
sql("SELECT * FROM %s ORDER BY id, dep", tableName));
sql("SELECT * FROM %s ORDER BY id, dep", selectTarget()));
}
}
Loading