Skip to content

Commit

Permalink
HBASE-22819 Automatically migrate the rs group config for table after H…
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Aug 16, 2019
1 parent 07fe41d commit 9626e18
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package org.apache.hadoop.hbase.rsgroup;

import java.util.Collection;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.net.Address;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -104,7 +102,7 @@ public boolean containsServer(Address hostPort) {
/**
* Get list of servers.
*/
public Set<Address> getServers() {
public SortedSet<Address> getServers() {
return servers;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,6 @@ Set<Address> moveServers(Set<Address> servers, String srcGroup, String dstGroup)
*/
List<RSGroupInfo> listRSGroups() throws IOException;

/**
* Refresh/reload the group information from the persistent store
*/
void refresh() throws IOException;

/**
* Whether the manager is able to fully return group metadata
* @return whether the manager is in online mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
Expand All @@ -32,6 +33,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -164,7 +166,7 @@ private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException


private synchronized void init() throws IOException {
refresh();
refresh(false);
serverEventsListenerThread.start();
masterServices.getServerManager().registerListener(serverEventsListenerThread);
failedOpenUpdaterThread = new FailedOpenUpdaterThread(masterServices.getConfiguration());
Expand Down Expand Up @@ -356,9 +358,112 @@ private List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
return RSGroupInfoList;
}

@Override
public void refresh() throws IOException {
refresh(false);
private void waitUntilSomeProcsDone(Set<Long> pendingProcIds) {
int size = pendingProcIds.size();
while (!masterServices.isStopped()) {
for (Iterator<Long> iter = pendingProcIds.iterator(); iter.hasNext();) {
long procId = iter.next();
if (masterServices.getMasterProcedureExecutor().isFinished(procId)) {
iter.remove();
}
}
if (pendingProcIds.size() < size) {
return;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

private void waitUntilMasterStarted() {
while (!masterServices.isInitialized() && !masterServices.isStopped()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

private void migrate(List<RSGroupInfo> groupList, int maxConcurrency) {
LOG.info("Start migrating table rs group config");
waitUntilMasterStarted();
Set<Long> pendingProcIds = new HashSet<>();
for (RSGroupInfo groupInfo : groupList) {
if (groupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
continue;
}
SortedSet<TableName> failedTables = new TreeSet<>();
for (TableName tableName : groupInfo.getTables()) {
LOG.info("Migrating {} in group {}", tableName, groupInfo.getName());
TableDescriptor oldTd;
try {
oldTd = masterServices.getTableDescriptors().get(tableName);
} catch (IOException e) {
LOG.warn("Failed to migrate {} in group {}", tableName, groupInfo.getName(), e);
failedTables.add(tableName);
continue;
}
if (oldTd == null) {
continue;
}
if (oldTd.getRegionServerGroup().isPresent()) {
// either we have already migrated it or that user has set the rs group with the new
// code, skip.
LOG.debug("Skip migrating {} since it is already in group {}", tableName,
oldTd.getRegionServerGroup().get());
continue;
}
TableDescriptor newTd = TableDescriptorBuilder.newBuilder(oldTd)
.setRegionServerGroup(groupInfo.getName()).build();
try {
pendingProcIds.add(
masterServices.modifyTable(tableName, newTd, HConstants.NO_NONCE, HConstants.NO_NONCE));
} catch (IOException e) {
LOG.warn("Failed to migrate {} in group {}", tableName, groupInfo.getName(), e);
failedTables.add(tableName);
continue;
}
if (pendingProcIds.size() >= maxConcurrency) {
waitUntilSomeProcsDone(pendingProcIds);
}
}
LOG.info("Done migrating {}, failed tables {}", groupInfo.getName(), failedTables);
synchronized (RSGroupInfoManagerImpl.this) {
RSGroupInfo currentInfo = rsGroupMap.get(groupInfo.getName());
if (currentInfo != null) {
RSGroupInfo newInfo =
new RSGroupInfo(currentInfo.getName(), currentInfo.getServers(), failedTables);
Map<String, RSGroupInfo> newGroupMap = new HashMap<>(rsGroupMap);
newGroupMap.put(groupInfo.getName(), newInfo);
try {
flushConfig(newGroupMap);
} catch (IOException e) {
LOG.warn("Failed to persist rs group", e);
}
}
}
}
LOG.info("Done migrating table rs group info");
}

// Migrate the table rs group info from RSGroupInfo into the table descriptor
// Notice that we do not want to block the initialize so this will be done in background, and
// during the migrating, the rs group info maybe incomplete and cause region to be misplaced.
private void migrate(List<RSGroupInfo> groupList) {
final int maxConcurrency = 8;
Thread migrateThread = new Thread("Migrate-RSGroup") {

@Override
public void run() {
migrate(groupList, maxConcurrency);
}
};
migrateThread.setDaemon(true);
migrateThread.start();
}

/**
Expand Down Expand Up @@ -389,6 +494,7 @@ private synchronized void refresh(boolean forceOnline) throws IOException {
}
resetRSGroupMap(newGroupMap);
updateCacheOfRSGroups(rsGroupMap.keySet());
migrate(groupList);
}

private void flushConfigTable(Map<String, RSGroupInfo> groupMap) throws IOException {
Expand All @@ -403,9 +509,9 @@ private void flushConfigTable(Map<String, RSGroupInfo> groupMap) throws IOExcept
}

// populate puts
for (RSGroupInfo RSGroupInfo : groupMap.values()) {
RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(RSGroupInfo);
Put p = new Put(Bytes.toBytes(RSGroupInfo.getName()));
for (RSGroupInfo rsGroupInfo : groupMap.values()) {
RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(rsGroupInfo);
Put p = new Put(Bytes.toBytes(rsGroupInfo.getName()));
p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray());
mutations.add(p);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.rsgroup;

import static org.apache.hadoop.hbase.rsgroup.RSGroupInfoManagerImpl.META_FAMILY_BYTES;
import static org.apache.hadoop.hbase.rsgroup.RSGroupInfoManagerImpl.META_QUALIFIER_BYTES;
import static org.apache.hadoop.hbase.rsgroup.RSGroupInfoManagerImpl.RSGROUP_TABLE_NAME;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

/**
* Testcase for HBASE-22819
*/
@Category({ MediumTests.class })
public class TestMigrateRSGroupInfo extends TestRSGroupsBase {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMigrateRSGroupInfo.class);

private static String TABLE_NAME_PREFIX = "Table_";

private static int NUM_TABLES = 10;

private static byte[] FAMILY = Bytes.toBytes("family");

@BeforeClass
public static void setUp() throws Exception {
setUpTestBeforeClass();
for (int i = 0; i < NUM_TABLES; i++) {
TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME_PREFIX + i), FAMILY);
}
}

@AfterClass
public static void tearDown() throws Exception {
tearDownAfterClass();
}

@Test
public void testMigrate() throws IOException, InterruptedException {
String groupName = name.getMethodName();
addGroup(groupName, TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().size() - 1);
RSGroupInfo rsGroupInfo = rsGroupAdmin.getRSGroupInfo(groupName);
assertTrue(rsGroupInfo.getTables().isEmpty());
for (int i = 0; i < NUM_TABLES; i++) {
rsGroupInfo.addTable(TableName.valueOf(TABLE_NAME_PREFIX + i));
}
try (Table table = TEST_UTIL.getConnection().getTable(RSGROUP_TABLE_NAME)) {
RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(rsGroupInfo);
Put p = new Put(Bytes.toBytes(rsGroupInfo.getName()));
p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray());
table.put(p);
}
TEST_UTIL.getMiniHBaseCluster().stopMaster(0).join();
TEST_UTIL.getMiniHBaseCluster().startMaster();
TEST_UTIL.waitFor(60000, () -> {
for (int i = 0; i < NUM_TABLES; i++) {
TableDescriptor td;
try {
td = TEST_UTIL.getAdmin().getDescriptor(TableName.valueOf(TABLE_NAME_PREFIX + i));
} catch (IOException e) {
return false;
}
if (!rsGroupInfo.getName().equals(td.getRegionServerGroup().orElse(null))) {
return false;
}
}
return true;
});
// make sure that we persist the result to hbase, where we delete all the tables in the rs
// group.
TEST_UTIL.waitFor(30000, () -> {
try (Table table = TEST_UTIL.getConnection().getTable(RSGROUP_TABLE_NAME)) {
Result result = table.get(new Get(Bytes.toBytes(rsGroupInfo.getName())));
RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo
.parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
RSGroupInfo gi = ProtobufUtil.toGroupInfo(proto);
return gi.getTables().isEmpty();
}
});
}
}

0 comments on commit 9626e18

Please sign in to comment.