Skip to content

Commit

Permalink
using CellUtil to get viewIndexId and DataType Cells
Browse files Browse the repository at this point in the history
  • Loading branch information
yanxinyi committed Nov 18, 2020
1 parent 9d7983d commit 6a5a89e
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public void testUpdatedSplitPolicyForSysTask() throws Exception {
}

@Test
public void testViewIndexIdCreatedByOldCliet() throws Exception {
public void testViewIndexIdCreatedWithOldClient() throws Exception {
executeQueryWithClientVersion(compatibleClientVersion, ADD_VIEW_INDEX, zkQuorum);
try (org.apache.hadoop.hbase.client.Connection conn =
hbaseTestUtil.getConnection(); Admin admin = conn.getAdmin()) {
Expand All @@ -337,52 +337,11 @@ public void testViewIndexIdCreatedByOldCliet() throws Exception {
+ " has been added with compatible client version: "
+ compatibleClientVersion, tableDescriptor.hasCoprocessor(
SyscatRegionObserver.class.getName()));
}

executeQueriesWithCurrentVersion(QUERY_VIEW_INDEX, url, NONE);
assertExpectedOutput(QUERY_VIEW_INDEX);
}

/*
Ideally, it should test has syscat coproc before and after; however, it always failed
on my local env. Including this test against the Hadoop QA for now.
*/
@Test
public void testViewIndexIdCreatedByOldClient() throws Exception {
executeQueryWithClientVersion(compatibleClientVersion, ADD_VIEW_INDEX, zkQuorum);
try (org.apache.hadoop.hbase.client.Connection conn =
hbaseTestUtil.getConnection(); Admin admin = conn.getAdmin()) {
HTableDescriptor tableDescriptor = admin.getTableDescriptor(
TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
assertFalse("Coprocessor " + SyscatRegionObserver.class.getName()
+ " has been added with compatible client version: "
+ compatibleClientVersion, tableDescriptor.hasCoprocessor(
SyscatRegionObserver.class.getName()));
}

executeQueriesWithCurrentVersion(QUERY_VIEW_INDEX, url, NONE);
assertExpectedOutput(QUERY_VIEW_INDEX);

try (org.apache.hadoop.hbase.client.Connection conn =
hbaseTestUtil.getConnection(); Admin admin = conn.getAdmin()) {
HTableDescriptor tableDescriptor = admin.getTableDescriptor(
TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
assertTrue("Coprocessor " + SyscatRegionObserver.class.getName()
+ " has been added with compatible client version: "
+ compatibleClientVersion, tableDescriptor.hasCoprocessor(
SyscatRegionObserver.class.getName()));
}
}

@Test
public void testViewIndexIdCreatedByOldClientAndSyscatCoproCheckAfter() throws Exception {
executeQueryWithClientVersion(compatibleClientVersion, ADD_VIEW_INDEX, zkQuorum);
executeQueriesWithCurrentVersion(QUERY_VIEW_INDEX, url, NONE);
assertExpectedOutput(QUERY_VIEW_INDEX);
executeQueriesWithCurrentVersion(QUERY_VIEW_INDEX, url, NONE);
assertExpectedOutput(QUERY_VIEW_INDEX);

try (org.apache.hadoop.hbase.client.Connection conn =
hbaseTestUtil.getConnection(); Admin admin = conn.getAdmin()) {
HTableDescriptor tableDescriptor = admin.getTableDescriptor(
tableDescriptor = admin.getTableDescriptor(
TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
assertTrue("Coprocessor " + SyscatRegionObserver.class.getName()
+ " has been added with compatible client version: "
Expand All @@ -392,7 +351,7 @@ public void testViewIndexIdCreatedByOldClientAndSyscatCoproCheckAfter() throws E
}

@Test
public void testViewIndexIdCreatedByNewClient() throws Exception {
public void testViewIndexIdCreatedWithNewClient() throws Exception {
executeQueriesWithCurrentVersion(ADD_VIEW_INDEX, url, NONE);
executeQueryWithClientVersion(compatibleClientVersion, QUERY_VIEW_INDEX, zkQuorum);
assertExpectedOutput(QUERY_VIEW_INDEX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.util.KeyValueUtil;
import org.apache.phoenix.util.ViewIndexIdRetrieveUtil;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.types.PInteger;
Expand All @@ -37,23 +39,20 @@

import java.io.IOException;
import java.sql.Types;
import java.util.Collections;
import java.util.List;

import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SPLITTABLE_SYSTEM_CATALOG;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE;
import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE_BYTES;
import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
import static org.apache.phoenix.util.ViewIndexIdRetrieveUtil.VIEW_INDEX_ID_BIGINT_TYPE_PTR_LEN;
import static org.apache.phoenix.util.ViewIndexIdRetrieveUtil.VIEW_INDEX_ID_SMALLINT_TYPE_VALUE_LEN;

/**
* Coprocessor that checks whether the VIEW_INDEX_ID needs to retrieve.
*/
public class SyscatRegionObserver extends BaseRegionObserver {
public static final String VIEW_INDEX_ID_CQ = DEFAULT_COLUMN_FAMILY + ":" + VIEW_INDEX_ID;
public static final String VIEW_INDEX_ID_DATA_TYPE_CQ =
DEFAULT_COLUMN_FAMILY + ":" + VIEW_INDEX_ID_DATA_TYPE;
public static final int VIEW_INDEX_ID_BIGINT_TYPE_PTR_LEN = 9;
public static final int VIEW_INDEX_ID_SMALLINT_TYPE_VALUE_LEN = 3;
public static final int OFF_RANGE_POS = -1;
public static final int NULL_DATA_TYPE_VALUE = 0;

@Override public void start(CoprocessorEnvironment e) throws IOException {
Expand Down Expand Up @@ -106,14 +105,27 @@ public SyscatRegionScanner(RegionCoprocessorEnvironment env, Scan scan,

@Override public boolean next(List<Cell> result, ScannerContext scannerContext)
throws IOException {
throw new IOException(
"next with scannerContext should not be called in Phoenix environment");
return doHBaseScanNext(result, false);
}

@Override public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
throws IOException {
throw new IOException(
"NextRaw with scannerContext should not be called in Phoenix environment");
return doHBaseScanNext(result, true);
}

private boolean doHBaseScanNext(List<Cell> result, boolean raw) throws IOException {
boolean hasMore;
try {
hasMore = raw ? scanner.nextRaw(result) : scanner.next(result);
if (result.isEmpty()) {
return false;
}
}catch (Throwable t) {
ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
return false; // impossible
}

return hasMore;
}

@Override public void close() throws IOException {
Expand Down Expand Up @@ -148,42 +160,38 @@ private boolean doNext(List<Cell> result, boolean raw) throws IOException {
return false;
}
// logic to change the cell
int pos = OFF_RANGE_POS;
int type = NULL_DATA_TYPE_VALUE;
for (int i = 0; i < result.size(); i++) {
Cell cell = result.get(i);
if (cell.toString().contains(VIEW_INDEX_ID_DATA_TYPE_CQ)) {
if (cell.getValueArray().length > 0) {
type = (Integer) PInteger.INSTANCE.toObject(
cell.getValueArray(), cell.getValueOffset(),
cell.getValueLength(), PInteger.INSTANCE, SortOrder.ASC);
}
} else if (cell.toString().contains(VIEW_INDEX_ID_CQ)) {
pos = i;
Cell viewIndexIdCell = KeyValueUtil.getColumnLatest(
GenericKeyValueBuilder.INSTANCE, result,
DEFAULT_COLUMN_FAMILY_BYTES, VIEW_INDEX_ID_BYTES);
Cell viewIndexIdDataTypeCell = KeyValueUtil.getColumnLatest(
GenericKeyValueBuilder.INSTANCE, result,
DEFAULT_COLUMN_FAMILY_BYTES, VIEW_INDEX_ID_DATA_TYPE_BYTES);

if (viewIndexIdCell != null) {
if (viewIndexIdDataTypeCell != null) {
type = (Integer) PInteger.INSTANCE.toObject(
viewIndexIdDataTypeCell.getValueArray(),
viewIndexIdDataTypeCell.getValueOffset(),
viewIndexIdDataTypeCell.getValueLength(),
PInteger.INSTANCE,
SortOrder.ASC);
}
}
if (pos != OFF_RANGE_POS) {
// only 3 cases VIEW_INDEX_ID, VIEW_INDEX_ID_DATA_TYPE, CLIENT
// SMALLINT, NULL, PRE-4.15
// SMALLINT, 5, POST-4.15
// BIGINT, -5, POST-4.15

// if VIEW_INDEX_ID IS presenting
if (ScanUtil.getClientVersion(this.scan) < MIN_SPLITTABLE_SYSTEM_CATALOG) {
// pre-splittable client should always using SMALLINT
if (type == NULL_DATA_TYPE_VALUE && result.get(pos).getValueLength() >
if (type == NULL_DATA_TYPE_VALUE && viewIndexIdCell.getValueLength() >
VIEW_INDEX_ID_SMALLINT_TYPE_VALUE_LEN) {
Cell keyValue = ViewIndexIdRetrieveUtil.
getViewIndexIdKeyValueInShortDataFormat(result.get(pos));
result.set(pos, keyValue);
getViewIndexIdKeyValueInShortDataFormat(viewIndexIdCell);
Collections.replaceAll(result, viewIndexIdCell, keyValue);
}
} else {
// post-splittable client should always using BIGINT
if (type != Types.BIGINT && result.get(pos).getValueLength() <
if (type != Types.BIGINT && viewIndexIdCell.getValueLength() <
VIEW_INDEX_ID_BIGINT_TYPE_PTR_LEN) {
Cell keyValue = ViewIndexIdRetrieveUtil.
getViewIndexIdKeyValueInLongDataFormat(result.get(pos));
result.set(pos, keyValue);
getViewIndexIdKeyValueInLongDataFormat(viewIndexIdCell);
Collections.replaceAll(result, viewIndexIdCell, keyValue);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.apache.phoenix.schema.types.PSmallint;

public final class ViewIndexIdRetrieveUtil {
public static final int VIEW_INDEX_ID_BIGINT_TYPE_PTR_LEN = 9;
public static final int VIEW_INDEX_ID_SMALLINT_TYPE_VALUE_LEN = 3;

private ViewIndexIdRetrieveUtil() {

}
Expand All @@ -32,10 +35,14 @@ public static Cell getViewIndexIdKeyValueInLongDataFormat(Cell viewIndexIdCell)
Short valueInShort = (Short) PSmallint.INSTANCE.toObject(
viewIndexIdCell.getValueArray(), viewIndexIdCell.getValueOffset(),
viewIndexIdCell.getValueLength(), PSmallint.INSTANCE, SortOrder.ASC);
byte[] valueBytesInLong = new byte[9];
byte[] valueBytesInLong = new byte[VIEW_INDEX_ID_BIGINT_TYPE_PTR_LEN];
byte[] valueBytes = PLong.INSTANCE.toBytes(valueInShort);
System.arraycopy(valueBytes, 0, valueBytesInLong, 0, 8);
System.arraycopy(valueBytes, 0, valueBytesInLong,
0, VIEW_INDEX_ID_BIGINT_TYPE_PTR_LEN - 1);
return buildNewCell(viewIndexIdCell, valueBytesInLong);
}

public static Cell buildNewCell(Cell viewIndexIdCell, byte[] newVal) {
KeyValue keyValue = new KeyValue(
viewIndexIdCell.getRowArray(), viewIndexIdCell.getRowOffset(),
viewIndexIdCell.getRowLength(),
Expand All @@ -44,31 +51,20 @@ public static Cell getViewIndexIdKeyValueInLongDataFormat(Cell viewIndexIdCell)
viewIndexIdCell.getQualifierArray(), viewIndexIdCell.getQualifierOffset(),
viewIndexIdCell.getQualifierLength(),
viewIndexIdCell.getTimestamp(),KeyValue.Type.Put,
valueBytesInLong, 0,9);
newVal, 0,newVal.length);
keyValue.setSequenceId(viewIndexIdCell.getSequenceId());

return keyValue;
}

public static Cell getViewIndexIdKeyValueInShortDataFormat(Cell viewIndexIdCell) {
Long valueInShort = (Long) PLong.INSTANCE.toObject(
viewIndexIdCell.getValueArray(), viewIndexIdCell.getValueOffset(),
viewIndexIdCell.getValueLength(), PLong.INSTANCE, SortOrder.ASC);
byte[] valueBytesInShort = new byte[3];
byte[] valueBytesInShort = new byte[VIEW_INDEX_ID_SMALLINT_TYPE_VALUE_LEN];
byte[] valueBytes = PSmallint.INSTANCE.toBytes(valueInShort);
System.arraycopy(valueBytes, 0, valueBytesInShort, 0, 2);

KeyValue keyValue = new KeyValue(
viewIndexIdCell.getRowArray(), viewIndexIdCell.getRowOffset(),
viewIndexIdCell.getRowLength(),
viewIndexIdCell.getFamilyArray(), viewIndexIdCell.getFamilyOffset(),
viewIndexIdCell.getFamilyLength(),
viewIndexIdCell.getQualifierArray(), viewIndexIdCell.getQualifierOffset(),
viewIndexIdCell.getQualifierLength(),
viewIndexIdCell.getTimestamp(),KeyValue.Type.Put,
valueBytesInShort, 0,3);
keyValue.setSequenceId(viewIndexIdCell.getSequenceId());
System.arraycopy(valueBytes, 0, valueBytesInShort,
0, VIEW_INDEX_ID_SMALLINT_TYPE_VALUE_LEN - 1);

return keyValue;
return buildNewCell(viewIndexIdCell, valueBytesInShort);
}
}

0 comments on commit 6a5a89e

Please sign in to comment.