From aae9556f2a965ca80de59964362b01991c47ee71 Mon Sep 17 00:00:00 2001 From: Liebing Date: Tue, 17 Mar 2026 16:20:25 +0800 Subject: [PATCH] [client] Support specifying rack when adding/removing server tags in Admin API --- .../org/apache/fluss/client/admin/Admin.java | 95 +++++++++++++++++++ .../client/admin/RackAwareClusterITCase.java | 42 ++++++++ .../AddServerTagByRackProcedure.java | 65 +++++++++++++ .../flink/procedure/ProcedureManager.java | 3 + .../RemoveServerTagByRackProcedure.java | 56 +++++++++++ .../flink/procedure/FlinkProcedureITCase.java | 64 +++++++++++++ website/docs/engine-flink/procedures.md | 80 ++++++++++++++++ 7 files changed, 405 insertions(+) create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/AddServerTagByRackProcedure.java create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/RemoveServerTagByRackProcedure.java diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java index d8fe576608..4ae65df4c4 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java @@ -22,6 +22,7 @@ import org.apache.fluss.client.metadata.KvSnapshots; import org.apache.fluss.client.metadata.LakeSnapshot; import org.apache.fluss.cluster.ServerNode; +import org.apache.fluss.cluster.ServerType; import org.apache.fluss.cluster.rebalance.GoalType; import org.apache.fluss.cluster.rebalance.RebalanceProgress; import org.apache.fluss.cluster.rebalance.ServerTag; @@ -72,10 +73,16 @@ import javax.annotation.Nullable; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.fluss.utils.Preconditions.checkArgument; +import static org.apache.fluss.utils.Preconditions.checkNotNull; /** * The administrative client for Fluss, which supports managing and inspecting tables, servers, @@ -633,6 +640,94 @@ ListOffsetsResult listOffsets( */ CompletableFuture removeServerTag(List tabletServers, ServerTag serverTag); + /** + * Add server tag to all tabletServers belonging to any of the specified racks. + * + *

If no tabletServer is found for any of the given racks, the operation completes + * successfully without making any changes. + * + *

If any tabletServer fails to add the tag, none of the tags will take effect. + * + *

    + *
  • {@link AuthorizationException} If the authenticated user doesn't have cluster + * permissions. + *
  • {@link ServerTagAlreadyExistException} If the server tag already exists for any one of + * the matched tabletServers, and the server tag is different from the existing one. + *
+ * + * @param racks the list of rack identifiers; all tabletServers whose {@link ServerNode#rack()} + * is contained in this list will be targeted. Must not be null or empty. + * @param serverTag the server tag to be added. + */ + default CompletableFuture addServerTagByRack(List racks, ServerTag serverTag) { + checkNotNull(racks, "racks must not be null"); + checkArgument(!racks.isEmpty(), "racks must not be empty"); + Set rackSet = new HashSet<>(racks); + return getServerNodes() + .thenCompose( + nodes -> { + List serverIds = + nodes.stream() + .filter( + n -> + n.serverType() + == ServerType + .TABLET_SERVER + && rackSet.contains(n.rack())) + .map(ServerNode::id) + .collect(Collectors.toList()); + if (serverIds.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + return addServerTag(serverIds, serverTag); + }); + } + + /** + * Remove server tag from all tabletServers belonging to any of the specified racks. + * + *

If no tabletServer is found for any of the given racks, the operation completes + * successfully without making any changes. + * + *

If any tabletServer fails to remove the tag, none of the tags will be removed. + * + *

No exception will be thrown if a matched server already has no server tag. + * + *

    + *
  • {@link AuthorizationException} If the authenticated user doesn't have cluster + * permissions. + *
  • {@link ServerTagNotExistException} If the server tag does not exist for any one of the + * matched tabletServers. + *
+ * + * @param racks the list of rack identifiers; all tabletServers whose {@link ServerNode#rack()} + * is contained in this list will be targeted. Must not be null or empty. + * @param serverTag the server tag to be removed. + */ + default CompletableFuture removeServerTagByRack(List racks, ServerTag serverTag) { + checkNotNull(racks, "racks must not be null"); + checkArgument(!racks.isEmpty(), "racks must not be empty"); + Set rackSet = new HashSet<>(racks); + return getServerNodes() + .thenCompose( + nodes -> { + List serverIds = + nodes.stream() + .filter( + n -> + n.serverType() + == ServerType + .TABLET_SERVER + && rackSet.contains(n.rack())) + .map(ServerNode::id) + .collect(Collectors.toList()); + if (serverIds.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + return removeServerTag(serverIds, serverTag); + }); + } + /** * Based on the provided {@code priorityGoals}, Fluss performs load balancing on the cluster's * bucket load. diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/RackAwareClusterITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/RackAwareClusterITCase.java index edf57fb32a..831cce3957 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/RackAwareClusterITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/RackAwareClusterITCase.java @@ -19,6 +19,7 @@ import org.apache.fluss.client.Connection; import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.metadata.DatabaseDescriptor; @@ -26,6 +27,7 @@ import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.server.testutils.FlussClusterExtension; +import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.BucketAssignment; import org.apache.fluss.server.zk.data.TableAssignment; @@ -34,6 +36,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import java.util.Arrays; +import java.util.Collections; import java.util.Optional; import static org.apache.fluss.client.admin.FlussAdminITCase.DEFAULT_SCHEMA; @@ -107,4 +111,42 @@ void testCreateTableWithInsufficientRack() throws Exception { admin.dropTable(tablePath, false).get(); admin.dropDatabase(DEFAULT_TABLE_PATH.getDatabaseName(), false, false).get(); } + + @Test + void testAddAndRemoveServerTagByRack() throws Exception { + // Cluster racks: server-0 -> rack-0, server-1 -> rack-1, + // server-2 -> rack-2, server-3 -> rack-0 + ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); + + // 1. addServerTagByRack with single rack (rack-0: server-0, server-3). + admin.addServerTagByRack(Collections.singletonList("rack-0"), ServerTag.TEMPORARY_OFFLINE) + .get(); + assertThat(zkClient.getServerTags()).isPresent(); + assertThat(zkClient.getServerTags().get().getServerTags()) + .containsEntry(0, ServerTag.TEMPORARY_OFFLINE) + .containsEntry(3, ServerTag.TEMPORARY_OFFLINE) + .doesNotContainKey(1) + .doesNotContainKey(2); + + // 2. removeServerTagByRack with single rack (rack-0). + admin.removeServerTagByRack( + Collections.singletonList("rack-0"), ServerTag.TEMPORARY_OFFLINE) + .get(); + assertThat(zkClient.getServerTags()).isNotPresent(); + + // 3. addServerTagByRack with multiple racks (rack-0, rack-1: server-0, server-1, + // server-3). + admin.addServerTagByRack(Arrays.asList("rack-0", "rack-1"), ServerTag.PERMANENT_OFFLINE) + .get(); + assertThat(zkClient.getServerTags().get().getServerTags()) + .containsEntry(0, ServerTag.PERMANENT_OFFLINE) + .containsEntry(1, ServerTag.PERMANENT_OFFLINE) + .containsEntry(3, ServerTag.PERMANENT_OFFLINE) + .doesNotContainKey(2); + + // cleanup + admin.removeServerTagByRack(Arrays.asList("rack-0", "rack-1"), ServerTag.PERMANENT_OFFLINE) + .get(); + assertThat(zkClient.getServerTags()).isNotPresent(); + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/AddServerTagByRackProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/AddServerTagByRackProcedure.java new file mode 100644 index 0000000000..b4cfaafca6 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/AddServerTagByRackProcedure.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.procedure; + +import org.apache.fluss.cluster.rebalance.ServerTag; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.fluss.flink.procedure.RemoveServerTagProcedure.validateAndGetServerTag; + +/** + * Procedure to add server tag by rack. + * + *

Usage: + * + *


+ *  CALL sys.add_server_tag_by_rack('rack-0', 'PERMANENT_OFFLINE')
+ *  CALL sys.add_server_tag_by_rack('rack-0,rack-1', 'TEMPORARY_OFFLINE')
+ * 
+ */ +public class AddServerTagByRackProcedure extends ProcedureBase { + + @ProcedureHint( + argument = { + @ArgumentHint(name = "racks", type = @DataTypeHint("STRING")), + @ArgumentHint(name = "serverTag", type = @DataTypeHint("STRING")) + }) + public String[] call(ProcedureContext context, String racks, String serverTag) + throws Exception { + List rackList = validateAndGetRacks(racks); + ServerTag tag = validateAndGetServerTag(serverTag); + admin.addServerTagByRack(rackList, tag).get(); + return new String[] {"success"}; + } + + static List validateAndGetRacks(String racks) { + if (racks == null || racks.trim().isEmpty()) { + throw new IllegalArgumentException( + "racks cannot be null or empty. You can specify one rack as 'rack-0' or " + + "multiple racks as 'rack-0,rack-1' (split by ',')"); + } + return Arrays.asList(racks.trim().split(",")); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java index fc2632e856..404b37a96a 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java @@ -75,6 +75,9 @@ private enum ProcedureEnum { RESET_CLUSTER_CONFIGS("sys.reset_cluster_configs", ResetClusterConfigsProcedure.class), ADD_SERVER_TAG("sys.add_server_tag", AddServerTagProcedure.class), REMOVE_SERVER_TAG("sys.remove_server_tag", RemoveServerTagProcedure.class), + ADD_SERVER_TAG_BY_RACK("sys.add_server_tag_by_rack", AddServerTagByRackProcedure.class), + REMOVE_SERVER_TAG_BY_RACK( + "sys.remove_server_tag_by_rack", RemoveServerTagByRackProcedure.class), REBALANCE("sys.rebalance", RebalanceProcedure.class), CANCEL_REBALANCE("sys.cancel_rebalance", CancelRebalanceProcedure.class), LIST_REBALANCE_PROGRESS("sys.list_rebalance", ListRebalanceProcessProcedure.class), diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/RemoveServerTagByRackProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/RemoveServerTagByRackProcedure.java new file mode 100644 index 0000000000..6137412a56 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/RemoveServerTagByRackProcedure.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.procedure; + +import org.apache.fluss.cluster.rebalance.ServerTag; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +import java.util.List; + +import static org.apache.fluss.flink.procedure.AddServerTagByRackProcedure.validateAndGetRacks; +import static org.apache.fluss.flink.procedure.RemoveServerTagProcedure.validateAndGetServerTag; + +/** + * Procedure to remove server tag by rack. + * + *

Usage: + * + *


+ *  CALL sys.remove_server_tag_by_rack('rack-0', 'PERMANENT_OFFLINE')
+ *  CALL sys.remove_server_tag_by_rack('rack-0,rack-1', 'TEMPORARY_OFFLINE')
+ * 
+ */ +public class RemoveServerTagByRackProcedure extends ProcedureBase { + + @ProcedureHint( + argument = { + @ArgumentHint(name = "racks", type = @DataTypeHint("STRING")), + @ArgumentHint(name = "serverTag", type = @DataTypeHint("STRING")) + }) + public String[] call(ProcedureContext context, String racks, String serverTag) + throws Exception { + List rackList = validateAndGetRacks(racks); + ServerTag tag = validateAndGetServerTag(serverTag); + admin.removeServerTagByRack(rackList, tag).get(); + return new String[] {"success"}; + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java index 8b2817e32f..5d39f6a2c2 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java @@ -75,6 +75,7 @@ public abstract class FlinkProcedureITCase { .setCoordinatorServerListeners("FLUSS://localhost:0, CLIENT://localhost:0") .setTabletServerListeners("FLUSS://localhost:0, CLIENT://localhost:0") .setClusterConf(initConfig()) + .setRacks(new String[] {"rack-0", "rack-1", "rack-2", "rack-0"}) .build(); static final String CATALOG_NAME = "testcatalog"; @@ -141,6 +142,8 @@ void testShowProcedures() throws Exception { "+I[sys.reset_cluster_configs]", "+I[sys.add_server_tag]", "+I[sys.remove_server_tag]", + "+I[sys.add_server_tag_by_rack]", + "+I[sys.remove_server_tag_by_rack]", "+I[sys.rebalance]", "+I[sys.cancel_rebalance]", "+I[sys.list_rebalance]", @@ -628,6 +631,67 @@ void testAddAndRemoveServerTag(boolean upperCase) throws Exception { } } + @Test + void testAddAndRemoveServerTagByRack() throws Exception { + // Cluster racks: server-0 -> rack-0, server-1 -> rack-1, + // server-2 -> rack-2, server-3 -> rack-0 + ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); + + // 1. addServerTagByRack with single rack (rack-0: server-0, server-3). + try (CloseableIterator resultIterator = + tEnv.executeSql( + String.format( + "Call %s.sys.add_server_tag_by_rack('rack-0', 'TEMPORARY_OFFLINE')", + CATALOG_NAME)) + .collect()) { + assertCallResult(resultIterator, new String[] {"+I[success]"}); + } + assertThat(zkClient.getServerTags()).isPresent(); + assertThat(zkClient.getServerTags().get().getServerTags()) + .containsEntry(0, ServerTag.TEMPORARY_OFFLINE) + .containsEntry(3, ServerTag.TEMPORARY_OFFLINE) + .doesNotContainKey(1) + .doesNotContainKey(2); + + // 2. removeServerTagByRack with single rack (rack-0). + try (CloseableIterator resultIterator = + tEnv.executeSql( + String.format( + "Call %s.sys.remove_server_tag_by_rack('rack-0', 'TEMPORARY_OFFLINE')", + CATALOG_NAME)) + .collect()) { + assertCallResult(resultIterator, new String[] {"+I[success]"}); + } + assertThat(zkClient.getServerTags()).isNotPresent(); + + // 3. addServerTagByRack with multiple racks (rack-0, rack-1: server-0, server-1, + // server-3). + try (CloseableIterator resultIterator = + tEnv.executeSql( + String.format( + "Call %s.sys.add_server_tag_by_rack('rack-0,rack-1', 'PERMANENT_OFFLINE')", + CATALOG_NAME)) + .collect()) { + assertCallResult(resultIterator, new String[] {"+I[success]"}); + } + assertThat(zkClient.getServerTags().get().getServerTags()) + .containsEntry(0, ServerTag.PERMANENT_OFFLINE) + .containsEntry(1, ServerTag.PERMANENT_OFFLINE) + .containsEntry(3, ServerTag.PERMANENT_OFFLINE) + .doesNotContainKey(2); + + // cleanup + try (CloseableIterator resultIterator = + tEnv.executeSql( + String.format( + "Call %s.sys.remove_server_tag_by_rack('rack-0,rack-1', 'PERMANENT_OFFLINE')", + CATALOG_NAME)) + .collect()) { + assertCallResult(resultIterator, new String[] {"+I[success]"}); + } + assertThat(zkClient.getServerTags()).isNotPresent(); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void testRebalance(boolean upperCase) throws Exception { diff --git a/website/docs/engine-flink/procedures.md b/website/docs/engine-flink/procedures.md index b3035e8361..8be67cc82a 100644 --- a/website/docs/engine-flink/procedures.md +++ b/website/docs/engine-flink/procedures.md @@ -357,6 +357,86 @@ CALL sys.remove_server_tag('0', 'PERMANENT_OFFLINE'); CALL sys.remove_server_tag('1,2,3', 'TEMPORARY_OFFLINE'); ``` +### add_server_tag_by_rack + +Add server tag to all TabletServers belonging to the specified racks. This is a convenience procedure that resolves rack names to TabletServer IDs automatically. For example, if `rack-0` contains `tabletServer-0` and `tabletServer-3`, calling this procedure with `'rack-0'` will add the tag to both servers. + +**Syntax:** + +```sql +CALL [catalog_name.]sys.add_server_tag_by_rack( + racks => 'STRING', + serverTag => 'STRING' +) +``` + +**Parameters:** + +- `racks` (required): The rack identifiers to target. Can be a single rack (e.g., `'rack-0'`) or multiple racks separated by commas (e.g., `'rack-0,rack-1'`). All TabletServers belonging to any of the specified racks will be tagged. +- `serverTag` (required): The tag to add to the matched TabletServers. Valid values are: + - `'PERMANENT_OFFLINE'`: Indicates the TabletServer is permanently offline and will be decommissioned. All buckets on this server will be migrated during the next rebalance. + - `'TEMPORARY_OFFLINE'`: Indicates the TabletServer is temporarily offline (e.g., for upgrading). Buckets may be temporarily migrated but can return after the server comes back online. + +**Returns:** An array with a single element `'success'` if the operation completes successfully. + +**Important Notes:** + +- If no TabletServer is found for any of the given racks, the operation completes successfully without making any changes. +- If any TabletServer already has a different tag, the operation will fail and none of the tags will take effect. + +**Example:** + +```sql title="Flink SQL" +-- Use the Fluss catalog (replace 'fluss_catalog' with your catalog name if different) +USE fluss_catalog; + +-- Add PERMANENT_OFFLINE tag to all TabletServers in rack-0 +CALL sys.add_server_tag_by_rack('rack-0', 'PERMANENT_OFFLINE'); + +-- Add TEMPORARY_OFFLINE tag to all TabletServers in rack-0 and rack-1 +CALL sys.add_server_tag_by_rack('rack-0,rack-1', 'TEMPORARY_OFFLINE'); +``` + +### remove_server_tag_by_rack + +Remove server tag from all TabletServers belonging to the specified racks. This is a convenience procedure that resolves rack names to TabletServer IDs automatically. + +**Syntax:** + +```sql +CALL [catalog_name.]sys.remove_server_tag_by_rack( + racks => 'STRING', + serverTag => 'STRING' +) +``` + +**Parameters:** + +- `racks` (required): The rack identifiers to target. Can be a single rack (e.g., `'rack-0'`) or multiple racks separated by commas (e.g., `'rack-0,rack-1'`). All TabletServers belonging to any of the specified racks will have the tag removed. +- `serverTag` (required): The tag to remove from the matched TabletServers. Valid values are: + - `'PERMANENT_OFFLINE'`: Remove the permanent offline tag from the TabletServer. + - `'TEMPORARY_OFFLINE'`: Remove the temporary offline tag from the TabletServer. + +**Returns:** An array with a single element `'success'` if the operation completes successfully. + +**Important Notes:** + +- If no TabletServer is found for any of the given racks, the operation completes successfully without making any changes. +- If any TabletServer has a different tag than the one specified, the operation will fail. + +**Example:** + +```sql title="Flink SQL" +-- Use the Fluss catalog (replace 'fluss_catalog' with your catalog name if different) +USE fluss_catalog; + +-- Remove PERMANENT_OFFLINE tag from all TabletServers in rack-0 +CALL sys.remove_server_tag_by_rack('rack-0', 'PERMANENT_OFFLINE'); + +-- Remove TEMPORARY_OFFLINE tag from all TabletServers in rack-0 and rack-1 +CALL sys.remove_server_tag_by_rack('rack-0,rack-1', 'TEMPORARY_OFFLINE'); +``` + ### rebalance Trigger a rebalance operation to redistribute buckets across TabletServers in the cluster. This procedure helps balance workload based on specified goals, such as distributing replicas or leaders evenly across the cluster.