Skip to content

Commit

Permalink
moved filter logic to the server side
Browse files Browse the repository at this point in the history
  • Loading branch information
yanxinyi committed Nov 20, 2020
1 parent 620df93 commit 6b5bb0e
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void testMixedCase() throws Exception {

String schemaName = rs.getString(TABLE_SCHEM);
long viewIndexId = rs.getLong(VIEW_INDEX_ID);
if (schemaName.equals(schema) && viewIndexId != 0) {
if (schemaName != null && schemaName.equals(schema) && viewIndexId != 0) {
int viewIndexIdDataType = rs.getInt(VIEW_INDEX_ID_DATA_TYPE);
String tableName = rs.getString(TABLE_NAME);
if (tableName.equals(viewIndexName1)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,18 +770,6 @@ protected QueryPlan compileSingleFlatQuery(
if(plan instanceof BaseQueryPlan){
((BaseQueryPlan) plan).setApplicable(isApplicable);
}
if (context.getScan().getFilter() != null &&
table.toString().equalsIgnoreCase(SYSTEM_CATALOG_NAME)) {
context.getScan().setAttribute(SYSCATA_COPROC_IGNORE_TAG,
TRUE_BYTES);
try {
projector.getColumnIndex(VIEW_INDEX_ID);
ScanUtil.andFilterAtBeginning(context.getScan(),
new SyscatViewIndexIdFilter());
} catch (Exception e) {
// VIEW_INDEX_ID is not presenting.
}
}
return plan;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,18 @@
*/
package org.apache.phoenix.coprocessor;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.util.KeyValueUtil;
import org.apache.phoenix.util.ViewIndexIdRetrieveUtil;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.filter.SyscatViewIndexIdFilter;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;

import java.io.IOException;
import java.sql.Types;
import java.util.Collections;
import java.util.List;

import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SPLITTABLE_SYSTEM_CATALOG;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE_BYTES;
import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
import static org.apache.phoenix.util.ViewIndexIdRetrieveUtil.NULL_DATA_TYPE_VALUE;
import static org.apache.phoenix.util.ViewIndexIdRetrieveUtil.SYSCATA_COPROC_IGNORE_TAG;
import static org.apache.phoenix.util.ViewIndexIdRetrieveUtil.VIEW_INDEX_ID_BIGINT_TYPE_PTR_LEN;
import static org.apache.phoenix.util.ViewIndexIdRetrieveUtil.VIEW_INDEX_ID_SMALLINT_TYPE_VALUE_LEN;
import static org.apache.phoenix.util.ScanUtil.UNKNOWN_CLIENT_VERSION;

/**
* Coprocessor that checks whether the VIEW_INDEX_ID needs to retrieve.
Expand All @@ -64,146 +43,12 @@ public class SyscatRegionObserver extends BaseRegionObserver {
}

@Override
public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
RegionScanner s) throws IOException {
return new SyscatRegionScanner(c.getEnvironment(), scan, s);
}

/**
* A region scanner that retrieve the right data type back to the client
*/
private static class SyscatRegionScanner implements RegionScanner {

private final RegionScanner scanner;
private final Scan scan;
private final Region region;
private final boolean ignore;

public SyscatRegionScanner(RegionCoprocessorEnvironment env, Scan scan,
RegionScanner scanner) throws IOException {
this.scan = scan;
this.scanner = scanner;
this.region = env.getRegion();
this.ignore = this.scan.getAttribute(SYSCATA_COPROC_IGNORE_TAG) != null;

byte[] txnScn = scan.getAttribute(BaseScannerRegionObserver.TX_SCN);
if (txnScn != null) {
TimeRange timeRange = scan.getTimeRange();
scan.setTimeRange(timeRange.getMin(), Bytes.toLong(txnScn));
}
}

@Override public int getBatch() {
return scanner.getBatch();
}

@Override public long getMaxResultSize() {
return scanner.getMaxResultSize();
}

@Override public boolean next(List<Cell> result) throws IOException {
return doNext(result, false);
}

@Override public boolean next(List<Cell> result, ScannerContext scannerContext)
throws IOException {
return doHBaseScanNext(result, false);
}

@Override public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
throws IOException {
return doHBaseScanNext(result, true);
}

// we don't want to modify the cell when we have the HBase scan
private boolean doHBaseScanNext(List<Cell> result, boolean raw) throws IOException {
boolean hasMore;
try {
hasMore = raw ? scanner.nextRaw(result) : scanner.next(result);
if (result.isEmpty()) {
return false;
}
}catch (Throwable t) {
ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
return false; // impossible
}

return hasMore;
}

@Override public void close() throws IOException {
scanner.close();
}

@Override public HRegionInfo getRegionInfo() {
return scanner.getRegionInfo();
}

@Override public boolean isFilterDone() throws IOException {
return scanner.isFilterDone();
}

@Override public boolean reseek(byte[] row) throws IOException {
return scanner.reseek(row);
}

@Override public long getMvccReadPoint() {
return scanner.getMvccReadPoint();
}

@Override public boolean nextRaw(List<Cell> result) throws IOException {
return doNext(result, true);
}

private boolean doNext(List<Cell> result, boolean raw) throws IOException {
boolean hasMore;
try {
hasMore = raw ? scanner.nextRaw(result) : scanner.next(result);
if (result.isEmpty()) {
return false;
}
// logic to change the cell
if (!ignore) {
Cell viewIndexIdCell = KeyValueUtil.getColumnLatest(
GenericKeyValueBuilder.INSTANCE, result,
DEFAULT_COLUMN_FAMILY_BYTES, VIEW_INDEX_ID_BYTES);
if (viewIndexIdCell != null) {
int type = NULL_DATA_TYPE_VALUE;
Cell viewIndexIdDataTypeCell = KeyValueUtil.getColumnLatest(
GenericKeyValueBuilder.INSTANCE, result,
DEFAULT_COLUMN_FAMILY_BYTES, VIEW_INDEX_ID_DATA_TYPE_BYTES);
if (viewIndexIdDataTypeCell != null) {
type = (Integer) PInteger.INSTANCE.toObject(
viewIndexIdDataTypeCell.getValueArray(),
viewIndexIdDataTypeCell.getValueOffset(),
viewIndexIdDataTypeCell.getValueLength(),
PInteger.INSTANCE,
SortOrder.ASC);
}
if (ScanUtil.getClientVersion(this.scan) < MIN_SPLITTABLE_SYSTEM_CATALOG) {
// pre-splittable client should always using SMALLINT
if (type == NULL_DATA_TYPE_VALUE && viewIndexIdCell.getValueLength() >
VIEW_INDEX_ID_SMALLINT_TYPE_VALUE_LEN) {
Cell keyValue = ViewIndexIdRetrieveUtil.
getViewIndexIdKeyValueInShortDataFormat(viewIndexIdCell);
Collections.replaceAll(result, viewIndexIdCell, keyValue);
}
} else {
// post-splittable client should always using BIGINT
if (type != Types.BIGINT && viewIndexIdCell.getValueLength() <
VIEW_INDEX_ID_BIGINT_TYPE_PTR_LEN) {
Cell keyValue = ViewIndexIdRetrieveUtil.
getViewIndexIdKeyValueInLongDataFormat(viewIndexIdCell);
Collections.replaceAll(result, viewIndexIdCell, keyValue);
}
}
}
}
} catch (Throwable t) {
ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
return false; // impossible
}
return hasMore;
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
RegionScanner s) throws IOException {
int clientVersion = ScanUtil.getClientVersion(scan);
if (clientVersion != UNKNOWN_CLIENT_VERSION) {
ScanUtil.andFilterAtBeginning(scan, new SyscatViewIndexIdFilter(clientVersion));
}
return s;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,27 +36,40 @@
import java.util.List;


import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SPLITTABLE_SYSTEM_CATALOG;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE_BYTES;
import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
import static org.apache.phoenix.util.ViewIndexIdRetrieveUtil.NULL_DATA_TYPE_VALUE;
import static org.apache.phoenix.util.ViewIndexIdRetrieveUtil.VIEW_INDEX_ID_BIGINT_TYPE_PTR_LEN;
import static org.apache.phoenix.util.ViewIndexIdRetrieveUtil.VIEW_INDEX_ID_SMALLINT_TYPE_VALUE_LEN;

public class SyscatViewIndexIdFilter extends FilterBase implements Writable {
private int clientVersion;

public SyscatViewIndexIdFilter() {
}

public SyscatViewIndexIdFilter(int clientVersion) {
this.clientVersion = clientVersion;
}

@Override
public ReturnCode filterKeyValue(Cell keyValue) {
return ReturnCode.INCLUDE_AND_NEXT_COL;
}

@Override
public boolean hasFilterRow() {
return true;
}

@Override
public void filterRowCells(List<Cell> kvs) throws IOException {
Cell viewIndexIdCell = KeyValueUtil.getColumnLatest(
GenericKeyValueBuilder.INSTANCE, kvs,
DEFAULT_COLUMN_FAMILY_BYTES, VIEW_INDEX_ID_BYTES);

if (viewIndexIdCell != null) {
int type = NULL_DATA_TYPE_VALUE;
Cell viewIndexIdDataTypeCell = KeyValueUtil.getColumnLatest(
Expand All @@ -70,13 +83,22 @@ public void filterRowCells(List<Cell> kvs) throws IOException {
PInteger.INSTANCE,
SortOrder.ASC);
}

// post-splittable client should always using BIGINT
if (type != Types.BIGINT && viewIndexIdCell.getValueLength() <
VIEW_INDEX_ID_BIGINT_TYPE_PTR_LEN) {
Cell keyValue = ViewIndexIdRetrieveUtil.
getViewIndexIdKeyValueInLongDataFormat(viewIndexIdCell);
Collections.replaceAll(kvs, viewIndexIdCell, keyValue);
if (this.clientVersion < MIN_SPLITTABLE_SYSTEM_CATALOG) {
// pre-splittable client should always using SMALLINT
if (type == NULL_DATA_TYPE_VALUE && viewIndexIdCell.getValueLength() >
VIEW_INDEX_ID_SMALLINT_TYPE_VALUE_LEN) {
Cell keyValue = ViewIndexIdRetrieveUtil.
getViewIndexIdKeyValueInShortDataFormat(viewIndexIdCell);
Collections.replaceAll(kvs, viewIndexIdCell, keyValue);
}
} else {
// post-splittable client should always using BIGINT
if (type != Types.BIGINT && viewIndexIdCell.getValueLength() <
VIEW_INDEX_ID_BIGINT_TYPE_PTR_LEN) {
Cell keyValue = ViewIndexIdRetrieveUtil.
getViewIndexIdKeyValueInLongDataFormat(viewIndexIdCell);
Collections.replaceAll(kvs, viewIndexIdCell, keyValue);
}
}
}
}
Expand All @@ -98,12 +120,12 @@ public byte[] toByteArray() throws IOException {

@Override
public void readFields(DataInput input) throws IOException {
input.readByte();
this.clientVersion = input.readInt();
}

@Override
public void write(DataOutput output) throws IOException {
output.writeByte(1);
output.writeInt(this.clientVersion);
}

}

0 comments on commit 6b5bb0e

Please sign in to comment.