Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -51,7 +49,6 @@

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.hbase.thirdparty.com.google.common.collect.LinkedListMultimap;
import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
Expand Down Expand Up @@ -81,15 +78,15 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
private LoadBalancer internalBalancer;

/**
* Define the config key of fallback groups
* Enabled only if this property is set
* Set this key to {@code true} to allow region fallback.
* Fallback to the default rsgroup first, then fallback to any group if no online servers in
* default rsgroup.
* Please keep balancer switch on at the same time, which is relied on to correct misplaced
* regions
*/
public static final String FALLBACK_GROUPS_KEY = "hbase.rsgroup.fallback.groups";
public static final String FALLBACK_GROUP_ENABLE_KEY = "hbase.rsgroup.fallback.enable";

private boolean fallbackEnabled = false;
private Set<String> fallbackGroups;

/**
* Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}.
Expand Down Expand Up @@ -180,22 +177,14 @@ public List<RegionPlan> balanceCluster(
public Map<ServerName, List<RegionInfo>> roundRobinAssignment(
List<RegionInfo> regions, List<ServerName> servers) throws IOException {
Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap();
ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create();
ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create();
generateGroupMaps(regions, servers, regionMap, serverMap);
for (String groupKey : regionMap.keySet()) {
if (regionMap.get(groupKey).size() > 0) {
Map<ServerName, List<RegionInfo>> result = this.internalBalancer
.roundRobinAssignment(regionMap.get(groupKey), serverMap.get(groupKey));
if (result != null) {
if (result.containsKey(LoadBalancer.BOGUS_SERVER_NAME) &&
assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
assignments.get(LoadBalancer.BOGUS_SERVER_NAME)
.addAll(result.get(LoadBalancer.BOGUS_SERVER_NAME));
} else {
assignments.putAll(result);
}
}
List<Pair<List<RegionInfo>, List<ServerName>>> pairs =
generateGroupAssignments(regions, servers);
for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) {
Map<ServerName, List<RegionInfo>> result = this.internalBalancer
.roundRobinAssignment(pair.getFirst(), pair.getSecond());
if (result != null) {
result.forEach((server, regionInfos) ->
assignments.computeIfAbsent(server, s -> Lists.newArrayList()).addAll(regionInfos));
}
}
return assignments;
Expand All @@ -206,36 +195,16 @@ public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, Server
List<ServerName> servers) throws HBaseIOException {
try {
Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
ListMultimap<String, RegionInfo> groupToRegion = ArrayListMultimap.create();
RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
for (RegionInfo region : regions.keySet()) {
String groupName =
RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, region.getTable())
.orElse(defaultInfo).getName();
groupToRegion.put(groupName, region);
}
for (String group : groupToRegion.keySet()) {
Map<RegionInfo, ServerName> currentAssignmentMap = new TreeMap<RegionInfo, ServerName>();
List<RegionInfo> regionList = groupToRegion.get(group);
RSGroupInfo info = rsGroupInfoManager.getRSGroup(group);
List<ServerName> candidateList = filterOfflineServers(info, servers);
if (fallbackEnabled && candidateList.isEmpty()) {
candidateList = getFallBackCandidates(servers);
}
for (RegionInfo region : regionList) {
currentAssignmentMap.put(region, regions.get(region));
}
if (candidateList.size() > 0) {
assignments
.putAll(this.internalBalancer.retainAssignment(currentAssignmentMap, candidateList));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("No available servers for group {} to assign regions: {}", group,
RegionInfo.getShortNameToLog(regionList));
}
assignments.computeIfAbsent(LoadBalancer.BOGUS_SERVER_NAME, s -> new ArrayList<>())
.addAll(regionList);
}
List<Pair<List<RegionInfo>, List<ServerName>>> pairs =
generateGroupAssignments(Lists.newArrayList(regions.keySet()), servers);
for (Pair<List<RegionInfo>, List<ServerName>> pair : pairs) {
List<RegionInfo> regionList = pair.getFirst();
Map<RegionInfo, ServerName> currentAssignmentMap = Maps.newTreeMap();
regionList.forEach(r -> currentAssignmentMap.put(r, regions.get(r)));
Map<ServerName, List<RegionInfo>> pairResult =
this.internalBalancer.retainAssignment(currentAssignmentMap, pair.getSecond());
pairResult.forEach((server, rs) ->
assignments.computeIfAbsent(server, s -> Lists.newArrayList()).addAll(rs));
}
return assignments;
} catch (IOException e) {
Expand All @@ -246,17 +215,17 @@ public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, Server
@Override
public ServerName randomAssignment(RegionInfo region,
List<ServerName> servers) throws IOException {
ListMultimap<String,RegionInfo> regionMap = LinkedListMultimap.create();
ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create();
generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap);
List<ServerName> filteredServers = serverMap.get(regionMap.keySet().iterator().next());
List<Pair<List<RegionInfo>, List<ServerName>>> pairs =
generateGroupAssignments(Lists.newArrayList(region), servers);
List<ServerName> filteredServers = pairs.iterator().next().getSecond();
return this.internalBalancer.randomAssignment(region, filteredServers);
}

private void generateGroupMaps(List<RegionInfo> regions, List<ServerName> servers,
ListMultimap<String, RegionInfo> regionMap, ListMultimap<String, ServerName> serverMap)
throws HBaseIOException {
private List<Pair<List<RegionInfo>, List<ServerName>>> generateGroupAssignments(
List<RegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
try {
ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create();
ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create();
RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
for (RegionInfo region : regions) {
String groupName =
Expand All @@ -267,15 +236,29 @@ private void generateGroupMaps(List<RegionInfo> regions, List<ServerName> server
for (String groupKey : regionMap.keySet()) {
RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupKey);
serverMap.putAll(groupKey, filterOfflineServers(info, servers));
if (fallbackEnabled && serverMap.get(groupKey).isEmpty()) {
serverMap.putAll(groupKey, getFallBackCandidates(servers));
}
}

List<Pair<List<RegionInfo>, List<ServerName>>> result = Lists.newArrayList();
List<RegionInfo> fallbackRegions = Lists.newArrayList();
for (String groupKey : regionMap.keySet()) {
if (serverMap.get(groupKey).isEmpty()) {
serverMap.put(groupKey, LoadBalancer.BOGUS_SERVER_NAME);
fallbackRegions.addAll(regionMap.get(groupKey));
} else {
result.add(Pair.newPair(regionMap.get(groupKey), serverMap.get(groupKey)));
}
}
if (!fallbackRegions.isEmpty()) {
List<ServerName> candidates = null;
if (fallbackEnabled) {
candidates = getFallBackCandidates(servers);
}
candidates = (candidates == null || candidates.isEmpty()) ?
Lists.newArrayList(BOGUS_SERVER_NAME) : candidates;
result.add(Pair.newPair(fallbackRegions, candidates));
}
return result;
} catch(IOException e) {
throw new HBaseIOException("Failed to generate group maps", e);
throw new HBaseIOException("Failed to generate group assignments", e);
}
}

Expand Down Expand Up @@ -390,11 +373,7 @@ public void initialize() throws IOException {
}
internalBalancer.initialize();
// init fallback groups
Collection<String> groups = config.getTrimmedStringCollection(FALLBACK_GROUPS_KEY);
if (groups != null && !groups.isEmpty()) {
this.fallbackEnabled = true;
this.fallbackGroups = new HashSet<>(groups);
}
this.fallbackEnabled = config.getBoolean(FALLBACK_GROUP_ENABLE_KEY, false);
}

public boolean isOnline() {
Expand Down Expand Up @@ -485,15 +464,13 @@ public List<RegionPlan> balanceTable(TableName tableName,
}

private List<ServerName> getFallBackCandidates(List<ServerName> servers) {
List<ServerName> serverNames = new ArrayList<>();
for (String fallbackGroup : fallbackGroups) {
try {
RSGroupInfo info = rsGroupInfoManager.getRSGroup(fallbackGroup);
serverNames.addAll(filterOfflineServers(info, servers));
} catch (IOException e) {
LOG.error("Get group info for {} failed", fallbackGroup, e);
}
List<ServerName> serverNames = null;
try {
RSGroupInfo info = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
serverNames = filterOfflineServers(info, servers);
} catch (IOException e) {
LOG.error("Failed to get default rsgroup info to fallback", e);
}
return serverNames;
return serverNames == null || serverNames.isEmpty() ? servers : serverNames;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,12 @@ protected final void removeGroup(String groupName) throws IOException {
}
}
ADMIN.setRSGroup(tables, RSGroupInfo.DEFAULT_GROUP);
for (NamespaceDescriptor nd : ADMIN.listNamespaceDescriptors()) {
if (groupName.equals(nd.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP))) {
nd.removeConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP);
ADMIN.modifyNamespace(nd);
}
}
RSGroupInfo groupInfo = ADMIN.getRSGroup(groupName);
ADMIN.moveServersToRSGroup(groupInfo.getServers(), RSGroupInfo.DEFAULT_GROUP);
ADMIN.removeRSGroup(groupName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
Expand All @@ -32,6 +34,7 @@
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RSGroupTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.AfterClass;
Expand All @@ -56,8 +59,8 @@ public class TestRSGroupsFallback extends TestRSGroupsBase {

@BeforeClass
public static void setUp() throws Exception {
Configuration configuration = TEST_UTIL.getConfiguration();
configuration.set(RSGroupBasedLoadBalancer.FALLBACK_GROUPS_KEY, FALLBACK_GROUP);
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean(RSGroupBasedLoadBalancer.FALLBACK_GROUP_ENABLE_KEY, true);
setUpTestBeforeClass();
MASTER.balanceSwitch(true);
}
Expand All @@ -78,51 +81,57 @@ public void afterMethod() throws Exception {
}

@Test
public void testGroupFallback() throws Exception {
public void testFallback() throws Exception {
// add fallback group
addGroup(FALLBACK_GROUP, 1);
// add test group
String groupName = getGroupName(name.getMethodName());
addGroup(groupName, 1);
TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).build())
.setRegionServerGroup(groupName)
.build();
ADMIN.createTable(desc);
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).build())
.setRegionServerGroup(groupName)
.build();
ADMIN.createTable(desc, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
// server of test group crash
for (Address server : ADMIN.getRSGroup(groupName).getServers()) {
AssignmentTestingUtil.crashRs(TEST_UTIL, getServerName(server), true);
}
Threads.sleep(1000);
TEST_UTIL.waitUntilNoRegionsInTransition(10000);
TEST_UTIL.waitUntilAllRegionsAssigned(tableName);

// regions move to fallback group
assertRegionsInGroup(FALLBACK_GROUP);
// server of test group crash, regions move to default group
crashRsInGroup(groupName);
assertRegionsInGroup(tableName, RSGroupInfo.DEFAULT_GROUP);

// move a new server from default group
Address address = ADMIN.getRSGroup(RSGroupInfo.DEFAULT_GROUP).getServers().first();
ADMIN.moveServersToRSGroup(Collections.singleton(address), groupName);
// server of default group crash, regions move to any other group
crashRsInGroup(RSGroupInfo.DEFAULT_GROUP);
assertRegionsInGroup(tableName, FALLBACK_GROUP);

// correct misplaced regions
// add a new server to default group, regions move to default group
TEST_UTIL.getMiniHBaseCluster().startRegionServerAndWait(60000);
MASTER.balance();
assertRegionsInGroup(tableName, RSGroupInfo.DEFAULT_GROUP);

TEST_UTIL.waitUntilNoRegionsInTransition(10000);
TEST_UTIL.waitUntilAllRegionsAssigned(tableName);

// regions move back
assertRegionsInGroup(groupName);
// add a new server to test group, regions move back
JVMClusterUtil.RegionServerThread t =
TEST_UTIL.getMiniHBaseCluster().startRegionServerAndWait(60000);
ADMIN.moveServersToRSGroup(
Collections.singleton(t.getRegionServer().getServerName().getAddress()), groupName);
MASTER.balance();
assertRegionsInGroup(tableName, groupName);

TEST_UTIL.deleteTable(tableName);
}

private void assertRegionsInGroup(String group) throws IOException {
RSGroupInfo fallbackGroup = ADMIN.getRSGroup(group);
MASTER.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName).forEach(region -> {
private void assertRegionsInGroup(TableName table, String group) throws IOException {
TEST_UTIL.waitUntilAllRegionsAssigned(table);
RSGroupInfo rsGroup = ADMIN.getRSGroup(group);
MASTER.getAssignmentManager().getRegionStates().getRegionsOfTable(table).forEach(region -> {
Address regionOnServer = MASTER.getAssignmentManager().getRegionStates()
.getRegionAssignments().get(region).getAddress();
assertTrue(fallbackGroup.getServers().contains(regionOnServer));
assertTrue(rsGroup.getServers().contains(regionOnServer));
});
}

private void crashRsInGroup(String groupName) throws Exception {
for (Address server : ADMIN.getRSGroup(groupName).getServers()) {
AssignmentTestingUtil.crashRs(TEST_UTIL, getServerName(server), true);
}
Threads.sleep(1000);
TEST_UTIL.waitUntilNoRegionsInTransition(60000);
}
}