Skip to content

Commit

Permalink
DRILL-5743: Handling column family and column scan for hbase
Browse files Browse the repository at this point in the history
  • Loading branch information
prasadns14 committed Oct 7, 2017
1 parent 9df8102 commit f665485
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 6 deletions.
Expand Up @@ -94,9 +94,21 @@ public HBaseRecordReader(Connection connection, HBaseSubScan.HBaseSubScanSpec su
setColumns(projectedColumns);
}

/**
* Provides the projected columns information to the Hbase Scan instance. If the
* projected columns list contains a column family and also a column in the
* column family, only the column family is passed to the Scan instance.
*
* For example, if the projection list is {cf1, cf1.col1, cf2.col1} then we only
* pass {cf1, cf2.col1} to the Scan instance.
*
* @param columns collection of projected columns
* @return collection of projected column family names
*/
@Override
protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns) {
Set<SchemaPath> transformed = Sets.newLinkedHashSet();
Set<String> completeFamilies = Sets.newHashSet();
rowKeyOnly = true;
if (!isStarQuery()) {
for (SchemaPath column : columns) {
Expand All @@ -109,11 +121,14 @@ protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns
byte[] family = root.getPath().getBytes();
transformed.add(SchemaPath.getSimplePath(root.getPath()));
PathSegment child = root.getChild();
if (child != null && child.isNamed()) {
byte[] qualifier = child.getNameSegment().getPath().getBytes();
hbaseScan.addColumn(family, qualifier);
} else {
hbaseScan.addFamily(family);
if (!completeFamilies.contains(new String(family, StandardCharsets.UTF_8).toLowerCase())) {
if (child != null && child.isNamed()) {
byte[] qualifier = child.getNameSegment().getPath().getBytes();
hbaseScan.addColumn(family, qualifier);
} else {
hbaseScan.addFamily(family);
completeFamilies.add(new String(family, StandardCharsets.UTF_8).toLowerCase());
}
}
}
/* if only the row key was requested, add a FirstKeyOnlyFilter to the scan
Expand Down
Expand Up @@ -19,6 +19,9 @@

import org.junit.Test;

import static org.apache.drill.TestBuilder.listOf;
import static org.apache.drill.TestBuilder.mapOf;

public class HBaseRecordReaderTest extends BaseHBaseTest {

@Test
Expand All @@ -39,4 +42,52 @@ public void testLocalDistributedFamilySelect() throws Exception {
runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1.getNameAsString(), 4);
}

@Test
public void testOrderBy() throws Exception {
testBuilder()
.sqlQuery("select cast(row_key AS VARCHAR) as row_key, t.f from hbase.`TestTable2` t order by t.f.c1")
.ordered()
.baselineColumns("row_key", "f")
.baselineValues(
"a2",
mapOf(
"c1", "11".getBytes(),
"c2", "12".getBytes(),
"c3", "13".getBytes()))
.baselineValues(
"a1",
mapOf(
"c1", "21".getBytes(),
"c2", "22".getBytes(),
"c3", "23".getBytes()))
.baselineValues(
"a3",
mapOf(
"c1", "31".getBytes(),
"c2", "32".getBytes(),
"c3", "33".getBytes()))
.go();
}

@Test
public void testMultiCFDifferentCase() throws Exception {
testBuilder()
.sqlQuery("select * from hbase.`TestTableMultiCF` t")
.unOrdered()
.baselineColumns("row_key", "F", "f0")
.baselineValues(
"a1".getBytes(),
mapOf("c3", "23".getBytes()),
mapOf("c3", "23".getBytes()))
.baselineValues(
"a2".getBytes(),
mapOf("c3", "13".getBytes()),
mapOf("c3", "13".getBytes()))
.baselineValues(
"a3".getBytes(),
mapOf("c3", "33".getBytes()),
mapOf("c3", "33".getBytes()))
.go();
}

}
Expand Up @@ -62,7 +62,9 @@ public class HBaseTestsSuite {
private static final boolean IS_DEBUG = ManagementFactory.getRuntimeMXBean().getInputArguments().toString().indexOf("-agentlib:jdwp") > 0;

protected static final TableName TEST_TABLE_1 = TableName.valueOf("TestTable1");
protected static final TableName TEST_TABLE_2 = TableName.valueOf("TestTable2");
protected static final TableName TEST_TABLE_3 = TableName.valueOf("TestTable3");
protected static final TableName TEST_TABLE_MULTI_CF_DIFFERENT_CASE = TableName.valueOf("TestTableMultiCF");
protected static final TableName TEST_TABLE_COMPOSITE_DATE = TableName.valueOf("TestTableCompositeDate");
protected static final TableName TEST_TABLE_COMPOSITE_TIME = TableName.valueOf("TestTableCompositeTime");
protected static final TableName TEST_TABLE_COMPOSITE_INT = TableName.valueOf("TestTableCompositeInt");
Expand Down Expand Up @@ -164,7 +166,9 @@ public static HBaseTestingUtility getHBaseTestingUtility() {
}

private static boolean tablesExist() throws IOException {
return admin.tableExists(TEST_TABLE_1) && admin.tableExists(TEST_TABLE_3)
return admin.tableExists(TEST_TABLE_1) && admin.tableExists(TEST_TABLE_2)
&& admin.tableExists(TEST_TABLE_3)
&& admin.tableExists(TEST_TABLE_MULTI_CF_DIFFERENT_CASE)
&& admin.tableExists(TEST_TABLE_COMPOSITE_DATE)
&& admin.tableExists(TEST_TABLE_COMPOSITE_TIME)
&& admin.tableExists(TEST_TABLE_COMPOSITE_INT)
Expand All @@ -188,7 +192,9 @@ private static void createTestTables() throws Exception {
* Will revert to multiple region once the issue is resolved.
*/
TestTableGenerator.generateHBaseDataset1(conn, admin, TEST_TABLE_1, 2);
TestTableGenerator.generateHBaseDatasetSingleSchema(conn, admin, TEST_TABLE_2, 1);
TestTableGenerator.generateHBaseDataset3(conn, admin, TEST_TABLE_3, 1);
TestTableGenerator.generateHBaseDatasetMultiCF(conn, admin, TEST_TABLE_MULTI_CF_DIFFERENT_CASE, 1);
TestTableGenerator.generateHBaseDatasetCompositeKeyDate(conn, admin, TEST_TABLE_COMPOSITE_DATE, 1);
TestTableGenerator.generateHBaseDatasetCompositeKeyTime(conn, admin, TEST_TABLE_COMPOSITE_TIME, 1);
TestTableGenerator.generateHBaseDatasetCompositeKeyInt(conn, admin, TEST_TABLE_COMPOSITE_INT, 1);
Expand All @@ -206,8 +212,12 @@ private static void createTestTables() throws Exception {
private static void cleanupTestTables() throws IOException {
admin.disableTable(TEST_TABLE_1);
admin.deleteTable(TEST_TABLE_1);
admin.disableTable(TEST_TABLE_2);
admin.deleteTable(TEST_TABLE_2);
admin.disableTable(TEST_TABLE_3);
admin.deleteTable(TEST_TABLE_3);
admin.disableTable(TEST_TABLE_MULTI_CF_DIFFERENT_CASE);
admin.deleteTable(TEST_TABLE_MULTI_CF_DIFFERENT_CASE);
admin.disableTable(TEST_TABLE_COMPOSITE_DATE);
admin.deleteTable(TEST_TABLE_COMPOSITE_DATE);
admin.disableTable(TEST_TABLE_COMPOSITE_TIME);
Expand Down
Expand Up @@ -133,6 +133,81 @@ public static void generateHBaseDataset1(Connection conn, Admin admin, TableName
table.close();
}

public static void generateHBaseDatasetSingleSchema(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception {
if (admin.tableExists(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}

HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor("f"));
if (numberRegions > 1) {
admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions - 1));
} else {
admin.createTable(desc);
}

BufferedMutator table = conn.getBufferedMutator(tableName);

Put p = new Put("a1".getBytes());
p.addColumn("f".getBytes(), "c1".getBytes(), "21".getBytes());
p.addColumn("f".getBytes(), "c2".getBytes(), "22".getBytes());
p.addColumn("f".getBytes(), "c3".getBytes(), "23".getBytes());
table.mutate(p);

p = new Put("a2".getBytes());
p.addColumn("f".getBytes(), "c1".getBytes(), "11".getBytes());
p.addColumn("f".getBytes(), "c2".getBytes(), "12".getBytes());
p.addColumn("f".getBytes(), "c3".getBytes(), "13".getBytes());
table.mutate(p);

p = new Put("a3".getBytes());
p.addColumn("f".getBytes(), "c1".getBytes(), "31".getBytes());
p.addColumn("f".getBytes(), "c2".getBytes(), "32".getBytes());
p.addColumn("f".getBytes(), "c3".getBytes(), "33".getBytes());
table.mutate(p);

table.close();
}

public static void generateHBaseDatasetMultiCF(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception {
if (admin.tableExists(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}

HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor("f"));
desc.addFamily(new HColumnDescriptor("F"));
if (numberRegions > 1) {
admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions - 1));
} else {
admin.createTable(desc);
}

BufferedMutator table = conn.getBufferedMutator(tableName);

Put p = new Put("a1".getBytes());
p.addColumn("f".getBytes(), "c1".getBytes(), "21".getBytes());
p.addColumn("f".getBytes(), "c2".getBytes(), "22".getBytes());
p.addColumn("F".getBytes(), "c3".getBytes(), "23".getBytes());
table.mutate(p);

p = new Put("a2".getBytes());
p.addColumn("f".getBytes(), "c1".getBytes(), "11".getBytes());
p.addColumn("f".getBytes(), "c2".getBytes(), "12".getBytes());
p.addColumn("F".getBytes(), "c3".getBytes(), "13".getBytes());
table.mutate(p);

p = new Put("a3".getBytes());
p.addColumn("f".getBytes(), "c1".getBytes(), "31".getBytes());
p.addColumn("f".getBytes(), "c2".getBytes(), "32".getBytes());
p.addColumn("F".getBytes(), "c3".getBytes(), "33".getBytes());
table.mutate(p);

table.close();
}

public static void generateHBaseDataset2(Connection conn, Admin admin, TableName tableName, int numberRegions) throws Exception {
if (admin.tableExists(tableName)) {
admin.disableTable(tableName);
Expand Down
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.util;

import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;

Expand Down Expand Up @@ -58,6 +59,13 @@ public boolean equals(Object obj) {
return false;
}
}
if (this.get(key) instanceof byte[]) {
if (other.get(key) instanceof byte[]) {
return Arrays.equals((byte[]) this.get(key), (byte[]) other.get(key));
} else {
return false;
}
}
if ( ! this.get(key).equals(other.get(key))) {
return false;
}
Expand Down

0 comments on commit f665485

Please sign in to comment.