Skip to content

Commit

Permalink
PHOENIX-4839 IndexHalfStoreFileReaderGenerator throws NullPointerExce…
Browse files Browse the repository at this point in the history
…ption(Aman Poonia)
  • Loading branch information
ankitsinghal committed Aug 21, 2018
1 parent f60d11e commit 3b9a108
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,11 @@ public byte[] getSplitRow() {
public boolean isTop() {
return top;
}

@Override
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt,
long scannerOrder, boolean canOptimizeForNonNullColumn) {
return new LocalIndexStoreFileScanner(this, getScanner(cacheBlocks, pread, isCompaction), true,
getHFileReader().hasMVCCInfo(), readPt, scannerOrder, canOptimizeForNonNullColumn);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,11 @@
*/
package org.apache.hadoop.hbase.regionserver;

import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;

import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -153,7 +148,9 @@ public Reader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironmen
try {
conn = QueryUtil.getConnectionOnServer(ctx.getEnvironment().getConfiguration()).unwrap(
PhoenixConnection.class);
PTable dataTable = IndexUtil.getPDataTable(conn, ctx.getEnvironment().getRegion().getTableDesc());
PTable dataTable =
IndexUtil.getPDataTable(conn, ctx.getEnvironment().getRegion()
.getTableDesc());
List<PTable> indexes = dataTable.getIndexes();
Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers =
new HashMap<ImmutableBytesWritable, IndexMaintainer>();
Expand Down Expand Up @@ -187,19 +184,13 @@ public Reader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironmen
return reader;
}

@SuppressWarnings("deprecation")
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
long earliestPutTs, InternalScanner s, CompactionRequest request) throws IOException {
public InternalScanner preCompactScannerOpen(
org.apache.hadoop.hbase.coprocessor.ObserverContext<RegionCoprocessorEnvironment> c, Store store,
java.util.List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s, CompactionRequest request) throws IOException {

if (!IndexUtil.isLocalIndexStore(store)) { return s; }
Scan scan = null;
if (s!=null) {
scan = ((StoreScanner)s).scan;
} else {
scan = new Scan();
scan.setMaxVersions(store.getFamily().getMaxVersions());
}
if (!store.hasReferences()) {
InternalScanner repairScanner = null;
if (request.isMajor() && (!RepairUtil.isLocalIndexStoreFilesConsistent(c.getEnvironment(), store))) {
Expand All @@ -220,23 +211,7 @@ public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEn
return s;
}
}
List<StoreFileScanner> newScanners = new ArrayList<StoreFileScanner>(scanners.size());
boolean scanUsePread = c.getEnvironment().getConfiguration().getBoolean("hbase.storescanner.use.pread", scan.isSmall());
for(KeyValueScanner scanner: scanners) {
Reader reader = ((StoreFileScanner) scanner).getReader();
if (reader instanceof IndexHalfStoreFileReader) {
newScanners.add(new LocalIndexStoreFileScanner(reader, reader.getScanner(
scan.getCacheBlocks(), scanUsePread, false), true, reader.getHFileReader()
.hasMVCCInfo(), store.getSmallestReadPoint(), scanner.getScannerOrder(), false));
} else {
newScanners.add(((StoreFileScanner) scanner));
}
}
if (s!=null) {
s.close();
}
return new StoreScanner(store, store.getScanInfo(), scan, newScanners,
scanType, store.getSmallestReadPoint(), earliestPutTs);
return s;
}

private byte[][] getViewConstants(PTable dataTable) {
Expand Down Expand Up @@ -310,93 +285,5 @@ private InternalScanner getRepairScanner(RegionCoprocessorEnvironment env, Store
}
}

@Override
public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
final KeyValueScanner s) throws IOException {
if (store.getFamily().getNameAsString()
.startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)
&& store.hasReferences()) {
final long readPt = c.getEnvironment().getRegion().getReadpoint(scan.getIsolationLevel
());
if (s!=null) {
s.close();
}
if (!scan.isReversed()) {
return new StoreScanner(store, store.getScanInfo(), scan,
targetCols, readPt) {

@Override
protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
if (store.hasReferences()) {
return getLocalIndexScanners(c, store, scan, readPt);
} else {
return super.getScannersNoCompaction();
}
}
};
} else {
return new ReversedStoreScanner(store, store.getScanInfo(), scan,
targetCols, readPt) {
@Override
protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
if (store.hasReferences()) {
return getLocalIndexScanners(c, store, scan, readPt);
} else {
return super.getScannersNoCompaction();
}
}
};
}
}
return s;
}

private List<KeyValueScanner> getLocalIndexScanners(final
ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final Scan scan, final long readPt) throws IOException {

boolean scanUsePread = c.getEnvironment().getConfiguration().getBoolean("hbase.storescanner.use.pread", scan.isSmall());
Collection<StoreFile> storeFiles = store.getStorefiles();
List<StoreFile> nonReferenceStoreFiles = new ArrayList<>(store.getStorefiles().size());
List<StoreFile> referenceStoreFiles = new ArrayList<>(store.getStorefiles().size
());
final List<KeyValueScanner> keyValueScanners = new ArrayList<>(store
.getStorefiles().size() + 1);
byte[] startKey = c.getEnvironment().getRegionInfo().getStartKey();
byte[] endKey = c.getEnvironment().getRegionInfo().getEndKey();
// If the region start key is not the prefix of the scan start row then we can return empty
// scanners. This is possible during merge where one of the child region scan should not return any
// results as we go through merged region.
int prefixLength =
scan.getAttribute(SCAN_START_ROW_SUFFIX) == null ? (startKey.length == 0 ? endKey.length
: startKey.length) : (scan.getStartRow().length - scan.getAttribute(SCAN_START_ROW_SUFFIX).length);
if (Bytes.compareTo(scan.getStartRow(), 0, prefixLength, (startKey.length == 0 ? new byte[endKey.length] : startKey), 0,
startKey.length == 0 ? endKey.length : startKey.length) != 0) {
return keyValueScanners;
}
for (StoreFile storeFile : storeFiles) {
if (storeFile.isReference()) {
referenceStoreFiles.add(storeFile);
} else {
nonReferenceStoreFiles.add(storeFile);
}
}
final List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(nonReferenceStoreFiles, scan.getCacheBlocks(), scanUsePread, readPt);
keyValueScanners.addAll(scanners);
for (StoreFile sf : referenceStoreFiles) {
long scannerOrder = 0;
if (sf.getReader() instanceof IndexHalfStoreFileReader) {
keyValueScanners.add(new LocalIndexStoreFileScanner(sf.getReader(), sf.getReader()
.getScanner(scan.getCacheBlocks(), scanUsePread, false), true, sf
.getReader().getHFileReader().hasMVCCInfo(), readPt, scannerOrder++, false));
} else {
keyValueScanners.add(new StoreFileScanner(sf.getReader(), sf.getReader()
.getScanner(scan.getCacheBlocks(), scanUsePread, false), true, sf
.getReader().getHFileReader().hasMVCCInfo(), readPt, scannerOrder++, false));
}
}
keyValueScanners.addAll(((HStore) store).memstore.getScanners(readPt));
return keyValueScanners;
}

}

0 comments on commit 3b9a108

Please sign in to comment.