Skip to content

Commit

Permalink
HBASE-18826 Use HStore instead of Store in our own code base and remo…
Browse files Browse the repository at this point in the history
…ve unnecessary methods in Store interface
  • Loading branch information
Apache9 committed Sep 28, 2017
1 parent 0cf15fa commit 7f4c3b3
Show file tree
Hide file tree
Showing 62 changed files with 762 additions and 869 deletions.
Expand Up @@ -583,6 +583,11 @@ public ColumnFamilyDescriptorBuilder setValue(final byte[] key, final byte[] val
return this;
}

public ColumnFamilyDescriptorBuilder setValue(final String key, final String value) {
desc.setValue(key, value);
return this;
}

/**
* An ModifyableFamilyDescriptor contains information about a column family such as the
* number of versions, compression settings, etc.
Expand Down
Expand Up @@ -203,7 +203,7 @@ protected ScanInfo getScanInfo(Store store, RegionCoprocessorEnvironment e) {
if (data == null) {
return null;
}
ScanInfo oldSI = store.getScanInfo();
ScanInfo oldSI = ((HStore) store).getScanInfo();
if (oldSI.getTtl() == Long.MAX_VALUE) {
return null;
}
Expand Down
Expand Up @@ -15,9 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.regionserver;

import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
Expand Down Expand Up @@ -161,12 +162,12 @@ private void compactStoreFiles(final Path tableDir, final TableDescriptor htd,
}
do {
Optional<CompactionContext> compaction =
store.requestCompaction(Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null);
store.requestCompaction(PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null);
if (!compaction.isPresent()) {
break;
}
List<HStoreFile> storeFiles =
store.compact(compaction.get(), NoLimitThroughputController.INSTANCE);
store.compact(compaction.get(), NoLimitThroughputController.INSTANCE, null);
if (storeFiles != null && !storeFiles.isEmpty()) {
if (keepCompactedFiles && deleteCompacted) {
for (HStoreFile storeFile: storeFiles) {
Expand Down
Expand Up @@ -85,7 +85,7 @@
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.testclassification.LargeTests;
Expand Down Expand Up @@ -1258,7 +1258,7 @@ public void testExcludeAllFromMinorCompaction() throws Exception {
public Boolean call() throws Exception {
List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
for (HRegion region : regions) {
for (Store store : region.getStores()) {
for (HStore store : region.getStores()) {
store.closeAndArchiveCompactedFiles();
}
}
Expand All @@ -1277,7 +1277,7 @@ public Boolean call() throws Exception {
public Boolean call() throws Exception {
List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
for (HRegion region : regions) {
for (Store store : region.getStores()) {
for (HStore store : region.getStores()) {
store.closeAndArchiveCompactedFiles();
}
}
Expand Down
Expand Up @@ -105,7 +105,7 @@ protected boolean shouldSplit() {
return false;
}

for (Store store: region.getStores()) {
for (HStore store: region.getStores()) {
if (!store.canSplit()) {
return false;
}
Expand Down
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.hadoop.hbase.regionserver;

import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY;
import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
Expand All @@ -35,7 +38,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
Expand All @@ -45,12 +47,14 @@
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.StealJobQueue;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;

/**
* Compact region on request and then run split if appropriate
Expand Down Expand Up @@ -195,7 +199,7 @@ public String dumpQueue() {

public synchronized boolean requestSplit(final Region r) {
// don't split regions that are blocking
if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) {
if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= PRIORITY_USER) {
byte[] midKey = ((HRegion)r).checkSplit();
if (midKey != null) {
requestSplit(r, midKey);
Expand Down Expand Up @@ -298,13 +302,13 @@ private void requestCompactionInternal(HRegion region, HStore store, String why,
}

public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException {
requestCompactionInternal(region, why, Store.NO_PRIORITY, false,
requestCompactionInternal(region, why, NO_PRIORITY, false,
CompactionLifeCycleTracker.DUMMY, null);
}

public synchronized void requestSystemCompaction(HRegion region, HStore store, String why)
throws IOException {
requestCompactionInternal(region, store, why, Store.NO_PRIORITY, false,
requestCompactionInternal(region, store, why, NO_PRIORITY, false,
CompactionLifeCycleTracker.DUMMY, null);
}

Expand Down
Expand Up @@ -24,11 +24,8 @@
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;

Expand Down Expand Up @@ -95,12 +92,11 @@ public void chore() {
if (LOG.isTraceEnabled()) {
LOG.trace("Started compacted hfiles cleaner on " + region.getRegionInfo());
}
for (Store store : region.getStores()) {
for (HStore store : ((HRegion) region).getStores()) {
try {
if (useExecutor && regionServerServices != null) {
CompactedHFilesDischargeHandler handler = new CompactedHFilesDischargeHandler(
(Server) regionServerServices, EventType.RS_COMPACTED_FILES_DISCHARGER,
(HStore) store);
(Server) regionServerServices, EventType.RS_COMPACTED_FILES_DISCHARGER, store);
regionServerServices.getExecutorService().submit(handler);
} else {
// call synchronously if the RegionServerServices are not
Expand Down
Expand Up @@ -72,7 +72,7 @@ protected boolean shouldSplit() {
boolean force = region.shouldForceSplit();
boolean foundABigStore = false;

for (Store store : region.getStores()) {
for (HStore store : region.getStores()) {
// If any of the stores are unable to split (eg they contain reference files)
// then don't split
if ((!store.canSplit())) {
Expand Down
Expand Up @@ -78,11 +78,11 @@ protected long getFlushSizeLowerBound(HRegion region) {
}

protected boolean shouldFlush(HStore store) {
if (store.getSizeOfMemStore().getDataSize() > this.flushSizeLowerBound) {
if (store.getMemStoreSize().getDataSize() > this.flushSizeLowerBound) {
if (LOG.isDebugEnabled()) {
LOG.debug("Flush Column Family " + store.getColumnFamilyName() + " of " +
region.getRegionInfo().getEncodedName() + " because memstoreSize=" +
store.getSizeOfMemStore().getDataSize() + " > lower bound="
store.getMemStoreSize().getDataSize() + " > lower bound="
+ this.flushSizeLowerBound);
}
return true;
Expand Down
Expand Up @@ -1011,13 +1011,13 @@ public HStore call() throws IOException {
hasSloppyStores = true;
}

long storeMaxSequenceId = store.getMaxSequenceId();
long storeMaxSequenceId = store.getMaxSequenceId().orElse(0L);
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
storeMaxSequenceId);
if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
maxSeqId = storeMaxSequenceId;
}
long maxStoreMemstoreTS = store.getMaxMemstoreTS();
long maxStoreMemstoreTS = store.getMaxMemstoreTS().orElse(0L);
if (maxStoreMemstoreTS > maxMemstoreTS) {
maxMemstoreTS = maxStoreMemstoreTS;
}
Expand Down Expand Up @@ -1645,7 +1645,7 @@ private Map<byte[], List<HStoreFile>> doClose(boolean abort, MonitoredTask statu

// close each store in parallel
for (HStore store : stores.values()) {
MemstoreSize flushableSize = store.getSizeToFlush();
MemstoreSize flushableSize = store.getFlushableSize();
if (!(abort || flushableSize.getDataSize() == 0 || writestate.readOnly)) {
if (getRegionServerServices() != null) {
getRegionServerServices().abort("Assertion failed while closing store "
Expand Down Expand Up @@ -1717,7 +1717,7 @@ public Pair<byte[], Collection<HStoreFile>> call() throws IOException {
}

private long getMemstoreHeapSize() {
return stores.values().stream().mapToLong(s -> s.getSizeOfMemStore().getHeapSize()).sum();
return stores.values().stream().mapToLong(s -> s.getMemStoreSize().getHeapSize()).sum();
}

@Override
Expand Down Expand Up @@ -2320,7 +2320,7 @@ boolean shouldFlush(final StringBuffer whyFlush) {
}
//since we didn't flush in the recent past, flush now if certain conditions
//are met. Return true on first such memstore hit.
for (Store s : stores.values()) {
for (HStore s : stores.values()) {
if (s.timeOfOldestEdit() < now - modifiedFlushCheckInterval) {
// we have an old enough edit in the memstore, flush
whyFlush.append(s.toString() + " has an old edit so flush to free WALs");
Expand Down Expand Up @@ -2481,7 +2481,7 @@ protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
}

for (HStore s : storesToFlush) {
MemstoreSize flushableSize = s.getSizeToFlush();
MemstoreSize flushableSize = s.getFlushableSize();
totalSizeOfFlushableStores.incMemstoreSize(flushableSize);
storeFlushCtxs.put(s.getColumnFamilyDescriptor().getName(), s.createFlushContext(flushOpSeqId));
committedFiles.put(s.getColumnFamilyDescriptor().getName(), null); // for writing stores to WAL
Expand Down Expand Up @@ -2529,7 +2529,7 @@ private void logFatLineOnFlush(Collection<HStore> storesToFlush, long sequenceId
for (HStore store: storesToFlush) {
perCfExtras.append("; ").append(store.getColumnFamilyName());
perCfExtras.append("=")
.append(StringUtils.byteDesc(store.getSizeToFlush().getDataSize()));
.append(StringUtils.byteDesc(store.getFlushableSize().getDataSize()));
}
}
LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() +
Expand Down Expand Up @@ -4836,7 +4836,7 @@ private MemstoreSize dropMemstoreContentsForSeqId(long seqId, HStore store) thro

private MemstoreSize doDropStoreMemstoreContentsForSeqId(HStore s, long currentSeqId)
throws IOException {
MemstoreSize flushableSize = s.getSizeToFlush();
MemstoreSize flushableSize = s.getFlushableSize();
this.decrMemstoreSize(flushableSize);
StoreFlushContext ctx = s.createFlushContext(currentSeqId);
ctx.prepare();
Expand Down Expand Up @@ -4933,7 +4933,7 @@ void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOExce
continue;
}

long storeSeqId = store.getMaxSequenceId();
long storeSeqId = store.getMaxSequenceId().orElse(0L);
List<String> storeFiles = storeDescriptor.getStoreFileList();
try {
store.refreshStoreFiles(storeFiles); // replace the files with the new ones
Expand All @@ -4943,7 +4943,7 @@ void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOExce
+ " doesn't exist any more. Skip loading the file(s)", ex);
continue;
}
if (store.getMaxSequenceId() != storeSeqId) {
if (store.getMaxSequenceId().orElse(0L) != storeSeqId) {
// Record latest flush time if we picked up new files
lastStoreFlushTimeMap.put(store, EnvironmentEdgeManager.currentTime());
}
Expand All @@ -4954,7 +4954,7 @@ void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOExce
StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
null : this.prepareFlushResult.storeFlushCtxs.get(family);
if (ctx != null) {
MemstoreSize snapshotSize = store.getSizeToFlush();
MemstoreSize snapshotSize = store.getFlushableSize();
ctx.abort();
this.decrMemstoreSize(snapshotSize);
this.prepareFlushResult.storeFlushCtxs.remove(family);
Expand Down Expand Up @@ -5085,7 +5085,7 @@ private void dropPrepareFlushIfPossible() {
if (store == null) {
continue;
}
if (store.getSizeOfSnapshot().getDataSize() > 0) {
if (store.getSnapshotSize().getDataSize() > 0) {
canDrop = false;
break;
}
Expand Down Expand Up @@ -5129,12 +5129,12 @@ protected boolean refreshStoreFiles(boolean force) throws IOException {
for (HStore store : stores.values()) {
// TODO: some stores might see new data from flush, while others do not which
// MIGHT break atomic edits across column families.
long maxSeqIdBefore = store.getMaxSequenceId();
long maxSeqIdBefore = store.getMaxSequenceId().orElse(0L);

// refresh the store files. This is similar to observing a region open wal marker.
store.refreshStoreFiles();

long storeSeqId = store.getMaxSequenceId();
long storeSeqId = store.getMaxSequenceId().orElse(0L);
if (storeSeqId < smallestSeqIdInStores) {
smallestSeqIdInStores = storeSeqId;
}
Expand All @@ -5148,7 +5148,7 @@ protected boolean refreshStoreFiles(boolean force) throws IOException {
null : this.prepareFlushResult.storeFlushCtxs.get(
store.getColumnFamilyDescriptor().getName());
if (ctx != null) {
MemstoreSize snapshotSize = store.getSizeToFlush();
MemstoreSize snapshotSize = store.getFlushableSize();
ctx.abort();
this.decrMemstoreSize(snapshotSize);
this.prepareFlushResult.storeFlushCtxs.remove(
Expand All @@ -5169,7 +5169,7 @@ protected boolean refreshStoreFiles(boolean force) throws IOException {
// advance the mvcc read point so that the new flushed files are visible.
// either greater than flush seq number or they were already picked up via flush.
for (HStore s : stores.values()) {
mvcc.advanceTo(s.getMaxMemstoreTS());
mvcc.advanceTo(s.getMaxMemstoreTS().orElse(0L));
}


Expand Down Expand Up @@ -8074,7 +8074,7 @@ void throwException(String title, String regionName) {
for (HStore s : stores.values()) {
buf.append(s.getColumnFamilyDescriptor().getNameAsString());
buf.append(" size: ");
buf.append(s.getSizeOfMemStore().getDataSize());
buf.append(s.getMemStoreSize().getDataSize());
buf.append(" ");
}
buf.append("end-of-stores");
Expand Down
Expand Up @@ -1787,7 +1787,7 @@ protected void chore() {
// Queue a compaction. Will recognize if major is needed.
this.instance.compactSplitThread.requestSystemCompaction(hr, s,
getName() + " requests compaction");
} else if (s.isMajorCompaction()) {
} else if (s.shouldPerformMajorCompaction()) {
s.triggerMajorCompaction();
if (majorCompactPriority == DEFAULT_PRIORITY ||
majorCompactPriority > hr.getCompactPriority()) {
Expand Down

0 comments on commit 7f4c3b3

Please sign in to comment.