Skip to content

Commit

Permalink
HBASE-18815 We need to pass something like CompactionRequest in CP to…
Browse files Browse the repository at this point in the history
… give user some information about the compaction

CompactionRequest was removed from CP in HBASE-18453, this change reintroduces
CompatcionRequest to CP as a read-only interface called CompactionRequest.
The CompactionRequest class is renamed to CompactionRequestImpl.

Additionally, this change removes selectionTimeInNanos from CompactionRequest and
uses selectionTime as a replacement. This means that CompactionRequest:toString
is modified and compare as well.

Signed-off-by: Michael Stack <stack@apache.org>
  • Loading branch information
petersomogyi authored and saintstack committed Oct 3, 2017
1 parent 5026539 commit 0af61dc
Show file tree
Hide file tree
Showing 43 changed files with 378 additions and 258 deletions.
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -228,7 +229,8 @@ public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvi
@Override @Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException { InternalScanner s, CompactionLifeCycleTracker tracker, CompactionRequest request,
long readPoint) throws IOException {
ScanInfo scanInfo = getScanInfo(store, c.getEnvironment()); ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
if (scanInfo == null) { if (scanInfo == null) {
// take default action // take default action
Expand Down
Expand Up @@ -56,6 +56,7 @@
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileReader; import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
Expand Down Expand Up @@ -187,9 +188,11 @@ default void postFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store st
* @param store the store where compaction is being requested * @param store the store where compaction is being requested
* @param candidates the store files currently available for compaction * @param candidates the store files currently available for compaction
* @param tracker tracker used to track the life cycle of a compaction * @param tracker tracker used to track the life cycle of a compaction
* @param request the requested compaction
*/ */
default void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, default void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker) throws IOException {} List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {}


/** /**
* Called after the {@link StoreFile}s to compact have been selected from the available * Called after the {@link StoreFile}s to compact have been selected from the available
Expand All @@ -198,9 +201,11 @@ default void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c
* @param store the store being compacted * @param store the store being compacted
* @param selected the store files selected to compact * @param selected the store files selected to compact
* @param tracker tracker used to track the life cycle of a compaction * @param tracker tracker used to track the life cycle of a compaction
* @param request the requested compaction
*/ */
default void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, default void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
ImmutableList<? extends StoreFile> selected, CompactionLifeCycleTracker tracker) {} ImmutableList<? extends StoreFile> selected, CompactionLifeCycleTracker tracker,
CompactionRequest request) {}


/** /**
* Called prior to writing the {@link StoreFile}s selected for compaction into a new * Called prior to writing the {@link StoreFile}s selected for compaction into a new
Expand All @@ -221,11 +226,13 @@ default void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment>
* @param scanner the scanner over existing data used in the store file rewriting * @param scanner the scanner over existing data used in the store file rewriting
* @param scanType type of Scan * @param scanType type of Scan
* @param tracker tracker used to track the life cycle of a compaction * @param tracker tracker used to track the life cycle of a compaction
* @param request the requested compaction
* @return the scanner to use during compaction. Should not be {@code null} unless the * @return the scanner to use during compaction. Should not be {@code null} unless the
* implementation is writing new store files on its own. * implementation is writing new store files on its own.
*/ */
default InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, default InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker) InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request)
throws IOException { throws IOException {
return scanner; return scanner;
} }
Expand All @@ -245,13 +252,15 @@ default InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment>
* files * files
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
* @param tracker used to track the life cycle of a compaction * @param tracker used to track the life cycle of a compaction
* @param request the requested compaction
* @param readPoint the readpoint to create scanner * @param readPoint the readpoint to create scanner
* @return the scanner to use during compaction. {@code null} if the default implementation is to * @return the scanner to use during compaction. {@code null} if the default implementation is to
* be used. * be used.
*/ */
default InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, default InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException { InternalScanner s, CompactionLifeCycleTracker tracker, CompactionRequest request,
long readPoint) throws IOException {
return s; return s;
} }


Expand All @@ -261,9 +270,11 @@ default InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorE
* @param store the store being compacted * @param store the store being compacted
* @param resultFile the new store file written out during compaction * @param resultFile the new store file written out during compaction
* @param tracker used to track the life cycle of a compaction * @param tracker used to track the life cycle of a compaction
* @param request the requested compaction
*/ */
default void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, default void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
StoreFile resultFile, CompactionLifeCycleTracker tracker) throws IOException {} StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request)
throws IOException {}


