Skip to content

Commit

Permalink
DRILL-4199: Add Support for HBase 1.X
Browse files Browse the repository at this point in the history
  • Loading branch information
adityakishore committed Sep 9, 2016
1 parent 5fa9ba3 commit 1882d93
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 48 deletions.
2 changes: 1 addition & 1 deletion contrib/format-maprdb/pom.xml
Expand Up @@ -32,7 +32,7 @@

<properties>
<maprdb-storage-plugin.mapr.version>5.1.0.37817-mapr</maprdb-storage-plugin.mapr.version>
<maprdb-storage-plugin.hbase.version>0.98.12-mapr-1506</maprdb-storage-plugin.hbase.version>
<maprdb-storage-plugin.hbase.version>1.1.1-mapr-1602-m7-5.1.0</maprdb-storage-plugin.hbase.version>
<maprdb-storage-plugin.hadoop.version>2.7.0-mapr-1602</maprdb-storage-plugin.hadoop.version>
<maprdb.TestSuite>**/MaprDBTestsSuite.class</maprdb.TestSuite>
</properties>
Expand Down
Expand Up @@ -35,6 +35,9 @@
import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.ImmutableSet;
Expand All @@ -44,11 +47,16 @@ public class MapRDBFormatPlugin extends TableFormatPlugin {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBFormatPlugin.class);

private final MapRDBFormatMatcher matcher;
private final Configuration hbaseConf;
private final Connection connection;

public MapRDBFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
StoragePluginConfig storageConfig, MapRDBFormatPluginConfig formatConfig) {
StoragePluginConfig storageConfig, MapRDBFormatPluginConfig formatConfig) throws IOException {
super(name, context, fsConf, storageConfig, formatConfig);
matcher = new MapRDBFormatMatcher(this);
hbaseConf = HBaseConfiguration.create(fsConf);
hbaseConf.set(ConnectionFactory.DEFAULT_DB, ConnectionFactory.MAPR_ENGINE2);
connection = ConnectionFactory.createConnection(hbaseConf);
}

@Override
Expand Down Expand Up @@ -79,4 +87,14 @@ public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
}
}

@JsonIgnore
public Configuration getHBaseConf() {
return hbaseConf;
}

@JsonIgnore
public Connection getConnection() {
return connection;
}

}
Expand Up @@ -49,9 +49,9 @@
public abstract class MapRDBGroupScan extends AbstractGroupScan {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBGroupScan.class);

private FileSystemPlugin storagePlugin;
protected FileSystemPlugin storagePlugin;

private MapRDBFormatPlugin formatPlugin;
protected MapRDBFormatPlugin formatPlugin;

protected MapRDBFormatPluginConfig formatPluginConfig;

