From 65ceead9de150655046299b1500fb08bc61edce9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Wed, 18 Mar 2026 20:53:55 +0800 Subject: [PATCH 1/2] [flink] Split btree index build and execute --- .../paimon/flink/btree/BTreeIndexTopoBuilder.java | 12 +++++++----- .../flink/procedure/CreateGlobalIndexProcedure.java | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java index 0a8cda3baf36..32d0ed433385 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java @@ -72,7 +72,7 @@ /** The {@link BTreeIndexTopoBuilder} for BTree index in Flink. */ public class BTreeIndexTopoBuilder { - public static void buildIndex( + public static boolean buildIndex( StreamExecutionEnvironment env, Supplier indexBuilderSupplier, FileStoreTable table, @@ -90,12 +90,12 @@ public static void buildIndex( List splits = splitByContiguousRowRange(indexBuilder.scan()); if (splits.isEmpty()) { - return; + return false; } Map>> partitionRangeSplits = groupSplitsByRange(splits); if (partitionRangeSplits.isEmpty()) { - return; + return false; } // 2. Select necessary columns (index field + ROW_ID) @@ -160,10 +160,10 @@ public static void buildIndex( commit(table, allCommitMessages); } - env.execute("Create btree global index for table: " + table.name()); + return true; } - public static void buildIndex( + public static void buildIndexAndExecute( StreamExecutionEnvironment env, FileStoreTable table, String indexColumn, @@ -177,6 +177,8 @@ public static void buildIndex( Collections.singletonList(indexColumn), partitionPredicate, userOptions); + + env.execute("Create btree global index for table: " + table.name()); } protected static DataStream executeForPartitionRange( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedure.java index 56675c18e4ce..664b69be4f9f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedure.java @@ -119,7 +119,7 @@ public String[] call( // Build global index based on index type indexType = indexType.toLowerCase().trim(); if ("btree".equals(indexType)) { - BTreeIndexTopoBuilder.buildIndex( + BTreeIndexTopoBuilder.buildIndexAndExecute( procedureContext.getExecutionEnvironment(), table, indexColumn, From 3dd6fcc7af10a3b53490bca01c757e6138f4861e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Wed, 18 Mar 2026 20:59:58 +0800 Subject: [PATCH 2/2] Fix minus --- .../apache/paimon/flink/btree/BTreeIndexTopoBuilder.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java index 32d0ed433385..c98f7452f7bd 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java @@ -170,15 +170,15 @@ public static void buildIndexAndExecute( PartitionPredicate partitionPredicate, Options userOptions) throws Exception { - buildIndex( + if (buildIndex( env, () -> new BTreeGlobalIndexBuilder(table), table, Collections.singletonList(indexColumn), partitionPredicate, - userOptions); - - env.execute("Create btree global index for table: " + table.name()); + userOptions)) { + env.execute("Create btree global index for table: " + table.name()); + } } protected static DataStream executeForPartitionRange(