Skip to content
Permalink
Browse files
DRILL-450: Add exchange rules, move from BasicOptimizer to Optiq
  • Loading branch information
Jinfeng Ni authored and jacques-n committed Apr 20, 2014
1 parent 22c4190 commit 6b517daad2a817231df9959f2f3e5c53bd728117
Show file tree
Hide file tree
Showing 118 changed files with 2,913 additions and 552 deletions.
@@ -40,19 +40,20 @@ Nullable: 'nullable';
Repeat: 'repeat';
As: 'as';

INT : 'int';
BIGINT : 'bigint';
FLOAT4 : 'float4';
FLOAT8 : 'float8';
VARCHAR : 'varchar';
VARBINARY: 'varbinary';
DATE : 'date';
TIMESTAMP: 'timestamp';
TIME : 'time';
TIMESTAMPTZ: 'timestamptz';
INTERVAL : 'interval';
INTERVALYEAR : 'intervalyear';
INTERVALDAY : 'intervalday';
INT : 'int' | 'INT';
BIGINT : 'bigint' | 'BIGINT';
FLOAT4 : 'float4' | 'FLOAT4';
FLOAT8 : 'float8' | 'FLOAT8';
VARCHAR : 'varchar' | 'VARCHAR';
VARBINARY: 'varbinary' | 'VARBINARY';
DATE : 'date' | 'DATE';
TIMESTAMP: 'timestamp' | 'TIMESTAMP';
TIME : 'time' | 'TIME';
TIMESTAMPTZ: 'timestamptz' | 'TIMESTAMPTZ';
INTERVAL : 'interval' | 'INTERVAL';
INTERVALYEAR : 'intervalyear' | 'INTERVALYEAR';
INTERVALDAY : 'intervalday' | 'INTERVALDAY';


