Skip to content
Permalink
Browse files
HBASE-26245 Store region server list in master local region (#4136)
Signed-off-by: Andrew Purtell <apurtell@apache.org>
  • Loading branch information
Apache9 committed Mar 31, 2022
1 parent 828ea91 commit bb1bbddf748416411938f57f1b846ceba1c99528
Showing 16 changed files with 366 additions and 44 deletions.
@@ -425,6 +425,8 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
// the master local storage to store procedure data, meta region locations, etc.
private MasterRegion masterRegion;

private RegionServerList rsListStorage;

// handle table states
private TableStateManager tableStateManager;

@@ -927,15 +929,18 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc
status.setStatus("Initialize ServerManager and schedule SCP for crash servers");
// The below two managers must be created before loading procedures, as they will be used during
// loading.
this.serverManager = createServerManager(this);
// initialize master local region
masterRegion = MasterRegionFactory.create(this);
rsListStorage = new MasterRegionServerList(masterRegion, this);

this.serverManager = createServerManager(this, rsListStorage);
this.syncReplicationReplayWALManager = new SyncReplicationReplayWALManager(this);
if (!conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
this.splitWALManager = new SplitWALManager(this);
}

// initialize master local region
masterRegion = MasterRegionFactory.create(this);


tryMigrateMetaLocationsFromZooKeeper();

@@ -964,7 +969,8 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc
this.regionServerTracker.upgrade(
procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream()
.map(p -> (ServerCrashProcedure) p).map(p -> p.getServerName()).collect(Collectors.toSet()),
walManager.getLiveServersFromWALDir(), walManager.getSplittingServersFromWALDir());
Sets.union(rsListStorage.getAll(), walManager.getLiveServersFromWALDir()),
walManager.getSplittingServersFromWALDir());
// This manager must be accessed AFTER hbase:meta is confirmed on line..
this.tableStateManager = new TableStateManager(this);

@@ -1387,11 +1393,12 @@ private void initMobCleaner() {
* </p>
*/
@InterfaceAudience.Private
protected ServerManager createServerManager(final MasterServices master) throws IOException {
protected ServerManager createServerManager(MasterServices master,
RegionServerList storage) throws IOException {
// We put this out here in a method so can do a Mockito.spy and stub it out
// w/ a mocked up ServerManager.
setupClusterConnection();
return new ServerManager(master);
return new ServerManager(master, storage);
}

private void waitForRegionServers(final MonitoredTask status)
@@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.assignment.ServerState;
import org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link MasterRegion} based {@link RegionServerList}.
* <p/>
* This is useful when we want to restart a cluster with only the data on file system, as when
* restarting, we need to get the previous live region servers for scheduling SCP. Before we have
* this class, we need to scan the WAL directory on WAL file system to find out the previous live
* region servers, which means we can not restart a cluster without the previous WAL file system,
* even if we have flushed all the data.
* <p/>
* Please see HBASE-26245 for more details.
*/
@InterfaceAudience.Private
public class MasterRegionServerList implements RegionServerList {

private static final Logger LOG = LoggerFactory.getLogger(MasterRegionServerList.class);

private final MasterRegion region;

private final Abortable abortable;

public MasterRegionServerList(MasterRegion region, Abortable abortable) {
this.region = region;
this.abortable = abortable;
}

@Override
public void started(ServerName sn) {
Put put =
new Put(Bytes.toBytes(sn.getServerName())).addColumn(MasterRegionFactory.REGION_SERVER_FAMILY,
HConstants.STATE_QUALIFIER, Bytes.toBytes(ServerState.ONLINE.name()));
try {
region.update(r -> r.put(put));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to record region server {} as started, aborting...", sn,
e);
abortable.abort("Failed to record region server as started");
throw new UncheckedIOException(e);
}
}

@Override
public void expired(ServerName sn) {
Delete delete = new Delete(Bytes.toBytes(sn.getServerName()))
.addFamily(MasterRegionFactory.REGION_SERVER_FAMILY);
try {
region.update(r -> r.delete(delete));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to record region server {} as expired, aborting...", sn,
e);
abortable.abort("Failed to record region server as expired");
throw new UncheckedIOException(e);
}
}

