From a380e0e6f7952c82716b58ea5075fad71028c1e7 Mon Sep 17 00:00:00 2001 From: haxiaolin Date: Wed, 24 May 2023 12:09:12 +0800 Subject: [PATCH] HBASE-27881 The sleep time in checkQuota of replication WAL reader should be controlled independently --- .../ReplicationSourceWALReader.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index bd5b7736f3b9..3adbbc55aeaf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -69,6 +69,7 @@ class ReplicationSourceWALReader extends Thread { // position in the WAL to start reading at private long currentPosition; private final long sleepForRetries; + private final long sleepForQuotaCheck; private final int maxRetriesMultiplier; // Indicates whether this particular worker is running @@ -102,6 +103,8 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf, int batchCount = conf.getInt("replication.source.nb.batches", 1); // 1 second this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); + // 300ms + this.sleepForQuotaCheck = this.conf.getLong("replication.source.sleepforquotacheck", 300); // 5 minutes @ 1 sec per this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); @@ -140,9 +143,7 @@ public void run() { Threads.sleep(sleepForRetries); continue; } - if (!checkBufferQuota()) { - continue; - } + checkBufferQuota(); Path currentPath = entryStream.getCurrentPath(); WALEntryStream.HasNext hasNext = entryStream.hasNext(); if (hasNext == WALEntryStream.HasNext.NO) { @@ -266,14 +267,14 @@ public Path getCurrentPath() { return logQueue.getQueue(walGroupId).peek(); } - // returns false if we've already exceeded the global quota - private boolean checkBufferQuota() { + // sleeping when we've already exceeded the global quota + private void checkBufferQuota() { // try not to go over total quota - if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) { - Threads.sleep(sleepForRetries); - return false; + while (!this.getSourceManager().checkBufferQuota(this.source.getPeerId()) && isReaderRunning()) { + LOG.warn("PeerId={}, sleep {}ms for source reader, current WAL is {}", + this.source.getPeerId(), sleepForQuotaCheck, this.getCurrentPath()); + Threads.sleep(sleepForQuotaCheck); } - return true; } private WALEntryBatch createBatch(WALEntryStream entryStream) {