diff --git a/api/src/main/java/org/apache/iceberg/catalog/Catalog.java b/api/src/main/java/org/apache/iceberg/catalog/Catalog.java
index 3438f445ec2a..a6c26e889c7e 100644
--- a/api/src/main/java/org/apache/iceberg/catalog/Catalog.java
+++ b/api/src/main/java/org/apache/iceberg/catalog/Catalog.java
@@ -32,6 +32,8 @@
/** A Catalog API for table create, drop, and load operations. */
public interface Catalog {
+ String SYSTEM_DATABASE_NAME = "sys";
+
/**
* Return the name for this catalog.
*
diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 4bb235b811d0..a7692c55d86b 100644
--- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -45,6 +45,7 @@
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
@@ -52,6 +53,7 @@
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.procedures.Procedure;
import org.apache.flink.util.StringUtils;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.DataFile;
@@ -69,6 +71,7 @@
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.flink.procedure.ProcedureUtil;
import org.apache.iceberg.flink.util.FlinkAlterTableUtil;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.CloseableIterable;
@@ -869,4 +872,14 @@ public CatalogColumnStatistics getPartitionColumnStatistics(
ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
return CatalogColumnStatistics.UNKNOWN;
}
+
+ @Override
+ public Procedure getProcedure(ObjectPath procedurePath)
+ throws ProcedureNotExistException, CatalogException {
+ if (!Catalog.SYSTEM_DATABASE_NAME.equals(procedurePath.getDatabaseName())) {
+ throw new ProcedureNotExistException(icebergCatalog.name(), procedurePath);
+ }
+
+ return ProcedureUtil.getProcedure(procedurePath.getObjectName(), icebergCatalog);
+ }
}
diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/CreateBranchProcedure.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/CreateBranchProcedure.java
new file mode 100644
index 000000000000..a140add851b8
--- /dev/null
+++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/CreateBranchProcedure.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.procedure;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+
+/**
+ * Create branch procedure for given tag. Usage:
+ *
+ *
snapshots = table.snapshots();
+ Snapshot target = null;
+ for (Snapshot snapshot : snapshots) {
+ if (snapshot.timestampMillis() > timestamp) {
+ if (target == null || target.timestampMillis() >= snapshot.timestampMillis()) {
+ target = snapshot;
+ }
+ }
+ }
+
+ return target;
+ }
+
+ @Override
+ public String procedureName() {
+ return PROCEDURE_NAME;
+ }
+}
diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/CreateTagProcedure.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/CreateTagProcedure.java
new file mode 100644
index 000000000000..bbbc51700293
--- /dev/null
+++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/CreateTagProcedure.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.procedure;
+
+import java.time.Duration;
+import javax.annotation.Nullable;
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+
+/**
+ * Create tag procedure. Usage:
+ *
+ *
+ * CALL sys.create_tag('tableId', 'tagName', snapshotId, 'timeRetained')
+ *
+ */
+public class CreateTagProcedure extends ProcedureBase {
+ public static final String PROCEDURE_NAME = "create_tag";
+
+ @ProcedureHint(
+ arguments = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "tag", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "snapshot_id", type = @DataTypeHint("BIGINT"), isOptional = true),
+ @ArgumentHint(name = "time_retained", type = @DataTypeHint("STRING"), isOptional = true)
+ })
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ String tagName,
+ @Nullable Long snapshotId,
+ @Nullable String timeRetained)
+ throws NoSuchTableException {
+ Table table = table(tableId);
+ table.refresh();
+ createTag(table, tagName, snapshotId, toDuration(timeRetained));
+ return new String[] {"Success"};
+ }
+
+ void createTag(Table table, String tagName, Long snapshotId, Duration timeRetained) {
+ Long requiredSnapshotId = snapshotId;
+ if (requiredSnapshotId == null) {
+ requiredSnapshotId = table.currentSnapshot().snapshotId();
+ }
+
+ if (timeRetained == null) {
+ table.manageSnapshots().createTag(tagName, requiredSnapshotId).commit();
+ } else {
+ table
+ .manageSnapshots()
+ .createTag(tagName, requiredSnapshotId)
+ .setMaxRefAgeMs(tagName, timeRetained.toMillis())
+ .commit();
+ }
+ }
+
+ @Override
+ public String procedureName() {
+ return PROCEDURE_NAME;
+ }
+}
diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/DeleteBranchProcedure.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/DeleteBranchProcedure.java
new file mode 100644
index 000000000000..545b769e0cff
--- /dev/null
+++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/DeleteBranchProcedure.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.procedure;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+
+/**
+ * Delete branch procedure. Usage:
+ *
+ *
+ * CALL sys.delete_branch('tableId', 'branchName')
+ *
+ */
+public class DeleteBranchProcedure extends ProcedureBase {
+ public static final String PROCEDURE_NAME = "delete_branch";
+
+ @ProcedureHint(
+ arguments = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "branch", type = @DataTypeHint("STRING"))
+ })
+ public String[] call(ProcedureContext procedureContext, String tableId, String branchName)
+ throws NoSuchTableException {
+ Table table = table(tableId);
+ table.refresh();
+
+ deleteBranch(table, branchName);
+
+ return new String[] {"Success"};
+ }
+
+ private void deleteBranch(Table table, String branchName) {
+ table.manageSnapshots().removeBranch(branchName).commit();
+ }
+
+ @Override
+ public String procedureName() {
+ return PROCEDURE_NAME;
+ }
+}
diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/DeleteTagProcedure.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/DeleteTagProcedure.java
new file mode 100644
index 000000000000..404a1c6e69b6
--- /dev/null
+++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/DeleteTagProcedure.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.procedure;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+
+/**
+ * Delete tag procedure. Usage:
+ *
+ *
+ * CALL sys.delete_tag('tableId', 'tagName')
+ *
+ */
+public class DeleteTagProcedure extends ProcedureBase {
+ public static final String PROCEDURE_NAME = "delete_tag";
+
+ @ProcedureHint(
+ arguments = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "tag", type = @DataTypeHint("STRING"))
+ })
+ public String[] call(ProcedureContext procedureContext, String tableId, String tagNameStr)
+ throws NoSuchTableException {
+ Table table = table(tableId);
+ table.refresh();
+ deleteTag(table, tagNameStr);
+ return new String[] {"Success"};
+ }
+
+ private void deleteTag(Table table, String tagNameStr) {
+ table.manageSnapshots().removeTag(tagNameStr).commit();
+ }
+
+ @Override
+ public String procedureName() {
+ return PROCEDURE_NAME;
+ }
+}
diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/ExpireSnapshotsProcedure.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/ExpireSnapshotsProcedure.java
new file mode 100644
index 000000000000..8ff0d4f8de8f
--- /dev/null
+++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/ExpireSnapshotsProcedure.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.procedure;
+
+import java.util.TimeZone;
+import javax.annotation.Nullable;
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.table.utils.DateTimeUtils;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+
+/**
+ * Procedure for expiring snapshots. Usage:
+ *
+ *
+ * CALL sys.expire_snapshots('tableId', retainLast, 'olderThan')
+ *
+ */
+public class ExpireSnapshotsProcedure extends ProcedureBase {
+ public static final String PROCEDURE_NAME = "expire_snapshots";
+
+ @ProcedureHint(
+ arguments = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "retain_last", type = @DataTypeHint("INTEGER"), isOptional = true),
+ @ArgumentHint(
+ name = "older_than",
+ type = @DataTypeHint(value = "STRING"),
+ isOptional = true)
+ })
+ public String[] call(
+ ProcedureContext procedureContext, String tableId, Integer retainLast, String olderThanStr)
+ throws NoSuchTableException {
+ Table table = table(tableId);
+ table.refresh();
+ expireSnapshots(table, retainLast, toMillis(olderThanStr));
+ return new String[] {"Success"};
+ }
+
+ private void expireSnapshots(Table table, Integer retainLast, Long olderThan) {
+ ExpireSnapshots expireSnapshots = table.expireSnapshots();
+ if (retainLast != null) {
+ expireSnapshots = expireSnapshots.retainLast(retainLast);
+ }
+ if (olderThan != null) {
+ expireSnapshots = expireSnapshots.expireOlderThan(olderThan);
+ }
+ expireSnapshots.commit();
+ }
+
+ @Nullable
+ private Long toMillis(String olderThanStr) {
+ if (olderThanStr == null || olderThanStr.isEmpty()) {
+ return null;
+ }
+
+ return DateTimeUtils.parseTimestampData(olderThanStr, 3, TimeZone.getDefault())
+ .getMillisecond();
+ }
+
+ @Override
+ public String procedureName() {
+ return PROCEDURE_NAME;
+ }
+}
diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/ProcedureBase.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/ProcedureBase.java
new file mode 100644
index 000000000000..23292d474608
--- /dev/null
+++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/ProcedureBase.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.procedure;
+
+import java.time.Duration;
+import javax.annotation.Nullable;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.table.procedures.Procedure;
+import org.apache.flink.util.TimeUtils;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+
+/** Base implementation for flink's {@link Procedure}. */
+public abstract class ProcedureBase implements Procedure, ProcedureService {
+
+ private Catalog catalog;
+
+ public ProcedureBase withCatalog(Catalog workingCatalog) {
+ this.catalog = workingCatalog;
+ return this;
+ }
+
+ @Override
+ public Procedure create(Catalog workingCatalog) {
+ try {
+ return this.getClass().getDeclaredConstructor().newInstance().withCatalog(workingCatalog);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected Table table(String tableId) throws NoSuchTableException {
+ return catalog.loadTable(TableIdentifier.parse(tableId));
+ }
+
+ protected String[] execute(ProcedureContext procedureContext, JobClient jobClient) {
+ StreamExecutionEnvironment env = procedureContext.getExecutionEnvironment();
+ ReadableConfig conf = env.getConfiguration();
+ return execute(jobClient, conf.get(TableConfigOptions.TABLE_DML_SYNC));
+ }
+
+ protected String[] execute(StreamExecutionEnvironment env, String defaultJobName)
+ throws Exception {
+ ReadableConfig conf = env.getConfiguration();
+ String name = conf.getOptional(PipelineOptions.NAME).orElse(defaultJobName);
+ return execute(env.executeAsync(name), conf.get(TableConfigOptions.TABLE_DML_SYNC));
+ }
+
+ private String[] execute(JobClient jobClient, boolean dmlSync) {
+ String jobId = jobClient.getJobID().toString();
+ if (dmlSync) {
+ try {
+ jobClient.getJobExecutionResult().get();
+ } catch (Exception e) {
+ throw new TableException(String.format("Failed to wait job '%s' finish", jobId), e);
+ }
+ return new String[] {"Success"};
+ } else {
+ return new String[] {"JobID=" + jobId};
+ }
+ }
+
+ @Nullable
+ protected static Duration toDuration(@Nullable String text) {
+ if (text == null) {
+ return null;
+ }
+
+ return TimeUtils.parseDuration(text);
+ }
+}
diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/ProcedureService.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/ProcedureService.java
new file mode 100644
index 000000000000..569e9765c3a8
--- /dev/null
+++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/ProcedureService.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.procedure;
+
+import org.apache.flink.table.procedures.Procedure;
+import org.apache.iceberg.catalog.Catalog;
+
+/**
+ * Service Provider Interface (SPI) for discovering and creating {@link Procedure} implementations.
+ *
+ * Each implementation acts as a provider for a specific procedure type and must be registered
+ * in:
+ *
+ *
resources/META-INF/services/org.apache.iceberg.flink.procedure.ProcedureService
+ *
+ *
+ * The file must contain the fully qualified class name of the ProviderService implementation.
+ */
+public interface ProcedureService {
+ /**
+ * Returns a procedure name which is used to create a given procedure.
+ *
+ *
For example: sql "CALL sys.create_tag(tableId, tagName)" will provide "create_tag" name
+ * which will be used by {@link ProcedureUtil}'s registry to map this to {@link
+ * CreateTagProcedure} class.
+ */
+ String procedureName();
+
+ /**
+ * Creates a specific procedure within specific catalog. This method used in {@link ProcedureBase}
+ * to instantiate a necessary procedure via reflection mechanism.
+ */
+ Procedure create(Catalog catalog);
+}
diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/ProcedureUtil.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/ProcedureUtil.java
new file mode 100644
index 000000000000..28a640471c1a
--- /dev/null
+++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/ProcedureUtil.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.procedure;
+
+import java.util.Map;
+import java.util.ServiceLoader;
+import org.apache.flink.table.procedures.Procedure;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+/**
+ * Util class to provide specific implementation for the given procedure tag. Implementations of the
+ * {@link ProcedureService} interface are loaded through Java's {@link ServiceLoader}.
+ */
+public class ProcedureUtil {
+
+ private ProcedureUtil() {
+ throw new UnsupportedOperationException("Cannot instantiate utility class");
+ }
+
+ private static final Map REGISTRY = Maps.newHashMap();
+
+ static {
+ ServiceLoader services = ServiceLoader.load(ProcedureService.class);
+ for (ProcedureService service : services) {
+ REGISTRY.put(service.procedureName(), service);
+ }
+ }
+
+ public static Procedure getProcedure(String procedureName, Catalog catalog) {
+ ProcedureService procedureService = REGISTRY.get(procedureName);
+ if (procedureService == null) {
+ throw new IllegalArgumentException("Unknown procedure: " + procedureName);
+ }
+ return procedureService.create(catalog);
+ }
+}
diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/RollbackToProcedure.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/RollbackToProcedure.java
new file mode 100644
index 000000000000..17849e73a790
--- /dev/null
+++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/RollbackToProcedure.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.procedure;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+
+/**
+ * Rollback procedure. Usage:
+ *
+ *
+ * -- rollback to a snapshot
+ * CALL sys.rollback_to(`table` => 'tableId', snapshot_id => snapshotId)
+ *
+ * -- rollback to a tag
+ * CALL sys.rollback_to(`table` => 'tableId', tag => 'tagName')
+ *
+ */
+public class RollbackToProcedure extends ProcedureBase {
+ public static final String PROCEDURE_NAME = "rollback_to";
+
+ @ProcedureHint(
+ arguments = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "tag", type = @DataTypeHint("STRING"), isOptional = true),
+ @ArgumentHint(name = "snapshot_id", type = @DataTypeHint("BIGINT"), isOptional = true)
+ })
+ public @DataTypeHint("ROW") Row[] call(
+ ProcedureContext procedureContext, String tableId, String tagName, Long snapshotId)
+ throws Exception {
+ Table table = table(tableId);
+ table.refresh();
+
+ Long requiredSnapshotId = snapshotId;
+
+ if (requiredSnapshotId != null && tagName != null) {
+ checkIfCompatible(table, tagName, requiredSnapshotId);
+ }
+
+ Long previousSnapshotId = table.currentSnapshot().snapshotId();
+ if (requiredSnapshotId == null) {
+ if (tagName == null) {
+ throw new IllegalArgumentException(
+ "No arguments to rollback to. Please specify a tag or a snapshot id");
+ }
+ Snapshot snapshot = table.snapshot(tagName);
+ requiredSnapshotId = snapshot.snapshotId();
+ }
+ rollbackTo(table, requiredSnapshotId);
+ return new Row[] {Row.of(previousSnapshotId, requiredSnapshotId)};
+ }
+
+ private void checkIfCompatible(Table table, String tagName, Long snapshotId) {
+ Snapshot tagSnapshot = table.snapshot(tagName);
+ if (snapshotId != tagSnapshot.snapshotId()) {
+ throw new IllegalArgumentException(
+ "Snapshot with provided snapshot id is not the same snapshot provided tag refers to. Please specify a tag or a snapshot id, or be sure both refer to the same snapshot");
+ }
+ }
+
+ private void rollbackTo(Table table, Long snapshotId) {
+ table.manageSnapshots().rollbackTo(snapshotId).commit();
+ }
+
+ @Override
+ public String procedureName() {
+ return PROCEDURE_NAME;
+ }
+}
diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/RollbackToTimestampProcedure.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/RollbackToTimestampProcedure.java
new file mode 100644
index 000000000000..588396fe5918
--- /dev/null
+++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/procedure/RollbackToTimestampProcedure.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.procedure;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+
+/**
+ * Rollback to timestamp procedure. Usage:
+ *
+ *
+ * -- rollback to the snapshot which earlier or equal than timestamp.
+ * CALL sys.rollback_to_timestamp(`table` => 'tableId', timestamp => timestamp)
+ *
+ */
+public class RollbackToTimestampProcedure extends ProcedureBase {
+ public static final String PROCEDURE_NAME = "rollback_to_timestamp";
+
+ @ProcedureHint(
+ arguments = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "timestamp", type = @DataTypeHint("BIGINT"))
+ })
+ public @DataTypeHint("ROW") Row[] call(
+ ProcedureContext procedureContext, String tableId, Long timestamp) throws Exception {
+ Table table = table(tableId);
+ table.refresh();
+
+ Long previousSnapshotId = table.currentSnapshot().snapshotId();
+ Long snapshotId = rollbackToTimestamp(table, timestamp);
+
+ return new Row[] {Row.of(previousSnapshotId, snapshotId)};
+ }
+
+ private Long rollbackToTimestamp(Table table, Long timestamp) {
+ Snapshot snapshot = getLastSnapshotEarlierOrEqualThanTimestamp(table, timestamp);
+
+ if (snapshot == null) {
+ throw new IllegalArgumentException(
+ "Could not find any snapshot whose commit-time earlier than " + timestamp);
+ }
+
+ long snapshotId = snapshot.snapshotId();
+
+ table.manageSnapshots().rollbackTo(snapshotId).commit();
+
+ return snapshotId;
+ }
+
+ private Snapshot getLastSnapshotEarlierOrEqualThanTimestamp(Table table, Long timestamp) {
+ Iterable snapshots = table.snapshots();
+ Snapshot target = null;
+ for (Snapshot snapshot : snapshots) {
+ if (snapshot.timestampMillis() <= timestamp) {
+ if (target == null || target.timestampMillis() <= snapshot.timestampMillis()) {
+ target = snapshot;
+ }
+ }
+ }
+
+ return target;
+ }
+
+ @Override
+ public String procedureName() {
+ return PROCEDURE_NAME;
+ }
+}
diff --git a/flink/v2.1/flink/src/main/resources/META-INF/services/org.apache.iceberg.flink.procedure.ProcedureService b/flink/v2.1/flink/src/main/resources/META-INF/services/org.apache.iceberg.flink.procedure.ProcedureService
new file mode 100644
index 000000000000..cffd51795ac3
--- /dev/null
+++ b/flink/v2.1/flink/src/main/resources/META-INF/services/org.apache.iceberg.flink.procedure.ProcedureService
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.iceberg.flink.procedure.CreateTagProcedure
+org.apache.iceberg.flink.procedure.DeleteTagProcedure
+org.apache.iceberg.flink.procedure.CreateTagFromTimestampProcedure
+org.apache.iceberg.flink.procedure.RollbackToProcedure
+org.apache.iceberg.flink.procedure.RollbackToTimestampProcedure
+org.apache.iceberg.flink.procedure.CreateBranchProcedure
+org.apache.iceberg.flink.procedure.DeleteBranchProcedure
+org.apache.iceberg.flink.procedure.ExpireSnapshotsProcedure
diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/procedure/ProcedureTestBase.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/procedure/ProcedureTestBase.java
new file mode 100644
index 000000000000..a805315157bf
--- /dev/null
+++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/procedure/ProcedureTestBase.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.procedure;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.CatalogTestBase;
+import org.apache.iceberg.flink.FlinkCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+/** Base class for {@link org.apache.flink.table.procedures.Procedure} tests. */
+public class ProcedureTestBase extends CatalogTestBase {
+ private Snapshot firstSnapshot;
+ private Catalog currentCatalog;
+
+ @Override
+ @BeforeEach
+ public void before() {
+ super.before();
+ sql("CREATE DATABASE %s", flinkDatabase);
+ sql("USE CATALOG %s", catalogName);
+ sql("USE %s", DATABASE);
+
+ this.currentCatalog = ((FlinkCatalog) getTableEnv().getCatalog(catalogName).get()).catalog();
+ }
+
+ @AfterEach
+ public void cleanNamespaces() {
+ sql("DROP TABLE IF EXISTS %s.T", flinkDatabase);
+ dropDatabase(flinkDatabase, true);
+ super.clean();
+ }
+
+ protected long getFirstSnapshotId(String tableName) {
+ if (firstSnapshot == null) {
+ firstSnapshot = getFirstSnapshot(tableName);
+ }
+
+ return firstSnapshot.snapshotId();
+ }
+
+ protected long getLastSnapshotId(String tableName) {
+ Table table = getFreshTable(tableName);
+ Snapshot lastSnapshot = table.currentSnapshot();
+
+ return lastSnapshot.snapshotId();
+ }
+
+ private Snapshot getFirstSnapshot(String tableName) {
+ Table table = getFreshTable(tableName);
+ Snapshot snapshot = table.currentSnapshot();
+ while (snapshot.parentId() != null) {
+ snapshot = table.snapshot(snapshot.parentId());
+ }
+
+ return snapshot;
+ }
+
+ protected long getFirstSnapshotTimestamp(String tableName) {
+ if (firstSnapshot == null) {
+ firstSnapshot = getFirstSnapshot(tableName);
+ }
+
+ return firstSnapshot.timestampMillis();
+ }
+
+ protected long getLastSnapshotTimestamp(String tableName) {
+ Table table = getFreshTable(tableName);
+ Snapshot lastSnapshot = table.currentSnapshot();
+
+ return lastSnapshot.timestampMillis();
+ }
+
+ protected List getSnapshotTimestampsSorted(String tableName) {
+ List snapshots = getSnapshotsSortedByTimestamps(tableName);
+ List timestamps = Lists.newArrayList();
+ for (Snapshot snapshot : snapshots) {
+ timestamps.add(snapshot.timestampMillis());
+ }
+ return timestamps;
+ }
+
+ protected List getSnapshotIdsSortedByTimestamps(String tableName) {
+ List snapshots = getSnapshotsSortedByTimestamps(tableName);
+ List snapshotIds = Lists.newArrayList();
+ for (Snapshot snapshot : snapshots) {
+ snapshotIds.add(snapshot.snapshotId());
+ }
+ return snapshotIds;
+ }
+
+ private Table getFreshTable(String tableName) {
+ Table table = currentCatalog.loadTable(TableIdentifier.parse(tableName));
+ table.refresh();
+ return table;
+ }
+
+ private List getSnapshotsSortedByTimestamps(String tableName) {
+ Table table = getFreshTable(tableName);
+ List snapshots = Lists.newArrayList();
+ table.snapshots().forEach(snapshots::add);
+ snapshots.sort(Comparator.comparingLong(Snapshot::timestampMillis));
+ return snapshots;
+ }
+
+ protected Object toDateTime(long millis) {
+ return LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneId.systemDefault())
+ .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+ }
+}
diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/procedure/TestCreateAndDeleteBranchProcedure.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/procedure/TestCreateAndDeleteBranchProcedure.java
new file mode 100644
index 000000000000..754226cd5a49
--- /dev/null
+++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/procedure/TestCreateAndDeleteBranchProcedure.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.procedure;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.TestTemplate;
+
+/** Unit tests for {@link CreateBranchProcedure} and {@link DeleteBranchProcedure}. */
+public class TestCreateAndDeleteBranchProcedure extends ProcedureTestBase {
+ @TestTemplate
+ public void testCreateAndDeleteBranchFromCurrentSnapshot() {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt)");
+ sql("insert into T values('k', '2024-01-01')");
+ sql("insert into T values('k2', '2024-01-02')");
+ sql("insert into T values('k3', '2024-01-03')");
+
+ String tableName = getFullQualifiedTableName("T");
+
+ sql("CALL sys.create_branch('%s', 'branch1')", tableName);
+
+ sql("insert into T /*+ OPTIONS('branch'='branch1') */ values('k4', '2024-01-04')");
+
+ assertThat(
+ sql("select * from T /*+ OPTIONS('branch'='branch1') */").stream().map(Row::toString))
+ .containsExactlyInAnyOrder(
+ "+I[k4, 2024-01-04]", "+I[k3, 2024-01-03]", "+I[k2, 2024-01-02]", "+I[k, 2024-01-01]");
+
+ sql("CALL sys.delete_branch('%s', 'branch1')", tableName);
+
+ assertThat(sql("select * from T ").stream().map(Row::toString))
+ .containsExactlyInAnyOrder("+I[k3, 2024-01-03]", "+I[k2, 2024-01-02]", "+I[k, 2024-01-01]");
+ }
+
+ @TestTemplate
+ public void testDeleteNonExistentBranch() {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt)");
+
+ String tableName = getFullQualifiedTableName("T");
+
+ assertThatThrownBy(() -> sql("CALL sys.delete_branch('%s', 'branch1')", tableName))
+ .isInstanceOf(TableException.class)
+ .hasMessage("The call method caused an error: Branch does not exist: branch1.");
+ }
+
+ @TestTemplate
+ public void testCreateEmptyBranch() {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt)");
+
+ String tableName = getFullQualifiedTableName("T");
+
+ sql("CALL sys.create_branch('%s', 'branch1')", tableName);
+
+ sql("insert into T /*+ OPTIONS('branch'='branch1') */ values('k1', '2024-01-01')");
+
+ assertThat(
+ sql("select * from T /*+ OPTIONS('branch'='branch1') */").stream().map(Row::toString))
+ .containsExactlyInAnyOrder("+I[k1, 2024-01-01]");
+
+ assertThat(sql("select * from T ").stream().map(Row::toString)).isEmpty();
+ }
+
+ @TestTemplate
+ public void testCreateAndDeleteBranchFromTag() {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt)");
+ sql("insert into T values('k', '2024-01-01')");
+ sql("insert into T values('k2', '2024-01-02')");
+ sql("insert into T values('k3', '2024-01-03')");
+
+ String tableName = getFullQualifiedTableName("T");
+
+ sql("CALL sys.create_tag('%s', 'tag1')", tableName);
+
+ sql("CALL sys.create_branch('%s', 'branch1', 'tag1')", tableName);
+
+ sql("insert into T /*+ OPTIONS('branch'='branch1') */ values('k4', '2024-01-04')");
+
+ assertThat(
+ sql("select * from T /*+ OPTIONS('branch'='branch1') */").stream().map(Row::toString))
+ .containsExactlyInAnyOrder(
+ "+I[k4, 2024-01-04]", "+I[k3, 2024-01-03]", "+I[k2, 2024-01-02]", "+I[k, 2024-01-01]");
+
+ sql("CALL sys.delete_branch('%s', 'branch1')", tableName);
+
+ assertThat(sql("select * from T /*+ OPTIONS('tag'='tag1') */").stream().map(Row::toString))
+ .containsExactlyInAnyOrder("+I[k3, 2024-01-03]", "+I[k2, 2024-01-02]", "+I[k, 2024-01-01]");
+ }
+}
diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/procedure/TestCreateAndDeleteTagProcedure.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/procedure/TestCreateAndDeleteTagProcedure.java
new file mode 100644
index 000000000000..36e01aa80384
--- /dev/null
+++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/procedure/TestCreateAndDeleteTagProcedure.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.procedure;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.TestTemplate;
+
+/** Unit tests for {@link CreateTagProcedure} and {@link DeleteTagProcedure}. */
+public class TestCreateAndDeleteTagProcedure extends ProcedureTestBase {
+ @TestTemplate
+ public void testCreateAndDeleteTags() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt)");
+ for (int i = 1; i <= 4; i++) {
+ sql("insert into T values('k" + i + "', '2024-01-01')");
+ Thread.sleep(100L);
+ }
+
+ String tableName = getFullQualifiedTableName("T");
+
+ sql("CALL sys.create_tag('%s', 'tag1')", tableName);
+
+ assertThat(sql("select * from T /*+ OPTIONS('tag'='tag1') */").stream().map(Row::toString))
+ .containsExactlyInAnyOrder(
+ "+I[k4, 2024-01-01]", "+I[k3, 2024-01-01]", "+I[k2, 2024-01-01]", "+I[k1, 2024-01-01]");
+
+ long firstSnapshotId = getFirstSnapshotId(tableName);
+
+ sql("CALL sys.create_tag('%s', 'tag2', %s)", tableName, firstSnapshotId);
+
+ assertThat(sql("select * from T /*+ OPTIONS('tag'='tag2') */").stream().map(Row::toString))
+ .containsExactlyInAnyOrder("+I[k1, 2024-01-01]");
+
+ sql("CALL sys.delete_tag('%s', 'tag2')", tableName);
+
+ assertThatThrownBy(
+ () -> sql("select * from T /*+ OPTIONS('tag'='tag2') */").stream().map(Row::toString))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage("Failed to collect table result");
+ }
+
+ @TestTemplate
+ public void testCreateAndDeleteTagsWithNamedParameters() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt)");
+ for (int i = 1; i <= 4; i++) {
+ sql("insert into T values('k" + i + "', '2024-01-01')");
+ Thread.sleep(100L);
+ }
+
+ String tableName = getFullQualifiedTableName("T");
+
+ sql("CALL sys.create_tag(`table` => '%s', `tag` => 'tag1')", tableName);
+
+ assertThat(sql("select * from T /*+ OPTIONS('tag'='tag1') */").stream().map(Row::toString))
+ .containsExactlyInAnyOrder(
+ "+I[k4, 2024-01-01]", "+I[k3, 2024-01-01]", "+I[k2, 2024-01-01]", "+I[k1, 2024-01-01]");
+
+ long firstSnapshotId = getFirstSnapshotId(tableName);
+
+ sql(
+ "CALL sys.create_tag(`table` => '%s', `tag` => 'tag2', `snapshot_id` => %s)",
+ tableName, firstSnapshotId);
+
+ assertThat(sql("select * from T /*+ OPTIONS('tag'='tag2') */").stream().map(Row::toString))
+ .containsExactlyInAnyOrder("+I[k1, 2024-01-01]");
+
+ sql("CALL sys.delete_tag(`table` => '%s', `tag` => 'tag2')", tableName);
+
+ assertThatThrownBy(
+ () -> sql("select * from T /*+ OPTIONS('tag'='tag2') */").stream().map(Row::toString))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage("Failed to collect table result");
+ }
+}
diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/procedure/TestCreateTagFromTimestampProcedure.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/procedure/TestCreateTagFromTimestampProcedure.java
new file mode 100644
index 000000000000..abbc0b13cda8
--- /dev/null
+++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/procedure/TestCreateTagFromTimestampProcedure.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.procedure;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.List;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.TestTemplate;
+
+/** Unit tests for {@link CreateTagFromTimestampProcedure}. */
+public class TestCreateTagFromTimestampProcedure extends ProcedureTestBase {
+ @TestTemplate
+ public void testCreateTagFromTimestampProcedure() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt)");
+ for (int i = 1; i <= 3; i++) {
+ sql("insert into T values('k" + i + "', '2024-01-01')");
+ Thread.sleep(100L);
+ }
+
+ String tableName = getFullQualifiedTableName("T");
+
+ List timestamps = getSnapshotTimestampsSorted(tableName);
+ List snapshotIds = getSnapshotIdsSortedByTimestamps(tableName);
+
+ assertThat(
+ sql(
+ "CALL sys.create_tag_from_timestamp('%s', 'tag1', %s)",
+ tableName, timestamps.get(0) - 1))
+ .containsExactly(Row.of("tag1", snapshotIds.get(0), timestamps.get(0)));
+
+ assertThat(sql("select * from T /*+ OPTIONS('tag'='tag1') */").stream().map(Row::toString))
+ .containsExactlyInAnyOrder("+I[k1, 2024-01-01]");
+
+ assertThat(
+ sql(
+ "CALL sys.create_tag_from_timestamp('%s', 'tag2', %s)",
+ tableName, timestamps.get(1) - 1))
+ .containsExactly(Row.of("tag2", snapshotIds.get(1), timestamps.get(1)));
+
+ assertThat(sql("select * from T /*+ OPTIONS('tag'='tag2') */").stream().map(Row::toString))
+ .containsExactlyInAnyOrder("+I[k1, 2024-01-01]", "+I[k2, 2024-01-01]");
+
+ assertThat(
+ sql(
+ "CALL sys.create_tag_from_timestamp('%s', 'tag3', %s)",
+ tableName, timestamps.get(2) - 1))
+ .containsExactly(Row.of("tag3", snapshotIds.get(2), timestamps.get(2)));
+
+ assertThat(sql("select * from T /*+ OPTIONS('tag'='tag3') */").stream().map(Row::toString))
+ .containsExactlyInAnyOrder(
+ "+I[k1, 2024-01-01]", "+I[k2, 2024-01-01]", "+I[k3, 2024-01-01]");
+ }
+
+ @TestTemplate
+ public void testCreateTagFromTimestampProcedureWithNamedParameters() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt)");
+ for (int i = 1; i <= 3; i++) {
+ sql("insert into T values('k" + i + "', '2024-01-01')");
+ Thread.sleep(100L);
+ }
+
+ String tableName = getFullQualifiedTableName("T");
+
+ List timestamps = getSnapshotTimestampsSorted(tableName);
+ List snapshotIds = getSnapshotIdsSortedByTimestamps(tableName);
+
+ assertThat(
+ sql(
+ "CALL sys.create_tag_from_timestamp(`table` => '%s', `tag` => 'tag1', `timestamp` => %s)",
+ tableName, timestamps.get(0) - 1))
+ .containsExactly(Row.of("tag1", snapshotIds.get(0), timestamps.get(0)));
+
+ assertThat(sql("select * from T /*+ OPTIONS('tag'='tag1') */").stream().map(Row::toString))
+ .containsExactlyInAnyOrder("+I[k1, 2024-01-01]");
+
+ assertThat(
+ sql(
+ "CALL sys.create_tag_from_timestamp(`table` => '%s', `tag` => 'tag2', `timestamp` => %s)",
+ tableName, timestamps.get(1) - 1))
+ .containsExactly(Row.of("tag2", snapshotIds.get(1), timestamps.get(1)));
+
+ assertThat(sql("select * from T /*+ OPTIONS('tag'='tag2') */").stream().map(Row::toString))
+ .containsExactlyInAnyOrder("+I[k1, 2024-01-01]", "+I[k2, 2024-01-01]");
+
+ assertThat(
+ sql(
+ "CALL sys.create_tag_from_timestamp(`table` => '%s', `tag` => 'tag3', `timestamp` => %s)",
+ tableName, timestamps.get(2) - 1))
+ .containsExactly(Row.of("tag3", snapshotIds.get(2), timestamps.get(2)));
+
+ assertThat(sql("select * from T /*+ OPTIONS('tag'='tag3') */").stream().map(Row::toString))
+ .containsExactlyInAnyOrder(
+ "+I[k1, 2024-01-01]", "+I[k2, 2024-01-01]", "+I[k3, 2024-01-01]");
+ }
+
+ @TestTemplate
+ public void testNoSnapshotLaterThanTimestamp() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt)");
+ for (int i = 1; i <= 3; i++) {
+ sql("insert into T values('k" + i + "', '2024-01-01')");
+ Thread.sleep(100L);
+ }
+
+ String tableName = getFullQualifiedTableName("T");
+
+ long lastTimestamp = getLastSnapshotTimestamp(tableName);
+
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CALL sys.create_tag_from_timestamp(`table` => '%s', `tag` => 'tag1', `timestamp` => %s)",
+ tableName, lastTimestamp))
+ .isInstanceOf(TableException.class)
+ .hasMessage(
+ "The call method caused an error: Could not find any snapshot whose commit-time later than %s.",
+ lastTimestamp);
+ }
+}
diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/procedure/TestExpireSnapshotsProcedure.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/procedure/TestExpireSnapshotsProcedure.java
new file mode 100644
index 000000000000..9ce32f8b3eab
--- /dev/null
+++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/procedure/TestExpireSnapshotsProcedure.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.procedure;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.List;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.TestTemplate;
+
+/** Unit tests for {@link ExpireSnapshotsProcedure}. */
+public class TestExpireSnapshotsProcedure extends ProcedureTestBase {
+ @TestTemplate
+ public void testDefaultOptions() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt)");
+ for (int i = 1; i <= 4; i++) {
+ sql("insert into T values('k" + i + "', '2024-01-01')");
+ Thread.sleep(100L);
+ }
+
+ String tableName = getFullQualifiedTableName("T");
+
+ sql("CALL sys.expire_snapshots(`table` => '%s')", tableName);
+
+ // Default options are retain_last = 1, older_than = now - 5 days
+ assertThat(sql("select * from T").stream().map(Row::toString))
+ .containsExactlyInAnyOrder(
+ "+I[k1, 2024-01-01]", "+I[k2, 2024-01-01]", "+I[k3, 2024-01-01]", "+I[k4, 2024-01-01]");
+ }
+
+ @TestTemplate
+ public void testOneSnapshotStays() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt)");
+ for (int i = 1; i <= 4; i++) {
+ sql("insert into T values('k" + i + "', '2024-01-01')");
+ Thread.sleep(100L);
+ }
+
+ String tableName = getFullQualifiedTableName("T");
+
+ long latestSnapshotId = getLastSnapshotId(tableName);
+ long latestSnapshotTime = getLastSnapshotTimestamp(tableName);
+
+ sql(
+ "CALL sys.expire_snapshots(`table` => '%s', `older_than` => '%s')",
+ tableName, toDateTime(latestSnapshotTime + 10_000));
+
+ List snapshotIds = getSnapshotIdsSortedByTimestamps(tableName);
+
+ // By default, at least one snapshot stays
+ assertThat(snapshotIds).containsExactly(latestSnapshotId);
+ }
+
+ @TestTemplate
+ public void testTwoSnapshotsStay() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt)");
+ for (int i = 1; i <= 4; i++) {
+ sql("insert into T values('k" + i + "', '2024-01-01')");
+ Thread.sleep(1000L);
+ }
+
+ String tableName = getFullQualifiedTableName("T");
+
+ List snapshotTimestamps = getSnapshotTimestampsSorted(tableName);
+
+ assertThat(snapshotTimestamps).hasSize(4);
+
+ sql(
+ "CALL sys.expire_snapshots(`table` => '%s', `older_than` => '%s')",
+ tableName, toDateTime(snapshotTimestamps.get(2)));
+
+ List snapshotIds = getSnapshotIdsSortedByTimestamps(tableName);
+
+ // Only last two snapshots should stay
+ assertThat(snapshotIds).hasSize(2);
+ }
+
+ @TestTemplate
+ public void testRetainLastAllStay() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt)");
+ for (int i = 1; i <= 4; i++) {
+ sql("insert into T values('k" + i + "', '2024-01-01')");
+ Thread.sleep(100L);
+ }
+
+ String tableName = getFullQualifiedTableName("T");
+
+ long latestSnapshotTime = getLastSnapshotTimestamp(tableName);
+
+ sql(
+ "CALL sys.expire_snapshots(`table` => '%s', `older_than` => '%s', `retain_last` => %s)",
+ tableName, toDateTime(latestSnapshotTime + 10_000), 4);
+
+ List snapshotIds = getSnapshotIdsSortedByTimestamps(tableName);
+
+ // All 4 snapshots stay
+ assertThat(snapshotIds).hasSize(4);
+ }
+
+ @TestTemplate
+ public void testRetainLastNoneStays() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt)");
+ for (int i = 1; i <= 4; i++) {
+ sql("insert into T values('k" + i + "', '2024-01-01')");
+ Thread.sleep(100L);
+ }
+
+ String tableName = getFullQualifiedTableName("T");
+
+ long latestSnapshotTime = getLastSnapshotTimestamp(tableName);
+
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CALL sys.expire_snapshots(`table` => '%s', `older_than` => '%s', `retain_last` => %s)",
+ tableName, toDateTime(latestSnapshotTime + 10_000), 0))
+ .isInstanceOf(TableException.class)
+ .hasMessage(
+ "The call method caused an error: Number of snapshots to retain must be at least 1, cannot be: 0.");
+ }
+}
diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/procedure/TestRollbackToProcedure.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/procedure/TestRollbackToProcedure.java
new file mode 100644
index 000000000000..9570b8873c51
--- /dev/null
+++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/procedure/TestRollbackToProcedure.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.procedure;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.TestTemplate;
+
+/** Unit tests for {@link TestRollbackToProcedure}. */
+public class TestRollbackToProcedure extends ProcedureTestBase {
+ @TestTemplate
+ public void testRollbackToSnapshot() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt)");
+ for (int i = 1; i <= 4; i++) {
+ sql("insert into T values('k" + i + "', '2024-01-01')");
+ Thread.sleep(100L);
+ }
+
+ String tableName = getFullQualifiedTableName("T");
+
+ long latestSnapshotId = getLastSnapshotId(tableName);
+ long firstSnapshotId = getFirstSnapshotId(tableName);
+
+ assertThat(
+ sql(
+ "CALL sys.rollback_to(`table` => '%s', `snapshot_id` => %s)",
+ tableName, firstSnapshotId))
+ .containsExactly(Row.of(latestSnapshotId, firstSnapshotId));
+
+ assertThat(sql("select * from T").stream().map(Row::toString))
+ .containsExactlyInAnyOrder("+I[k1, 2024-01-01]");
+ }
+
+ @TestTemplate
+ public void testRollbackToTag() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt)");
+ for (int i = 1; i <= 4; i++) {
+ sql("insert into T values('k" + i + "', '2024-01-01')");
+ Thread.sleep(100L);
+ }
+
+ String tableName = getFullQualifiedTableName("T");
+
+ long latestSnapshotId = getLastSnapshotId(tableName);
+ long firstSnapshotId = getFirstSnapshotId(tableName);
+
+ sql("CALL sys.create_tag('%s', 'tag1', %s)", tableName, firstSnapshotId);
+
+ assertThat(sql("CALL sys.rollback_to(`table` => '%s', `tag` => 'tag1')", tableName))
+ .containsExactly(Row.of(latestSnapshotId, firstSnapshotId));
+
+ assertThat(sql("select * from T").stream().map(Row::toString))
+ .containsExactlyInAnyOrder("+I[k1, 2024-01-01]");
+ }
+
+ @TestTemplate
+ public void testRollbackToTagAndSnapshot() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt)");
+ for (int i = 1; i <= 4; i++) {
+ sql("insert into T values('k" + i + "', '2024-01-01')");
+ Thread.sleep(100L);
+ }
+
+ String tableName = getFullQualifiedTableName("T");
+
+ long latestSnapshotId = getLastSnapshotId(tableName);
+ long firstSnapshotId = getFirstSnapshotId(tableName);
+
+ sql("CALL sys.create_tag('%s', 'tag1', %s)", tableName, firstSnapshotId);
+
+ assertThat(
+ sql(
+ "CALL sys.rollback_to(`table` => '%s', `tag` => 'tag1', `snapshot_id` => %s)",
+ tableName, firstSnapshotId))
+ .containsExactly(Row.of(latestSnapshotId, firstSnapshotId));
+
+ assertThat(sql("select * from T").stream().map(Row::toString))
+ .containsExactlyInAnyOrder("+I[k1, 2024-01-01]");
+ }
+
+ @TestTemplate
+ public void testRollbackToTagAndSnapshotIncompatible() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt)");
+ for (int i = 1; i <= 4; i++) {
+ sql("insert into T values('k" + i + "', '2024-01-01')");
+ Thread.sleep(100L);
+ }
+
+ String tableName = getFullQualifiedTableName("T");
+
+ long latestSnapshotId = getLastSnapshotId(tableName);
+ long firstSnapshotId = getFirstSnapshotId(tableName);
+
+ sql("CALL sys.create_tag('%s', 'tag1', %s)", tableName, firstSnapshotId);
+
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CALL sys.rollback_to(`table` => '%s', `tag` => 'tag1', `snapshot_id` => %s)",
+ tableName, latestSnapshotId))
+ .isInstanceOf(TableException.class)
+ .hasMessage(
+ "The call method caused an error: Snapshot with provided snapshot id is not the same snapshot provided tag refers to. Please specify a tag or a snapshot id, or be sure both refer to the same snapshot.");
+ }
+
+ @TestTemplate
+ public void testRollbackToEmptyArgs() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt)");
+ for (int i = 1; i <= 4; i++) {
+ sql("insert into T values('k" + i + "', '2024-01-01')");
+ Thread.sleep(100L);
+ }
+
+ String tableName = getFullQualifiedTableName("T");
+
+ assertThatThrownBy(() -> sql("CALL sys.rollback_to(`table` => '%s')", tableName))
+ .isInstanceOf(TableException.class)
+ .hasMessage(
+ "The call method caused an error: No arguments to rollback to. Please specify a tag or a snapshot id.");
+ }
+}
diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/procedure/TestRollbackToTimestampProcedure.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/procedure/TestRollbackToTimestampProcedure.java
new file mode 100644
index 000000000000..9239c78311cf
--- /dev/null
+++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/procedure/TestRollbackToTimestampProcedure.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.procedure;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.List;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.TestTemplate;
+
+/** Unit tests for {@link RollbackToTimestampProcedure}. */
+public class TestRollbackToTimestampProcedure extends ProcedureTestBase {
+ @TestTemplate
+ public void testRollbackToTimestampProcedure() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt)");
+ for (int i = 1; i <= 3; i++) {
+ sql("insert into T values('k" + i + "', '2024-01-01')");
+ Thread.sleep(100L);
+ }
+
+ String tableName = getFullQualifiedTableName("T");
+
+ List timestamps = getSnapshotTimestampsSorted(tableName);
+ List snapshotIds = getSnapshotIdsSortedByTimestamps(tableName);
+
+ assertThat(sql("CALL sys.rollback_to_timestamp('%s', %s)", tableName, timestamps.get(1) + 1))
+ .containsExactly(Row.of(snapshotIds.get(2), snapshotIds.get(1)));
+
+ assertThat(sql("select * from T").stream().map(Row::toString))
+ .containsExactlyInAnyOrder("+I[k1, 2024-01-01]", "+I[k2, 2024-01-01]");
+
+ assertThat(sql("CALL sys.rollback_to_timestamp('%s', %s)", tableName, timestamps.get(0) + 1))
+ .containsExactly(Row.of(snapshotIds.get(1), snapshotIds.get(0)));
+
+ assertThat(sql("select * from T").stream().map(Row::toString))
+ .containsExactlyInAnyOrder("+I[k1, 2024-01-01]");
+ }
+
+ @TestTemplate
+ public void testNoSnapshotToRollbackTo() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt)");
+ for (int i = 1; i <= 3; i++) {
+ sql("insert into T values('k" + i + "', '2024-01-01')");
+ Thread.sleep(100L);
+ }
+
+ String tableName = getFullQualifiedTableName("T");
+
+ long firstTimestamp = getFirstSnapshotTimestamp(tableName);
+
+ assertThatThrownBy(
+ () -> sql("CALL sys.rollback_to_timestamp('%s', %s)", tableName, firstTimestamp - 1))
+ .isInstanceOf(TableException.class)
+ .hasMessage(
+ "The call method caused an error: Could not find any snapshot whose commit-time earlier than %s.",
+ (firstTimestamp - 1));
+ }
+}