/** /**
* Called before the region is reported as closed to the master. * Called before the region is reported as closed to the master.
Expand Down Expand Up @@ -320,7 +331,7 @@ default void postGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
* coprocessors * coprocessors
* @param c the environment provided by the region server * @param c the environment provided by the region server
* @param get the Get request * @param get the Get request
* @param exists * @param exists the result returned by the region server
* @return the value to return to the client if bypassing default processing * @return the value to return to the client if bypassing default processing
*/ */
default boolean preExists(ObserverContext<RegionCoprocessorEnvironment> c, Get get, default boolean preExists(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
Expand Down Expand Up @@ -799,8 +810,8 @@ default RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironmen
* <p> * <p>
* See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} * See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)}
* and {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, * and {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
* InternalScanner, CompactionLifeCycleTracker, long)} to override scanners created for flushes * InternalScanner, CompactionLifeCycleTracker, CompactionRequest, long)} to override scanners
* or compactions, resp. * created for flushes or compactions, resp.
* <p> * <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained coprocessors. * Call CoprocessorEnvironment#complete to skip any subsequent chained coprocessors.
* Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
Expand Down
Expand Up @@ -32,7 +32,6 @@
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HMobStore; import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStore;
Expand All @@ -41,17 +40,17 @@
import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.ShipperListener; import org.apache.hadoop.hbase.regionserver.ShipperListener;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;


