Skip to content

Commit

Permalink
HBASE-25055 Add ReplicationSource for meta WALs; add enable/disable w… (
Browse files Browse the repository at this point in the history
#2451)

* HBASE-25055 Add ReplicationSource for meta WALs; add enable/disable when hbase:meta assigned to RS

Fill in gap left by HBASE-11183 'Timeline Consistent region replicas - Phase 2 design'.
HBASE-11183 left off implementing 'async WAL Replication' on the hbase:meta
Table; hbase:meta Table could only do Phase 1 Region Replicas reading
the primary Regions' hfiles. Here we add 'async WAL Replication' to
hbase:meta so Replicas can be more current with the primary's changes.

Adds a 'special' ReplicationSource that reads hbase:meta WAL files and replicates
all edits to the configured in-cluster endpoint (Defaults to the
RegionReadReplicaEndpoint.class -- set hbase.region.replica.catalog.replication to
target a different endpoint implementation).

Set hbase.region.replica.replication.catalog.enabled to enable async WAL
Replication for hbase:meta region replicas. Its off by default.

The CatalogReplicationSource for async WAL Replication of hbase:meta does
NOT need to keep up WAL offset or a queue of WALs-to-replicate in the
replication queue store as is done in other ReplicationSource implementations;
the CatalogReplicationSource is for Region Replicas only. General
Replication does not replicate hbase:meta. hbase:meta Region Replicas reset
on crash of the primary replica so there is no need to 'recover'
replication that was running on the crashed server.

Because it so different in operation, the CatalogReplicationSource is bolted
on to the side of the ReplicationSourceManager. It is lazily
instantiated to match the lazy instantiation of the hbase:meta
WALProvider, created and started on the open of the first Region of an
hbase:meta table. Thereafter it stays up till the process dies, even if
all hbase:meta Regions have moved off the server, in case a hbase:meta
Region is moved back (Doing this latter simplifies the implementation)

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
  Read configuration to see if we need to wait on setting a Region read-enabled
  (if so, replicas will only flip to enable read after confirming a
  flush of the primary so they for sure are a replica of a known point)

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
 If configured, on open of hbase:meta, ask the ReplicationSourceManager
 to add a ReplicationSource (if it hasn't already).

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
 Edit log message.

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
 If configured, on close of hbase:meta, update ReplicationSourceManager
 that a source Region has closed.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
 javadoc and make constructor private.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
 Add logPositionAndCleanOldLogs w/ default of the old behavior so
 CatalogReplicationSource can bypass updating store with WAL position,
 etc.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 Add creation and start of an CatalogReplicationSource.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
 Go via ReplicationSource when calling logPostionAndCleanOldLogs so new RS can intercept.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
 Javadoc.

hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
 Add utility for reading configurations for hbase:meta region replicas.

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
 Javadoc.

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
 Use define.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
 Specical version of ReplicationSource for Region Replicas on hbase:meta.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java
 Needs a special peer too (peers are baked into replication though we don't use 'peers' here)

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALProvider.java
 Tests.

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Huaxiang Sun <huaxiangsun@apache.com>
  • Loading branch information
saintstack committed Nov 10, 2020
1 parent b0fdf1a commit 9db4e77
Show file tree
Hide file tree
Showing 23 changed files with 797 additions and 83 deletions.
Expand Up @@ -2486,9 +2486,9 @@ private void triggerFlushInPrimaryRegion(final HRegion region) {
if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
return;
}
if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) ||
!ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(
region.conf)) {
TableName tn = region.getTableDescriptor().getTableName();
if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn) ||
!ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf)) {
region.setReadsEnabled(true);
return;
}
Expand Down
Expand Up @@ -20,8 +20,11 @@
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
Expand All @@ -31,6 +34,7 @@
import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -129,8 +133,15 @@ public void process() throws IOException {
}
// pass null for the last parameter, which used to be a CancelableProgressable, as now the
// opening can not be interrupted by a close request any more.
region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), rs.getConfiguration(),
rs, null);
Configuration conf = rs.getConfiguration();
TableName tn = htd.getTableName();
if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(conf, tn)) {
if (RegionReplicaUtil.isDefaultReplica(this.regionInfo.getReplicaId())) {
// Add the hbase:meta replication source on replica zero/default.
rs.getReplicationSourceService().getReplicationManager().addCatalogReplicationSource();
}
}
region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), conf, rs, null);
} catch (IOException e) {
cleanUpAndReportFailure(e);
return;
Expand Down
Expand Up @@ -100,9 +100,9 @@ void triggerFlushInPrimaryRegion(final HRegion region) throws IOException {
RetryCounter counter = new RetryCounterFactory(maxAttempts, (int)pause).create();

if (LOG.isDebugEnabled()) {
LOG.debug("RPC'ing to primary region replica " +
ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) + " from " +
region.getRegionInfo() + " to trigger FLUSH");
LOG.debug("RPC'ing to primary " + ServerRegionReplicaUtil.
getRegionInfoForDefaultReplica(region.getRegionInfo()).getRegionNameAsString() +
" from " + region.getRegionInfo().getRegionNameAsString() + " to trigger FLUSH");
}
while (!region.isClosing() && !region.isClosed()
&& !server.isAborted() && !server.isStopped()) {
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.HRegion;
Expand All @@ -30,6 +31,7 @@
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -113,12 +115,20 @@ public void process() throws IOException {
if (region.close(abort) == null) {
// XXX: Is this still possible? The old comment says about split, but now split is done at
// master side, so...
LOG.warn("Can't close {} already closed during close()", regionName);
LOG.warn("Can't close {}, already closed during close()", regionName);
rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE);
return;
}

rs.removeRegion(region, destination);
if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(rs.getConfiguration(),
region.getTableDescriptor().getTableName())) {
if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo().getReplicaId())) {
// If hbase:meta read replicas enabled, remove replication source for hbase:meta Regions.
// See assign region handler where we add the replication source on open.
rs.getReplicationSourceService().getReplicationManager().removeCatalogReplicationSource();
}
}
if (!rs.reportRegionStateTransition(
new RegionStateTransitionContext(TransitionCode.CLOSED, HConstants.NO_SEQNUM, closeProcId,
-1, region.getRegionInfo()))) {
Expand Down
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;

import java.util.Collections;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;

/**
* ReplicationSource that reads catalog WAL files -- e.g. hbase:meta WAL files -- and lets through
* all WALEdits from these WALs. This ReplicationSource is NOT created via
* {@link ReplicationSourceFactory}.
*/
@InterfaceAudience.Private
class CatalogReplicationSource extends ReplicationSource {
CatalogReplicationSource() {
// Filters in hbase:meta WAL files and allows all edits, including 'meta' edits (these are
// filtered out in the 'super' class default implementation).
super(p -> AbstractFSWALProvider.isMetaFile(p), Collections.emptyList());
}

@Override
public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
// Noop. This implementation does not persist state to backing storage nor does it keep its
// WALs in a general map up in ReplicationSourceManager so just skip calling through to the
// default implemenentation.
}
}
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;

