Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.fluss.client.metadata.KvSnapshots;
import org.apache.fluss.client.metadata.LakeSnapshot;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.cluster.rebalance.GoalType;
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
import org.apache.fluss.cluster.rebalance.ServerTag;
Expand Down Expand Up @@ -72,10 +73,16 @@
import javax.annotation.Nullable;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.apache.fluss.utils.Preconditions.checkArgument;
import static org.apache.fluss.utils.Preconditions.checkNotNull;

/**
* The administrative client for Fluss, which supports managing and inspecting tables, servers,
Expand Down Expand Up @@ -633,6 +640,94 @@ ListOffsetsResult listOffsets(
*/
CompletableFuture<Void> removeServerTag(List<Integer> tabletServers, ServerTag serverTag);

/**
* Add server tag to all tabletServers belonging to any of the specified racks.
*
* <p>If no tabletServer is found for any of the given racks, the operation completes
* successfully without making any changes.
*
* <p>If any tabletServer fails to add the tag, none of the tags will take effect.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
* permissions.
* <li>{@link ServerTagAlreadyExistException} If the server tag already exists for any one of
* the matched tabletServers, and the server tag is different from the existing one.
* </ul>
*
* @param racks the list of rack identifiers; all tabletServers whose {@link ServerNode#rack()}
* is contained in this list will be targeted. Must not be null or empty.
* @param serverTag the server tag to be added.
*/
default CompletableFuture<Void> addServerTagByRack(List<String> racks, ServerTag serverTag) {
checkNotNull(racks, "racks must not be null");
checkArgument(!racks.isEmpty(), "racks must not be empty");
Set<String> rackSet = new HashSet<>(racks);
return getServerNodes()
.thenCompose(
nodes -> {
List<Integer> serverIds =
nodes.stream()
.filter(
n ->
n.serverType()
== ServerType
.TABLET_SERVER
&& rackSet.contains(n.rack()))
.map(ServerNode::id)
.collect(Collectors.toList());
if (serverIds.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return addServerTag(serverIds, serverTag);
});
}

/**
* Remove server tag from all tabletServers belonging to any of the specified racks.
*
* <p>If no tabletServer is found for any of the given racks, the operation completes
* successfully without making any changes.
*
* <p>If any tabletServer fails to remove the tag, none of the tags will be removed.
*
* <p>No exception will be thrown if a matched server already has no server tag.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
* permissions.
* <li>{@link ServerTagNotExistException} If the server tag does not exist for any one of the
* matched tabletServers.
* </ul>
*
* @param racks the list of rack identifiers; all tabletServers whose {@link ServerNode#rack()}
* is contained in this list will be targeted. Must not be null or empty.
* @param serverTag the server tag to be removed.
*/
default CompletableFuture<Void> removeServerTagByRack(List<String> racks, ServerTag serverTag) {
checkNotNull(racks, "racks must not be null");
checkArgument(!racks.isEmpty(), "racks must not be empty");
Set<String> rackSet = new HashSet<>(racks);
return getServerNodes()
.thenCompose(
nodes -> {
List<Integer> serverIds =
nodes.stream()
.filter(
n ->
n.serverType()
== ServerType
.TABLET_SERVER
&& rackSet.contains(n.rack()))
.map(ServerNode::id)
.collect(Collectors.toList());
if (serverIds.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return removeServerTag(serverIds, serverTag);
});
}

/**
* Based on the provided {@code priorityGoals}, Fluss performs load balancing on the cluster's
* bucket load.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

import org.apache.fluss.client.Connection;
import org.apache.fluss.client.ConnectionFactory;
import org.apache.fluss.cluster.rebalance.ServerTag;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.testutils.FlussClusterExtension;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.data.BucketAssignment;
import org.apache.fluss.server.zk.data.TableAssignment;

Expand All @@ -34,6 +36,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;

import static org.apache.fluss.client.admin.FlussAdminITCase.DEFAULT_SCHEMA;
Expand Down Expand Up @@ -107,4 +111,42 @@ void testCreateTableWithInsufficientRack() throws Exception {
admin.dropTable(tablePath, false).get();
admin.dropDatabase(DEFAULT_TABLE_PATH.getDatabaseName(), false, false).get();
}

@Test
void testAddAndRemoveServerTagByRack() throws Exception {
// Cluster racks: server-0 -> rack-0, server-1 -> rack-1,
// server-2 -> rack-2, server-3 -> rack-0
ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();

// 1. addServerTagByRack with single rack (rack-0: server-0, server-3).
admin.addServerTagByRack(Collections.singletonList("rack-0"), ServerTag.TEMPORARY_OFFLINE)
.get();
assertThat(zkClient.getServerTags()).isPresent();
assertThat(zkClient.getServerTags().get().getServerTags())
.containsEntry(0, ServerTag.TEMPORARY_OFFLINE)
.containsEntry(3, ServerTag.TEMPORARY_OFFLINE)
.doesNotContainKey(1)
.doesNotContainKey(2);

// 2. removeServerTagByRack with single rack (rack-0).
admin.removeServerTagByRack(
Collections.singletonList("rack-0"), ServerTag.TEMPORARY_OFFLINE)
.get();
assertThat(zkClient.getServerTags()).isNotPresent();

// 3. addServerTagByRack with multiple racks (rack-0, rack-1: server-0, server-1,
// server-3).
admin.addServerTagByRack(Arrays.asList("rack-0", "rack-1"), ServerTag.PERMANENT_OFFLINE)
.get();
assertThat(zkClient.getServerTags().get().getServerTags())
.containsEntry(0, ServerTag.PERMANENT_OFFLINE)
.containsEntry(1, ServerTag.PERMANENT_OFFLINE)
.containsEntry(3, ServerTag.PERMANENT_OFFLINE)
.doesNotContainKey(2);

// cleanup
admin.removeServerTagByRack(Arrays.asList("rack-0", "rack-1"), ServerTag.PERMANENT_OFFLINE)
.get();
assertThat(zkClient.getServerTags()).isNotPresent();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.procedure;

import org.apache.fluss.cluster.rebalance.ServerTag;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;

import java.util.Arrays;
import java.util.List;

import static org.apache.fluss.flink.procedure.RemoveServerTagProcedure.validateAndGetServerTag;

/**
* Procedure to add server tag by rack.
*
* <p>Usage:
*
* <pre><code>
* CALL sys.add_server_tag_by_rack('rack-0', 'PERMANENT_OFFLINE')
* CALL sys.add_server_tag_by_rack('rack-0,rack-1', 'TEMPORARY_OFFLINE')
* </code></pre>
*/
public class AddServerTagByRackProcedure extends ProcedureBase {

@ProcedureHint(
argument = {
@ArgumentHint(name = "racks", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "serverTag", type = @DataTypeHint("STRING"))
})
public String[] call(ProcedureContext context, String racks, String serverTag)
throws Exception {
List<String> rackList = validateAndGetRacks(racks);
ServerTag tag = validateAndGetServerTag(serverTag);
admin.addServerTagByRack(rackList, tag).get();
return new String[] {"success"};
}

static List<String> validateAndGetRacks(String racks) {
if (racks == null || racks.trim().isEmpty()) {
throw new IllegalArgumentException(
"racks cannot be null or empty. You can specify one rack as 'rack-0' or "
+ "multiple racks as 'rack-0,rack-1' (split by ',')");
}
return Arrays.asList(racks.trim().split(","));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ private enum ProcedureEnum {
RESET_CLUSTER_CONFIGS("sys.reset_cluster_configs", ResetClusterConfigsProcedure.class),
ADD_SERVER_TAG("sys.add_server_tag", AddServerTagProcedure.class),
REMOVE_SERVER_TAG("sys.remove_server_tag", RemoveServerTagProcedure.class),
ADD_SERVER_TAG_BY_RACK("sys.add_server_tag_by_rack", AddServerTagByRackProcedure.class),
REMOVE_SERVER_TAG_BY_RACK(
"sys.remove_server_tag_by_rack", RemoveServerTagByRackProcedure.class),
REBALANCE("sys.rebalance", RebalanceProcedure.class),
CANCEL_REBALANCE("sys.cancel_rebalance", CancelRebalanceProcedure.class),
LIST_REBALANCE_PROGRESS("sys.list_rebalance", ListRebalanceProcessProcedure.class),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.procedure;

import org.apache.fluss.cluster.rebalance.ServerTag;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;

import java.util.List;

import static org.apache.fluss.flink.procedure.AddServerTagByRackProcedure.validateAndGetRacks;
import static org.apache.fluss.flink.procedure.RemoveServerTagProcedure.validateAndGetServerTag;

/**
* Procedure to remove server tag by rack.
*
* <p>Usage:
*
* <pre><code>
* CALL sys.remove_server_tag_by_rack('rack-0', 'PERMANENT_OFFLINE')
* CALL sys.remove_server_tag_by_rack('rack-0,rack-1', 'TEMPORARY_OFFLINE')
* </code></pre>
*/
public class RemoveServerTagByRackProcedure extends ProcedureBase {

@ProcedureHint(
argument = {
@ArgumentHint(name = "racks", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "serverTag", type = @DataTypeHint("STRING"))
})
public String[] call(ProcedureContext context, String racks, String serverTag)
throws Exception {
List<String> rackList = validateAndGetRacks(racks);
ServerTag tag = validateAndGetServerTag(serverTag);
admin.removeServerTagByRack(rackList, tag).get();
return new String[] {"success"};
}
}
Loading