Skip to content

Commit

Permalink
PHOENIX-4318 Fix IndexHalfStoreFileReader and related classes(Rajeshb…
Browse files Browse the repository at this point in the history
…abu)
  • Loading branch information
chrajeshbabu committed Nov 17, 2017
1 parent 5bdd3b2 commit 8ab7cc1
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 171 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@


import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;


import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.Reference;
Expand All @@ -47,7 +49,7 @@
* This file is not splitable. Calls to {@link #midkey()} return null. * This file is not splitable. Calls to {@link #midkey()} return null.
*/ */


public class IndexHalfStoreFileReader extends StoreFile.Reader { public class IndexHalfStoreFileReader extends StoreFileReader {
private final boolean top; private final boolean top;
// This is the key we split around. Its the first possible entry on a row: // This is the key we split around. Its the first possible entry on a row:
// i.e. empty column and a timestamp of LATEST_TIMESTAMP. // i.e. empty column and a timestamp of LATEST_TIMESTAMP.
Expand All @@ -56,7 +58,7 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader {
private final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers; private final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers;
private final byte[][] viewConstants; private final byte[][] viewConstants;
private final int offset; private final int offset;
private final HRegionInfo regionInfo; private final RegionInfo regionInfo;
private final byte[] regionStartKeyInHFile; private final byte[] regionStartKeyInHFile;


/** /**
Expand All @@ -78,9 +80,10 @@ public IndexHalfStoreFileReader(final FileSystem fs, final Path p, final CacheCo
final FSDataInputStreamWrapper in, long size, final Reference r, final FSDataInputStreamWrapper in, long size, final Reference r,
final Configuration conf, final Configuration conf,
final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers, final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers,
final byte[][] viewConstants, final HRegionInfo regionInfo, final byte[][] viewConstants, final RegionInfo regionInfo,
byte[] regionStartKeyInHFile, byte[] splitKey) throws IOException { byte[] regionStartKeyInHFile, byte[] splitKey, boolean primaryReplicaStoreFile) throws IOException {
super(fs, p, in, size, cacheConf, conf); super(fs, p, in, size, cacheConf, primaryReplicaStoreFile, new AtomicInteger(0), false,
conf);
this.splitkey = splitKey == null ? r.getSplitKey() : splitKey; this.splitkey = splitKey == null ? r.getSplitKey() : splitKey;
// Is it top or bottom half? // Is it top or bottom half?
this.top = Reference.isTopFileRegion(r.getFileRegion()); this.top = Reference.isTopFileRegion(r.getFileRegion());
Expand All @@ -104,7 +107,7 @@ public Map<ImmutableBytesWritable, IndexMaintainer> getIndexMaintainers() {
return indexMaintainers; return indexMaintainers;
} }


public HRegionInfo getRegionInfo() { public RegionInfo getRegionInfo() {
return regionInfo; return regionInfo;
} }


Expand All @@ -123,4 +126,12 @@ public byte[] getSplitRow() {
public boolean isTop() { public boolean isTop() {
return top; return top;
} }

@Override
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread,
boolean isCompaction, long readPt, long scannerOrder,
boolean canOptimizeForNonNullColumn) {
return new LocalIndexStoreFileScanner(this, cacheBlocks, pread, isCompaction, readPt,
scannerOrder, canOptimizeForNonNullColumn);
}
} }

Large diffs are not rendered by default.

Original file line number Original file line Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;


import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.RegionObserver;


public class LocalIndexSplitter extends BaseRegionObserver { public class LocalIndexSplitter implements RegionObserver {


} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@


import java.io.IOException; import java.io.IOException;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Optional;


import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.util.PhoenixKeyValueUtil; import org.apache.phoenix.util.PhoenixKeyValueUtil;
Expand All @@ -36,10 +37,13 @@ public class LocalIndexStoreFileScanner extends StoreFileScanner{


private IndexHalfStoreFileReader reader; private IndexHalfStoreFileReader reader;
private boolean changeBottomKeys; private boolean changeBottomKeys;
public LocalIndexStoreFileScanner(Reader reader, HFileScanner hfs, boolean useMVCC, @SuppressWarnings("deprecation")
boolean hasMVCC, long readPt) { public LocalIndexStoreFileScanner(IndexHalfStoreFileReader reader, boolean cacheBlocks, boolean pread,
super(reader, hfs, useMVCC, hasMVCC, readPt); boolean isCompaction, long readPt, long scannerOrder,
this.reader = ((IndexHalfStoreFileReader)super.getReader()); boolean canOptimizeForNonNullColumn) {
super(reader, reader.getScanner(cacheBlocks, pread, isCompaction), true, reader
.getHFileReader().hasMVCCInfo(), readPt, scannerOrder, canOptimizeForNonNullColumn);
this.reader = reader;
this.changeBottomKeys = this.changeBottomKeys =
this.reader.getRegionInfo().getStartKey().length == 0 this.reader.getRegionInfo().getStartKey().length == 0
&& this.reader.getSplitRow().length != this.reader.getOffset(); && this.reader.getSplitRow().length != this.reader.getOffset();
Expand Down Expand Up @@ -114,13 +118,14 @@ public boolean reseek(Cell key) throws IOException {
public boolean seekToPreviousRow(Cell key) throws IOException { public boolean seekToPreviousRow(Cell key) throws IOException {
KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(key); KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(key);
if (reader.isTop()) { if (reader.isTop()) {
byte[] fk = reader.getFirstKey(); Optional<Cell> firstKey = reader.getFirstKey();
// This will be null when the file is empty in which we can not seekBefore to // This will be null when the file is empty in which we can not seekBefore to
// any key // any key
if (fk == null) { if (firstKey.isPresent()) {
return false; return false;
} }
if (getComparator().compare(kv.getRowArray(), kv.getKeyOffset(), kv.getKeyLength(), fk, 0, fk.length) <= 0) { byte[] fk = PhoenixKeyValueUtil.maybeCopyCell(firstKey.get()).getKey();
if (getComparator().compare(kv, firstKey.get()) <= 0) {
return super.seekToPreviousRow(key); return super.seekToPreviousRow(key);
} }
KeyValue replacedKey = getKeyPresentInHFiles(kv.getRowArray()); KeyValue replacedKey = getKeyPresentInHFiles(kv.getRowArray());
Expand All @@ -132,7 +137,8 @@ public boolean seekToPreviousRow(Cell key) throws IOException {
} else { } else {
// The equals sign isn't strictly necessary just here to be consistent with // The equals sign isn't strictly necessary just here to be consistent with
// seekTo // seekTo
if (getComparator().compare(kv.getRowArray(), kv.getKeyOffset(), kv.getKeyLength(), reader.getSplitkey(), 0, reader.getSplitkey().length) >= 0) { KeyValue splitKeyValue = KeyValueUtil.createKeyValueFromKey(reader.getSplitkey());
if (getComparator().compare(kv, splitKeyValue) >= 0) {
boolean seekToPreviousRow = super.seekToPreviousRow(kv); boolean seekToPreviousRow = super.seekToPreviousRow(kv);
while(super.peek()!=null && !isSatisfiedMidKeyCondition(super.peek())) { while(super.peek()!=null && !isSatisfiedMidKeyCondition(super.peek())) {
seekToPreviousRow = super.seekToPreviousRow(super.peek()); seekToPreviousRow = super.seekToPreviousRow(super.peek());
Expand Down Expand Up @@ -221,8 +227,9 @@ private KeyValue getKeyPresentInHFiles(byte[] key) {
public boolean seekOrReseek(Cell cell, boolean isSeek) throws IOException{ public boolean seekOrReseek(Cell cell, boolean isSeek) throws IOException{
KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(cell); KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(cell);
KeyValue keyToSeek = kv; KeyValue keyToSeek = kv;
KeyValue splitKeyValue = KeyValueUtil.createKeyValueFromKey(reader.getSplitkey());
if (reader.isTop()) { if (reader.isTop()) {
if(getComparator().compare(kv.getRowArray(), kv.getKeyOffset(), kv.getKeyLength(), reader.getSplitkey(), 0, reader.getSplitkey().length) < 0){ if(getComparator().compare(kv, splitKeyValue) < 0){
if(!isSeek && realSeekDone()) { if(!isSeek && realSeekDone()) {
return true; return true;
} }
Expand All @@ -231,7 +238,7 @@ public boolean seekOrReseek(Cell cell, boolean isSeek) throws IOException{
keyToSeek = getKeyPresentInHFiles(kv.getRowArray()); keyToSeek = getKeyPresentInHFiles(kv.getRowArray());
return seekOrReseekToProperKey(isSeek, keyToSeek); return seekOrReseekToProperKey(isSeek, keyToSeek);
} else { } else {
if (getComparator().compare(kv.getRowArray(), kv.getKeyOffset(), kv.getKeyLength(), reader.getSplitkey(), 0, reader.getSplitkey().length) >= 0) { if (getComparator().compare(kv, splitKeyValue) >= 0) {
close(); close();
return false; return false;
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,19 +19,20 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;




import java.util.List;

import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;


import java.util.List;

/** /**
* @ScannerContext has all methods package visible. To properly update the context progress for our scanners we * @ScannerContext has all methods package visible. To properly update the context progress for our scanners we
* need this helper * need this helper
*/ */
public class ScannerContextUtil { public class ScannerContextUtil {
public static void incrementSizeProgress(ScannerContext sc, List<Cell> cells) { public static void incrementSizeProgress(ScannerContext sc, List<Cell> cells) {
for (Cell cell : cells) { for (Cell cell : cells) {
sc.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell)); sc.incrementSizeProgress(CellUtil.estimatedSerializedSizeOf(cell),
CellUtil.estimatedHeapSizeOf(cell));
} }
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -1325,7 +1325,6 @@ public static byte[] serialize(List<Expression> selectExpressions) {
} }
} }



/* /*
* TODO: use waitForFlushes PHOENIX-4352 * TODO: use waitForFlushes PHOENIX-4352
*/ */
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
Expand All @@ -50,6 +49,7 @@
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
Expand Down Expand Up @@ -541,7 +541,7 @@ public static void wrapResultUsingOffset(final RegionCoprocessorEnvironment envi
} else { } else {
TableName dataTable = TableName dataTable =
TableName.valueOf(MetaDataUtil.getLocalIndexUserTableName( TableName.valueOf(MetaDataUtil.getLocalIndexUserTableName(
environment.getRegion().getTableDesc().getTableName().getNameAsString())); environment.getRegion().getTableDescriptor().getTableName().getNameAsString()));
Table table = null; Table table = null;
try { try {
table = environment.getConnection().getTable(dataTable); table = environment.getConnection().getTable(dataTable);
Expand Down Expand Up @@ -749,10 +749,10 @@ public static boolean matchingSplitKeys(byte[][] splitKeys1, byte[][] splitKeys2
} }


public static boolean isLocalIndexStore(Store store) { public static boolean isLocalIndexStore(Store store) {
return store.getFamily().getNameAsString().startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX); return store.getColumnFamilyDescriptor().getNameAsString().startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
} }


public static PTable getPDataTable(Connection conn, HTableDescriptor tableDesc) throws SQLException { public static PTable getPDataTable(Connection conn, TableDescriptor tableDesc) throws SQLException {
String dataTableName = Bytes.toString(tableDesc.getValue(MetaDataUtil.DATA_TABLE_NAME_PROP_BYTES)); String dataTableName = Bytes.toString(tableDesc.getValue(MetaDataUtil.DATA_TABLE_NAME_PROP_BYTES));
String physicalTableName = tableDesc.getTableName().getNameAsString(); String physicalTableName = tableDesc.getTableName().getNameAsString();
PTable pDataTable = null; PTable pDataTable = null;
Expand Down

0 comments on commit 8ab7cc1

Please sign in to comment.