/**
* The 'peer' used internally by Catalog Region Replicas Replication Source.
* The Replication system has 'peer' baked into its core so though we do not need 'peering', we
* need a 'peer' and its configuration else the replication system breaks at a few locales.
* Set "hbase.region.replica.catalog.replication" if you want to change the configured endpoint.
*/
@InterfaceAudience.Private
class CatalogReplicationSourcePeer extends ReplicationPeerImpl {
/**
* @param clusterKey Usually the UUID from zk passed in by caller as a String.
*/
CatalogReplicationSourcePeer(Configuration configuration, String clusterKey, String peerId) {
super(configuration, ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER + "_catalog",
ReplicationPeerConfig.newBuilder().
setClusterKey(clusterKey).
setReplicationEndpointImpl(
configuration.get("hbase.region.replica.catalog.replication",
RegionReplicaReplicationEndpoint.class.getName())).
setBandwidth(0). // '0' means no bandwidth.
setSerial(false).
build(),
true, SyncReplicationState.NONE, SyncReplicationState.NONE);
}
}
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Noop queue storage -- does nothing.
*/
@InterfaceAudience.Private
class NoopReplicationQueueStorage implements ReplicationQueueStorage {
NoopReplicationQueueStorage() {}

@Override
public void removeQueue(ServerName serverName, String queueId) throws ReplicationException {}

@Override
public void addWAL(ServerName serverName, String queueId, String fileName)
throws ReplicationException {}

@Override
public void removeWAL(ServerName serverName, String queueId, String fileName)
throws ReplicationException { }

@Override
public void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
Map<String, Long> lastSeqIds) throws ReplicationException {}