@Override
public Set<ServerName> getAll() throws IOException {
Set<ServerName> rsList = new HashSet<>();
try (ResultScanner scanner =
region.getScanner(new Scan().addFamily(MasterRegionFactory.REGION_SERVER_FAMILY))) {
for (;;) {
Result result = scanner.next();
if (result == null) {
break;
}
rsList.add(ServerName.valueOf(Bytes.toString(result.getRow())));
}
}
return rsList;
}

}
@@ -158,8 +158,6 @@ private boolean checkFileSystem() {

/**
* Get Servernames which are currently splitting; paths have a '-splitting' suffix.
* @return ServerName
* @throws IOException IOException
*/
public Set<ServerName> getSplittingServersFromWALDir() throws IOException {
return getServerNamesFromWALDirPath(
@@ -169,8 +167,6 @@ public Set<ServerName> getSplittingServersFromWALDir() throws IOException {
/**
* Get Servernames that COULD BE 'alive'; excludes those that have a '-splitting' suffix as these
* are already being split -- they cannot be 'alive'.
* @return ServerName
* @throws IOException IOException
*/
public Set<ServerName> getLiveServersFromWALDir() throws IOException {
return getServerNamesFromWALDirPath(
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;

import java.io.IOException;
import java.util.Set;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;

/**
* For storing the region server list.
* <p/>
* Mainly be used when restarting master, to load the previous active region server list.
*/
@InterfaceAudience.Private
public interface RegionServerList {

/**
* Called when a region server join the cluster.
*/
void started(ServerName sn);

/**
* Called when a region server is dead.
*/
void expired(ServerName sn);

/**
* Get all live region servers.
*/
Set<ServerName> getAll() throws IOException;
}
@@ -115,22 +115,22 @@ private RegionServerInfo getServerInfo(ServerName serverName)
* {@link ServerManager#findDeadServersAndProcess(Set, Set)}, we call it here under the lock
* protection to prevent concurrency issues with server expiration operation.
* @param deadServersFromPE the region servers which already have SCP associated.
* @param liveServersFromWALDir the live region servers from wal directory.
* @param liveServersBeforeRestart the live region servers we recorded before master restarts.
* @param splittingServersFromWALDir Servers whose WALs are being actively 'split'.
*/
public void upgrade(Set<ServerName> deadServersFromPE, Set<ServerName> liveServersFromWALDir,
public void upgrade(Set<ServerName> deadServersFromPE, Set<ServerName> liveServersBeforeRestart,
Set<ServerName> splittingServersFromWALDir) throws KeeperException, IOException {
LOG.info(
"Upgrading RegionServerTracker to active master mode; {} have existing" +
"ServerCrashProcedures, {} possibly 'live' servers, and {} 'splitting'.",
deadServersFromPE.size(), liveServersFromWALDir.size(), splittingServersFromWALDir.size());
deadServersFromPE.size(), liveServersBeforeRestart.size(), splittingServersFromWALDir.size());
// deadServersFromPE is made from a list of outstanding ServerCrashProcedures.
// splittingServersFromWALDir are being actively split -- the directory in the FS ends in
// '-SPLITTING'. Each splitting server should have a corresponding SCP. Log if not.
splittingServersFromWALDir.stream().filter(s -> !deadServersFromPE.contains(s)).
forEach(s -> LOG.error("{} has no matching ServerCrashProcedure", s));
// create ServerNode for all possible live servers from wal directory
liveServersFromWALDir
liveServersBeforeRestart
.forEach(sn -> server.getAssignmentManager().getRegionStates().getOrCreateServer(sn));
ServerManager serverManager = server.getServerManager();
synchronized (this) {
@@ -142,7 +142,7 @@ public void upgrade(Set<ServerName> deadServersFromPE, Set<ServerName> liveServe
info.getVersionInfo().getVersion()) : ServerMetricsBuilder.of(serverName);
serverManager.checkAndRecordNewServer(serverName, serverMetrics);
}
serverManager.findDeadServersAndProcess(deadServersFromPE, liveServersFromWALDir);
serverManager.findDeadServersAndProcess(deadServersFromPE, liveServersBeforeRestart);
active = true;
}
}
@@ -167,6 +167,7 @@ public class ServerManager {
private final ArrayList<ServerName> drainingServers = new ArrayList<>();

private final MasterServices master;
private final RegionServerList storage;

private final DeadServer deadservers = new DeadServer();

@@ -179,13 +180,14 @@ public class ServerManager {
/**
* Constructor.
*/
public ServerManager(final MasterServices master) {
public ServerManager(final MasterServices master, RegionServerList storage) {
this.master = master;
this.storage = storage;
Configuration c = master.getConfiguration();
maxSkew = c.getLong(MAX_CLOCK_SKEW_MS, 30000);
warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
persistFlushedSequenceId = c.getBoolean(PERSIST_FLUSHEDSEQUENCEID,
PERSIST_FLUSHEDSEQUENCEID_DEFAULT);
persistFlushedSequenceId =
c.getBoolean(PERSIST_FLUSHEDSEQUENCEID, PERSIST_FLUSHEDSEQUENCEID_DEFAULT);
}

/**
@@ -211,7 +213,6 @@ public boolean unregisterListener(final ServerListener listener) {
* @param version the version of the new regionserver, could contain strings like "SNAPSHOT"
* @param ia the InetAddress from which request is received
* @return The ServerName we know this server as.
* @throws IOException
*/
ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
String version, InetAddress ia) throws IOException {
@@ -232,13 +233,12 @@ ServerName regionServerStartup(RegionServerStartupRequest request, int versionNu
LOG.warn(
"THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn);
}
storage.started(sn);
return sn;
}

/**
* Updates last flushed sequence Ids for the regions on server sn
* @param sn
* @param hsl
*/
private void updateLastFlushedSequenceIds(ServerName sn, ServerMetrics hsl) {
for (Entry<byte[], RegionMetrics> entry : hsl.getRegionMetrics().entrySet()) {
@@ -611,6 +611,7 @@ synchronized long expireServer(final ServerName serverName, boolean force) {
}
LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
long pid = master.getAssignmentManager().submitServerCrash(serverName, true, force);
storage.expired(serverName);
// Tell our listeners that a server was removed
if (!this.listeners.isEmpty()) {
this.listeners.stream().forEach(l -> l.serverRemoved(serverName));
@@ -87,12 +87,16 @@ public final class MasterRegionFactory {

public static final byte[] PROC_FAMILY = Bytes.toBytes("proc");

public static final byte[] REGION_SERVER_FAMILY = Bytes.toBytes("rs");

private static final TableDescriptor TABLE_DESC = TableDescriptorBuilder.newBuilder(TABLE_NAME)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.CATALOG_FAMILY)
.setMaxVersions(HConstants.DEFAULT_HBASE_META_VERSIONS).setInMemory(true)
.setBlocksize(HConstants.DEFAULT_HBASE_META_BLOCK_SIZE).setBloomFilterType(BloomType.ROWCOL)
.setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(PROC_FAMILY)).build();
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(PROC_FAMILY))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(REGION_SERVER_FAMILY))
.build();

private static TableDescriptor withTrackerConfigs(Configuration conf) {
String trackerImpl = conf.get(TRACKER_IMPL, conf.get(StoreFileTrackerFactory.TRACKER_IMPL,
@@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;

import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import org.apache.hadoop.hbase.ServerName;

public class DummyRegionServerListStorage implements RegionServerList {

@Override
public void started(ServerName sn) {
}

@Override
public void expired(ServerName sn) {
}

@Override
public Set<ServerName> getAll() throws IOException {
return Collections.emptySet();
}

}
@@ -48,7 +48,8 @@ public class TestClockSkewDetection {
@Test
public void testClockSkewDetection() throws Exception {
final Configuration conf = HBaseConfiguration.create();
ServerManager sm = new ServerManager(new MockNoopMasterServices(conf));
ServerManager sm =
new ServerManager(new MockNoopMasterServices(conf), new DummyRegionServerListStorage());

LOG.debug("regionServerStartup 1");
InetAddress ia1 = InetAddress.getLocalHost();

0 comments on commit bb1bbdd

Please sign in to comment.