Expand Down
Expand Up @@ -29,8 +29,6 @@
import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec;
import org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan;
import org.apache.drill.exec.store.mapr.db.json.MaprDBJsonRecordReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
Expand All @@ -42,11 +40,11 @@ public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan>{
public ScanBatch getBatch(FragmentContext context, MapRDBSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
List<RecordReader> readers = Lists.newArrayList();
Configuration conf = HBaseConfiguration.create();
for(MapRDBSubScanSpec scanSpec : subScan.getRegionScanSpecList()){
try {
if (BinaryTableGroupScan.TABLE_BINARY.equals(subScan.getTableType())) {
readers.add(new HBaseRecordReader(conf, getHBaseSubScanSpec(scanSpec), subScan.getColumns(), context));
readers.add(new HBaseRecordReader(subScan.getFormatPlugin().getConnection(),
getHBaseSubScanSpec(scanSpec), subScan.getColumns(), context));
} else {
readers.add(new MaprDBJsonRecordReader(scanSpec, subScan.getFormatPluginConfig(), subScan.getColumns(), context));
}
Expand Down
Expand Up @@ -44,37 +44,37 @@ public class MapRDBSubScan extends AbstractBase implements SubScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBSubScan.class);

@JsonProperty
public final StoragePluginConfig storage;
public final StoragePluginConfig storageConfig;
@JsonIgnore
private final MapRDBFormatPluginConfig fsFormatPluginConfig;
private final FileSystemPlugin fsStoragePlugin;
private final MapRDBFormatPluginConfig formatPluginConfig;
private final FileSystemPlugin storagePlugin;
private final List<MapRDBSubScanSpec> regionScanSpecList;
private final List<SchemaPath> columns;
private final String tableType;

private final MapRDBFormatPlugin formatPlugin;

@JsonCreator
public MapRDBSubScan(@JacksonInject StoragePluginRegistry registry,
@JsonProperty("userName") String userName,
@JsonProperty("formatPluginConfig") MapRDBFormatPluginConfig formatPluginConfig,
@JsonProperty("storage") StoragePluginConfig storage,
@JsonProperty("storageConfig") StoragePluginConfig storage,
@JsonProperty("regionScanSpecList") List<MapRDBSubScanSpec> regionScanSpecList,
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("tableType") String tableType) throws ExecutionSetupException {
super(userName);
this.fsFormatPluginConfig = formatPluginConfig;
this.fsStoragePlugin = (FileSystemPlugin) registry.getPlugin(storage);
this.regionScanSpecList = regionScanSpecList;
this.storage = storage;
this.columns = columns;
this.tableType = tableType;
this(userName, formatPluginConfig,
(FileSystemPlugin) registry.getPlugin(storage),
storage, regionScanSpecList, columns, tableType);
}

public MapRDBSubScan(String userName, MapRDBFormatPluginConfig formatPluginConfig, FileSystemPlugin storagePlugin, StoragePluginConfig config,
public MapRDBSubScan(String userName, MapRDBFormatPluginConfig formatPluginConfig, FileSystemPlugin storagePlugin, StoragePluginConfig storageConfig,
List<MapRDBSubScanSpec> maprSubScanSpecs, List<SchemaPath> columns, String tableType) {
super(userName);
fsFormatPluginConfig = formatPluginConfig;
fsStoragePlugin = storagePlugin;
storage = config;
this.storageConfig = storageConfig;
this.storagePlugin = storagePlugin;
this.formatPluginConfig = formatPluginConfig;
this.formatPlugin = (MapRDBFormatPlugin) storagePlugin.getFormatPlugin(formatPluginConfig);

this.regionScanSpecList = maprSubScanSpecs;
this.columns = columns;
this.tableType = tableType;
Expand All @@ -101,7 +101,7 @@ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVis
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
return new MapRDBSubScan(getUserName(), fsFormatPluginConfig, fsStoragePlugin, storage, regionScanSpecList, columns, tableType);
return new MapRDBSubScan(getUserName(), formatPluginConfig, storagePlugin, storageConfig, regionScanSpecList, columns, tableType);
}

@Override
Expand All @@ -119,7 +119,12 @@ public String getTableType() {
}

public MapRDBFormatPluginConfig getFormatPluginConfig() {
return fsFormatPluginConfig;
return formatPluginConfig;
}

@JsonIgnore
public MapRDBFormatPlugin getFormatPlugin() {
return formatPlugin;
}

}
Expand Up @@ -21,8 +21,6 @@

import java.io.IOException;
import java.util.List;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.TreeMap;

import org.apache.drill.common.exceptions.DrillRuntimeException;
Expand All @@ -45,11 +43,12 @@
import org.apache.drill.exec.store.mapr.db.MapRDBTableStats;
import org.apache.drill.exec.store.mapr.db.TabletFragmentInfo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.codehaus.jackson.annotate.JsonCreator;

import com.fasterxml.jackson.annotation.JacksonInject;
Expand Down Expand Up @@ -112,23 +111,22 @@ public GroupScan clone(List<SchemaPath> columns) {

private void init() {
logger.debug("Getting region locations");
try {
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, hbaseScanSpec.getTableName());
tableStats = new MapRDBTableStats(conf, hbaseScanSpec.getTableName());
this.hTableDesc = table.getTableDescriptor();
NavigableMap<HRegionInfo, ServerName> regionsMap = table.getRegionLocations();
table.close();
TableName tableName = TableName.valueOf(hbaseScanSpec.getTableName());
try (Admin admin = formatPlugin.getConnection().getAdmin();
RegionLocator locator = formatPlugin.getConnection().getRegionLocator(tableName)) {
hTableDesc = admin.getTableDescriptor(tableName);
tableStats = new MapRDBTableStats(getHBaseConf(), hbaseScanSpec.getTableName());

boolean foundStartRegion = false;
regionsToScan = new TreeMap<TabletFragmentInfo, String>();
for (Entry<HRegionInfo, ServerName> mapEntry : regionsMap.entrySet()) {
HRegionInfo regionInfo = mapEntry.getKey();
List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
for (HRegionLocation regionLocation : regionLocations) {
HRegionInfo regionInfo = regionLocation.getRegionInfo();
if (!foundStartRegion && hbaseScanSpec.getStartRow() != null && hbaseScanSpec.getStartRow().length != 0 && !regionInfo.containsRow(hbaseScanSpec.getStartRow())) {
continue;
}
foundStartRegion = true;
regionsToScan.put(new TabletFragmentInfo(regionInfo), mapEntry.getValue().getHostname());
regionsToScan.put(new TabletFragmentInfo(regionInfo), regionLocation.getHostname());
if (hbaseScanSpec.getStopRow() != null && hbaseScanSpec.getStopRow().length != 0 && regionInfo.containsRow(hbaseScanSpec.getStopRow())) {
break;
}
Expand Down Expand Up @@ -191,7 +189,7 @@ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {

@JsonIgnore
public Configuration getHBaseConf() {
return HBaseConfiguration.create();
return getFormatPlugin().getHBaseConf();
}

@JsonIgnore
Expand Down
Expand Up @@ -40,7 +40,7 @@
import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
import org.apache.hadoop.hbase.util.Order;
import org.apache.hadoop.hbase.util.PositionedByteRange;
import org.apache.hadoop.hbase.util.SimplePositionedByteRange;
import org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange;

import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
import org.apache.hadoop.hbase.HConstants;
Expand Down Expand Up @@ -256,7 +256,7 @@ public Boolean visitConvertExpression(ConvertExpression e, LogicalExpression val
case "DOUBLE_OBD":
if (valueArg instanceof DoubleExpression) {
bb = newByteBuf(9, true);
PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9);
PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, 9);
if (encodingType.endsWith("_OBD")) {
org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br,
((DoubleExpression)valueArg).getDouble(), Order.DESCENDING);
Expand All @@ -271,7 +271,7 @@ public Boolean visitConvertExpression(ConvertExpression e, LogicalExpression val
case "FLOAT_OBD":
if (valueArg instanceof FloatExpression) {
bb = newByteBuf(5, true);
PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5);
PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, 5);
if (encodingType.endsWith("_OBD")) {
org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br,
((FloatExpression)valueArg).getFloat(), Order.DESCENDING);
Expand All @@ -286,7 +286,7 @@ public Boolean visitConvertExpression(ConvertExpression e, LogicalExpression val
case "BIGINT_OBD":
if (valueArg instanceof LongExpression) {
bb = newByteBuf(9, true);
PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9);
PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, 9);
if (encodingType.endsWith("_OBD")) {
org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br,
((LongExpression)valueArg).getLong(), Order.DESCENDING);
Expand All @@ -301,7 +301,7 @@ public Boolean visitConvertExpression(ConvertExpression e, LogicalExpression val
case "INT_OBD":
if (valueArg instanceof IntExpression) {
bb = newByteBuf(5, true);
PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5);
PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, 5);
if (encodingType.endsWith("_OBD")) {
org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br,
((IntExpression)valueArg).getInt(), Order.DESCENDING);
Expand All @@ -317,7 +317,7 @@ public Boolean visitConvertExpression(ConvertExpression e, LogicalExpression val
if (valueArg instanceof QuotedString) {
int stringLen = ((QuotedString) valueArg).value.getBytes(Charsets.UTF_8).length;
bb = newByteBuf(stringLen + 2, true);
PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, stringLen + 2);
PositionedByteRange br = new SimplePositionedMutableByteRange(bb.array(), 0, stringLen + 2);
if (encodingType.endsWith("_OBD")) {
org.apache.hadoop.hbase.util.OrderedBytes.encodeString(br,
((QuotedString)valueArg).value, Order.DESCENDING);
Expand Down
Expand Up @@ -22,7 +22,7 @@
import org.apache.drill.BaseTestQuery;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.hbase.GuavaPatcher;
import org.apache.drill.exec.util.GuavaPatcher;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand Down

0 comments on commit 1882d93

Please sign in to comment.