Skip to content

Commit

Permalink
[#3264] feat(spark-connector): Support Iceberg time travel in SQL que…
Browse files Browse the repository at this point in the history
…ries
  • Loading branch information
caican00 committed May 4, 2024
1 parent 4d334aa commit 1503864
Showing 1 changed file with 60 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -251,6 +253,57 @@ void testIcebergTableRowLevelOperations() {
testIcebergMergeIntoUpdateOperation();
}

@Test
void testIcebergAsOfQuery() {
String tableName =
String.format("%s.%s.test_iceberg_as_of_query", getCatalogName(), getDefaultDatabase());
dropTableIfExists(tableName);
createSimpleTable(tableName);

checkTableColumns(tableName, getSimpleTableColumn(), getTableInfo(tableName));

sql(String.format("INSERT INTO %s VALUES (1, '1', 1)", tableName));
List<Row> snapshots =
getSparkSession().sql("SELECT snapshot_id FROM %s.snapshots").collectAsList();
Assertions.assertEquals(1, snapshots.size());
long snapshotId = snapshots.get(0).getLong(0);
List<Row> timestamp =
getSparkSession().sql("SELECT committed_at FROM %s.snapshots").collectAsList();
Assertions.assertEquals(1, timestamp.size());
Timestamp timestampAt = timestamp.get(0).getTimestamp(0);
waitUntilAfter(timestampAt.getTime());
Timestamp firstSnapshotTimestamp =
Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
sql(String.format("INSERT INTO %s VALUES (2, '2', 2)", tableName));

List<String> tableData = getQueryData(getSelectAllSqlWithOrder(tableName));
Assertions.assertEquals(2, tableData.size());
Assertions.assertEquals("1,1,1;2,2,2", String.join(";", tableData));

tableData =
getQueryData(
String.format(
"SELECT * FROM %s TIMESTAMP AS OF '%s'", tableName, firstSnapshotTimestamp));
Assertions.assertEquals(1, tableData.size());
Assertions.assertEquals("1,1,1", tableData.get(0));
tableData =
getQueryData(
String.format(
"SELECT * FROM %s FOR SYSTEM_TIME AS OF '%s'", tableName, firstSnapshotTimestamp));
Assertions.assertEquals(1, tableData.size());
Assertions.assertEquals("1,1,1", tableData.get(0));

tableData =
getQueryData(String.format("SELECT * FROM %s VERSION AS OF %d", tableName, snapshotId));
Assertions.assertEquals(1, tableData.size());
Assertions.assertEquals("1,1,1", tableData.get(0));
tableData =
getQueryData(
String.format("SELECT * FROM %s FOR SYSTEM_VERSION AS OF %d", tableName, snapshotId));
Assertions.assertEquals(1, tableData.size());
Assertions.assertEquals("1,1,1", tableData.get(0));
}

private void testMetadataColumns() {
String tableName = "test_metadata_columns";
dropTableIfExists(tableName);
Expand Down Expand Up @@ -555,4 +608,11 @@ private void createIcebergTableWithTabProperties(
tableName, partitionedClause, tblPropertiesStr);
sql(createSql);
}

private void waitUntilAfter(Long timestampMillis) {
long current = System.currentTimeMillis();
while (current <= timestampMillis) {
current = System.currentTimeMillis();
}
}
}

0 comments on commit 1503864

Please sign in to comment.