Skip to content

Commit

Permalink
DRILL-6381: Address code review comments (part 3).
Browse files Browse the repository at this point in the history
DRILL-6381: Add missing joinControl logic for INTERSECT_DISTINCT.

- Modified HashJoin's probe phase to process INTERSECT_DISTINCT.

- NOTE: For build phase, the functionality will be same as for SemiJoin when it is added later.

DRILL-6381: Address code review comment for intersect_distinct.

DRILL-6381: Rebase on latest master and fix compilation issues.

DRILL-6381: Generate protobuf files for C++ native client.

DRILL-6381: Use shaded Guava classes.  Add more comments and Javadoc.
  • Loading branch information
Aman Sinha committed Oct 25, 2018
1 parent 387bc4f commit 7571d52
Show file tree
Hide file tree
Showing 35 changed files with 248 additions and 136 deletions.
Expand Up @@ -17,8 +17,8 @@
*/ */
package org.apache.drill.exec.planner.index; package org.apache.drill.exec.planner.index;


import com.google.common.collect.Maps; import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import com.google.common.collect.Sets; import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
import org.apache.drill.common.expression.CastExpression; import org.apache.drill.common.expression.CastExpression;
import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
Expand Down
Expand Up @@ -100,7 +100,7 @@ private IndexCollection getTableIndexFromMFS(String tableName) {
for (IndexDesc idx : indexes) { for (IndexDesc idx : indexes) {
DrillIndexDescriptor hbaseIdx = buildIndexDescriptor(tableName, idx); DrillIndexDescriptor hbaseIdx = buildIndexDescriptor(tableName, idx);
if (hbaseIdx == null) { if (hbaseIdx == null) {
//not able to build a valid index based on the index info from MFS // not able to build a valid index based on the index info from MFS
logger.error("Not able to build index for {}", idx.toString()); logger.error("Not able to build index for {}", idx.toString());
continue; continue;
} }
Expand Down Expand Up @@ -233,9 +233,9 @@ TypeProtos.MajorType getDrillType(String typeStr) {
} }


private LogicalExpression castFunctionSQLSyntax(String field, String type) throws InvalidIndexDefinitionException { private LogicalExpression castFunctionSQLSyntax(String field, String type) throws InvalidIndexDefinitionException {
//get castTypeStr so we can construct SQL syntax string before MapRDB could provide such syntax // get castTypeStr so we can construct SQL syntax string before MapRDB could provide such syntax
String castTypeStr = getDrillTypeStr(type); String castTypeStr = getDrillTypeStr(type);
if(castTypeStr == null) {//no cast if(castTypeStr == null) { // no cast
throw new InvalidIndexDefinitionException("cast function type not recognized: " + type + "for field " + field); throw new InvalidIndexDefinitionException("cast function type not recognized: " + type + "for field " + field);
} }
try { try {
Expand All @@ -255,7 +255,7 @@ private LogicalExpression castFunctionSQLSyntax(String field, String type) throw
private LogicalExpression getIndexExpression(IndexFieldDesc desc) throws InvalidIndexDefinitionException { private LogicalExpression getIndexExpression(IndexFieldDesc desc) throws InvalidIndexDefinitionException {
final String fieldName = desc.getFieldPath().asPathString(); final String fieldName = desc.getFieldPath().asPathString();
final String functionDef = desc.getFunctionName(); final String functionDef = desc.getFunctionName();
if ((functionDef != null)) {//this is a function if ((functionDef != null)) { // this is a function
String[] tokens = functionDef.split("\\s+"); String[] tokens = functionDef.split("\\s+");
if (tokens[0].equalsIgnoreCase("cast")) { if (tokens[0].equalsIgnoreCase("cast")) {
if (tokens.length != 3) { if (tokens.length != 3) {
Expand All @@ -270,7 +270,7 @@ private LogicalExpression getIndexExpression(IndexFieldDesc desc) throws Invalid
throw new InvalidIndexDefinitionException("function definition is not supported for indexing: " + functionDef); throw new InvalidIndexDefinitionException("function definition is not supported for indexing: " + functionDef);
} }
} }
//else it is a schemaPath // else it is a schemaPath
return fieldName2SchemaPath(fieldName); return fieldName2SchemaPath(fieldName);
} }


Expand All @@ -285,7 +285,7 @@ private List<LogicalExpression> field2SchemaPath(Collection<IndexFieldDesc> desc


private List<RelFieldCollation> getFieldCollations(IndexDesc desc, Collection<IndexFieldDesc> descCollection) { private List<RelFieldCollation> getFieldCollations(IndexDesc desc, Collection<IndexFieldDesc> descCollection) {
List<RelFieldCollation> fieldCollations = new ArrayList<>(); List<RelFieldCollation> fieldCollations = new ArrayList<>();
int i=0; int i = 0;
for (IndexFieldDesc field : descCollection) { for (IndexFieldDesc field : descCollection) {
RelFieldCollation.Direction direction = (field.getSortOrder() == IndexFieldDesc.Order.Asc) ? RelFieldCollation.Direction direction = (field.getSortOrder() == IndexFieldDesc.Order.Asc) ?
RelFieldCollation.Direction.ASCENDING : (field.getSortOrder() == IndexFieldDesc.Order.Desc ? RelFieldCollation.Direction.ASCENDING : (field.getSortOrder() == IndexFieldDesc.Order.Desc ?
Expand Down
Expand Up @@ -53,6 +53,9 @@ public boolean isValid(Integer paramValue) {


public static final int JSON_TABLE_NUM_TABLETS_PER_INDEX_DEFAULT = 32; public static final int JSON_TABLE_NUM_TABLETS_PER_INDEX_DEFAULT = 32;


public static final int JSON_TABLE_SCAN_SIZE_MB_MIN = 32;
public static final int JSON_TABLE_SCAN_SIZE_MB_MAX = 8192;

public static final String JSON_TABLE_SCAN_SIZE_MB = "format-maprdb.json.scanSizeMB"; public static final String JSON_TABLE_SCAN_SIZE_MB = "format-maprdb.json.scanSizeMB";
public static final int JSON_TABLE_SCAN_SIZE_MB_DEFAULT = 128; public static final int JSON_TABLE_SCAN_SIZE_MB_DEFAULT = 128;


Expand Down
Expand Up @@ -66,13 +66,15 @@ public MapRDBFormatPlugin(String name, DrillbitContext context, Configuration fs
connection = ConnectionFactory.createConnection(hbaseConf); connection = ConnectionFactory.createConnection(hbaseConf);
jsonTableCache = new MapRDBTableCache(context.getConfig()); jsonTableCache = new MapRDBTableCache(context.getConfig());
int scanRangeSizeMBConfig = context.getConfig().getInt(PluginConstants.JSON_TABLE_SCAN_SIZE_MB); int scanRangeSizeMBConfig = context.getConfig().getInt(PluginConstants.JSON_TABLE_SCAN_SIZE_MB);
if (scanRangeSizeMBConfig < 32 || scanRangeSizeMBConfig > 8192) { if (scanRangeSizeMBConfig < PluginConstants.JSON_TABLE_SCAN_SIZE_MB_MIN ||
scanRangeSizeMBConfig > PluginConstants.JSON_TABLE_SCAN_SIZE_MB_MAX) {
logger.warn("Invalid scan size {} for MapR-DB tables, using default", scanRangeSizeMBConfig); logger.warn("Invalid scan size {} for MapR-DB tables, using default", scanRangeSizeMBConfig);
scanRangeSizeMBConfig = PluginConstants.JSON_TABLE_SCAN_SIZE_MB_DEFAULT; scanRangeSizeMBConfig = PluginConstants.JSON_TABLE_SCAN_SIZE_MB_DEFAULT;
} }


int restrictedScanRangeSizeMBConfig = context.getConfig().getInt(PluginConstants.JSON_TABLE_RESTRICTED_SCAN_SIZE_MB); int restrictedScanRangeSizeMBConfig = context.getConfig().getInt(PluginConstants.JSON_TABLE_RESTRICTED_SCAN_SIZE_MB);
if (restrictedScanRangeSizeMBConfig < 32 || restrictedScanRangeSizeMBConfig > 8192) { if (restrictedScanRangeSizeMBConfig < PluginConstants.JSON_TABLE_SCAN_SIZE_MB_MIN ||
restrictedScanRangeSizeMBConfig > PluginConstants.JSON_TABLE_SCAN_SIZE_MB_MAX) {
logger.warn("Invalid restricted scan size {} for MapR-DB tables, using default", restrictedScanRangeSizeMBConfig); logger.warn("Invalid restricted scan size {} for MapR-DB tables, using default", restrictedScanRangeSizeMBConfig);
restrictedScanRangeSizeMBConfig = PluginConstants.JSON_TABLE_RESTRICTED_SCAN_SIZE_MB_DEFAULT; restrictedScanRangeSizeMBConfig = PluginConstants.JSON_TABLE_RESTRICTED_SCAN_SIZE_MB_DEFAULT;
} }
Expand Down
Expand Up @@ -51,8 +51,9 @@ private MapRDBPushFilterIntoScan(RelOptRuleOperand operand, String description)


@Override @Override
public void onMatch(RelOptRuleCall call) { public void onMatch(RelOptRuleCall call) {
final ScanPrel scan = (ScanPrel) call.rel(1); final FilterPrel filter = call.rel(0);
final FilterPrel filter = (FilterPrel) call.rel(0); final ScanPrel scan = call.rel(1);

final RexNode condition = filter.getCondition(); final RexNode condition = filter.getCondition();


if (scan.getGroupScan() instanceof BinaryTableGroupScan) { if (scan.getGroupScan() instanceof BinaryTableGroupScan) {
Expand Down Expand Up @@ -80,9 +81,9 @@ public boolean matches(RelOptRuleCall call) {


@Override @Override
public void onMatch(RelOptRuleCall call) { public void onMatch(RelOptRuleCall call) {
final ScanPrel scan = (ScanPrel) call.rel(2); final FilterPrel filter = call.rel(0);
final ProjectPrel project = (ProjectPrel) call.rel(1); final ProjectPrel project = call.rel(1);
final FilterPrel filter = (FilterPrel) call.rel(0); final ScanPrel scan = call.rel(2);


// convert the filter to one that references the child of the project // convert the filter to one that references the child of the project
final RexNode condition = RelOptUtil.pushPastProject(filter.getCondition(), project); final RexNode condition = RelOptUtil.pushPastProject(filter.getCondition(), project);
Expand Down Expand Up @@ -134,13 +135,13 @@ protected void doPushFilterIntoJsonGroupScan(RelOptRuleCall call,
final JsonConditionBuilder jsonConditionBuilder = new JsonConditionBuilder(groupScan, conditionExp); final JsonConditionBuilder jsonConditionBuilder = new JsonConditionBuilder(groupScan, conditionExp);
final JsonScanSpec newScanSpec = jsonConditionBuilder.parseTree(); final JsonScanSpec newScanSpec = jsonConditionBuilder.parseTree();
if (newScanSpec == null) { if (newScanSpec == null) {
return; //no filter pushdown ==> No transformation. return; // no filter pushdown ==> No transformation.
} }


final JsonTableGroupScan newGroupsScan = (JsonTableGroupScan) groupScan.clone(newScanSpec); final JsonTableGroupScan newGroupsScan = (JsonTableGroupScan) groupScan.clone(newScanSpec);
newGroupsScan.setFilterPushedDown(true); newGroupsScan.setFilterPushedDown(true);


final ScanPrel newScanPrel = new ScanPrel(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable()); final ScanPrel newScanPrel = new ScanPrel(scan.getCluster(), filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable());


// Depending on whether is a project in the middle, assign either scan or copy of project to childRel. // Depending on whether is a project in the middle, assign either scan or copy of project to childRel.
final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel)); final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));
Expand Down Expand Up @@ -186,7 +187,7 @@ protected void doPushFilterIntoBinaryGroupScan(final RelOptRuleCall call,
groupScan.getTableStats()); groupScan.getTableStats());
newGroupsScan.setFilterPushedDown(true); newGroupsScan.setFilterPushedDown(true);


final ScanPrel newScanPrel = new ScanPrel(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable()); final ScanPrel newScanPrel = new ScanPrel(scan.getCluster(), filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable());


// Depending on whether is a project in the middle, assign either scan or copy of project to childRel. // Depending on whether is a project in the middle, assign either scan or copy of project to childRel.
final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));; final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));;
Expand Down
Expand Up @@ -47,8 +47,8 @@ private MapRDBPushLimitIntoScan(RelOptRuleOperand operand, String description) {


@Override @Override
public void onMatch(RelOptRuleCall call) { public void onMatch(RelOptRuleCall call) {
final ScanPrel scan = call.rel(1);
final LimitPrel limit = call.rel(0); final LimitPrel limit = call.rel(0);
final ScanPrel scan = call.rel(1);
doPushLimitIntoGroupScan(call, limit, null, scan, scan.getGroupScan()); doPushLimitIntoGroupScan(call, limit, null, scan, scan.getGroupScan());
} }


Expand Down
Expand Up @@ -17,7 +17,7 @@
*/ */
package org.apache.drill.exec.store.mapr.db; package org.apache.drill.exec.store.mapr.db;


import com.google.common.collect.Lists; import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand; import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.plan.RelTrait; import org.apache.calcite.plan.RelTrait;
Expand All @@ -30,18 +30,21 @@
import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.planner.common.DrillRelOptUtil; import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.logical.RelOptHelper; import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.common.DrillRelOptUtil.ProjectPushInfo;
import org.apache.drill.exec.planner.physical.Prel; import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.ProjectPrel; import org.apache.drill.exec.planner.physical.ProjectPrel;
import org.apache.drill.exec.planner.physical.ScanPrel; import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.drill.exec.store.StoragePluginOptimizerRule; import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan;
import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan; import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan;
import org.apache.drill.exec.util.Utilities; import org.apache.drill.exec.util.Utilities;


import java.util.List; import java.util.List;


/**
* Push a physical Project into Scan. Currently, this rule is only doing projection pushdown for MapRDB-JSON tables
* since it was needed for the secondary index feature which only applies to Json tables.
* For binary tables, note that the DrillPushProjectIntoScanRule is still applicable during the logical
* planning phase.
*/
public abstract class MapRDBPushProjectIntoScan extends StoragePluginOptimizerRule { public abstract class MapRDBPushProjectIntoScan extends StoragePluginOptimizerRule {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBPushProjectIntoScan.class); static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBPushProjectIntoScan.class);


Expand All @@ -53,17 +56,10 @@ private MapRDBPushProjectIntoScan(RelOptRuleOperand operand, String description)
RelOptHelper.some(ProjectPrel.class, RelOptHelper.any(ScanPrel.class)), "MapRDBPushProjIntoScan:Proj_On_Scan") { RelOptHelper.some(ProjectPrel.class, RelOptHelper.any(ScanPrel.class)), "MapRDBPushProjIntoScan:Proj_On_Scan") {
@Override @Override
public void onMatch(RelOptRuleCall call) { public void onMatch(RelOptRuleCall call) {
final ScanPrel scan = (ScanPrel) call.rel(1); final ProjectPrel project = call.rel(0);
final ProjectPrel project = (ProjectPrel) call.rel(0); final ScanPrel scan = call.rel(1);
if (!(scan.getGroupScan() instanceof MapRDBGroupScan)) {
return;
}
doPushProjectIntoGroupScan(call, project, scan, (MapRDBGroupScan) scan.getGroupScan());
if (scan.getGroupScan() instanceof BinaryTableGroupScan) {
BinaryTableGroupScan groupScan = (BinaryTableGroupScan) scan.getGroupScan();


} else { if (scan.getGroupScan() instanceof JsonTableGroupScan) {
assert (scan.getGroupScan() instanceof JsonTableGroupScan);
JsonTableGroupScan groupScan = (JsonTableGroupScan) scan.getGroupScan(); JsonTableGroupScan groupScan = (JsonTableGroupScan) scan.getGroupScan();


doPushProjectIntoGroupScan(call, project, scan, groupScan); doPushProjectIntoGroupScan(call, project, scan, groupScan);
Expand All @@ -72,22 +68,23 @@ public void onMatch(RelOptRuleCall call) {


@Override @Override
public boolean matches(RelOptRuleCall call) { public boolean matches(RelOptRuleCall call) {
final ScanPrel scan = (ScanPrel) call.rel(1); final ScanPrel scan = call.rel(1);
if (scan.getGroupScan() instanceof BinaryTableGroupScan ||
scan.getGroupScan() instanceof JsonTableGroupScan) { // See class level comments above for why only JsonGroupScan is considered
if (scan.getGroupScan() instanceof JsonTableGroupScan) {
return super.matches(call); return super.matches(call);
} }
return false; return false;
} }
}; };


protected void doPushProjectIntoGroupScan(RelOptRuleCall call, protected void doPushProjectIntoGroupScan(RelOptRuleCall call,
ProjectPrel project, ScanPrel scan, MapRDBGroupScan groupScan) { ProjectPrel project, ScanPrel scan, JsonTableGroupScan groupScan) {
try { try {


DrillRelOptUtil.ProjectPushInfo columnInfo = DrillRelOptUtil.ProjectPushInfo columnInfo =
DrillRelOptUtil.getFieldsInformation(scan.getRowType(), project.getProjects()); DrillRelOptUtil.getFieldsInformation(scan.getRowType(), project.getProjects());
if (columnInfo == null || Utilities.isStarQuery(columnInfo.getFields()) // if (columnInfo == null || Utilities.isStarQuery(columnInfo.getFields())
|| !groupScan.canPushdownProjects(columnInfo.getFields())) { || !groupScan.canPushdownProjects(columnInfo.getFields())) {
return; return;
} }
Expand Down
Expand Up @@ -17,8 +17,8 @@
*/ */
package org.apache.drill.exec.store.mapr.db; package org.apache.drill.exec.store.mapr.db;


import com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.BatchCreator;
Expand Down
Expand Up @@ -30,8 +30,8 @@
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import com.mapr.db.Table; import com.mapr.db.Table;
import com.mapr.db.impl.ConditionImpl; import com.mapr.db.impl.ConditionImpl;
import com.mapr.db.impl.IdCodec; import com.mapr.db.impl.IdCodec;
Expand Down
Expand Up @@ -236,7 +236,7 @@ protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns
idOnly = (scannedFields == null); idOnly = (scannedFields == null);
} }


if(projectWholeDocument) { if (projectWholeDocument) {
projector = new FieldProjector(projectedFieldsSet); projector = new FieldProjector(projectedFieldsSet);
} }


Expand Down
Expand Up @@ -24,8 +24,8 @@
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import org.apache.drill.shaded.guava.com.google.common.collect.Lists;


import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.GroupScan;
Expand Down Expand Up @@ -57,7 +57,7 @@ public RestrictedJsonTableGroupScan(@JsonProperty("userName") String userName,
@JsonProperty("format") MapRDBFormatPlugin formatPlugin, @JsonProperty("format") MapRDBFormatPlugin formatPlugin,
@JsonProperty("scanSpec") JsonScanSpec scanSpec, /* scan spec of the original table */ @JsonProperty("scanSpec") JsonScanSpec scanSpec, /* scan spec of the original table */
@JsonProperty("columns") List<SchemaPath> columns, @JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("")MapRDBStatistics statistics) { @JsonProperty("") MapRDBStatistics statistics) {
super(userName, storagePlugin, formatPlugin, scanSpec, columns, statistics); super(userName, storagePlugin, formatPlugin, scanSpec, columns, statistics);
} }


Expand Down

0 comments on commit 7571d52

Please sign in to comment.