Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27785 Encapsulate and centralize totalBufferUsed in Replication… #5168

Merged
merged 7 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
new ConcurrentHashMap<>();

private AtomicLong totalBufferUsed;

public static final String WAIT_ON_ENDPOINT_SECONDS =
"hbase.replication.wait.on.endpoint.seconds";
public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
Expand Down Expand Up @@ -220,7 +218,6 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
currentBandwidth = getCurrentBandwidth();
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
this.totalBufferUsed = manager.getTotalBufferUsed();
this.walFileLengthProvider = walFileLengthProvider;

this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true);
Expand Down Expand Up @@ -779,14 +776,12 @@ public MetricsSource getSourceMetrics() {

@Override
// offsets totalBufferUsed by deducting shipped batchSize.
public void postShipEdits(List<Entry> entries, int batchSize) {
public void postShipEdits(List<Entry> entries, long batchSize) {
if (throttler.isEnabled()) {
throttler.addPushSize(batchSize);
}
totalReplicatedEdits.addAndGet(entries.size());
long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
// Record the new buffer usage
this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
this.manager.addTotalBufferUsed(-batchSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ default boolean isSyncReplication() {
* @param entries pushed
* @param batchSize entries size pushed
*/
void postShipEdits(List<Entry> entries, int batchSize);
void postShipEdits(List<Entry> entries, long batchSize);

/**
* The queue of WALs only belong to one region server. This will return the server name which all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -984,8 +984,8 @@ Set<Path> getLastestPath() {
}
}

public AtomicLong getTotalBufferUsed() {
return totalBufferUsed;
public long getTotalBufferUsed() {
return totalBufferUsed.get();
}

/**
Expand Down Expand Up @@ -1035,7 +1035,7 @@ public String getStats() {
StringBuilder stats = new StringBuilder();
// Print stats that apply across all Replication Sources
stats.append("Global stats: ");
stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=")
stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed()).append("B, Limit=")
.append(getTotalBufferLimit()).append("B\n");
for (ReplicationSourceInterface source : this.sources.values()) {
stats.append("Normal source for cluster " + source.getPeerId() + ": ");
Expand Down Expand Up @@ -1070,4 +1070,21 @@ MetricsReplicationGlobalSourceSource getGlobalMetrics() {
ReplicationQueueStorage getQueueStorage() {
return queueStorage;
}

boolean addTotalBufferUsed(long size) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments to mention the meaning of the return value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After checking the later code, I think here we'd better introduce two methods, one is acquire, which returns a boolean, the other is release, which returns nothing. And they could both call a private method, where we change the totalBufferUsed, and record the new usage in metrics.
This will be much clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Apache9 ,thanks for suggestion, I would make it more clearer.

long newBufferUsed = totalBufferUsed.addAndGet(size);
// Record the new buffer usage
this.globalMetrics.setWALReaderEditsBufferBytes(newBufferUsed);
return newBufferUsed >= totalBufferLimit;
}

boolean checkBufferQuota(String peerId) {
// try not to go over total quota
if (totalBufferUsed.get() > totalBufferLimit) {
LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
peerId, totalBufferUsed.get(), totalBufferLimit);
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,6 @@ private void noMoreData() {
protected void postFinish() {
}

/**
* get batchEntry size excludes bulk load file sizes. Uses ReplicationSourceWALReader's static
* method.
*/
private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
int totalSize = 0;
for (Entry entry : entryBatch.getWalEntries()) {
totalSize += ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(entry);
}
return totalSize;
}

/**
* Do the shipping logic
*/
Expand All @@ -173,7 +161,6 @@ private void shipEdits(WALEntryBatch entryBatch) {
return;
}
int currentSize = (int) entryBatch.getHeapSize();
int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
source.getSourceMetrics()
.setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime());
while (isActive()) {
Expand Down Expand Up @@ -217,7 +204,7 @@ private void shipEdits(WALEntryBatch entryBatch) {
// this sizeExcludeBulkLoad has to use same calculation that when calling
// acquireBufferQuota() in ReplicationSourceWALReader because they maintain
// same variable: totalBufferUsed
source.postShipEdits(entries, sizeExcludeBulkLoad);
source.postShipEdits(entries, entryBatch.getUsedBufferSize());
// FIXME check relationship between wal group and overall
source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
entryBatch.getNbHFiles());
Expand Down Expand Up @@ -378,8 +365,6 @@ void clearWALEntryBatch() {
LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
totalToDecrement.longValue());
}
long newBufferUsed =
source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
source.getSourceManager().addTotalBufferUsed(-totalToDecrement.longValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -75,9 +74,6 @@ class ReplicationSourceWALReader extends Thread {

// Indicates whether this particular worker is running
private boolean isReaderRunning = true;

private AtomicLong totalBufferUsed;
private long totalBufferQuota;
private final String walGroupId;

/**
Expand Down Expand Up @@ -105,8 +101,6 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
// memory used will be batchSizeCapacity * (nb.batches + 1)
// the +1 is for the current thread reading before placing onto the queue
int batchCount = conf.getInt("replication.source.nb.batches", 1);
this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
// 1 second
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
// 5 minutes @ 1 sec per
Expand Down Expand Up @@ -147,7 +141,7 @@ public void run() {
Threads.sleep(sleepForRetries);
continue;
}
if (!checkQuota()) {
if (!checkBufferQuota()) {
continue;
}
Path currentPath = entryStream.getCurrentPath();
Expand Down Expand Up @@ -275,11 +269,9 @@ public Path getCurrentPath() {
}

// returns false if we've already exceeded the global quota
private boolean checkQuota() {
private boolean checkBufferQuota() {
// try not to go over total quota
if (totalBufferUsed.get() > totalBufferQuota) {
LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) {
Threads.sleep(sleepForRetries);
return false;
}
Expand Down Expand Up @@ -440,11 +432,9 @@ private void updateReplicationMarkerEdit(Entry entry, long offset) {
* @return true if we should clear buffer and push all
*/
private boolean acquireBufferQuota(WALEntryBatch walEntryBatch, long size) {
long newBufferUsed = totalBufferUsed.addAndGet(size);
// Record the new buffer usage
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
walEntryBatch.incrementUsedBufferSize(size);
return newBufferUsed >= totalBufferQuota;
return this.getSourceManager().addTotalBufferUsed(size);

}

/**
Expand All @@ -454,8 +444,7 @@ private boolean acquireBufferQuota(WALEntryBatch walEntryBatch, long size) {
private void releaseBufferQuota(WALEntryBatch walEntryBatch) {
long usedBufferSize = walEntryBatch.getUsedBufferSize();
if (usedBufferSize > 0) {
long newBufferUsed = totalBufferUsed.addAndGet(-usedBufferSize);
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
this.getSourceManager().addTotalBufferUsed(-usedBufferSize);
}
}

Expand All @@ -470,4 +459,8 @@ public boolean isReaderRunning() {
public void setReaderRunning(boolean readerRunning) {
this.isReaderRunning = readerRunning;
}

private ReplicationSourceManager getSourceManager() {
return this.source.getSourceManager();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public long getNextSleepInterval(final int size) {
* Add current size to the current cycle's total push size
* @param size is the current size added to the current cycle's total push size
*/
public void addPushSize(final int size) {
public void addPushSize(final long size) {
if (this.enabled) {
this.cyclePushSize += size;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void tryThrottle(int batchSize) throws InterruptedException {
}

@Override
public void postShipEdits(List<Entry> entries, int batchSize) {
public void postShipEdits(List<Entry> entries, long batchSize) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,18 +294,21 @@ public void testWALKeySerialization() throws Exception {
Assert.assertEquals(key.getReplicationScopes(), deserializedKey.getReplicationScopes());
}

private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
when(mockSourceManager.getTotalBufferLimit())
.thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf)
throws IOException {
ReplicationSourceManager mockSourceManager = new ReplicationSourceManager(null, null, conf,
null, null, null, null, null, null, null, createMockGlobalMetrics());
Server mockServer = Mockito.mock(Server.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceManager()).thenReturn(mockSourceManager);
when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
when(source.getWALFileLengthProvider()).thenReturn(log);
when(source.getServer()).thenReturn(mockServer);
when(source.isRecovered()).thenReturn(recovered);
return source;
}

private MetricsReplicationGlobalSourceSource createMockGlobalMetrics() {
MetricsReplicationGlobalSourceSource globalMetrics =
Mockito.mock(MetricsReplicationGlobalSourceSource.class);
final AtomicLong bufferUsedCounter = new AtomicLong(0);
Expand All @@ -315,12 +318,11 @@ private ReplicationSource mockReplicationSource(boolean recovered, Configuration
}).when(globalMetrics).setWALReaderEditsBufferBytes(Mockito.anyLong());
when(globalMetrics.getWALReaderEditsBufferBytes())
.then(invocationOnMock -> bufferUsedCounter.get());

when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
return source;
return globalMetrics;
}

private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {
private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf)
throws IOException {
ReplicationSource source = mockReplicationSource(recovered, conf);
when(source.isPeerEnabled()).thenReturn(true);
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0,
Expand All @@ -330,7 +332,7 @@ private ReplicationSourceWALReader createReader(boolean recovered, Configuration
}

private ReplicationSourceWALReader createReaderWithBadReplicationFilter(int numFailures,
Configuration conf) {
Configuration conf) throws IOException {
ReplicationSource source = mockReplicationSource(false, conf);
when(source.isPeerEnabled()).thenReturn(true);
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0,
Expand Down Expand Up @@ -667,12 +669,7 @@ public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception
appendEntries(writer1, 3);
localLogQueue.enqueueLog(log1, fakeWalGroupId);

ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
// Make it look like the source is from recovered source.
when(mockSourceManager.getOldSources())
.thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface) source)));
when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
// Override the max retries multiplier to fail fast.
conf.setInt("replication.source.maxretriesmultiplier", 1);
conf.setBoolean("replication.source.eof.autorecovery", true);
Expand Down Expand Up @@ -784,10 +781,8 @@ public void testEOFExceptionInOldWALsDirectory() throws Exception {
// make sure the size of the wal file is 0.
assertEquals(0, fs.getFileStatus(archivePath).getLen());

ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));

Configuration localConf = new Configuration(CONF);
localConf.setInt("replication.source.maxretriesmultiplier", 1);
Expand Down Expand Up @@ -838,7 +833,7 @@ public void testReplicationSourceWALReaderWithPartialWALEntryFailingFilter() thr
assertEquals(position, entryBatch.getLastWalPosition());
assertEquals(walPath, entryBatch.getLastWalPath());
assertEquals(3, entryBatch.getNbRowKeys());
assertEquals(sum, source.getSourceManager().getTotalBufferUsed().get());
assertEquals(sum, source.getSourceManager().getTotalBufferUsed());
assertEquals(sum, source.getSourceManager().getGlobalMetrics().getWALReaderEditsBufferBytes());
assertEquals(maxThrowExceptionCount, walEntryFilter.getThrowExceptionCount());
assertNull(reader.poll(10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
Expand Down Expand Up @@ -137,10 +136,9 @@ public void testQuota() throws IOException {
Thread watcher = new Thread(() -> {
Replication replication = (Replication) utility1.getMiniHBaseCluster().getRegionServer(0)
.getReplicationSourceService();
AtomicLong bufferUsed = replication.getReplicationManager().getTotalBufferUsed();
testQuotaPass = true;
while (!Thread.interrupted()) {
long size = bufferUsed.get();
long size = replication.getReplicationManager().getTotalBufferUsed();
if (size > 0) {
testQuotaNonZero = true;
}
Expand Down
Loading