From e70de746c18780db684e6af11c4b5d371b6674c3 Mon Sep 17 00:00:00 2001 From: naivedogger <623628963@qq.com> Date: Thu, 31 Jul 2025 20:10:02 +0800 Subject: [PATCH 1/5] solve --- .../lance/LakeEnabledTableCreateITCase.java | 26 +++++++++++++++++-- .../coordinator/CoordinatorService.java | 15 ++++++++--- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java index e0c16e30dd..8154641f63 100644 --- a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java +++ b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java @@ -23,6 +23,7 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.exception.FlussRuntimeException; +import com.alibaba.fluss.exception.InvalidTableException; import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter; import com.alibaba.fluss.metadata.DataLakeFormat; import com.alibaba.fluss.metadata.Schema; @@ -30,7 +31,6 @@ import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.server.testutils.FlussClusterExtension; import com.alibaba.fluss.types.DataTypes; - import org.apache.arrow.vector.types.DateUnit; import org.apache.arrow.vector.types.FloatingPointPrecision; import org.apache.arrow.vector.types.TimeUnit; @@ -49,8 +49,11 @@ import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** ITCase for create lake enabled table with lance as lake storage. */ +/** + * ITCase for create lake enabled table with lance as lake storage. + */ class LakeEnabledTableCreateITCase { @RegisterExtension @@ -201,4 +204,23 @@ void testLogTable() throws Exception { logC18, logC19)); assertThat(expectedSchema).isEqualTo(LanceDatasetAdapter.getSchema(config).get()); } + + @Test + void testPrimaryKeyTable() throws Exception { + TableDescriptor pkTable = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("pk_c1", DataTypes.INT()) + .column("pk_c2", DataTypes.STRING()) + .primaryKey("pk_c1") + .build()) + .distributedBy(BUCKET_NUM) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .build(); + TablePath pkTablePath = TablePath.of(DATABASE, "pk_table"); + assertThatThrownBy(() -> admin.createTable(pkTablePath, pkTable, false).get()) + .cause() + .isInstanceOf(InvalidTableException.class); + } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java index b1d588fe16..7ab93ff15a 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java @@ -104,7 +104,6 @@ import com.alibaba.fluss.utils.concurrent.FutureUtils; import javax.annotation.Nullable; - import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.HashMap; @@ -129,7 +128,9 @@ import static com.alibaba.fluss.utils.Preconditions.checkNotNull; import static com.alibaba.fluss.utils.Preconditions.checkState; -/** An RPC Gateway service for coordinator server. */ +/** + * An RPC Gateway service for coordinator server. + */ public final class CoordinatorService extends RpcServiceBase implements CoordinatorGateway { private final int defaultBucketNumber; @@ -302,13 +303,21 @@ private TableDescriptor applySystemDefaults(TableDescriptor tableDescriptor) { if (newDescriptor.getPartitionKeys().size() > 1 && "true".equals(properties.get(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED.key())) && !properties.containsKey( - ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.key())) { + ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.key())) { Map newProperties = new HashMap<>(newDescriptor.getProperties()); // disable precreate partitions for multi-level partitions. newProperties.put(ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.key(), "0"); newDescriptor = newDescriptor.withProperties(newProperties); } + // currently, we don't support primary key table for lance + if (dataLakeFormat != null && dataLakeFormat.equals(DataLakeFormat.LANCE)) { + if (newDescriptor.hasPrimaryKey()) { + throw new InvalidTableException( + "Currently, we don't support tiering a primary key table to Lance"); + } + } + // override the datalake format if the table hasn't set it and the cluster configured if (dataLakeFormat != null && !properties.containsKey(ConfigOptions.TABLE_DATALAKE_FORMAT.key())) { From c2179f2b631f8ac0b78de97cc02b679dbe2cfad2 Mon Sep 17 00:00:00 2001 From: naivedogger <623628963@qq.com> Date: Fri, 1 Aug 2025 14:20:33 +0800 Subject: [PATCH 2/5] fix format --- .../fluss/lake/lance/LakeEnabledTableCreateITCase.java | 5 ++--- .../fluss/server/coordinator/CoordinatorService.java | 7 +++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java index 8154641f63..ff39a81efb 100644 --- a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java +++ b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java @@ -31,6 +31,7 @@ import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.server.testutils.FlussClusterExtension; import com.alibaba.fluss.types.DataTypes; + import org.apache.arrow.vector.types.DateUnit; import org.apache.arrow.vector.types.FloatingPointPrecision; import org.apache.arrow.vector.types.TimeUnit; @@ -51,9 +52,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** - * ITCase for create lake enabled table with lance as lake storage. - */ +/** ITCase for create lake enabled table with lance as lake storage. */ class LakeEnabledTableCreateITCase { @RegisterExtension diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java index 7ab93ff15a..2c8fd17ce5 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java @@ -104,6 +104,7 @@ import com.alibaba.fluss.utils.concurrent.FutureUtils; import javax.annotation.Nullable; + import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.HashMap; @@ -128,9 +129,7 @@ import static com.alibaba.fluss.utils.Preconditions.checkNotNull; import static com.alibaba.fluss.utils.Preconditions.checkState; -/** - * An RPC Gateway service for coordinator server. - */ +/** An RPC Gateway service for coordinator server. */ public final class CoordinatorService extends RpcServiceBase implements CoordinatorGateway { private final int defaultBucketNumber; @@ -303,7 +302,7 @@ private TableDescriptor applySystemDefaults(TableDescriptor tableDescriptor) { if (newDescriptor.getPartitionKeys().size() > 1 && "true".equals(properties.get(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED.key())) && !properties.containsKey( - ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.key())) { + ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.key())) { Map newProperties = new HashMap<>(newDescriptor.getProperties()); // disable precreate partitions for multi-level partitions. newProperties.put(ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.key(), "0"); From 7de3dc4ab01486ec3d7afe04d0eeae1f3ce07479 Mon Sep 17 00:00:00 2001 From: naivedogger <623628963@qq.com> Date: Mon, 11 Aug 2025 11:45:33 +0800 Subject: [PATCH 3/5] address comments --- .../coordinator/CoordinatorService.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java index 2c8fd17ce5..ed9a56cc21 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java @@ -261,6 +261,17 @@ public CompletableFuture createTable(CreateTableRequest req tableAssignment = generateAssignment(bucketCount, replicaFactor, servers); } + // TODO: should support tiering a primary key table to lance in the future + // currently, we don't support primary key table for lance + if (isDataLakeEnabled(tableDescriptor) + && dataLakeFormat != null + && dataLakeFormat.equals(DataLakeFormat.LANCE)) { + if (tableDescriptor.hasPrimaryKey()) { + throw new InvalidTableException( + "Currently, we don't support tiering a primary key table to Lance"); + } + } + // TODO: should tolerate if the lake exist but matches our schema. This ensures eventually // consistent by idempotently creating the table multiple times. See #846 // before create table in fluss, we may create in lake @@ -309,14 +320,6 @@ private TableDescriptor applySystemDefaults(TableDescriptor tableDescriptor) { newDescriptor = newDescriptor.withProperties(newProperties); } - // currently, we don't support primary key table for lance - if (dataLakeFormat != null && dataLakeFormat.equals(DataLakeFormat.LANCE)) { - if (newDescriptor.hasPrimaryKey()) { - throw new InvalidTableException( - "Currently, we don't support tiering a primary key table to Lance"); - } - } - // override the datalake format if the table hasn't set it and the cluster configured if (dataLakeFormat != null && !properties.containsKey(ConfigOptions.TABLE_DATALAKE_FORMAT.key())) { From 5eac3023329bb4d92ecc3eed39d5518192b53bca Mon Sep 17 00:00:00 2001 From: naivedogger <623628963@qq.com> Date: Tue, 12 Aug 2025 20:45:59 +0800 Subject: [PATCH 4/5] adress comments --- .../alibaba/fluss/lake/lance/LanceLakeCatalog.java | 7 +++++++ .../lake/lance/LakeEnabledTableCreateITCase.java | 3 ++- .../fluss/server/coordinator/CoordinatorService.java | 11 ----------- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeCatalog.java b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeCatalog.java index daaf26eb2a..827c9582d7 100644 --- a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeCatalog.java +++ b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeCatalog.java @@ -18,6 +18,7 @@ package com.alibaba.fluss.lake.lance; import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.exception.InvalidTableException; import com.alibaba.fluss.lake.lakestorage.LakeCatalog; import com.alibaba.fluss.lake.lance.utils.LanceArrowUtils; import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter; @@ -58,6 +59,12 @@ public LanceLakeCatalog(Configuration config) { @Override public void createTable(TablePath tablePath, TableDescriptor tableDescriptor) { + // currently, we don't support primary key table for lance + if (tableDescriptor.hasPrimaryKey()) { + throw new InvalidTableException( + "Currently, we don't support tiering a primary key table to Lance"); + } + LanceConfig config = LanceConfig.from( options.toMap(), tablePath.getDatabaseName(), tablePath.getTableName()); diff --git a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java index ff39a81efb..1b071e8674 100644 --- a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java +++ b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java @@ -220,6 +220,7 @@ void testPrimaryKeyTable() throws Exception { TablePath pkTablePath = TablePath.of(DATABASE, "pk_table"); assertThatThrownBy(() -> admin.createTable(pkTablePath, pkTable, false).get()) .cause() - .isInstanceOf(InvalidTableException.class); + .isInstanceOf(InvalidTableException.class) + .hasMessage("Currently, we don't support tiering a primary key table to Lance"); } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java index ed9a56cc21..b1d588fe16 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java @@ -261,17 +261,6 @@ public CompletableFuture createTable(CreateTableRequest req tableAssignment = generateAssignment(bucketCount, replicaFactor, servers); } - // TODO: should support tiering a primary key table to lance in the future - // currently, we don't support primary key table for lance - if (isDataLakeEnabled(tableDescriptor) - && dataLakeFormat != null - && dataLakeFormat.equals(DataLakeFormat.LANCE)) { - if (tableDescriptor.hasPrimaryKey()) { - throw new InvalidTableException( - "Currently, we don't support tiering a primary key table to Lance"); - } - } - // TODO: should tolerate if the lake exist but matches our schema. This ensures eventually // consistent by idempotently creating the table multiple times. See #846 // before create table in fluss, we may create in lake From a3f54ebee97edbf051613377596c7c014c39379f Mon Sep 17 00:00:00 2001 From: naivedogger <623628963@qq.com> Date: Tue, 12 Aug 2025 22:22:31 +0800 Subject: [PATCH 5/5] rerun CI