Skip to content

Commit

Permalink
HBASE-24382 Flush partial stores of region filtered by seqId when arc… (
Browse files Browse the repository at this point in the history
#1737)

* HBASE-24382 Flush partial stores of region filtered by seqId when archive wal due to too many wals

* fix checkstyle and javadoc issue

* fix javadoc issues

* move the geting of stores to HRegion, since it should not be part of FlushPolicy, and comment fix

* fix checkstyle issue

* add some comment

* remove the forceFlushAllStores since we can use families to determine how to select stores to flush

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: stack <stack@duboce.net>
  • Loading branch information
bsglz committed Jun 26, 2020
1 parent 84e246f commit c046120
Show file tree
Hide file tree
Showing 20 changed files with 251 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.HConstants.HREGION_OLDLOGDIR_NAME;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -39,8 +40,8 @@
* roller logic by our own.
* <p/>
* We can reuse most of the code for normal wal roller, the only difference is that there is only
* one region, so in {@link #scheduleFlush(String)} method we can just schedule flush for the master
* local region.
* one region, so in {@link #scheduleFlush(String, List)} method we can just schedule flush
* for the master local region.
*/
@InterfaceAudience.Private
public final class MasterRegionWALRoller extends AbstractWALRoller<Abortable> {
Expand Down Expand Up @@ -79,7 +80,7 @@ protected void afterRoll(WAL wal) {
}

@Override
protected void scheduleFlush(String encodedRegionName) {
protected void scheduleFlush(String encodedRegionName, List<byte[]> families) {
MasterRegionFlusherAndCompactor flusher = this.flusherAndCompactor;
if (flusher != null) {
flusher.requestFlush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,4 @@ protected void configureForRegion(HRegion region) {
* @return the stores need to be flushed.
*/
public abstract Collection<HStore> selectStoresToFlush();

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.hadoop.hbase.regionserver;

import java.util.List;

import org.apache.yetus.audience.InterfaceAudience;

/**
Expand All @@ -30,22 +32,28 @@ public interface FlushRequester {
* Tell the listener the cache needs to be flushed.
*
* @param region the Region requesting the cache flush
* @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log
* rolling.
* @return true if our region is added into the queue, false otherwise
*/
boolean requestFlush(HRegion region, boolean forceFlushAllStores, FlushLifeCycleTracker tracker);
boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker);

/**
* Tell the listener the cache needs to be flushed.
*
* @param region the Region requesting the cache flush
* @param families stores of region to flush, if null then use flush policy
* @return true if our region is added into the queue, false otherwise
*/
boolean requestFlush(HRegion region, List<byte[]> families,
FlushLifeCycleTracker tracker);

/**
* Tell the listener the cache needs to be flushed after a delay
*
* @param region the Region requesting the cache flush
* @param delay after how much time should the flush happen
* @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log
* rolling.
* @return true if our region is added into the queue, false otherwise
*/
boolean requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores);
boolean requestDelayedFlush(HRegion region, long delay);

/**
* Register a FlushRequestListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2353,16 +2353,16 @@ public boolean compact(CompactionContext compaction, HStore store,
*
* <p>This method may block for some time, so it should not be called from a
* time-sensitive thread.
* @param force whether we want to force a flush of all stores
* @param flushAllStores whether we want to force a flush of all stores
* @return FlushResult indicating whether the flush was successful or not and if
* the region needs compacting
*
* @throws IOException general io exceptions
* because a snapshot was not properly persisted.
*/
// TODO HBASE-18905. We might have to expose a requestFlush API for CPs
public FlushResult flush(boolean force) throws IOException {
return flushcache(force, false, FlushLifeCycleTracker.DUMMY);
public FlushResult flush(boolean flushAllStores) throws IOException {
return flushcache(flushAllStores, false, FlushLifeCycleTracker.DUMMY);
}

public interface FlushResult {
Expand All @@ -2385,6 +2385,16 @@ enum Result {
boolean isCompactionNeeded();
}

public FlushResultImpl flushcache(boolean flushAllStores, boolean writeFlushRequestWalMarker,
FlushLifeCycleTracker tracker) throws IOException {
List families = null;
if (flushAllStores) {
families = new ArrayList();
families.addAll(this.getTableDescriptor().getColumnFamilyNames());
}
return this.flushcache(families, writeFlushRequestWalMarker, tracker);
}

/**
* Flush the cache.
*
Expand All @@ -2398,7 +2408,7 @@ enum Result {
*
* <p>This method may block for some time, so it should not be called from a
* time-sensitive thread.
* @param forceFlushAllStores whether we want to flush all stores
* @param families stores of region to flush.
* @param writeFlushRequestWalMarker whether to write the flush request marker to WAL
* @param tracker used to track the life cycle of this flush
* @return whether the flush is success and whether the region needs compacting
Expand All @@ -2408,8 +2418,8 @@ enum Result {
* because a Snapshot was not properly persisted. The region is put in closing mode, and the
* caller MUST abort after this.
*/
public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker,
FlushLifeCycleTracker tracker) throws IOException {
public FlushResultImpl flushcache(List<byte[]> families,
boolean writeFlushRequestWalMarker, FlushLifeCycleTracker tracker) throws IOException {
// fail-fast instead of waiting on the lock
if (this.closing.get()) {
String msg = "Skipping flush on " + this + " because closing";
Expand Down Expand Up @@ -2456,8 +2466,15 @@ public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlus
}

try {
Collection<HStore> specificStoresToFlush =
forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
// The reason that we do not always use flushPolicy is, when the flush is
// caused by logRoller, we should select stores which must be flushed
// rather than could be flushed.
Collection<HStore> specificStoresToFlush = null;
if (families != null) {
specificStoresToFlush = getSpecificStores(families);
} else {
specificStoresToFlush = flushPolicy.selectStoresToFlush();
}
FlushResultImpl fs =
internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker, tracker);

Expand Down Expand Up @@ -2487,6 +2504,19 @@ public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlus
}
}

/**
* get stores which matches the specified families
*
* @return the stores need to be flushed.
*/
private Collection<HStore> getSpecificStores(List<byte[]> families) {
Collection<HStore> specificStoresToFlush = new ArrayList<>();
for (byte[] family : families) {
specificStoresToFlush.add(stores.get(family));
}
return specificStoresToFlush;
}

/**
* Should the store be flushed because it is old enough.
* <p>
Expand Down Expand Up @@ -8962,7 +8992,7 @@ private void requestFlush0(FlushLifeCycleTracker tracker) {
}
if (shouldFlush) {
// Make request outside of synchronize block; HBASE-818.
this.rsServices.getFlushRequester().requestFlush(this, false, tracker);
this.rsServices.getFlushRequester().requestFlush(this, tracker);
if (LOG.isDebugEnabled()) {
LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1840,7 +1840,7 @@ protected void chore() {
//Throttle the flushes by putting a delay. If we don't throttle, and there
//is a balanced write-load on the regions in a table, we might end up
//overwhelming the filesystem with too many flushes at once.
if (requester.requestDelayedFlush(r, randomDelay, false)) {
if (requester.requestDelayedFlush(r, randomDelay)) {
LOG.info("{} requesting flush of {} because {} after random delay {} ms",
getName(), r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(),
randomDelay);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.hadoop.hbase.regionserver;

import java.util.List;
import java.util.Map;

import org.apache.hadoop.hbase.wal.AbstractWALRoller;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -45,7 +47,7 @@ public LogRoller(RegionServerServices services) {
super("LogRoller", services.getConfiguration(), services);
}

protected void scheduleFlush(String encodedRegionName) {
protected void scheduleFlush(String encodedRegionName, List<byte[]> families) {
RegionServerServices services = this.abortable;
HRegion r = (HRegion) services.getRegion(encodedRegionName);
if (r == null) {
Expand All @@ -58,8 +60,8 @@ protected void scheduleFlush(String encodedRegionName) {
encodedRegionName, r);
return;
}
// force flushing all stores to clean old logs
requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY);
// flush specified stores to clean old logs
requester.requestFlush(r, families, FlushLifeCycleTracker.DUMMY);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ private boolean flushOneForGlobalPressure(FlushType flushType) {
server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) +
", Region memstore size=" +
TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1));
flushedOne = flushRegion(regionToFlush, true, false, FlushLifeCycleTracker.DUMMY);
flushedOne = flushRegion(regionToFlush, true, null, FlushLifeCycleTracker.DUMMY);

if (!flushedOne) {
LOG.info("Excluding unflushable region " + regionToFlush +
Expand Down Expand Up @@ -458,13 +458,18 @@ private FlushType isAboveLowWaterMark() {
}

@Override
public boolean requestFlush(HRegion r, boolean forceFlushAllStores,
FlushLifeCycleTracker tracker) {
public boolean requestFlush(HRegion r, FlushLifeCycleTracker tracker) {
return this.requestFlush(r, null, tracker);
}

@Override
public boolean requestFlush(HRegion r, List<byte[]> families,
FlushLifeCycleTracker tracker) {
synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) {
// This entry has no delay so it will be added at the top of the flush
// queue. It'll come out near immediately.
FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores, tracker);
FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
r.incrementFlushesQueuedCount();
Expand All @@ -477,12 +482,12 @@ public boolean requestFlush(HRegion r, boolean forceFlushAllStores,
}

@Override
public boolean requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) {
public boolean requestDelayedFlush(HRegion r, long delay) {
synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) {
// This entry has some delay
FlushRegionEntry fqe =
new FlushRegionEntry(r, forceFlushAllStores, FlushLifeCycleTracker.DUMMY);
new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY);
fqe.requeue(delay);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
Expand Down Expand Up @@ -581,7 +586,7 @@ private boolean flushRegion(final FlushRegionEntry fqe) {
return true;
}
}
return flushRegion(region, false, fqe.isForceFlushAllStores(), fqe.getTracker());
return flushRegion(region, false, fqe.families, fqe.getTracker());
}

/**
Expand All @@ -591,13 +596,13 @@ private boolean flushRegion(final FlushRegionEntry fqe) {
* needs to be removed from the flush queue. If false, when we were called
* from the main flusher run loop and we got the entry to flush by calling
* poll on the flush queue (which removed it).
* @param forceFlushAllStores whether we want to flush all store.
* @param families stores of region to flush.
* @return true if the region was successfully flushed, false otherwise. If
* false, there will be accompanying log messages explaining why the region was
* not flushed.
*/
private boolean flushRegion(HRegion region, boolean emergencyFlush, boolean forceFlushAllStores,
FlushLifeCycleTracker tracker) {
private boolean flushRegion(HRegion region, boolean emergencyFlush,
List<byte[]> families, FlushLifeCycleTracker tracker) {
synchronized (this.regionsInQueue) {
FlushRegionEntry fqe = this.regionsInQueue.remove(region);
// Use the start time of the FlushRegionEntry if available
Expand All @@ -612,7 +617,7 @@ private boolean flushRegion(HRegion region, boolean emergencyFlush, boolean forc
lock.readLock().lock();
try {
notifyFlushRequest(region, emergencyFlush);
FlushResult flushResult = region.flushcache(forceFlushAllStores, false, tracker);
FlushResult flushResult = region.flushcache(families, false, tracker);
boolean shouldCompact = flushResult.isCompactionNeeded();
// We just want to check the size
boolean shouldSplit = region.checkSplit() != null;
Expand Down Expand Up @@ -845,15 +850,16 @@ static class FlushRegionEntry implements FlushQueueEntry {
private long whenToExpire;
private int requeueCount = 0;

private final boolean forceFlushAllStores;
private final List<byte[]> families;

private final FlushLifeCycleTracker tracker;

FlushRegionEntry(final HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) {
FlushRegionEntry(final HRegion r, List<byte[]> families,
FlushLifeCycleTracker tracker) {
this.region = r;
this.createTime = EnvironmentEdgeManager.currentTime();
this.whenToExpire = this.createTime;
this.forceFlushAllStores = forceFlushAllStores;
this.families = families;
this.tracker = tracker;
}

Expand All @@ -873,13 +879,6 @@ public int getRequeueCount() {
return this.requeueCount;
}

/**
* @return whether we need to flush all stores.
*/
public boolean isForceFlushAllStores() {
return forceFlushAllStores;
}

public FlushLifeCycleTracker getTracker() {
return tracker;
}
Expand Down
Loading

0 comments on commit c046120

Please sign in to comment.