Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,55 +18,76 @@

package org.apache.paimon.flink;

import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.branch.TableBranch;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.SnapshotManager;

import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

/** IT cases for table with branches using SQL. */
public class BranchSqlITCase extends AbstractTestBase {

@TempDir java.nio.file.Path tempDir;
public class BranchSqlITCase extends CatalogITCaseBase {

@Test
public void testAlterTable() throws Exception {
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql(
"CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '" + tempDir + "' )");
tEnv.executeSql("USE CATALOG mycat");
tEnv.executeSql(
"CREATE TABLE t ( pt INT, k INT, v STRING, PRIMARY KEY (pt, k) NOT ENFORCED ) "
+ "PARTITIONED BY (pt) WITH ( 'bucket' = '2' )");

tEnv.executeSql(
"INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana'), (2, 10, 'cat'), (2, 20, 'dog')")
.await();
tEnv.executeSql("CALL sys.create_branch('default.t', 'test', 1)");
tEnv.executeSql("INSERT INTO t VALUES (1, 10, 'APPLE'), (2, 20, 'DOG'), (2, 30, 'horse')")
.await();

tEnv.executeSql("ALTER TABLE `t$branch_test` ADD (v2 INT)").await();
tEnv.executeSql(
"INSERT INTO `t$branch_test` VALUES "
+ "(1, 10, 'cherry', 100), (2, 20, 'bird', 200), (2, 40, 'wolf', 400)")
.await();

assertThat(collectResult(tEnv, "SELECT * FROM t"))
public void testAlterBranchTable() throws Exception {

sql(
"CREATE TABLE T ("
+ " pt INT"
+ ", k INT"
+ ", v STRING"
+ ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ " ) PARTITIONED BY (pt) WITH ("
+ " 'bucket' = '2'"
+ " )");

sql(
"INSERT INTO T VALUES"
+ " (1, 10, 'apple'),"
+ " (1, 20, 'banana'),"
+ " (2, 10, 'cat'),"
+ " (2, 20, 'dog')");

sql("CALL sys.create_branch('default.T', 'test', 1)");

FileStoreTable branchTable = paimonTable("T$branch_test");
assertThat(branchTable.schema().fields().size()).isEqualTo(3);

sql(
"INSERT INTO T VALUES"
+ " (1, 10, 'APPLE'),"
+ " (2, 20, 'DOG'),"
+ " (2, 30, 'horse')");

// Add v2 column for branch table.
sql("ALTER TABLE `T$branch_test` ADD (v2 INT)");

branchTable = paimonTable("T$branch_test");
assertThat(branchTable.schema().fields().size()).isEqualTo(4);

sql(
"INSERT INTO `T$branch_test` VALUES "
+ "(1, 10, 'cherry', 100)"
+ ", (2, 20, 'bird', 200)"
+ ", (2, 40, 'wolf', 400)");

assertThat(collectResult("SELECT * FROM T"))
.containsExactlyInAnyOrder(
"+I[1, 10, APPLE]",
"+I[1, 20, banana]",
"+I[2, 30, horse]",
"+I[2, 10, cat]",
"+I[2, 20, DOG]");
assertThat(collectResult(tEnv, "SELECT * FROM t$branch_test"))

assertThat(collectResult("SELECT * FROM T$branch_test"))
.containsExactlyInAnyOrder(
"+I[1, 10, cherry, 100]",
"+I[1, 20, banana, null]",
Expand All @@ -75,7 +96,224 @@ public void testAlterTable() throws Exception {
"+I[2, 40, wolf, 400]");
}

private List<String> collectResult(TableEnvironment tEnv, String sql) throws Exception {
@Test
public void testCreateBranchFromTag() throws Exception {
sql(
"CREATE TABLE T ("
+ " pt INT"
+ ", k INT"
+ ", v STRING"
+ ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ " ) PARTITIONED BY (pt) WITH ("
+ " 'bucket' = '2'"
+ " )");

// snapshot 1.
sql("INSERT INTO T VALUES" + " (1, 10, 'apple')," + " (1, 20, 'banana')");
// snapshot 2.
sql("INSERT INTO T VALUES" + " (2, 10, 'cat')," + " (2, 20, 'dog')");

sql("CALL sys.create_tag('default.T', 'tag1', 1)");
sql("CALL sys.create_tag('default.T', 'tag2', 2)");

sql("CALL sys.create_branch('default.T', 'test', 'tag1')");
sql("CALL sys.create_branch('default.T', 'test2', 'tag2')");

FileStoreTable branchTable = paimonTable("T$branch_test");
assertThat(branchTable.tagManager().tagExists("tag1")).isEqualTo(true);

assertThat(collectResult("SELECT * FROM T$branch_test"))
.containsExactlyInAnyOrder("+I[1, 10, apple]", "+I[1, 20, banana]");

FileStoreTable branchTable2 = paimonTable("T$branch_test2");
assertThat(branchTable2.tagManager().tagExists("tag2")).isEqualTo(true);

assertThat(collectResult("SELECT * FROM T$branch_test2"))
.containsExactlyInAnyOrder(
"+I[1, 10, apple]",
"+I[1, 20, banana]",
"+I[2, 10, cat]",
"+I[2, 20, dog]");
}

@Test
public void testCreateBranchFromSnapshot() throws Catalog.TableNotExistException {

sql(
"CREATE TABLE T ("
+ " pt INT"
+ ", k INT"
+ ", v STRING"
+ ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ " ) PARTITIONED BY (pt) WITH ("
+ " 'bucket' = '2'"
+ " )");

// snapshot 1.
sql("INSERT INTO T VALUES(1, 10, 'apple')");

// snapshot 2.
sql("INSERT INTO T VALUES(1, 20, 'dog')");

sql("CALL sys.create_branch('default.T', 'test', 1)");
sql("CALL sys.create_branch('default.T', 'test2', 2)");

FileStoreTable table = paimonTable("T");

assertThat(
table.branchManager().branches().stream()
.map(TableBranch::getCreatedFromSnapshot))
.containsExactlyInAnyOrder(1L, 2L);

assertThat(paimonTable("T$branch_test").snapshotManager().snapshotExists(1))
.isEqualTo(true);

assertThat(paimonTable("T$branch_test2").snapshotManager().snapshotExists(2))
.isEqualTo(true);
}

@Test
public void testCreateEmptyBranch() throws Exception {
sql(
"CREATE TABLE T ("
+ " pt INT"
+ ", k INT"
+ ", v STRING"
+ ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ " ) PARTITIONED BY (pt) WITH ("
+ " 'bucket' = '2'"
+ " )");

// snapshot 1.
sql("INSERT INTO T VALUES(1, 10, 'apple')");

// snapshot 2.
sql("INSERT INTO T VALUES(1, 20, 'dog')");

assertThat(collectResult("SELECT * FROM T"))
.containsExactlyInAnyOrder("+I[1, 10, apple]", "+I[1, 20, dog]");

// create en empty branch.
sql("CALL sys.create_branch('default.T', 'empty_branch')");

sql("INSERT INTO `T$branch_empty_branch` VALUES (3, 30, 'banana')");

assertThat(collectResult("SELECT * FROM T$branch_empty_branch"))
.containsExactlyInAnyOrder("+I[3, 30, banana]");
}

@Test
public void testDeleteBranchTable() throws Exception {
sql(
"CREATE TABLE T ("
+ " pt INT"
+ ", k INT"
+ ", v STRING"
+ ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ " ) PARTITIONED BY (pt) WITH ("
+ " 'bucket' = '2'"
+ " )");

// snapshot 1.
sql("INSERT INTO T VALUES(1, 10, 'apple')");

// snapshot 2.
sql("INSERT INTO T VALUES(1, 20, 'dog')");

sql("CALL sys.create_branch('default.T', 'test', 1)");
sql("CALL sys.create_branch('default.T', 'test2', 2)");

FileStoreTable table = paimonTable("T");

assertThat(
table.branchManager().branches().stream()
.map(TableBranch::getCreatedFromSnapshot))
.containsExactlyInAnyOrder(1L, 2L);

assertThat(table.branchManager().branches().stream().map(TableBranch::getBranchName))
.containsExactlyInAnyOrder("test", "test2");

sql("CALL sys.delete_branch('default.T', 'test')");

assertThat(table.branchManager().branches().stream().map(TableBranch::getBranchName))
.containsExactlyInAnyOrder("test2");
}

@Test
public void testBranchManagerGetBranchSnapshotsList()
throws Catalog.TableNotExistException, IOException {
sql(
"CREATE TABLE T ("
+ " pt INT"
+ ", k INT"
+ ", v STRING"
+ ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ " ) PARTITIONED BY (pt) WITH ("
+ " 'bucket' = '2'"
+ " )");

sql("INSERT INTO T VALUES (1, 10, 'hxh')");
sql("INSERT INTO T VALUES (1, 20, 'hxh')");
sql("INSERT INTO T VALUES (1, 30, 'hxh')");

FileStoreTable table = paimonTable("T");
checkSnapshots(table.snapshotManager(), 1, 3);

sql("CALL sys.create_branch('default.T', 'test1', 1)");
sql("CALL sys.create_branch('default.T', 'test2', 2)");
sql("CALL sys.create_branch('default.T', 'test3', 3)");

assertThat(
table.branchManager().branches().stream()
.map(TableBranch::getCreatedFromSnapshot))
.containsExactlyInAnyOrder(1L, 2L, 3L);
}

@Test
public void testBranchFastForward() throws Exception {
sql(
"CREATE TABLE T ("
+ " pt INT"
+ ", k INT"
+ ", v STRING"
+ ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ " ) PARTITIONED BY (pt) WITH ("
+ " 'bucket' = '2'"
+ " )");

FileStoreTable table = paimonTable("T");
SnapshotManager snapshotManager = table.snapshotManager();

sql("INSERT INTO T VALUES (1, 10, 'hunter')");
sql("INSERT INTO T VALUES (1, 20, 'hunter')");
sql("INSERT INTO T VALUES (1, 30, 'hunter')");

checkSnapshots(snapshotManager, 1, 3);

assertThat(collectResult("SELECT * FROM T"))
.containsExactlyInAnyOrder(
"+I[1, 10, hunter]", "+I[1, 20, hunter]", "+I[1, 30, hunter]");

sql("CALL sys.create_branch('default.T', 'test', 1)");

sql("INSERT INTO `T$branch_test` VALUES (2, 10, 'hunterX')");

checkSnapshots(paimonTable("T$branch_test").snapshotManager(), 1, 2);

// query branch data.
assertThat(collectResult("SELECT * FROM T$branch_test"))
.containsExactlyInAnyOrder("+I[1, 10, hunter]", "+I[2, 10, hunterX]");

sql("CALL sys.fast_forward('default.T', 'test')");

// Branch `test` replaces the main branch.
assertThat(collectResult("SELECT * FROM T"))
.containsExactlyInAnyOrder("+I[1, 10, hunter]", "+I[2, 10, hunterX]");

checkSnapshots(snapshotManager, 1, 2);
}

private List<String> collectResult(String sql) throws Exception {
List<String> result = new ArrayList<>();
try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
while (it.hasNext()) {
Expand All @@ -84,4 +322,10 @@ private List<String> collectResult(TableEnvironment tEnv, String sql) throws Exc
}
return result;
}

private void checkSnapshots(SnapshotManager sm, int earliest, int latest) throws IOException {
assertThat(sm.snapshotCount()).isEqualTo(latest - earliest + 1);
assertThat(sm.earliestSnapshotId()).isEqualTo(earliest);
assertThat(sm.latestSnapshotId()).isEqualTo(latest);
}
}