Or : '||' | 'or' | 'OR' | 'Or';
And : '&&' | 'and' | 'AND' ;
@@ -100,7 +100,7 @@ public LogicalExpression getExpr() {
public String getOrder() {

switch(direction){
case Descending: return "DESC";
case DESCENDING: return "DESC";
default: return "ASC";
}
}
@@ -140,10 +140,10 @@ public Order internalBuild() {
}

public static Direction getDirectionFromString(String direction){
return "DESC".equalsIgnoreCase(direction) ? Direction.Descending : Direction.Ascending;
return "DESC".equalsIgnoreCase(direction) ? Direction.DESCENDING : Direction.ASCENDING;
}

public static String getStringFromDirection(Direction direction){
return direction == Direction.Descending ? "DESC" : "ASC";
return direction == Direction.DESCENDING ? "DESC" : "ASC";
}
}
@@ -25,7 +25,6 @@
import com.google.common.collect.Lists;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.OperatorCost;
@@ -70,7 +69,6 @@ public List<SchemaPath> getColumns() {
private String tableName;
private HBaseStoragePlugin storagePlugin;
private HBaseStoragePluginConfig storagePluginConfig;
private final FieldReference ref;
private List<EndpointAffinity> endpointAffinities;
private List<SchemaPath> columns;

@@ -80,24 +78,21 @@ public List<SchemaPath> getColumns() {
public HBaseGroupScan(@JsonProperty("entries") List<HTableReadEntry> entries,
@JsonProperty("storage") HBaseStoragePluginConfig storageEngineConfig,
@JsonProperty("columns") List<SchemaPath> columns,
@JacksonInject StoragePluginRegistry engineRegistry,
@JsonProperty("ref") FieldReference ref
@JacksonInject StoragePluginRegistry engineRegistry
)throws IOException, ExecutionSetupException {
Preconditions.checkArgument(entries.size() == 1);
engineRegistry.init(DrillConfig.create());
this.storagePlugin = (HBaseStoragePlugin) engineRegistry.getEngine(storageEngineConfig);
this.storagePluginConfig = storageEngineConfig;
this.tableName = entries.get(0).getTableName();
this.ref = ref;
this.columns = columns;
getRegionInfos();
}

public HBaseGroupScan(String tableName, HBaseStoragePlugin storageEngine, FieldReference ref, List<SchemaPath> columns) throws IOException {
public HBaseGroupScan(String tableName, HBaseStoragePlugin storageEngine, List<SchemaPath> columns) throws IOException {
this.storagePlugin = storageEngine;
this.storagePluginConfig = storageEngine.getEngineConfig();
this.storagePluginConfig = storageEngine.getConfig();
this.tableName = tableName;
this.ref = ref;
this.columns = columns;
getRegionInfos();
}
@@ -167,12 +162,9 @@ public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {

@Override
public HBaseSubScan getSpecificScan(int minorFragmentId) {
return new HBaseSubScan(storagePlugin, storagePluginConfig, mappings.get(minorFragmentId), ref, columns);
return new HBaseSubScan(storagePlugin, storagePluginConfig, mappings.get(minorFragmentId), columns);
}

public FieldReference getRef() {
return ref;
}

@Override
public int getMaxParallelizationWidth() {
@@ -35,11 +35,11 @@
public class HBaseSchemaFactory implements SchemaFactory {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseSchemaFactory.class);

final HBaseStoragePluginConfig configuration;
final String schemaName;

public HBaseSchemaFactory(HBaseStoragePluginConfig configuration, String name) throws IOException {
this.configuration = configuration;
final HBaseStoragePlugin plugin;

public HBaseSchemaFactory(HBaseStoragePlugin plugin, String name) throws IOException {
this.plugin = plugin;
this.schemaName = name;
}

@@ -65,7 +65,7 @@ public void setHolder(SchemaPlus plusOfThis) {

@Override
public Schema getSubSchema(String name) {
throw new UnsupportedOperationException();
return null;
}

@Override
@@ -76,13 +76,12 @@ public Set<String> getSubSchemaNames() {
@Override
public DrillTable getTable(String name) {
Object selection = new HTableReadEntry(name);
return new DynamicDrillTable(schemaName, selection, configuration);
return new DynamicDrillTable(plugin, schemaName, selection, plugin.getConfig());
}

@Override
public Set<String> getTableNames() {
try {
HBaseAdmin admin = new HBaseAdmin(configuration.conf);
try(HBaseAdmin admin = new HBaseAdmin(plugin.getConfig().conf)) {
HTableDescriptor[] tables = admin.listTables();
Set<String> tableNames = Sets.newHashSet();
for (HTableDescriptor table : tables) {
@@ -18,18 +18,17 @@
package org.apache.drill.exec.store.hbase;

import java.io.IOException;
import java.util.ArrayList;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import net.hydromatic.optiq.Schema;
import net.hydromatic.optiq.SchemaPlus;
import org.apache.drill.common.logical.data.Scan;

import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;

import com.google.common.base.Preconditions;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

public class HBaseStoragePlugin extends AbstractStoragePlugin {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseStoragePlugin.class);
@@ -42,15 +41,11 @@ public class HBaseStoragePlugin extends AbstractStoragePlugin {
public HBaseStoragePlugin(HBaseStoragePluginConfig configuration, DrillbitContext context, String name)
throws IOException {
this.context = context;
this.schemaFactory = new HBaseSchemaFactory(configuration, name);
this.schemaFactory = new HBaseSchemaFactory(this, name);
this.engineConfig = configuration;
this.name = name;
}

public HBaseStoragePluginConfig getEngineConfig() {
return engineConfig;
}

public DrillbitContext getContext() {
return this.context;
}
@@ -61,16 +56,20 @@ public boolean supportsRead() {
}

@Override
public HBaseGroupScan getPhysicalScan(Scan scan) throws IOException {
HTableReadEntry readEntry = scan.getSelection().getListWith(new ObjectMapper(),
public HBaseGroupScan getPhysicalScan(JSONOptions selection) throws IOException {
HTableReadEntry readEntry = selection.getListWith(new ObjectMapper(),
new TypeReference<HTableReadEntry>() {});

return new HBaseGroupScan(readEntry.getTableName(), this, scan.getOutputReference(), null);
return new HBaseGroupScan(readEntry.getTableName(), this, null);
}

@Override
public Schema createAndAddSchema(SchemaPlus parent) {
return schemaFactory.add(parent);
}

@Override
public HBaseStoragePluginConfig getConfig() {
return engineConfig;
}

}
@@ -51,28 +51,24 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
@JsonIgnore
private final HBaseStoragePlugin hbaseStoragePlugin;
private final List<HBaseSubScanReadEntry> rowGroupReadEntries;
private final FieldReference ref;
private final List<SchemaPath> columns;

@JsonCreator
public HBaseSubScan(@JacksonInject StoragePluginRegistry registry, @JsonProperty("storage") StoragePluginConfig storage,
@JsonProperty("rowGroupReadEntries") LinkedList<HBaseSubScanReadEntry> rowGroupReadEntries,
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("ref") FieldReference ref) throws ExecutionSetupException {
@JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
hbaseStoragePlugin = (HBaseStoragePlugin) registry.getEngine(storage);
this.rowGroupReadEntries = rowGroupReadEntries;
this.storage = storage;
this.ref = ref;
this.columns = columns;
}

public HBaseSubScan(HBaseStoragePlugin plugin, HBaseStoragePluginConfig config,
List<HBaseSubScanReadEntry> regionInfoList, FieldReference ref,
List<HBaseSubScanReadEntry> regionInfoList,
List<SchemaPath> columns) {
hbaseStoragePlugin = plugin;
storage = config;
this.rowGroupReadEntries = regionInfoList;
this.ref = ref;
this.columns = columns;
}

@@ -94,11 +90,6 @@ public OperatorCost getCost() {
return null;
}


public FieldReference getRef() {
return ref;
}

@Override
public Size getSize() {
return null;
@@ -122,7 +113,7 @@ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVis
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
return new HBaseSubScan(hbaseStoragePlugin, (HBaseStoragePluginConfig) storage, rowGroupReadEntries, ref, columns);
return new HBaseSubScan(hbaseStoragePlugin, (HBaseStoragePluginConfig) storage, rowGroupReadEntries, columns);
}

@Override
@@ -106,6 +106,23 @@ public void eval() {
}
}

@FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL )
public static class NullableVarCharHash implements DrillSimpleFunc {

@Param NullableVarCharHolder in;
@Output IntHolder out;

public void setup(RecordBatch incoming) {
}

public void eval() {
if (in.isSet == 0)
out.value = 0;
else
out.value = org.apache.drill.exec.expr.fn.impl.HashHelper.hash(in.buffer.nioBuffer(), 0);
}
}

@FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL)
public static class NullableBigIntHash implements DrillSimpleFunc {

@@ -154,6 +171,20 @@ public void eval() {
out.value = org.apache.drill.exec.expr.fn.impl.HashHelper.hash(in.buffer.nioBuffer(in.start, in.end - in.start), 0);
}
}

@FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL)
public static class VarCharHash implements DrillSimpleFunc {

@Param VarCharHolder in;
@Output IntHolder out;

public void setup(RecordBatch incoming) {
}

public void eval() {
out.value = org.apache.drill.exec.expr.fn.impl.HashHelper.hash(in.buffer.nioBuffer(in.start, in.end - in.start), 0);
}
}

@FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL)
public static class HashBigInt implements DrillSimpleFunc {
@@ -21,6 +21,7 @@

import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -85,4 +86,7 @@ public DrillSchemaFactory getFactory(){
return drillbitContext.getSchemaFactory();
}

public FunctionImplementationRegistry getFunctionRegistry(){
return drillbitContext.getFunctionImplementationRegistry();
}
}
@@ -129,7 +129,7 @@ public PhysicalOperator visitGroupingAggregate(GroupingAggregate groupBy, Object

if(groupBy.getKeys().length > 0){
for(NamedExpression e : groupBy.getKeys()){
orderDefs.add(new Ordering(Direction.Ascending, e.getExpr(), NullDirection.FIRST));
orderDefs.add(new Ordering(Direction.ASCENDING, e.getExpr(), NullDirection.FIRST));
}
input = new Sort(input, orderDefs, false);
}
@@ -161,15 +161,15 @@ public PhysicalOperator visitJoin(Join join, Object value) throws OptimizerExcep
PhysicalOperator leftOp = join.getLeft().accept(this, value);
List<Ordering> leftOrderDefs = Lists.newArrayList();
for(JoinCondition jc : join.getConditions()){
leftOrderDefs.add(new Ordering(Direction.Ascending, jc.getLeft()));
leftOrderDefs.add(new Ordering(Direction.ASCENDING, jc.getLeft()));
}
leftOp = new Sort(leftOp, leftOrderDefs, false);
leftOp = new SelectionVectorRemover(leftOp);

PhysicalOperator rightOp = join.getRight().accept(this, value);
List<Ordering> rightOrderDefs = Lists.newArrayList();
for(JoinCondition jc : join.getConditions()){
rightOrderDefs.add(new Ordering(Direction.Ascending, jc.getRight()));
rightOrderDefs.add(new Ordering(Direction.ASCENDING, jc.getRight()));
}
rightOp = new Sort(rightOp, rightOrderDefs, false);
rightOp = new SelectionVectorRemover(rightOp);
@@ -19,6 +19,7 @@

import org.apache.drill.common.graph.GraphVisitor;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;

import com.google.common.base.Preconditions;

@@ -43,4 +44,9 @@ public boolean isExecutable() {
return true;
}

@Override
public SelectionVectorMode getSVMode() {
return SelectionVectorMode.NONE;
}

}
@@ -23,6 +23,7 @@
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.graph.GraphVisitor;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;

import com.google.common.collect.Iterators;

@@ -67,4 +68,9 @@ public Iterator<PhysicalOperator> iterator() {
return Iterators.emptyIterator();
}

@Override
public SelectionVectorMode getSVMode() {
return SelectionVectorMode.NONE;
}

}

0 comments on commit 6b517da

Please sign in to comment.