Skip to content

Commit

Permalink
DRILL-4279: Improve query performance when no column is required from…
Browse files Browse the repository at this point in the history
… scan operator.
  • Loading branch information
jinfengni committed Jan 28, 2016
1 parent d70cf36 commit 665ee7a
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 20 deletions.
Expand Up @@ -97,6 +97,14 @@ public void testStarProject() throws Exception {
testHelper(query, 5, expectedColNames);
}

@Test
public void testHiveCountStar() throws Exception {
String query = "SELECT count(*) as cnt FROM hive.`default`.kv";
String expectedColNames = "\"columns\" : [ ]";

testHelper(query, 1, expectedColNames);
}

@Test
public void projectPushDownOnHiveParquetTable() throws Exception {
try {
Expand Down
Expand Up @@ -82,13 +82,8 @@ public DrillScanRel(final RelOptCluster cluster, final RelTraitSet traits,
super(DRILL_LOGICAL, cluster, traits, table);
this.settings = PrelUtil.getPlannerSettings(cluster.getPlanner());
this.rowType = rowType;
if (columns == null) { // planner asks to scan all of the columns
this.columns = ColumnList.all();
} else if (columns.size() == 0) { // planner asks to skip all of the columns
this.columns = ColumnList.none();
} else { // planner asks to scan some columns
this.columns = ColumnList.some(columns);
}
Preconditions.checkNotNull(columns);
this.columns = columns;
this.partitionFilterPushdown = partitionFilterPushdown;
try {
this.groupScan = drillTable.getGroupScan().clone(this.columns);
Expand Down
Expand Up @@ -23,16 +23,15 @@
import java.util.List;
import java.util.Set;

import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.PathSegment.ArraySegment;
import org.apache.drill.common.expression.PathSegment.NameSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;

import org.apache.calcite.plan.RelOptPlanner;
Expand Down
Expand Up @@ -18,11 +18,14 @@
package org.apache.drill.exec.store;

import java.util.Collection;
import java.util.List;
import java.util.Map;

import com.google.common.collect.ImmutableList;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.planner.logical.ColumnList;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.record.MaterializedField.Key;
import org.apache.drill.exec.vector.ValueVector;

Expand All @@ -31,10 +34,14 @@
import com.google.common.collect.Iterables;

public abstract class AbstractRecordReader implements RecordReader {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractRecordReader.class);

private static final String COL_NULL_ERROR = "Columns cannot be null. Use star column to select all fields.";
private static final String COL_EMPTY_ERROR = "Readers needs at least a column to read.";
public static final SchemaPath STAR_COLUMN = SchemaPath.getSimplePath("*");

// For text reader, the default columns to read is "columns[0]".
protected static final List<SchemaPath> DEFAULT_TEXT_COLS_TO_READ = ImmutableList.of(new SchemaPath(new PathSegment.NameSegment("columns", new PathSegment.ArraySegment(0))));

private Collection<SchemaPath> columns = null;
private boolean isStarQuery = false;
private boolean isSkipQuery = false;
Expand All @@ -47,14 +54,22 @@ public String toString() {
+ ", isSkipQuery = " + isSkipQuery + "]";
}

protected final void setColumns(Collection<SchemaPath> projected) {
assert Preconditions.checkNotNull(projected, COL_NULL_ERROR).size() > 0 : COL_EMPTY_ERROR;
if (projected instanceof ColumnList) {
final ColumnList columns = ColumnList.class.cast(projected);
isSkipQuery = columns.getMode() == ColumnList.Mode.SKIP_ALL;
protected final void setColumns(List<SchemaPath> projected) {
Preconditions.checkNotNull(projected, COL_NULL_ERROR);
isSkipQuery = projected.isEmpty();
List<SchemaPath> columnsToRead = projected;

// If no column is required (SkipQuery), by default it will use DEFAULT_COLS_TO_READ .
// Handling SkipQuery is storage-plugin specif : JSON, text reader, parquet will override, in order to
// improve query performance.
if (projected.isEmpty()) {
columnsToRead = getDefaultColumnsToRead();
}
isStarQuery = isStarQuery(projected);
columns = transformColumns(projected);

isStarQuery = isStarQuery(columnsToRead);
columns = transformColumns(columnsToRead);

logger.debug("columns to read : {}", columns);
}

protected Collection<SchemaPath> getColumns() {
Expand Down Expand Up @@ -92,4 +107,9 @@ public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryExceptio
v.allocateNew();
}
}

protected List<SchemaPath> getDefaultColumnsToRead() {
return GroupScan.ALL_COLUMNS;
}

}
Expand Up @@ -21,13 +21,15 @@
import java.io.InputStream;
import java.util.List;

import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
Expand Down Expand Up @@ -140,6 +142,10 @@ public void setup(final OperatorContext context, final OutputMutator output) thr
}
}

protected List<SchemaPath> getDefaultColumnsToRead() {
return ImmutableList.of();
}

private void setupParser() throws IOException {
if(hadoopPath != null){
jsonReader.setSource(stream);
Expand Down
Expand Up @@ -28,7 +28,6 @@

import javax.annotation.Nullable;

import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
Expand Down Expand Up @@ -90,6 +89,11 @@ public boolean apply(@Nullable SchemaPath path) {
return super.isStarQuery();
}

@Override
protected List<SchemaPath> getDefaultColumnsToRead() {
return DEFAULT_TEXT_COLS_TO_READ;
}

/**
* Performs the initial setup required for the record reader.
* Initializes the input stream, handling of the output record batch
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;

import com.google.common.collect.ImmutableList;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
Expand Down Expand Up @@ -69,6 +70,10 @@ public class ParquetRecordReader extends AbstractRecordReader {
private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb
private static final char DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH = 32*1024;

// When no column is required by the downstrea operator, ask SCAN to return a DEFAULT column. If such column does not exist,
// it will return as a nullable-int column. If that column happens to exist, return that column.
protected static final List<SchemaPath> DEFAULT_COLS_TO_READ = ImmutableList.of(SchemaPath.getSimplePath("_DEFAULT_COL_TO_READ_"));

// TODO - should probably find a smarter way to set this, currently 1 megabyte
public static final int PARQUET_PAGE_MAX_SIZE = 1024 * 1024 * 1;

Expand Down Expand Up @@ -508,4 +513,10 @@ public void close() {
parquetReaderStats=null;
}
}

@Override
protected List<SchemaPath> getDefaultColumnsToRead() {
return DEFAULT_COLS_TO_READ;
}

}
Expand Up @@ -106,6 +106,11 @@ public DrillTextRecordReader(FileSplit split, Configuration fsConf, FragmentCont
}
}

@Override
protected List<SchemaPath> getDefaultColumnsToRead() {
return DEFAULT_TEXT_COLS_TO_READ;
}

@Override
public boolean isStarQuery() {
return super.isStarQuery() || Iterables.tryFind(getColumns(), new Predicate<SchemaPath>() {
Expand Down
Expand Up @@ -208,6 +208,61 @@ public void testProjectPastJoinPastFilterPastJoinPushDown() throws Exception {

}

@Test
public void testEmptyColProjectInTextScan() throws Exception {
final String sql = "SELECT count(*) cnt from cp.`store/text/data/d1/regions.csv`";
final String expected = "\"columns\" : [ ]";
// Verify plan
testPushDown(new PushDownTestInstance(sql, new String[] {expected}));

// Verify execution result.
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("cnt")
.baselineValues((long) 5)
.build()
.run();
}

@Test
public void testEmptyColProjectInJsonScan() throws Exception {
final String sql = "SELECT count(*) cnt from cp.`employee.json`";
final String expected = "\"columns\" : [ ]";

testPushDown(new PushDownTestInstance(sql, new String[] {expected}));

// Verify execution result.
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("cnt")
.baselineValues((long) 1155)
.build()
.run();
}

@Test
public void testEmptyColProjectInParquetScan() throws Exception {
final String sql = "SELECT 1 + 1 as val from cp.`tpch/region.parquet`";
final String expected = "\"columns\" : [ ]";

testPushDown(new PushDownTestInstance(sql, new String[] {expected}));

// Verify execution result.
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("val")
.baselineValues(2)
.baselineValues(2)
.baselineValues(2)
.baselineValues(2)
.baselineValues(2)
.build()
.run();
}

@Test
public void testSimpleProjectPastJoinPastFilterPastJoinPushDown() throws Exception {
// String sql = "select * " +
Expand Down
Expand Up @@ -262,4 +262,13 @@ public void testLinkedList() throws Exception {
final String sql = "select * from dfs_test.`" + file + "`";
test(sql);
}

@Test
public void testCountStar() throws Exception {

final String file = AvroTestUtil.generateStringAndUtf8Data();
final String sql = "select count(*) from dfs_test.`" + file + "`";
test(sql);
}

}

0 comments on commit 665ee7a

Please sign in to comment.