/** /**
* Compact passed set of files in the mob-enabled column family. * Compact passed set of files in the mob-enabled column family.
Expand All @@ -66,7 +65,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
private final InternalScannerFactory scannerFactory = new InternalScannerFactory() { private final InternalScannerFactory scannerFactory = new InternalScannerFactory() {


@Override @Override
public ScanType getScanType(CompactionRequest request) { public ScanType getScanType(CompactionRequestImpl request) {
// retain the delete markers until they are expired. // retain the delete markers until they are expired.
return ScanType.COMPACT_RETAIN_DELETES; return ScanType.COMPACT_RETAIN_DELETES;
} }
Expand Down Expand Up @@ -105,7 +104,7 @@ public DefaultMobStoreCompactor(Configuration conf, HStore store) {
} }


@Override @Override
public List<Path> compact(CompactionRequest request, ThroughputController throughputController, public List<Path> compact(CompactionRequestImpl request, ThroughputController throughputController,
User user) throws IOException { User user) throws IOException {
return compact(request, scannerFactory, writerFactory, throughputController, user); return compact(request, scannerFactory, writerFactory, throughputController, user);
} }
Expand Down
Expand Up @@ -43,7 +43,7 @@
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
Expand Down Expand Up @@ -393,7 +393,7 @@ public int getRegionSplitLimit() {
private static final Comparator<Runnable> COMPARATOR = private static final Comparator<Runnable> COMPARATOR =
new Comparator<Runnable>() { new Comparator<Runnable>() {


private int compare(CompactionRequest r1, CompactionRequest r2) { private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) {
if (r1 == r2) { if (r1 == r2) {
return 0; //they are the same request return 0; //they are the same request
} }
Expand All @@ -402,7 +402,7 @@ private int compare(CompactionRequest r1, CompactionRequest r2) {
if (cmp != 0) { if (cmp != 0) {
return cmp; return cmp;
} }
cmp = Long.compare(r1.getSelectionNanoTime(), r2.getSelectionNanoTime()); cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime());
if (cmp != 0) { if (cmp != 0) {
return cmp; return cmp;
} }
Expand Down
Expand Up @@ -23,9 +23,9 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor; import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor;
Expand Down Expand Up @@ -81,7 +81,7 @@ public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction
} }


@Override @Override
public void forceSelect(CompactionRequest request) { public void forceSelect(CompactionRequestImpl request) {
if (!(request instanceof DateTieredCompactionRequest)) { if (!(request instanceof DateTieredCompactionRequest)) {
throw new IllegalArgumentException("DateTieredCompactionRequest is expected. Actual: " throw new IllegalArgumentException("DateTieredCompactionRequest is expected. Actual: "
+ request.getClass().getCanonicalName()); + request.getClass().getCanonicalName());
Expand Down
Expand Up @@ -82,7 +82,7 @@
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
Expand Down Expand Up @@ -1349,9 +1349,9 @@ public List<HStoreFile> compact(CompactionContext compaction,
ThroughputController throughputController, User user) throws IOException { ThroughputController throughputController, User user) throws IOException {
assert compaction != null; assert compaction != null;
List<HStoreFile> sfs = null; List<HStoreFile> sfs = null;
CompactionRequest cr = compaction.getRequest(); CompactionRequestImpl cr = compaction.getRequest();
try { try {
// Do all sanity checking in here if we have a valid CompactionRequest // Do all sanity checking in here if we have a valid CompactionRequestImpl
// because we need to clean up after it on the way out in a finally // because we need to clean up after it on the way out in a finally
// block below // block below
long compactionStartTime = EnvironmentEdgeManager.currentTime(); long compactionStartTime = EnvironmentEdgeManager.currentTime();
Expand Down Expand Up @@ -1387,7 +1387,7 @@ public List<HStoreFile> compact(CompactionContext compaction,
return sfs; return sfs;
} }
// Do the steps necessary to complete the compaction. // Do the steps necessary to complete the compaction.
sfs = moveCompatedFilesIntoPlace(cr, newFiles, user); sfs = moveCompactedFilesIntoPlace(cr, newFiles, user);
writeCompactionWalRecord(filesToCompact, sfs); writeCompactionWalRecord(filesToCompact, sfs);
replaceStoreFiles(filesToCompact, sfs); replaceStoreFiles(filesToCompact, sfs);
if (cr.isMajor()) { if (cr.isMajor()) {
Expand Down Expand Up @@ -1417,14 +1417,14 @@ public List<HStoreFile> compact(CompactionContext compaction,
} }
} }


private List<HStoreFile> moveCompatedFilesIntoPlace(CompactionRequest cr, List<Path> newFiles, private List<HStoreFile> moveCompactedFilesIntoPlace(CompactionRequestImpl cr, List<Path> newFiles,
User user) throws IOException { User user) throws IOException {
List<HStoreFile> sfs = new ArrayList<>(newFiles.size()); List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
for (Path newFile : newFiles) { for (Path newFile : newFiles) {
assert newFile != null; assert newFile != null;
HStoreFile sf = moveFileIntoPlace(newFile); HStoreFile sf = moveFileIntoPlace(newFile);
if (this.getCoprocessorHost() != null) { if (this.getCoprocessorHost() != null) {
getCoprocessorHost().postCompact(this, sf, cr.getTracker(), user); getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
} }
assert sf != null; assert sf != null;
sfs.add(sf); sfs.add(sf);
Expand Down Expand Up @@ -1483,7 +1483,7 @@ void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreF
* @param compactionStartTime Start time. * @param compactionStartTime Start time.
*/ */
private void logCompactionEndMessage( private void logCompactionEndMessage(
CompactionRequest cr, List<HStoreFile> sfs, long now, long compactionStartTime) { CompactionRequestImpl cr, List<HStoreFile> sfs, long now, long compactionStartTime) {
StringBuilder message = new StringBuilder( StringBuilder message = new StringBuilder(
"Completed" + (cr.isMajor() ? " major" : "") + " compaction of " "Completed" + (cr.isMajor() ? " major" : "") + " compaction of "
+ cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in " + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in "
Expand Down Expand Up @@ -1625,7 +1625,7 @@ public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOExcepti
// Move the compaction into place. // Move the compaction into place.
HStoreFile sf = moveFileIntoPlace(newFile); HStoreFile sf = moveFileIntoPlace(newFile);
if (this.getCoprocessorHost() != null) { if (this.getCoprocessorHost() != null) {
this.getCoprocessorHost().postCompact(this, sf, null, null); this.getCoprocessorHost().postCompact(this, sf, null, null, null);
} }
replaceStoreFiles(filesToCompact, Collections.singletonList(sf)); replaceStoreFiles(filesToCompact, Collections.singletonList(sf));
completeCompaction(filesToCompact); completeCompaction(filesToCompact);
Expand Down Expand Up @@ -1674,19 +1674,20 @@ public Optional<CompactionContext> requestCompaction(int priority,
removeUnneededFiles(); removeUnneededFiles();


final CompactionContext compaction = storeEngine.createCompaction(); final CompactionContext compaction = storeEngine.createCompaction();
CompactionRequest request = null; CompactionRequestImpl request = null;
this.lock.readLock().lock(); this.lock.readLock().lock();
try { try {
synchronized (filesCompacting) { synchronized (filesCompacting) {
// First, see if coprocessor would want to override selection. // First, see if coprocessor would want to override selection.
if (this.getCoprocessorHost() != null) { if (this.getCoprocessorHost() != null) {
final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting); final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
boolean override = false; boolean override = false;
//TODO: is it correct way to get CompactionRequest?
override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc, override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
tracker, user); tracker, null, user);
if (override) { if (override) {
// Coprocessor is overriding normal file selection. // Coprocessor is overriding normal file selection.
compaction.forceSelect(new CompactionRequest(candidatesForCoproc)); compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc));
} }
} }


Expand All @@ -1712,7 +1713,8 @@ public Optional<CompactionContext> requestCompaction(int priority,
} }
if (this.getCoprocessorHost() != null) { if (this.getCoprocessorHost() != null) {
this.getCoprocessorHost().postCompactSelection( this.getCoprocessorHost().postCompactSelection(
this, ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, user); this, ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker,
compaction.getRequest(), user);
} }
// Finally, we have the resulting files list. Check if we have any files at all. // Finally, we have the resulting files list. Check if we have any files at all.
request = compaction.getRequest(); request = compaction.getRequest();
Expand Down Expand Up @@ -1790,7 +1792,7 @@ public void cancelRequestedCompaction(CompactionContext compaction) {
finishCompactionRequest(compaction.getRequest()); finishCompactionRequest(compaction.getRequest());
} }


private void finishCompactionRequest(CompactionRequest cr) { private void finishCompactionRequest(CompactionRequestImpl cr) {
this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize()); this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
if (cr.isOffPeak()) { if (cr.isOffPeak()) {
offPeakCompactionTracker.set(false); offPeakCompactionTracker.set(false);
Expand Down

0 comments on commit 0af61dc

Please sign in to comment.