From 259d1bdefaee599f0eb214fa1af511c967a9b51a Mon Sep 17 00:00:00 2001 From: haxiaolin Date: Wed, 24 May 2023 12:09:12 +0800 Subject: [PATCH] HBASE-27881 The sleep time in checkQuota of replication WAL reader should be controlled independently --- .../ReplicationSourceManager.java | 2 +- .../ReplicationSourceWALReader.java | 29 ++++++++++++++----- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index d54cda92d901..87f129ebed79 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -1195,7 +1195,7 @@ private long addTotalBufferUsed(long size) { boolean checkBufferQuota(String peerId) { // try not to go over total quota if (totalBufferUsed.get() > totalBufferLimit) { - LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B", + LOG.debug("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B", peerId, totalBufferUsed.get(), totalBufferLimit); return false; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index bd5b7736f3b9..d8d7ac8c1aac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.replication.WALEntryFilter; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -69,6 +70,9 @@ class ReplicationSourceWALReader extends Thread { // position in the WAL to start reading at private long currentPosition; private final long sleepForRetries; + private final long sleepForQuotaCheck; + + private final long logQuotaThrottleInterval; private final int maxRetriesMultiplier; // Indicates whether this particular worker is running @@ -102,6 +106,10 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf, int batchCount = conf.getInt("replication.source.nb.batches", 1); // 1 second this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); + // 300ms + this.sleepForQuotaCheck = this.conf.getLong("replication.source.sleepforquotacheck", 300); + this.logQuotaThrottleInterval = + this.conf.getLong("replication.source.logintervalforquotathrottle.ms", 3000); // 5 minutes @ 1 sec per this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); @@ -140,9 +148,7 @@ public void run() { Threads.sleep(sleepForRetries); continue; } - if (!checkBufferQuota()) { - continue; - } + blockUntilFreeBufferQuota(); Path currentPath = entryStream.getCurrentPath(); WALEntryStream.HasNext hasNext = entryStream.hasNext(); if (hasNext == WALEntryStream.HasNext.NO) { @@ -267,13 +273,20 @@ public Path getCurrentPath() { } // returns false if we've already exceeded the global quota - private boolean checkBufferQuota() { + private void blockUntilFreeBufferQuota() { // try not to go over total quota - if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) { - Threads.sleep(sleepForRetries); - return false; + long current = EnvironmentEdgeManager.currentTime(); + while ( + !this.getSourceManager().checkBufferQuota(this.source.getPeerId()) && isReaderRunning() + ) { + if (EnvironmentEdgeManager.currentTime() - current >= logQuotaThrottleInterval) { + LOG.warn( + "peer={}, source reader check buffer quota failed, current wal is {}, will sleep {}ms for next retry", + this.source.getPeerId(), this.getCurrentPath(), sleepForQuotaCheck); + } + Threads.sleep(sleepForQuotaCheck); + current = EnvironmentEdgeManager.currentTime(); } - return true; } private WALEntryBatch createBatch(WALEntryStream entryStream) {