Skip to content

Commit

Permalink
HBASE-27881 The sleep time in checkQuota of replication WAL reader sh…
Browse files Browse the repository at this point in the history
…ould be controlled independently
  • Loading branch information
sunhelly committed Jun 7, 2023
1 parent 22526a6 commit 8728906
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1195,7 +1195,7 @@ private long addTotalBufferUsed(long size) {
boolean checkBufferQuota(String peerId) {
// try not to go over total quota
if (totalBufferUsed.get() > totalBufferLimit) {
LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
LOG.debug("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
peerId, totalBufferUsed.get(), totalBufferLimit);
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
Expand Down Expand Up @@ -69,6 +70,9 @@ class ReplicationSourceWALReader extends Thread {
// position in the WAL to start reading at
private long currentPosition;
private final long sleepForRetries;
private final long sleepForQuotaCheck;

private final long logQuotaThrottleInterval;
private final int maxRetriesMultiplier;

// Indicates whether this particular worker is running
Expand Down Expand Up @@ -102,6 +106,10 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
int batchCount = conf.getInt("replication.source.nb.batches", 1);
// 1 second
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
// 300ms
this.sleepForQuotaCheck = this.conf.getLong("replication.source.sleepforquotacheck", 300);
this.logQuotaThrottleInterval =
this.conf.getLong("replication.source.logintervalforquotathrottle.ms", 3000);
// 5 minutes @ 1 sec per
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
Expand Down Expand Up @@ -140,9 +148,7 @@ public void run() {
Threads.sleep(sleepForRetries);
continue;
}
if (!checkBufferQuota()) {
continue;
}
blockUntilFreeBufferQuota();
Path currentPath = entryStream.getCurrentPath();
WALEntryStream.HasNext hasNext = entryStream.hasNext();
if (hasNext == WALEntryStream.HasNext.NO) {
Expand Down Expand Up @@ -266,14 +272,19 @@ public Path getCurrentPath() {
return logQueue.getQueue(walGroupId).peek();
}

// returns false if we've already exceeded the global quota
private boolean checkBufferQuota() {
// try not to go over total quota
if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) {
Threads.sleep(sleepForRetries);
return false;
// blocking until buffer quota free
private void blockUntilFreeBufferQuota() {
long start = EnvironmentEdgeManager.currentTime();
while (
!this.getSourceManager().checkBufferQuota(this.source.getPeerId()) && isReaderRunning()
) {
if (EnvironmentEdgeManager.currentTime() - start >= logQuotaThrottleInterval) {
LOG.warn("peer={}, source reader is blocking until buffer quota free, current wal is {}",
this.source.getPeerId(), this.getCurrentPath());
start = EnvironmentEdgeManager.currentTime();
}
Threads.sleep(sleepForQuotaCheck);
}
return true;
}

private WALEntryBatch createBatch(WALEntryStream entryStream) {
Expand Down

0 comments on commit 8728906

Please sign in to comment.