From ccbd2e8ad0241f9e2ba3f33f9fbca037088f4ef0 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Mon, 13 Jul 2020 17:35:32 +0800 Subject: [PATCH] HBASE-24998 Introduce a ReplicationSourceOverallController interface and decouple ReplicationSourceManager and ReplicationSource --- .../org/apache/hadoop/hbase/HConstants.java | 2 + .../replication/ReplicationListener.java | 2 +- .../ReplicationSourceController.java | 47 ++++++ .../RecoveredReplicationSource.java | 18 +-- .../regionserver/ReplicationSource.java | 32 ++-- .../ReplicationSourceInterface.java | 25 +++- .../ReplicationSourceManager.java | 141 +++++++++--------- .../ReplicationSourceWALReader.java | 13 +- .../replication/ReplicationSourceDummy.java | 21 ++- .../TestReplicationSourceManager.java | 10 +- .../regionserver/TestWALEntryStream.java | 15 +- 11 files changed, 193 insertions(+), 133 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index f6f00c552546..d3800b6397c6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -979,6 +979,8 @@ public enum OperationStatusCode { /* * cluster replication constants. */ + public static final String REPLICATION_OFFLOAD_ENABLE_KEY = "hbase.replication.offload.enabled"; + public static final boolean REPLICATION_OFFLOAD_ENABLE_DEFAULT = false; public static final String REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service"; public static final String diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java index f040bf999336..6ecbb4670927 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java @@ -33,5 +33,5 @@ public interface ReplicationListener { * A region server has been removed from the local cluster * @param regionServer the removed region server */ - public void regionServerRemoved(String regionServer); + void regionServerRemoved(String regionServer); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java new file mode 100644 index 000000000000..bf4c36520f84 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; +import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; +import org.apache.yetus.audience.InterfaceAudience; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Used to control all replication sources inside one RegionServer or ReplicationServer. + * Used by {@link ReplicationSource} or {@link RecoveredReplicationSource}. + */ +@InterfaceAudience.Private +public interface ReplicationSourceController { + + /** + * Returns the maximum size in bytes of edits held in memory which are pending replication + * across all sources inside this RegionServer or ReplicationServer. + */ + long getTotalBufferLimit(); + + AtomicLong getTotalBufferUsed(); + + MetricsReplicationGlobalSourceSource getGlobalMetrics(); + + /** + * Call this when the recovered replication source replicated all WALs. + */ + void finishRecoveredSource(RecoveredReplicationSource src); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index 62685eea6213..e0b626c02fa8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; @@ -45,18 +46,15 @@ public class RecoveredReplicationSource extends ReplicationSource { private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class); - private Path walDir; - private String actualPeerId; @Override - public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager, - ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, - String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException { - super.init(conf, fs, walDir, manager, queueStorage, replicationPeer, server, peerClusterZnode, - clusterId, walFileLengthProvider, metrics); - this.walDir = walDir; + public void init(Configuration conf, FileSystem fs, Path walDir, + ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, + ReplicationPeer replicationPeer, Server server, String peerClusterZnode, UUID clusterId, + WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { + super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server, + peerClusterZnode, clusterId, walFileLengthProvider, metrics); this.actualPeerId = this.replicationQueueInfo.getPeerId(); } @@ -149,7 +147,7 @@ private Path getReplSyncUpPath(Path path) throws IOException { void tryFinish() { if (workerThreads.isEmpty()) { this.getSourceMetrics().clear(); - manager.finishRecoveredSource(this); + controller.finishRecoveredSource(this); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index fd9fb311b2ba..65420cddf33a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; @@ -99,8 +100,9 @@ public class ReplicationSource implements ReplicationSourceInterface { protected Configuration conf; protected ReplicationQueueInfo replicationQueueInfo; - // The manager of all sources to which we ping back our progress - ReplicationSourceManager manager; + protected Path walDir; + + protected ReplicationSourceController controller; // Should we stop everything? protected Server server; // How long should we sleep for each retry @@ -177,23 +179,14 @@ public class ReplicationSource implements ReplicationSourceInterface { this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries); } - /** - * Instantiation method used by region servers - * @param conf configuration to use - * @param fs file system to use - * @param manager replication manager to ping to - * @param server the server for this region server - * @param queueId the id of our replication queue - * @param clusterId unique UUID for the cluster - * @param metrics metrics for replication source - */ @Override - public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager, - ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, - String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException { + public void init(Configuration conf, FileSystem fs, Path walDir, + ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, + ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, + WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { this.server = server; this.conf = HBaseConfiguration.create(conf); + this.walDir = walDir; this.waitOnEndpointSeconds = this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS); decorateConf(); @@ -204,7 +197,7 @@ public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSour this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); this.queueStorage = queueStorage; this.replicationPeer = replicationPeer; - this.manager = manager; + this.controller = overallController; this.fs = fs; this.metrics = metrics; this.clusterId = clusterId; @@ -217,6 +210,7 @@ public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSour currentBandwidth = getCurrentBandwidth(); this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); this.walFileLengthProvider = walFileLengthProvider; + LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, replicationPeer.getId(), this.currentBandwidth); } @@ -734,9 +728,9 @@ public void postShipEdits(List entries, int batchSize) { throttler.addPushSize(batchSize); } totalReplicatedEdits.addAndGet(entries.size()); - long newBufferUsed = manager.getTotalBufferUsed().addAndGet(-batchSize); + long newBufferUsed = controller.getTotalBufferUsed().addAndGet(-batchSize); // Record the new buffer usage - this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); + controller.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 321edc2bf08b..f3bf8a41ff90 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; @@ -44,14 +45,22 @@ public interface ReplicationSourceInterface { /** * Initializer for the source * - * @param conf the configuration to use - * @param fs the file system to use - * @param server the server for this region server - */ - void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager, - ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, - String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException; + * @param conf configuration to use + * @param fs file system to use + * @param walDir the directory where the WAL is located + * @param overallController the overall controller of all replication sources + * @param queueStorage the replication queue storage + * @param replicationPeer the replication peer + * @param server the server which start and run this replication source + * @param queueId the id of our replication queue + * @param clusterId unique UUID for the cluster + * @param walFileLengthProvider used to get the WAL length + * @param metrics metrics for this replication source + */ + void init(Configuration conf, FileSystem fs, Path walDir, + ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, + ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, + WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException; /** * Add a log to the list of logs to replicate diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 32126978e1ee..de9e21f99ae7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Bytes; @@ -92,7 +93,7 @@ * */ @InterfaceAudience.Private -public class ReplicationSourceManager implements ReplicationListener { +public class ReplicationSourceManager implements ReplicationListener, ReplicationSourceController { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class); // all the sources that read this RS's logs and every peer only has one replication source private final ConcurrentMap sources; @@ -126,18 +127,18 @@ public class ReplicationSourceManager implements ReplicationListener { private AtomicLong totalBufferUsed = new AtomicLong(); - // How long should we sleep for each retry when deleting remote wal files for sync replication - // peer. - private final long sleepForRetries; - // Maximum number of retries before taking bold actions when deleting remote wal files for sync - // replication peer. - private final int maxRetriesMultiplier; // Total buffer size on this RegionServer for holding batched edits to be shipped. private final long totalBufferLimit; private final MetricsReplicationGlobalSourceSource globalMetrics; private final Map sourceMetrics = new HashMap<>(); + /** + * When enable replication offload, will not create replication source and only write WAL to + * replication queue storage. The replication source will be started by ReplicationServer. + */ + private final boolean replicationOffload; + /** * Creates a replication manager and sets the watch on all the other registered region servers * @param queueStorage the interface for manipulating replication queues @@ -186,12 +187,11 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage, this.latestPaths = new HashMap<>(); this.replicationForBulkLoadDataEnabled = conf.getBoolean( HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); - this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000); - this.maxRetriesMultiplier = - this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60); this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); this.globalMetrics = globalMetrics; + this.replicationOffload = conf.getBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY, + HConstants.REPLICATION_OFFLOAD_ENABLE_DEFAULT); } /** @@ -212,6 +212,47 @@ Future init() throws IOException { return this.executor.submit(this::adoptAbandonedQueues); } + @VisibleForTesting + @Override + public AtomicLong getTotalBufferUsed() { + return totalBufferUsed; + } + + @Override + public long getTotalBufferLimit() { + return totalBufferLimit; + } + + @Override + public void finishRecoveredSource(RecoveredReplicationSource src) { + synchronized (oldsources) { + if (!removeRecoveredSource(src)) { + return; + } + } + LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(), + src.getStats()); + } + + @Override + public MetricsReplicationGlobalSourceSource getGlobalMetrics() { + return this.globalMetrics; + } + + /** + * Clear the metrics and related replication queue of the specified old source + * @param src source to clear + */ + private boolean removeRecoveredSource(ReplicationSourceInterface src) { + if (!this.oldsources.remove(src)) { + return false; + } + LOG.info("Done with the recovered queue {}", src.getQueueId()); + // Delete queue from storage and memory + deleteQueue(src.getQueueId()); + return true; + } + private void adoptAbandonedQueues() { List currentReplicators = null; try { @@ -331,8 +372,7 @@ private ReplicationSourceInterface createSource(String queueId, ReplicationPeer * @param peerId the id of the replication peer * @return the source that was created */ - @VisibleForTesting - ReplicationSourceInterface addSource(String peerId) throws IOException { + void addSource(String peerId) throws IOException { ReplicationPeer peer = replicationPeers.getPeer(peerId); ReplicationSourceInterface src = createSource(peerId, peer); // synchronized on latestPaths to avoid missing the new log @@ -354,8 +394,9 @@ ReplicationSourceInterface addSource(String peerId) throws IOException { if (peerConfig.isSyncReplication()) { syncReplicationPeerMappingManager.add(peer.getId(), peerConfig); } - src.startup(); - return src; + if (!replicationOffload) { + src.startup(); + } } /** @@ -373,7 +414,11 @@ ReplicationSourceInterface addSource(String peerId) throws IOException { *

* @param peerId the id of the sync replication peer */ - public void drainSources(String peerId) throws IOException, ReplicationException { + void drainSources(String peerId) throws IOException, ReplicationException { + if (replicationOffload) { + throw new ReplicationException( + "Should not add use sync replication when replication offload enabled"); + } String terminateMessage = "Sync replication peer " + peerId + " is transiting to STANDBY. Will close the previous replication source and open a new one"; ReplicationPeer peer = replicationPeers.getPeer(peerId); @@ -430,7 +475,7 @@ public void drainSources(String peerId) throws IOException, ReplicationException * replication queue storage and only to enqueue all logs to the new replication source * @param peerId the id of the replication peer */ - public void refreshSources(String peerId) throws ReplicationException, IOException { + void refreshSources(String peerId) throws ReplicationException, IOException { String terminateMessage = "Peer " + peerId + " state or config changed. Will close the previous replication source and open a new one"; ReplicationPeer peer = replicationPeers.getPeer(peerId); @@ -447,7 +492,9 @@ public void refreshSources(String peerId) throws ReplicationException, IOExcepti .forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); } LOG.info("Startup replication source for " + src.getPeerId()); - src.startup(); + if (!replicationOffload) { + src.startup(); + } List toStartup = new ArrayList<>(); // synchronized on oldsources to avoid race with NodeFailoverWorker @@ -470,41 +517,18 @@ public void refreshSources(String peerId) throws ReplicationException, IOExcepti toStartup.add(recoveredReplicationSource); } } - for (ReplicationSourceInterface replicationSource : toStartup) { - replicationSource.startup(); - } - } - - /** - * Clear the metrics and related replication queue of the specified old source - * @param src source to clear - */ - private boolean removeRecoveredSource(ReplicationSourceInterface src) { - if (!this.oldsources.remove(src)) { - return false; - } - LOG.info("Done with the recovered queue {}", src.getQueueId()); - // Delete queue from storage and memory - deleteQueue(src.getQueueId()); - return true; - } - - void finishRecoveredSource(ReplicationSourceInterface src) { - synchronized (oldsources) { - if (!removeRecoveredSource(src)) { - return; + if (!replicationOffload) { + for (ReplicationSourceInterface replicationSource : toStartup) { + replicationSource.startup(); } } - LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(), - src.getStats()); } /** * Clear the metrics and related replication queue of the specified old source * @param src source to clear */ - void removeSource(ReplicationSourceInterface src) { - LOG.info("Done with the queue " + src.getQueueId()); + private void removeSource(ReplicationSourceInterface src) { this.sources.remove(src.getPeerId()); // Delete queue from storage and memory deleteQueue(src.getQueueId()); @@ -548,8 +572,7 @@ private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) thro } } - // public because of we call it in TestReplicationEmptyWALRecovery - @VisibleForTesting + @InterfaceAudience.Private public void preLogRoll(Path newLog) throws IOException { String logName = newLog.getName(); String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); @@ -567,9 +590,8 @@ public void preLogRoll(Path newLog) throws IOException { } } - // public because of we call it in TestReplicationEmptyWALRecovery - @VisibleForTesting - public void postLogRoll(Path newLog) throws IOException { + @InterfaceAudience.Private + public void postLogRoll(Path newLog) { // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources.values()) { source.enqueueLog(newLog); @@ -739,7 +761,9 @@ public void run() { LOG.trace("Enqueueing log from recovered queue for source: " + src.getQueueId()); src.enqueueLog(new Path(oldLogDir, wal)); } - src.startup(); + if (!replicationOffload) { + src.startup(); + } } } catch (IOException e) { // TODO manage it @@ -849,19 +873,6 @@ Set getLastestPath() { } } - @VisibleForTesting - public AtomicLong getTotalBufferUsed() { - return totalBufferUsed; - } - - /** - * Returns the maximum size in bytes of edits held in memory which are pending replication - * across all sources inside this RegionServer. - */ - public long getTotalBufferLimit() { - return totalBufferLimit; - } - /** * Get the directory where wals are archived * @return the directory where wals are archived @@ -967,10 +978,6 @@ int activeFailoverTaskCount() { return executor.getActiveCount(); } - MetricsReplicationGlobalSourceSource getGlobalMetrics() { - return this.globalMetrics; - } - @InterfaceAudience.Private Server getServer() { return this.server; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 22cbd97d33af..7b7d0d830f97 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -267,10 +267,11 @@ public Path getCurrentPath() { //returns false if we've already exceeded the global quota private boolean checkQuota() { // try not to go over total quota - if (source.manager.getTotalBufferUsed().get() > source.manager.getTotalBufferLimit()) { + if (source.controller.getTotalBufferUsed().get() > source.controller + .getTotalBufferLimit()) { LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B", - this.source.getPeerId(), source.manager.getTotalBufferUsed().get(), - source.manager.getTotalBufferLimit()); + this.source.getPeerId(), source.controller.getTotalBufferUsed().get(), + source.controller.getTotalBufferLimit()); Threads.sleep(sleepForRetries); return false; } @@ -399,10 +400,10 @@ private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) { * @return true if we should clear buffer and push all */ private boolean acquireBufferQuota(long size) { - long newBufferUsed = source.manager.getTotalBufferUsed().addAndGet(size); + long newBufferUsed = source.controller.getTotalBufferUsed().addAndGet(size); // Record the new buffer usage - source.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); - return newBufferUsed >= source.manager.getTotalBufferLimit(); + source.controller.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); + return newBufferUsed >= source.controller.getTotalBufferLimit(); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index b75a7ed3ab88..66059c722cb3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch; import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -40,21 +39,21 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { private ReplicationPeer replicationPeer; - private String peerClusterId; + private String queueId; private Path currentPath; private MetricsSource metrics; private WALFileLengthProvider walFileLengthProvider; private AtomicBoolean startup = new AtomicBoolean(false); @Override - public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager, - ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, - UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) - throws IOException { - this.peerClusterId = peerClusterId; + public void init(Configuration conf, FileSystem fs, Path walDir, + ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, + ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, + WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { + this.queueId = queueId; this.metrics = metrics; this.walFileLengthProvider = walFileLengthProvider; - this.replicationPeer = rp; + this.replicationPeer = replicationPeer; } @Override @@ -96,14 +95,14 @@ public void terminate(String reason, Exception e, boolean clearMetrics) { @Override public String getQueueId() { - return peerClusterId; + return queueId; } @Override public String getPeerId() { - String[] parts = peerClusterId.split("-", 2); + String[] parts = queueId.split("-", 2); return parts.length != 1 ? - parts[0] : peerClusterId; + parts[0] : queueId; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 4b685ce42039..e1b33d974fc2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; +import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; @@ -816,10 +817,11 @@ private int isLogZnodesMapPopulated() { static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy { - @Override public void init(Configuration conf, FileSystem fs, Path walDir, - ReplicationSourceManager manager, ReplicationQueueStorage rq, ReplicationPeer rp, - Server server, String peerClusterId, UUID clusterId, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { + @Override + public void init(Configuration conf, FileSystem fs, Path walDir, + ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, + ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, + WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException{ throw new IOException("Failing deliberately"); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 9410604f5d7c..bafabb0e5dfe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; +import org.apache.hadoop.hbase.replication.ReplicationSourceController; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -375,19 +376,19 @@ private ReplicationSource mockReplicationSource(boolean recovered, Configuration when(source.getWALFileLengthProvider()).thenReturn(log); when(source.getServer()).thenReturn(mockServer); when(source.isRecovered()).thenReturn(recovered); - source.manager = mockReplicationSourceManager(); + source.controller = mockReplicationSourceController(); return source; } - private ReplicationSourceManager mockReplicationSourceManager() { - ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); + private ReplicationSourceController mockReplicationSourceController() { + ReplicationSourceController controller = Mockito.mock(ReplicationSourceController.class); MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(MetricsReplicationGlobalSourceSource.class); - when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics); - when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); - when(mockSourceManager.getTotalBufferLimit()) + when(controller.getGlobalMetrics()).thenReturn(globalMetrics); + when(controller.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + when(controller.getTotalBufferLimit()) .thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); - return mockSourceManager; + return controller; } private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {