Skip to content

Commit

Permalink
DRILL-5830: Resolve regressions to MapR DB from DRILL-5546
Browse files Browse the repository at this point in the history
- Back out HBase changes
- Code cleanup
- Test utilities
- Fix for DRILL-5829

closes #968
  • Loading branch information
Paul Rogers committed Oct 11, 2017
1 parent fe84713 commit 42f7af2
Show file tree
Hide file tree
Showing 23 changed files with 691 additions and 155 deletions.
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -17,17 +17,22 @@
*/
package org.apache.drill.exec.store.hbase;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;

import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
Expand All @@ -50,24 +55,18 @@
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

@JsonTypeName("hbase-scan")
public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConstants {
Expand Down Expand Up @@ -144,8 +143,8 @@ private HBaseGroupScan(HBaseGroupScan that) {
@Override
public GroupScan clone(List<SchemaPath> columns) {
HBaseGroupScan newScan = new HBaseGroupScan(this);
newScan.columns = columns == null ? ALL_COLUMNS : columns;;
newScan.verifyColumnsAndConvertStar();
newScan.columns = columns == null ? ALL_COLUMNS : columns;
newScan.verifyColumns();
return newScan;
}

Expand Down Expand Up @@ -177,37 +176,19 @@ private void init() {
} catch (IOException e) {
throw new DrillRuntimeException("Error getting region info for table: " + hbaseScanSpec.getTableName(), e);
}
verifyColumnsAndConvertStar();
verifyColumns();
}

private void verifyColumnsAndConvertStar() {
boolean hasStarCol = false;
LinkedHashSet<SchemaPath> requestedColumns = new LinkedHashSet<>();

private void verifyColumns() {
if (Utilities.isStarQuery(columns)) {
return;
}
for (SchemaPath column : columns) {
// convert * into [row_key, cf1, cf2, ..., cf_n].
if (column.equals(Utilities.STAR_COLUMN)) {
hasStarCol = true;
Set<byte[]> families = hTableDesc.getFamiliesKeys();
requestedColumns.add(ROW_KEY_PATH);
for (byte[] family : families) {
SchemaPath colFamily = SchemaPath.getSimplePath(Bytes.toString(family));
requestedColumns.add(colFamily);
}
} else {
if (!(column.equals(ROW_KEY_PATH) ||
hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) {
DrillRuntimeException.format("The column family '%s' does not exist in HBase table: %s .",
column.getRootSegment().getPath(), hTableDesc.getNameAsString());
}
requestedColumns.add(column);
if (!(column.equals(ROW_KEY_PATH) || hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) {
DrillRuntimeException.format("The column family '%s' does not exist in HBase table: %s .",
column.getRootSegment().getPath(), hTableDesc.getNameAsString());
}
}

// since star column has been converted, reset this.cloumns.
if (hasStarCol) {
this.columns = new ArrayList<>(requestedColumns);
}
}

@Override
Expand Down
Expand Up @@ -126,12 +126,10 @@ protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns
HBaseUtils.andFilterAtIndex(hbaseScan.getFilter(), HBaseUtils.LAST_FILTER, new FirstKeyOnlyFilter()));
}
} else {
throw new IllegalArgumentException("HBaseRecordReader does not allow column *. Column * should have been converted to list of <row_key, column family1, column family2, ..., column family_n");
// rowKeyOnly = false;
// transformed.add(ROW_KEY_PATH);
rowKeyOnly = false;
transformed.add(ROW_KEY_PATH);
}


return transformed;
}

Expand Down
Expand Up @@ -93,5 +93,4 @@ public String toString() {
+ ", filter=" + (filter == null ? null : filter.toString())
+ "]";
}

}
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -94,7 +94,5 @@ public Set<String> getTableNames() {
public String getTypeName() {
return HBaseStoragePluginConfig.NAME;
}

}

}
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -154,7 +154,5 @@ public boolean equals(Object obj) {
private HBaseStoragePlugin getHBaseStoragePlugin() {
return HBaseStoragePlugin.this;
}

}

}
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -41,7 +41,10 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;

// Class containing information for reading a single HBase region
/**
* Contains information for reading a single HBase region
*/

@JsonTypeName("hbase-region-scan")
public class HBaseSubScan extends AbstractBase implements SubScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseSubScan.class);
Expand Down Expand Up @@ -210,12 +213,10 @@ public String toString() {
+ ", filter=" + (getScanFilter() == null ? null : getScanFilter().toString())
+ ", regionServer=" + regionServer + "]";
}

}

@Override
public int getOperatorType() {
return CoreOperatorType.HBASE_SUB_SCAN_VALUE;
}

}
Expand Up @@ -38,7 +38,7 @@

public class BaseHBaseTest extends BaseTestQuery {

private static final String HBASE_STORAGE_PLUGIN_NAME = "hbase";
public static final String HBASE_STORAGE_PLUGIN_NAME = "hbase";

protected static Configuration conf = HBaseConfiguration.create();

Expand Down
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -71,35 +71,44 @@ public void queryCompleted(QueryState state) {
}

@Override
@SuppressWarnings("resource")
public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
final QueryData header = result.getHeader();
final DrillBuf data = result.getData();

if (data != null) {
count.addAndGet(header.getRowCount());
try {
loader.load(header.getDef(), data);
// TODO: Clean: DRILL-2933: That load(...) no longer throws
// SchemaChangeException, so check/clean catch clause below.
} catch (SchemaChangeException e) {
submissionFailed(UserException.systemError(e).build(logger));
}
try {
if (data != null) {
count.addAndGet(header.getRowCount());
try {
loader.load(header.getDef(), data);
// TODO: Clean: DRILL-2933: That load(...) no longer throws
// SchemaChangeException, so check/clean catch clause below.
} catch (SchemaChangeException e) {
submissionFailed(UserException.systemError(e).build(logger));
}

switch(format) {
case TABLE:
VectorUtil.showVectorAccessibleContent(loader, columnWidth);
break;
case TSV:
VectorUtil.showVectorAccessibleContent(loader, "\t");
break;
case CSV:
VectorUtil.showVectorAccessibleContent(loader, ",");
break;
try {
switch(format) {
case TABLE:
VectorUtil.showVectorAccessibleContent(loader, columnWidth);
break;
case TSV:
VectorUtil.showVectorAccessibleContent(loader, "\t");
break;
case CSV:
VectorUtil.showVectorAccessibleContent(loader, ",");
break;
default:
throw new IllegalStateException(format.toString());
}
} finally {
loader.clear();
}
}
loader.clear();
}

result.release();
finally {
result.release();
}
}

@Override
Expand Down
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -33,9 +33,7 @@
import com.fasterxml.jackson.annotation.JsonTypeName;

@JsonTypeName("union-exchange")
public class UnionExchange extends AbstractExchange{

static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionExchange.class);
public class UnionExchange extends AbstractExchange {

public UnionExchange(@JsonProperty("child") PhysicalOperator child) {
super(child);
Expand Down Expand Up @@ -76,5 +74,4 @@ public Receiver getReceiver(int minorFragmentId) {
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
return new UnionExchange(child);
}

}
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -181,9 +181,6 @@ public IterOutcome next() {
return IterOutcome.OUT_OF_MEMORY;
}


// logger.debug("Next received batch {}", batch);

final RecordBatchDef rbd = batch.getHeader().getDef();
final boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
// TODO: Clean: DRILL-2933: That load(...) no longer throws
Expand Down
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -28,8 +28,8 @@
import org.apache.drill.exec.work.batch.RawBatchBuffer;

public class UnorderedReceiverCreator implements BatchCreator<UnorderedReceiver>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverCreator.class);

@SuppressWarnings("resource")
@Override
public UnorderedReceiverBatch getBatch(FragmentContext context, UnorderedReceiver receiver, List<RecordBatch> children)
throws ExecutionSetupException {
Expand All @@ -42,6 +42,4 @@ public UnorderedReceiverBatch getBatch(FragmentContext context, UnorderedReceive
RawBatchBuffer buffer = buffers[0];
return new UnorderedReceiverBatch(context, buffer, receiver);
}


}
Expand Up @@ -22,7 +22,6 @@
import java.util.List;

import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalProject;
Expand Down
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down
Expand Up @@ -136,6 +136,21 @@ public boolean equals(Object obj) {
return true;
}

public boolean isEquivalent(BatchSchema other) {
if (fields == null || other.fields == null) {
return fields == other.fields;
}
if (fields.size() != other.fields.size()) {
return false;
}
for (int i = 0; i < fields.size(); i++) {
if (! fields.get(i).isEquivalent(other.fields.get(i))) {
return false;
}
}
return true;
}

/**
* We treat fields with same set of Subtypes as equal, even if they are in a different order
* @param t1
Expand Down

0 comments on commit 42f7af2

Please sign in to comment.