@Override
public long getLastSequenceId(String encodedRegionName, String peerId)
throws ReplicationException {
return 0;
}

@Override
public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
throws ReplicationException {}

@Override
public void removeLastSequenceIds(String peerId) throws ReplicationException {}

@Override
public void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
throws ReplicationException {}

@Override
public long getWALPosition(ServerName serverName, String queueId, String fileName)
throws ReplicationException {
return 0;
}

@Override
public List<String> getWALsInQueue(ServerName serverName, String queueId)
throws ReplicationException {
return Collections.EMPTY_LIST;
}

@Override
public List<String> getAllQueues(ServerName serverName) throws ReplicationException {
return Collections.EMPTY_LIST;
}

@Override
public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId,
ServerName destServerName) throws ReplicationException {
return null;
}

@Override
public void removeReplicatorIfQueueIsEmpty(ServerName serverName)
throws ReplicationException {}

@Override
public List<ServerName> getListOfReplicators() throws ReplicationException {
return Collections.EMPTY_LIST;
}

@Override
public Set<String> getAllWALs() throws ReplicationException {
return Collections.EMPTY_SET;
}

@Override
public void addPeerToHFileRefs(String peerId) throws ReplicationException {}

@Override
public void removePeerFromHFileRefs(String peerId) throws ReplicationException {}

@Override
public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
throws ReplicationException {}

@Override
public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException {}

@Override
public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
return Collections.EMPTY_LIST;
}

@Override
public List<String> getReplicableHFiles(String peerId) throws ReplicationException {
return Collections.EMPTY_LIST;
}

@Override
public Set<String> getAllHFileRefs() throws ReplicationException {
return Collections.EMPTY_SET;
}

@Override
public String getRsNode(ServerName serverName) {
return null;
}
}
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down
Expand Up @@ -368,7 +368,7 @@ public boolean replicate(ReplicateContext replicateContext) {
ctx.getMetrics().incrLogEditsFiltered(skippedEdits);
return true;
} else {
LOG.warn("Failed to replicate all entris, retry={}", retryCounter.getAttemptTimes());
LOG.warn("Failed to replicate all entries, retry={}", retryCounter.getAttemptTimes());
if (!retryCounter.shouldRetry()) {
return false;
}
Expand Down
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -116,14 +115,14 @@ public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir
SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager();
this.globalMetricsSource = CompatibilitySingletonFactory
.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
WALProvider walProvider = walFactory.getWALProvider();
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers,
replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walFactory,
mapping, globalMetricsSource);
this.syncReplicationPeerInfoProvider =
new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping);
PeerActionListener peerActionListener = PeerActionListener.DUMMY;
// Get the user-space WAL provider
WALProvider walProvider = walFactory != null? walFactory.getWALProvider(): null;
if (walProvider != null) {
walProvider
.addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
Expand Down

0 comments on commit 9db4e77

Please sign in to comment.