Skip to content

Commit

Permalink
HBASE-27785 Encapsulate and centralize totalBufferUsed in Replication… (
Browse files Browse the repository at this point in the history
#5168)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
comnetwork committed Apr 21, 2023
1 parent f5ee958 commit 269586c
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
new ConcurrentHashMap<>();

private AtomicLong totalBufferUsed;

public static final String WAIT_ON_ENDPOINT_SECONDS =
"hbase.replication.wait.on.endpoint.seconds";
public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
Expand Down Expand Up @@ -220,7 +218,6 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
currentBandwidth = getCurrentBandwidth();
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
this.totalBufferUsed = manager.getTotalBufferUsed();
this.walFileLengthProvider = walFileLengthProvider;

this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true);
Expand Down Expand Up @@ -779,14 +776,12 @@ public MetricsSource getSourceMetrics() {

@Override
// offsets totalBufferUsed by deducting shipped batchSize.
public void postShipEdits(List<Entry> entries, int batchSize) {
public void postShipEdits(List<Entry> entries, long batchSize) {
if (throttler.isEnabled()) {
throttler.addPushSize(batchSize);
}
totalReplicatedEdits.addAndGet(entries.size());
long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
// Record the new buffer usage
this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
this.manager.releaseBufferQuota(batchSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ default boolean isSyncReplication() {
* @param entries pushed
* @param batchSize entries size pushed
*/
void postShipEdits(List<Entry> entries, int batchSize);
void postShipEdits(List<Entry> entries, long batchSize);

/**
* The queue of WALs only belong to one region server. This will return the server name which all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -984,8 +985,8 @@ Set<Path> getLastestPath() {
}
}

public AtomicLong getTotalBufferUsed() {
return totalBufferUsed;
public long getTotalBufferUsed() {
return totalBufferUsed.get();
}

/**
Expand Down Expand Up @@ -1035,7 +1036,7 @@ public String getStats() {
StringBuilder stats = new StringBuilder();
// Print stats that apply across all Replication Sources
stats.append("Global stats: ");
stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=")
stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed()).append("B, Limit=")
.append(getTotalBufferLimit()).append("B\n");
for (ReplicationSourceInterface source : this.sources.values()) {
stats.append("Normal source for cluster " + source.getPeerId() + ": ");
Expand Down Expand Up @@ -1070,4 +1071,80 @@ MetricsReplicationGlobalSourceSource getGlobalMetrics() {
ReplicationQueueStorage getQueueStorage() {
return queueStorage;
}

/**
* Acquire the buffer quota for {@link Entry} which is added to {@link WALEntryBatch}.
* @param entry the wal entry which is added to {@link WALEntryBatch} and should acquire buffer
* quota.
* @return true if we should clear buffer and push all
*/
boolean acquireWALEntryBufferQuota(WALEntryBatch walEntryBatch, Entry entry) {
long entrySize = walEntryBatch.incrementUsedBufferSize(entry);
return this.acquireBufferQuota(entrySize);
}

/**
* To release the buffer quota of {@link WALEntryBatch} which acquired by
* {@link ReplicationSourceManager#acquireWALEntryBufferQuota}.
* @return the released buffer quota size.
*/
long releaseWALEntryBatchBufferQuota(WALEntryBatch walEntryBatch) {
long usedBufferSize = walEntryBatch.getUsedBufferSize();
if (usedBufferSize > 0) {
this.releaseBufferQuota(usedBufferSize);
}
return usedBufferSize;
}

/**
* Add the size to {@link ReplicationSourceManager#totalBufferUsed} and check if it exceeds
* {@link ReplicationSourceManager#totalBufferLimit}.
* @return true if {@link ReplicationSourceManager#totalBufferUsed} exceeds
* {@link ReplicationSourceManager#totalBufferLimit},we should stop increase buffer and
* ship all.
*/
boolean acquireBufferQuota(long size) {
if (size < 0) {
throw new IllegalArgumentException("size should not less than 0");
}
long newBufferUsed = addTotalBufferUsed(size);
return newBufferUsed >= totalBufferLimit;
}

/**
* To release the buffer quota which acquired by
* {@link ReplicationSourceManager#acquireBufferQuota}.
*/
void releaseBufferQuota(long size) {
if (size < 0) {
throw new IllegalArgumentException("size should not less than 0");
}
addTotalBufferUsed(-size);
}

private long addTotalBufferUsed(long size) {
if (size == 0) {
return totalBufferUsed.get();
}
long newBufferUsed = totalBufferUsed.addAndGet(size);
// Record the new buffer usage
this.globalMetrics.setWALReaderEditsBufferBytes(newBufferUsed);
return newBufferUsed;
}

/**
* Check if {@link ReplicationSourceManager#totalBufferUsed} exceeds
* {@link ReplicationSourceManager#totalBufferLimit} for peer.
* @return true if {@link ReplicationSourceManager#totalBufferUsed} not more than
* {@link ReplicationSourceManager#totalBufferLimit}.
*/
boolean checkBufferQuota(String peerId) {
// try not to go over total quota
if (totalBufferUsed.get() > totalBufferLimit) {
LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
peerId, totalBufferUsed.get(), totalBufferLimit);
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.LongAccumulator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
Expand Down Expand Up @@ -150,18 +149,6 @@ private void noMoreData() {
protected void postFinish() {
}

/**
* get batchEntry size excludes bulk load file sizes. Uses ReplicationSourceWALReader's static
* method.
*/
private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
int totalSize = 0;
for (Entry entry : entryBatch.getWalEntries()) {
totalSize += ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(entry);
}
return totalSize;
}

/**
* Do the shipping logic
*/
Expand All @@ -173,7 +160,6 @@ private void shipEdits(WALEntryBatch entryBatch) {
return;
}
int currentSize = (int) entryBatch.getHeapSize();
int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
source.getSourceMetrics()
.setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime());
while (isActive()) {
Expand Down Expand Up @@ -217,7 +203,7 @@ private void shipEdits(WALEntryBatch entryBatch) {
// this sizeExcludeBulkLoad has to use same calculation that when calling
// acquireBufferQuota() in ReplicationSourceWALReader because they maintain
// same variable: totalBufferUsed
source.postShipEdits(entries, sizeExcludeBulkLoad);
source.postShipEdits(entries, entryBatch.getUsedBufferSize());
// FIXME check relationship between wal group and overall
source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
entryBatch.getNbHFiles());
Expand Down Expand Up @@ -366,20 +352,17 @@ void clearWALEntryBatch() {
return;
}
}
LongAccumulator totalToDecrement = new LongAccumulator((a, b) -> a + b, 0);
entryReader.entryBatchQueue.forEach(w -> {
entryReader.entryBatchQueue.remove(w);
w.getWalEntries().forEach(e -> {
long entrySizeExcludeBulkLoad = ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(e);
totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
});
});
long totalReleasedBytes = 0;
while (true) {
WALEntryBatch batch = entryReader.entryBatchQueue.poll();
if (batch == null) {
break;
}
totalReleasedBytes += source.getSourceManager().releaseWALEntryBatchBufferQuota(batch);
}
if (LOG.isTraceEnabled()) {
LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
totalToDecrement.longValue());
totalReleasedBytes);
}
long newBufferUsed =
source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -35,7 +34,6 @@
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
Expand Down Expand Up @@ -75,9 +73,6 @@ class ReplicationSourceWALReader extends Thread {

// Indicates whether this particular worker is running
private boolean isReaderRunning = true;

private AtomicLong totalBufferUsed;
private long totalBufferQuota;
private final String walGroupId;

/**
Expand Down Expand Up @@ -105,8 +100,6 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
// memory used will be batchSizeCapacity * (nb.batches + 1)
// the +1 is for the current thread reading before placing onto the queue
int batchCount = conf.getInt("replication.source.nb.batches", 1);
this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
// 1 second
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
// 5 minutes @ 1 sec per
Expand Down Expand Up @@ -147,7 +140,7 @@ public void run() {
Threads.sleep(sleepForRetries);
continue;
}
if (!checkQuota()) {
if (!checkBufferQuota()) {
continue;
}
Path currentPath = entryStream.getCurrentPath();
Expand Down Expand Up @@ -188,7 +181,7 @@ public void run() {
// batch is not put to ReplicationSourceWALReader#entryBatchQueue,so we should
// decrease ReplicationSourceWALReader.totalBufferUsed by the byte size which
// acquired in ReplicationSourceWALReader.acquireBufferQuota.
this.releaseBufferQuota(batch);
this.getSourceManager().releaseWALEntryBatchBufferQuota(batch);
}
}
}
Expand Down Expand Up @@ -218,10 +211,9 @@ protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) {
entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId());
updateReplicationMarkerEdit(entry, batch.getLastWalPosition());
long entrySize = getEntrySizeIncludeBulkLoad(entry);
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
batch.addEntry(entry, entrySize);
updateBatchStats(batch, entry, entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(batch, entrySizeExcludeBulkLoad);
boolean totalBufferTooLarge = this.getSourceManager().acquireWALEntryBufferQuota(batch, entry);

// Stop if too many entries or too big
return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
Expand Down Expand Up @@ -275,11 +267,9 @@ public Path getCurrentPath() {
}

// returns false if we've already exceeded the global quota
private boolean checkQuota() {
private boolean checkBufferQuota() {
// try not to go over total quota
if (totalBufferUsed.get() > totalBufferQuota) {
LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) {
Threads.sleep(sleepForRetries);
return false;
}
Expand Down Expand Up @@ -319,13 +309,7 @@ public WALEntryBatch poll(long timeout) throws InterruptedException {

private long getEntrySizeIncludeBulkLoad(Entry entry) {
WALEdit edit = entry.getEdit();
return getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
}

public static long getEntrySizeExcludeBulkLoad(Entry entry) {
WALEdit edit = entry.getEdit();
WALKey key = entry.getKey();
return edit.heapSize() + key.estimatedSerializedSizeOf();
return WALEntryBatch.getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
}

private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) {
Expand Down Expand Up @@ -435,30 +419,6 @@ private void updateReplicationMarkerEdit(Entry entry, long offset) {
edit.setCells(newCells);
}

/**
* @param size delta size for grown buffer
* @return true if we should clear buffer and push all
*/
private boolean acquireBufferQuota(WALEntryBatch walEntryBatch, long size) {
long newBufferUsed = totalBufferUsed.addAndGet(size);
// Record the new buffer usage
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
walEntryBatch.incrementUsedBufferSize(size);
return newBufferUsed >= totalBufferQuota;
}

/**
* To release the buffer quota of {@link WALEntryBatch} which acquired by
* {@link ReplicationSourceWALReader#acquireBufferQuota}
*/
private void releaseBufferQuota(WALEntryBatch walEntryBatch) {
long usedBufferSize = walEntryBatch.getUsedBufferSize();
if (usedBufferSize > 0) {
long newBufferUsed = totalBufferUsed.addAndGet(-usedBufferSize);
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
}
}

/** Returns whether the reader thread is running */
public boolean isReaderRunning() {
return isReaderRunning && !isInterrupted();
Expand All @@ -470,4 +430,8 @@ public boolean isReaderRunning() {
public void setReaderRunning(boolean readerRunning) {
this.isReaderRunning = readerRunning;
}

private ReplicationSourceManager getSourceManager() {
return this.source.getSourceManager();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public long getNextSleepInterval(final int size) {
* Add current size to the current cycle's total push size
* @param size is the current size added to the current cycle's total push size
*/
public void addPushSize(final int size) {
public void addPushSize(final long size) {
if (this.enabled) {
this.cyclePushSize += size;
}
Expand Down
Loading

0 comments on commit 269586c

Please sign in to comment.