From 8b5642353505d1001d7ec3590a07ad1144ecf7f3 Mon Sep 17 00:00:00 2001 From: Arina Ielchiieva Date: Thu, 20 Jul 2017 19:26:44 +0300 Subject: [PATCH] DRILL-4735: ConvertCountToDirectScan rule enhancements 1. ConvertCountToDirectScan rule will be applicable for 2 or more COUNT aggregates. To achieve this DynamicPojoRecordReader was added which accepts any number of columns, on the contrary with PojoRecordReader which depends on class fields. AbstractPojoRecordReader class was added to factor out common logic for these two readers. 2. ConvertCountToDirectScan will distinguish between missing, directory and implicit columns. For missing columns count will be set 0, for implicit to the total records count since implicit columns are based on files and there is no data without a file. If directory column will be encountered, rule won't be applied. CountsCollector class was introduced to encapsulate counts collection logic. 3. MetadataDirectGroupScan class was introduced to indicate to the user when metadata was used during calculation and for which files it was applied. DRILL-4735: Changes after code review. close #900 --- .../impl/project/ProjectRecordBatch.java | 4 +- .../physical/ConvertCountToDirectScan.java | 242 +++++++++----- .../drill/exec/planner/sql/DirectPlan.java | 16 +- .../planner/sql/handlers/ShowFileHandler.java | 2 +- ...olumnExplorer.java => ColumnExplorer.java} | 60 +++- .../exec/store/dfs/easy/EasyFormatPlugin.java | 7 +- .../exec/store/direct/DirectGroupScan.java | 7 +- .../store/direct/MetadataDirectGroupScan.java | 86 +++++ .../ischema/InfoSchemaRecordGenerator.java | 12 +- .../exec/store/parquet/ParquetGroupScan.java | 4 +- .../parquet/ParquetScanBatchCreator.java | 6 +- .../store/pojo/AbstractPojoRecordReader.java | 157 ++++++++++ ...actWriter.java => AbstractPojoWriter.java} | 24 +- .../store/pojo/DynamicPojoRecordReader.java | 71 +++++ .../exec/store/pojo/PojoRecordReader.java | 187 +++-------- .../drill/exec/store/pojo/PojoWriter.java | 38 ++- .../drill/exec/store/pojo/PojoWriters.java | 296 ++++++++++++++++++ .../apache/drill/exec/store/pojo/Writers.java | 274 ---------------- .../store/sys/SystemTableBatchCreator.java | 6 +- .../TestFunctionsWithTypeExpoQueries.java | 6 +- .../logical/TestConvertCountToDirectScan.java | 82 ++++- 21 files changed, 1026 insertions(+), 561 deletions(-) rename exec/java-exec/src/main/java/org/apache/drill/exec/store/{ImplicitColumnExplorer.java => ColumnExplorer.java} (79%) create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java rename exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/{AbstractWriter.java => AbstractPojoWriter.java} (75%) create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriters.java delete mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 676849a5293..6baf070d4ea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -58,7 +58,7 @@ import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.store.ImplicitColumnExplorer; +import org.apache.drill.exec.store.ColumnExplorer; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.ValueVector; @@ -500,7 +500,7 @@ protected boolean setupNewSchema() throws SchemaChangeException { } private boolean isImplicitFileColumn(ValueVector vvIn) { - return ImplicitColumnExplorer.initImplicitFileColumns(context.getOptions()).get(vvIn.getField().getName()) != null; + return ColumnExplorer.initImplicitFileColumns(context.getOptions()).get(vvIn.getField().getName()) != null; } private List getExpressionList() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java index 879d0f7ebd0..961816e2757 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,15 +18,21 @@ package org.apache.drill.exec.planner.physical; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.Set; +import com.google.common.collect.ImmutableMap; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelOptRuleOperand; import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rel.type.RelDataTypeFieldImpl; import org.apache.calcite.rel.type.RelRecordType; @@ -35,37 +41,41 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.planner.logical.DrillAggregateRel; import org.apache.drill.exec.planner.logical.DrillProjectRel; import org.apache.drill.exec.planner.logical.DrillScanRel; import org.apache.drill.exec.planner.logical.RelOptHelper; -import org.apache.drill.exec.store.direct.DirectGroupScan; -import org.apache.drill.exec.store.pojo.PojoRecordReader; +import org.apache.drill.exec.store.ColumnExplorer; -import com.google.common.collect.Lists; +import org.apache.drill.exec.store.direct.MetadataDirectGroupScan; +import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader; /** - * This rule will convert - * " select count(*) as mycount from table " - * or " select count( not-nullable-expr) as mycount from table " - * into - * + *

+ * This rule will convert " select count(*) as mycount from table " + * or " select count(not-nullable-expr) as mycount from table " into + *

  *    Project(mycount)
  *         \
  *    DirectGroupScan ( PojoRecordReader ( rowCount ))
- *
- * or
- *    " select count(column) as mycount from table "
- *    into
+ *
+ * or " select count(column) as mycount from table " into + *
  *      Project(mycount)
  *           \
  *            DirectGroupScan (PojoRecordReader (columnValueCount))
+ *
+ * Rule can be applied if query contains multiple count expressions. + * " select count(column1), count(column2), count(*) from table " + *

* + *

* Currently, only parquet group scan has the exact row count and column value count, * obtained from parquet row group info. This will save the cost to * scan the whole parquet files. + *

*/ - public class ConvertCountToDirectScan extends Prule { public static final RelOptRule AGG_ON_PROJ_ON_SCAN = new ConvertCountToDirectScan( @@ -77,6 +87,8 @@ public class ConvertCountToDirectScan extends Prule { RelOptHelper.some(DrillAggregateRel.class, RelOptHelper.any(DrillScanRel.class)), "Agg_on_scan"); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConvertCountToDirectScan.class); + /** Creates a SplunkPushDownRule. */ protected ConvertCountToDirectScan(RelOptRuleOperand rule, String id) { super(rule, "ConvertCountToDirectScan:" + id); @@ -85,40 +97,85 @@ protected ConvertCountToDirectScan(RelOptRuleOperand rule, String id) { @Override public void onMatch(RelOptRuleCall call) { final DrillAggregateRel agg = (DrillAggregateRel) call.rel(0); - final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length -1); - final DrillProjectRel proj = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null; + final DrillScanRel scan = (DrillScanRel) call.rel(call.rels.length - 1); + final DrillProjectRel project = call.rels.length == 3 ? (DrillProjectRel) call.rel(1) : null; final GroupScan oldGrpScan = scan.getGroupScan(); final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); - // Only apply the rule when : + // Only apply the rule when: // 1) scan knows the exact row count in getSize() call, // 2) No GroupBY key, - // 3) only one agg function (Check if it's count(*) below). - // 4) No distinct agg call. + // 3) No distinct agg call. if (!(oldGrpScan.getScanStats(settings).getGroupScanProperty().hasExactRowCount() && agg.getGroupCount() == 0 - && agg.getAggCallList().size() == 1 && !agg.containsDistinctCall())) { return; } - AggregateCall aggCall = agg.getAggCallList().get(0); + Map result = collectCounts(settings, agg, scan, project); + logger.trace("Calculated the following aggregate counts: ", result); + // if could not determine the counts, rule won't be applied + if (result.isEmpty()) { + return; + } + + final RelDataType scanRowType = constructDataType(agg, result.keySet()); + + final DynamicPojoRecordReader reader = new DynamicPojoRecordReader<>( + buildSchema(scanRowType.getFieldNames()), + Collections.singletonList((List) new ArrayList<>(result.values()))); - if (aggCall.getAggregation().getName().equals("COUNT") ) { + final ScanStats scanStats = new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, scanRowType.getFieldCount()); + final GroupScan directScan = new MetadataDirectGroupScan(reader, oldGrpScan.getFiles(), scanStats); + + final ScanPrel newScan = ScanPrel.create(scan, + scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), directScan, + scanRowType); + + final ProjectPrel newProject = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL) + .plus(DrillDistributionTrait.SINGLETON), newScan, prepareFieldExpressions(scanRowType), agg.getRowType()); + + call.transformTo(newProject); + } + + /** + * Collects counts for each aggregation call. + * Will return empty result map if was not able to determine count for at least one aggregation call, + * + * For each aggregate call will determine if count can be calculated. Collects counts only for COUNT function. + * For star, not null expressions and implicit columns sets count to total record number. + * For other cases obtains counts from group scan operator. Also count can not be calculated for parition columns. + * + * @param agg aggregate relational expression + * @param scan scan relational expression + * @param project project relational expression + * @return result map where key is count column name, value is count value + */ + private Map collectCounts(PlannerSettings settings, DrillAggregateRel agg, DrillScanRel scan, DrillProjectRel project) { + final Set implicitColumnsNames = ColumnExplorer.initImplicitFileColumns(settings.getOptions()).keySet(); + final GroupScan oldGrpScan = scan.getGroupScan(); + final long totalRecordCount = oldGrpScan.getScanStats(settings).getRecordCount(); + final LinkedHashMap result = new LinkedHashMap<>(); + + for (int i = 0; i < agg.getAggCallList().size(); i++) { + AggregateCall aggCall = agg.getAggCallList().get(i); + //for (AggregateCall aggCall : agg.getAggCallList()) { + long cnt; + + // rule can be applied only for count function, return empty counts + if (!"count".equalsIgnoreCase(aggCall.getAggregation().getName()) ) { + return ImmutableMap.of(); + } + + if (containsStarOrNotNullInput(aggCall, agg)) { + cnt = totalRecordCount; - long cnt = 0; - // count(*) == > empty arg ==> rowCount - // count(Not-null-input) ==> rowCount - if (aggCall.getArgList().isEmpty() || - (aggCall.getArgList().size() == 1 && - ! agg.getInput().getRowType().getFieldList().get(aggCall.getArgList().get(0).intValue()).getType().isNullable())) { - cnt = (long) oldGrpScan.getScanStats(settings).getRecordCount(); } else if (aggCall.getArgList().size() == 1) { - // count(columnName) ==> Agg ( Scan )) ==> columnValueCount + // count(columnName) ==> Agg ( Scan )) ==> columnValueCount int index = aggCall.getArgList().get(0); - if (proj != null) { + if (project != null) { // project in the middle of Agg and Scan : Only when input of AggCall is a RexInputRef in Project, we find the index of Scan's field. // For instance, // Agg - count($0) @@ -127,67 +184,108 @@ public void onMatch(RelOptRuleCall call) { // \ // Scan (col1, col2). // return count of "col2" in Scan's metadata, if found. - - if (proj.getProjects().get(index) instanceof RexInputRef) { - index = ((RexInputRef) proj.getProjects().get(index)).getIndex(); - } else { - return; // do not apply for all other cases. + if (!(project.getProjects().get(index) instanceof RexInputRef)) { + return ImmutableMap.of(); // do not apply for all other cases. } + + index = ((RexInputRef) project.getProjects().get(index)).getIndex(); } String columnName = scan.getRowType().getFieldNames().get(index).toLowerCase(); - cnt = oldGrpScan.getColumnValueCount(SchemaPath.getSimplePath(columnName)); - if (cnt == GroupScan.NO_COLUMN_STATS) { - // if column stats are not available don't apply this rule - return; + // for implicit column count will the same as total record count + if (implicitColumnsNames.contains(columnName)) { + cnt = totalRecordCount; + } else { + SchemaPath simplePath = SchemaPath.getSimplePath(columnName); + + if (ColumnExplorer.isPartitionColumn(settings.getOptions(), simplePath)) { + return ImmutableMap.of(); + } + + cnt = oldGrpScan.getColumnValueCount(simplePath); + if (cnt == GroupScan.NO_COLUMN_STATS) { + // if column stats is not available don't apply this rule, return empty counts + return ImmutableMap.of(); + } } } else { - return; // do nothing. + return ImmutableMap.of(); } - RelDataType scanRowType = getCountDirectScanRowType(agg.getCluster().getTypeFactory()); - - final ScanPrel newScan = ScanPrel.create(scan, - scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), getCountDirectScan(cnt), - scanRowType); - - List exprs = Lists.newArrayList(); - exprs.add(RexInputRef.of(0, scanRowType)); - - final ProjectPrel newProj = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL) - .plus(DrillDistributionTrait.SINGLETON), newScan, exprs, agg.getRowType()); - - call.transformTo(newProj); + String name = "count" + i + "$" + (aggCall.getName() == null ? aggCall.toString() : aggCall.getName()); + result.put(name, cnt); } + return ImmutableMap.copyOf(result); } /** - * Class to represent the count aggregate result. + * Checks if aggregate call contains star or non-null expression: + *
+   * count(*)  == >  empty arg  ==>  rowCount
+   * count(Not-null-input) ==> rowCount
+   * 
+ * + * @param aggregateCall aggregate call + * @param aggregate aggregate relation expression + * @return true of aggregate call contains star or non-null expression */ - public static class CountQueryResult { - public long count; - - public CountQueryResult(long cnt) { - this.count = cnt; - } + private boolean containsStarOrNotNullInput(AggregateCall aggregateCall, DrillAggregateRel aggregate) { + return aggregateCall.getArgList().isEmpty() || + (aggregateCall.getArgList().size() == 1 && + !aggregate.getInput().getRowType().getFieldList().get(aggregateCall.getArgList().get(0)).getType().isNullable()); } - private RelDataType getCountDirectScanRowType(RelDataTypeFactory typeFactory) { - List fields = Lists.newArrayList(); - fields.add(new RelDataTypeFieldImpl("count", 0, typeFactory.createSqlType(SqlTypeName.BIGINT))); - + /** + * For each aggregate call creates field based on its name with bigint type. + * Constructs record type for created fields. + * + * @param aggregateRel aggregate relation expression + * @param fieldNames field names + * @return record type + */ + private RelDataType constructDataType(DrillAggregateRel aggregateRel, Collection fieldNames) { + List fields = new ArrayList<>(); + Iterator filedNamesIterator = fieldNames.iterator(); + int fieldIndex = 0; + while (filedNamesIterator.hasNext()) { + RelDataTypeField field = new RelDataTypeFieldImpl( + filedNamesIterator.next(), + fieldIndex++, + aggregateRel.getCluster().getTypeFactory().createSqlType(SqlTypeName.BIGINT)); + fields.add(field); + } return new RelRecordType(fields); } - private GroupScan getCountDirectScan(long cnt) { - CountQueryResult res = new CountQueryResult(cnt); - - PojoRecordReader reader = new PojoRecordReader(CountQueryResult.class, - Collections.singleton(res).iterator()); + /** + * Builds schema based on given field names. + * Type for each schema is set to long.class. + * + * @param fieldNames field names + * @return schema + */ + private LinkedHashMap> buildSchema(List fieldNames) { + LinkedHashMap> schema = new LinkedHashMap<>(); + for (String fieldName: fieldNames) { + schema.put(fieldName, long.class); + } + return schema; + } - return new DirectGroupScan(reader); + /** + * For each field creates row expression. + * + * @param rowType row type + * @return list of row expressions + */ + private List prepareFieldExpressions(RelDataType rowType) { + List expressions = new ArrayList<>(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + expressions.add(RexInputRef.of(i, rowType)); + } + return expressions; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DirectPlan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DirectPlan.java index d40e0d7e5f2..3e1d6c79832 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DirectPlan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DirectPlan.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,9 +17,6 @@ */ package org.apache.drill.exec.planner.sql; -import java.util.Collections; -import java.util.Iterator; - import org.apache.drill.common.logical.PlanProperties; import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode; import org.apache.drill.common.logical.PlanProperties.PlanPropertiesBuilder; @@ -33,6 +30,9 @@ import org.apache.drill.exec.store.direct.DirectGroupScan; import org.apache.drill.exec.store.pojo.PojoRecordReader; +import java.util.Collections; +import java.util.List; + public class DirectPlan { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirectPlan.class); @@ -43,12 +43,12 @@ public static PhysicalPlan createDirectPlan(QueryContext context, boolean result @SuppressWarnings("unchecked") public static PhysicalPlan createDirectPlan(QueryContext context, T obj){ - Iterator iter = (Iterator) Collections.singleton(obj).iterator(); - return createDirectPlan(context.getCurrentEndpoint(), iter, (Class) obj.getClass()); + return createDirectPlan(context.getCurrentEndpoint(), Collections.singletonList(obj), (Class) obj.getClass()); } - public static PhysicalPlan createDirectPlan(DrillbitEndpoint endpoint, Iterator iterator, Class clazz){ - PojoRecordReader reader = new PojoRecordReader(clazz, iterator); + + public static PhysicalPlan createDirectPlan(DrillbitEndpoint endpoint, List records, Class clazz){ + PojoRecordReader reader = new PojoRecordReader<>(clazz, records); DirectGroupScan scan = new DirectGroupScan(reader); Screen screen = new Screen(scan, endpoint); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java index 5e6af7ce6ee..307b01dd52c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java @@ -102,6 +102,6 @@ public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConv fileStatus.getAccessTime(), fileStatus.getModificationTime()); rows.add(result); } - return DirectPlan.createDirectPlan(context.getCurrentEndpoint(), rows.iterator(), ShowFilesCommandResult.class); + return DirectPlan.createDirectPlan(context.getCurrentEndpoint(), rows, ShowFilesCommandResult.class); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ImplicitColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java similarity index 79% rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/ImplicitColumnExplorer.java rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java index 42ff82728be..ccd622b93e2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ImplicitColumnExplorer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -35,7 +35,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; -public class ImplicitColumnExplorer { +public class ColumnExplorer { private final String partitionDesignator; private final List columns; @@ -51,7 +51,7 @@ public class ImplicitColumnExplorer { * between actual table columns, partition columns and implicit file columns. * Also populates map with implicit columns names as keys and their values */ - public ImplicitColumnExplorer(FragmentContext context, List columns) { + public ColumnExplorer(FragmentContext context, List columns) { this(context.getOptions(), columns); } @@ -60,7 +60,7 @@ public ImplicitColumnExplorer(FragmentContext context, List columns) * between actual table columns, partition columns and implicit file columns. * Also populates map with implicit columns names as keys and their values */ - public ImplicitColumnExplorer(OptionManager optionManager, List columns) { + public ColumnExplorer(OptionManager optionManager, List columns) { this.partitionDesignator = optionManager.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val; this.columns = columns; this.isStarQuery = columns != null && AbstractRecordReader.isStarQuery(columns); @@ -86,6 +86,32 @@ public static Map initImplicitFileColumns(OptionMan return map; } + /** + * Checks if given column is partition or not. + * + * @param optionManager options + * @param column column + * @return true if given column is partition, false otherwise + */ + public static boolean isPartitionColumn(OptionManager optionManager, SchemaPath column){ + String partitionDesignator = optionManager.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val; + String path = column.getAsUnescapedPath(); + return isPartitionColumn(partitionDesignator, path); + } + + /** + * Checks if given column is partition or not. + * + * @param partitionDesignator partition designator + * @param path column path + * @return true if given column is partition, false otherwise + */ + public static boolean isPartitionColumn(String partitionDesignator, String path){ + Pattern pattern = Pattern.compile(String.format("%s[0-9]+", partitionDesignator)); + Matcher matcher = pattern.matcher(path); + return matcher.matches(); + } + /** * Compares selection root and actual file path to determine partition columns values. * Adds implicit file columns according to columns list. @@ -132,6 +158,24 @@ public List getTableColumns() { return tableColumns; } + /** + * Checks if current column selection contains partition columns. + * + * @return true if partition columns are present, false otherwise + */ + public boolean containsPartitionColumns() { + return !selectedPartitionColumns.isEmpty(); + } + + /** + * Checks if current column selection contains implicit columns. + * + * @return true if implicit columns are present, false otherwise + */ + public boolean containsImplicitColumns() { + return !selectedImplicitColumns.isEmpty(); + } + /** * If it is not star query, sorts out columns into three categories: * 1. table columns @@ -142,11 +186,9 @@ private void init() { if (isStarQuery) { selectedImplicitColumns.putAll(allImplicitColumns); } else { - Pattern pattern = Pattern.compile(String.format("%s[0-9]+", partitionDesignator)); for (SchemaPath column : columns) { String path = column.getAsUnescapedPath(); - Matcher m = pattern.matcher(path); - if (m.matches()) { + if (isPartitionColumn(partitionDesignator, path)) { selectedPartitionColumns.add(Integer.parseInt(path.substring(partitionDesignator.length()))); } else if (allImplicitColumns.get(path) != null) { selectedImplicitColumns.put(path, allImplicitColumns.get(path)); @@ -169,7 +211,7 @@ public enum ImplicitFileColumns { FQN (ExecConstants.IMPLICIT_FQN_COLUMN_LABEL) { @Override public String getValue(Path path) { - return path.toString(); + return path.toUri().getPath(); } }, @@ -179,7 +221,7 @@ public String getValue(Path path) { FILEPATH (ExecConstants.IMPLICIT_FILEPATH_COLUMN_LABEL) { @Override public String getValue(Path path) { - return path.getParent().toString(); + return path.getParent().toUri().getPath(); } }, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java index 776d806bea9..1f7bce937b0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -41,7 +41,7 @@ import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.store.ImplicitColumnExplorer; +import org.apache.drill.exec.store.ColumnExplorer; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.RecordWriter; import org.apache.drill.exec.store.StoragePluginOptimizerRule; @@ -52,7 +52,6 @@ import org.apache.drill.exec.store.dfs.FormatPlugin; import org.apache.drill.exec.store.schedule.CompleteFileWork; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @@ -128,7 +127,7 @@ public abstract RecordReader getRecordReader(FragmentContext context, DrillFileS @SuppressWarnings("resource") CloseableRecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException { - final ImplicitColumnExplorer columnExplorer = new ImplicitColumnExplorer(context, scan.getColumns()); + final ColumnExplorer columnExplorer = new ColumnExplorer(context, scan.getColumns()); if (!columnExplorer.isStarQuery()) { scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(), diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java index a4b2fadf38f..67b2e5ce68f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -33,10 +33,9 @@ @JsonTypeName("direct-scan") public class DirectGroupScan extends AbstractGroupScan { -// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirectGroupScan.class); - private final RecordReader reader; - private final ScanStats stats; + protected final RecordReader reader; + protected final ScanStats stats; public DirectGroupScan(RecordReader reader) { this(reader, ScanStats.TRIVIAL_TABLE); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java new file mode 100644 index 00000000000..505d68e78be --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.direct; + +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.store.RecordReader; + +import java.util.Collection; +import java.util.List; + +/** + * Represents direct scan based on metadata information. + * For example, for parquet files it can be obtained from parquet footer (total row count) + * or from parquet metadata files (column counts). + * Contains reader, statistics and list of scanned files if present. + */ +@JsonTypeName("metadata-direct-scan") +public class MetadataDirectGroupScan extends DirectGroupScan { + + private final Collection files; + + public MetadataDirectGroupScan(RecordReader reader, Collection files) { + super(reader); + this.files = files; + } + + public MetadataDirectGroupScan(RecordReader reader, Collection files, ScanStats stats) { + super(reader, stats); + this.files = files; + } + + @Override + public PhysicalOperator getNewWithChildren(List children) throws ExecutionSetupException { + assert children == null || children.isEmpty(); + return new MetadataDirectGroupScan(reader, files, stats); + } + + @Override + public GroupScan clone(List columns) { + return this; + } + + /** + *

+ * Returns string representation of group scan data. + * Includes list of files if present. + *

+ * + *

+ * Example: [files = [/tmp/0_0_0.parquet], numFiles = 1] + *

+ * + * @return string representation of group scan data + */ + @Override + public String getDigest() { + if (files != null) { + StringBuilder builder = new StringBuilder(); + builder.append("files = ").append(files).append(", "); + builder.append("numFiles = ").append(files.size()).append(", "); + return builder.append(super.getDigest()).toString(); + } + return super.getDigest(); + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java index aee3dc17e41..e96ec68ace9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -250,7 +250,7 @@ public Catalogs(OptionManager optionManager) { @Override public PojoRecordReader getRecordReader() { - return new PojoRecordReader<>(Records.Catalog.class, records.iterator()); + return new PojoRecordReader<>(Records.Catalog.class, records); } @Override @@ -269,7 +269,7 @@ public Schemata(OptionManager optionManager) { @Override public PojoRecordReader getRecordReader() { - return new PojoRecordReader<>(Records.Schema.class, records.iterator()); + return new PojoRecordReader<>(Records.Schema.class, records); } @Override @@ -290,7 +290,7 @@ public Tables(OptionManager optionManager) { @Override public PojoRecordReader getRecordReader() { - return new PojoRecordReader<>(Records.Table.class, records.iterator()); + return new PojoRecordReader<>(Records.Table.class, records); } @Override @@ -341,7 +341,7 @@ public Views(OptionManager optionManager) { @Override public PojoRecordReader getRecordReader() { - return new PojoRecordReader<>(Records.View.class, records.iterator()); + return new PojoRecordReader<>(Records.View.class, records); } @Override @@ -362,7 +362,7 @@ public Columns(OptionManager optionManager) { @Override public PojoRecordReader getRecordReader() { - return new PojoRecordReader<>(Records.Column.class, records.iterator()); + return new PojoRecordReader<>(Records.Column.class, records); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index 30f607d674a..c333a3ee943 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -54,7 +54,7 @@ import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.server.options.OptionManager; -import org.apache.drill.exec.store.ImplicitColumnExplorer; +import org.apache.drill.exec.store.ColumnExplorer; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.FileSelection; @@ -1063,7 +1063,7 @@ public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtili ParquetFilterPredicate filterPredicate = null; for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) { - final ImplicitColumnExplorer columnExplorer = new ImplicitColumnExplorer(optionManager, this.columns); + final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, this.columns); Map implicitColValues = columnExplorer.populateImplicitColumns(file.getPath(), selectionRoot); for (RowGroupMetadata rowGroup : file.getRowGroups()) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index 5e22458009d..490a5a086f2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -32,7 +32,7 @@ import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.store.ImplicitColumnExplorer; +import org.apache.drill.exec.store.ColumnExplorer; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader; @@ -63,7 +63,7 @@ public ScanBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupS Preconditions.checkArgument(children.isEmpty()); OperatorContext oContext = context.newOperatorContext(rowGroupScan); - final ImplicitColumnExplorer columnExplorer = new ImplicitColumnExplorer(context, rowGroupScan.getColumns()); + final ColumnExplorer columnExplorer = new ColumnExplorer(context, rowGroupScan.getColumns()); if (!columnExplorer.isStarQuery()) { rowGroupScan = new ParquetRowGroupScan(rowGroupScan.getUserName(), rowGroupScan.getStorageEngine(), diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java new file mode 100644 index 00000000000..0c1144a71ab --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pojo; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.testing.ControlsInjector; +import org.apache.drill.exec.testing.ControlsInjectorFactory; +import org.apache.drill.exec.vector.AllocationHelper; +import org.apache.drill.exec.vector.ValueVector; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Parent class for all pojo readers. Pojo readers can be based on java class (field list is predefined) or dynamic. + * Contains general logic for initiating writers and reading values from each row fields. + */ +public abstract class AbstractPojoRecordReader extends AbstractRecordReader implements Iterable { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractPojoRecordReader.class); + private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(AbstractPojoRecordReader.class); + + protected final List records; + protected List writers; + + private Iterator currentIterator; + private OperatorContext operatorContext; + + protected AbstractPojoRecordReader(List records) { + this.records = records; + } + + @Override + public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { + operatorContext = context; + writers = setupWriters(output); + currentIterator = records.iterator(); + } + + @Override + public int next() { + boolean allocated = false; + injector.injectPause(operatorContext.getExecutionControls(), "read-next", logger); + + int recordCount = 0; + while (currentIterator.hasNext()) { + if (!allocated) { + allocate(); + allocated = true; + } + + T row = currentIterator.next(); + for (int i = 0; i < writers.size(); i++) { + PojoWriter writer = writers.get(i); + writer.writeField(getFieldValue(row, i), recordCount); + } + recordCount++; + } + + if (recordCount != 0) { + setValueCount(recordCount); + } + return recordCount; + } + + @Override + public void allocate(Map vectorMap) throws OutOfMemoryException { + for (final ValueVector v : vectorMap.values()) { + AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10); + } + } + + @Override + public void close() { + } + + @Override + public Iterator iterator() { + return records.iterator(); + } + + /** + * Creates writer based input class type and then initiates it. + * + * @param type class type + * @param fieldName field name + * @param output output mutator + * @return pojo writer + */ + protected PojoWriter initWriter(Class type, String fieldName, OutputMutator output) throws ExecutionSetupException { + PojoWriter writer = PojoWriters.getWriter(type, fieldName, output.getManagedBuffer()); + try { + writer.init(output); + return writer; + } catch (SchemaChangeException e) { + throw new ExecutionSetupException("Failure while setting up schema for AbstractPojoRecordReader.", e); + } + } + + /** + * Allocates buffers for each writer. + */ + private void allocate() { + for (PojoWriter writer : writers) { + writer.allocate(); + } + } + + /** + * Sets number of written records for each writer. + * + * @param recordCount number of records written + */ + private void setValueCount(int recordCount) { + for (PojoWriter writer : writers) { + writer.setValueCount(recordCount); + } + } + + /** + * Setups writers for each field in the row. + * + * @param output output mutator + * @return list of pojo writers + */ + protected abstract List setupWriters(OutputMutator output) throws ExecutionSetupException; + + /** + * Retrieves field value to be written based for given row and field position. + * + * @param row current row + * @param fieldPosition current field position + * @return field value to be written for given row + */ + protected abstract Object getFieldValue(T row, int fieldPosition); + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoWriter.java similarity index 75% rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoWriter.java index c41a07aa7af..a2a4644ba65 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoWriter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,8 +17,6 @@ */ package org.apache.drill.exec.store.pojo; -import java.lang.reflect.Field; - import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; @@ -26,20 +24,24 @@ import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.vector.ValueVector; -abstract class AbstractWriter implements PojoWriter{ +/** + * Parent class for all pojo writers created for each field. + * Contains common logic for initializing value vector, stores field name and its type. + */ +public abstract class AbstractPojoWriter implements PojoWriter { - protected final Field field; protected V vector; - protected final MajorType type; + private final String fieldName; + private final MajorType type; - public AbstractWriter(Field field, MajorType type){ - this.field = field; + public AbstractPojoWriter(String fieldName, MajorType type) { + this.fieldName = fieldName; this.type = type; } @Override public void init(OutputMutator output) throws SchemaChangeException { - MaterializedField mf = MaterializedField.create(field.getName(), type); + MaterializedField mf = MaterializedField.create(fieldName, type); @SuppressWarnings("unchecked") Class valueVectorClass = (Class) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()); this.vector = output.addField(mf, valueVectorClass); @@ -50,7 +52,8 @@ public void allocate() { vector.allocateNew(); } - public void setValueCount(int valueCount){ + @Override + public void setValueCount(int valueCount) { vector.getMutator().setValueCount(valueCount); } @@ -58,5 +61,4 @@ public void setValueCount(int valueCount){ public void cleanup() { } - } \ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java new file mode 100644 index 00000000000..82383f084ac --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pojo; + +import com.google.common.base.Preconditions; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.physical.impl.OutputMutator; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Dynamically reads values from the given list of records. + * Creates writers based on given schema. + * + * @param type of given values, if contains various types, use Object class + */ +public class DynamicPojoRecordReader extends AbstractPojoRecordReader> { + + private final LinkedHashMap> schema; + + public DynamicPojoRecordReader(LinkedHashMap> schema, List> records) { + super(records); + Preconditions.checkState(schema != null && !schema.isEmpty(), "Undefined schema is not allowed."); + this.schema = schema; + } + + /** + * Initiates writers based on given schema which contains field name and its type. + * + * @param output output mutator + * @return list of pojo writers + */ + @Override + protected List setupWriters(OutputMutator output) throws ExecutionSetupException { + List writers = new ArrayList<>(); + for (Map.Entry> field : schema.entrySet()) { + writers.add(initWriter(field.getValue(), field.getKey(), output)); + } + return writers; + } + + @Override + protected Object getFieldValue(List row, int fieldPosition) { + return row.get(fieldPosition); + } + + @Override + public String toString() { + return "DynamicPojoRecordReader{" + + "records = " + records + + "}"; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java index baf07a46c7b..c3b6883a843 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,173 +17,66 @@ */ package org.apache.drill.exec.store.pojo; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.sql.Timestamp; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.exception.OutOfMemoryException; -import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; -import org.apache.drill.exec.store.AbstractRecordReader; -import org.apache.drill.exec.store.pojo.Writers.BitWriter; -import org.apache.drill.exec.store.pojo.Writers.DoubleWriter; -import org.apache.drill.exec.store.pojo.Writers.EnumWriter; -import org.apache.drill.exec.store.pojo.Writers.IntWriter; -import org.apache.drill.exec.store.pojo.Writers.LongWriter; -import org.apache.drill.exec.store.pojo.Writers.NBigIntWriter; -import org.apache.drill.exec.store.pojo.Writers.NBooleanWriter; -import org.apache.drill.exec.store.pojo.Writers.NDoubleWriter; -import org.apache.drill.exec.store.pojo.Writers.NIntWriter; -import org.apache.drill.exec.store.pojo.Writers.NTimeStampWriter; -import org.apache.drill.exec.store.pojo.Writers.StringWriter; -import org.apache.drill.exec.testing.ControlsInjector; -import org.apache.drill.exec.testing.ControlsInjectorFactory; -import org.apache.drill.exec.vector.AllocationHelper; -import org.apache.drill.exec.vector.ValueVector; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.List; -public class PojoRecordReader extends AbstractRecordReader implements Iterable { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoRecordReader.class); - private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(PojoRecordReader.class); +/** + * Reads values from the given list of pojo instances. + * Fields writers are determined based on pojo field class types. + * + * @param pojo class type + */ +public class PojoRecordReader extends AbstractPojoRecordReader { private final Class pojoClass; - private final List pojoObjects; - private PojoWriter[] writers; - private boolean doCurrent; - private T currentPojo; - private OperatorContext operatorContext; + private final List fields; - private Iterator currentIterator; - - /** - * TODO: Cleanup the callers to pass the List of POJO objects directly rather than iterator. - * @param pojoClass - * @param iterator - */ - public PojoRecordReader(Class pojoClass, Iterator iterator) { + public PojoRecordReader(Class pojoClass, List records) { + super(records); this.pojoClass = pojoClass; - this.pojoObjects = ImmutableList.copyOf(iterator); + this.fields = new ArrayList<>(); } + /** + * Creates writers based on pojo field class types. Ignores static fields. + * + * @param output output mutator + * @return list of pojo writers + */ @Override - public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { - operatorContext = context; - try { - Field[] fields = pojoClass.getDeclaredFields(); - List writers = Lists.newArrayList(); - - for (int i = 0; i < fields.length; i++) { - Field f = fields[i]; - - if (Modifier.isStatic(f.getModifiers())) { - continue; - } - - Class type = f.getType(); - PojoWriter w = null; - if(type == int.class) { - w = new IntWriter(f); - } else if(type == Integer.class) { - w = new NIntWriter(f); - } else if(type == Long.class) { - w = new NBigIntWriter(f); - } else if(type == Boolean.class) { - w = new NBooleanWriter(f); - } else if(type == double.class) { - w = new DoubleWriter(f); - } else if(type == Double.class) { - w = new NDoubleWriter(f); - } else if(type.isEnum()) { - w = new EnumWriter(f, output.getManagedBuffer()); - } else if(type == boolean.class) { - w = new BitWriter(f); - } else if(type == long.class) { - w = new LongWriter(f); - } else if(type == String.class) { - w = new StringWriter(f, output.getManagedBuffer()); - } else if (type == Timestamp.class) { - w = new NTimeStampWriter(f); - } else { - throw new ExecutionSetupException(String.format("PojoRecord reader doesn't yet support conversions from type [%s].", type)); - } - writers.add(w); - w.init(output); + protected List setupWriters(OutputMutator output) throws ExecutionSetupException { + List writers = new ArrayList<>(); + Field[] declaredFields = pojoClass.getDeclaredFields(); + for (Field field : declaredFields) { + if (Modifier.isStatic(field.getModifiers())) { + continue; } - - this.writers = writers.toArray(new PojoWriter[writers.size()]); - - } catch(SchemaChangeException e) { - throw new ExecutionSetupException("Failure while setting up schema for PojoRecordReader.", e); - } - - currentIterator = pojoObjects.iterator(); - } - - @Override - public void allocate(Map vectorMap) throws OutOfMemoryException { - for (final ValueVector v : vectorMap.values()) { - AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10); - } - } - - private void allocate() { - for (PojoWriter writer : writers) { - writer.allocate(); - } - } - - private void setValueCount(int i) { - for (PojoWriter writer : writers) { - writer.setValueCount(i); + writers.add(initWriter(field.getType(), field.getName(), output)); + fields.add(field); } + return writers; } @Override - public int next() { - boolean allocated = false; - injector.injectPause(operatorContext.getExecutionControls(), "read-next", logger); + protected Object getFieldValue(T row, int fieldPosition) { try { - int i =0; - while (doCurrent || currentIterator.hasNext()) { - if (doCurrent) { - doCurrent = false; - } else { - currentPojo = currentIterator.next(); - } - - if (!allocated) { - allocate(); - allocated = true; - } - - for (PojoWriter writer : writers) { - writer.writeField(currentPojo, i); - } - i++; - } - - if (i != 0 ) { - setValueCount(i); - } - return i; + return fields.get(fieldPosition).get(row); } catch (IllegalArgumentException | IllegalAccessException e) { - throw new RuntimeException("Failure while trying to use PojoRecordReader.", e); + throw new DrillRuntimeException("Failure while trying to use PojoRecordReader.", e); } } @Override - public Iterator iterator() { - return pojoObjects.iterator(); - } - - @Override - public void close() { + public String toString() { + return "PojoRecordReader{" + + "pojoClass = " + pojoClass + + ", recordCount = " + records.size() + + "}"; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriter.java index 31748f485e1..335bfb14b60 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,10 +20,40 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.physical.impl.OutputMutator; -interface PojoWriter{ - void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException ; +/** + * Pojo writer interface for writers based on types supported for pojo. + */ +public interface PojoWriter { + + /** + * Writes given value to the given position of the bit to set. + * + * @param value values to be written + * @param outboundIndex position of the bit + */ + void writeField(Object value, int outboundIndex); + + /** + * Initializes value vector. + * + * @param output output mutator + */ void init(OutputMutator output) throws SchemaChangeException; + + /** + * Allocates new buffer for value vector. + */ void allocate(); - void setValueCount(int i); + + /** + * Sets number of written records. + * + * @param recordCount record count + */ + void setValueCount(int recordCount); + + /** + * Performs clean up if needed. + */ void cleanup(); } \ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriters.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriters.java new file mode 100644 index 00000000000..090f32f6f87 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriters.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pojo; + +import io.netty.buffer.DrillBuf; + +import java.sql.Timestamp; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.expr.holders.NullableVarCharHolder; +import org.apache.drill.exec.vector.BigIntVector; +import org.apache.drill.exec.vector.BitVector; +import org.apache.drill.exec.vector.Float8Vector; +import org.apache.drill.exec.vector.IntVector; +import org.apache.drill.exec.vector.NullableBigIntVector; +import org.apache.drill.exec.vector.NullableBitVector; +import org.apache.drill.exec.vector.NullableFloat8Vector; +import org.apache.drill.exec.vector.NullableIntVector; +import org.apache.drill.exec.vector.NullableTimeStampVector; +import org.apache.drill.exec.vector.NullableVarCharVector; + +import com.google.common.base.Charsets; + +public class PojoWriters { + + /** + * Creates matching writer to the given field type. + * + * @param type field type + * @param fieldName field name + * @param buffer drill buffer + * @return pojo writer + * @throws ExecutionSetupException in case if writer was not found for the given type + */ + public static PojoWriter getWriter(Class type, String fieldName, DrillBuf buffer) throws ExecutionSetupException { + + if (type == Integer.class) { + return new NIntWriter(fieldName); + } else if (type == Long.class) { + return new NBigIntWriter(fieldName); + } else if (type == Boolean.class) { + return new NBooleanWriter(fieldName); + } else if (type == Double.class) { + return new NDoubleWriter(fieldName); + } else if (type.isEnum()) { + return new EnumWriter(fieldName, buffer); + } else if (type == String.class) { + return new StringWriter(fieldName, buffer); + } else if (type == Timestamp.class) { + return new NTimeStampWriter(fieldName); + // primitives + } else if (type == int.class) { + return new IntWriter(fieldName); + } else if (type == double.class) { + return new DoubleWriter(fieldName); + } else if (type == boolean.class) { + return new BitWriter(fieldName); + } else if (type == long.class) { + return new LongWriter(fieldName); + } + + throw new ExecutionSetupException(String.format("PojoRecordReader doesn't yet support conversions from the type [%s].", type)); + } + + /** + * Pojo writer for int. Does not expect to write null value. + */ + public static class IntWriter extends AbstractPojoWriter { + + public IntWriter(String fieldName) { + super(fieldName, Types.required(MinorType.INT)); + } + + @Override + public void writeField(Object value, int outboundIndex) { + vector.getMutator().setSafe(outboundIndex, (int) value); + } + } + + /** + * Pojo writer for boolean. Does not expect to write null value. + */ + public static class BitWriter extends AbstractPojoWriter { + + public BitWriter(String fieldName) { + super(fieldName, Types.required(MinorType.BIT)); + } + + @Override + public void writeField(Object value, int outboundIndex) { + vector.getMutator().setSafe(outboundIndex, (boolean) value ? 1 : 0); + } + + } + + /** + * Pojo writer for long. Does not expect to write null value. + */ + public static class LongWriter extends AbstractPojoWriter { + + public LongWriter(String fieldName) { + super(fieldName, Types.required(MinorType.BIGINT)); + } + + @Override + public void writeField(Object value, int outboundIndex) { + vector.getMutator().setSafe(outboundIndex, (long) value); + } + + } + + /** + * Pojo writer for double. Does not expect to write null value. + */ + public static class DoubleWriter extends AbstractPojoWriter { + + public DoubleWriter(String fieldName) { + super(fieldName, Types.required(MinorType.FLOAT8)); + } + + @Override + public void writeField(Object value, int outboundIndex) { + vector.getMutator().setSafe(outboundIndex, (double) value); + } + + } + + /** + * Parent class for String and Enum writers. Writes data using nullable varchar holder. + */ + private abstract static class AbstractStringWriter extends AbstractPojoWriter { + private DrillBuf data; + private final NullableVarCharHolder holder = new NullableVarCharHolder(); + + public AbstractStringWriter(String fieldName, DrillBuf managedBuf) { + super(fieldName, Types.optional(MinorType.VARCHAR)); + this.data = managedBuf; + ensureLength(100); + } + + void ensureLength(int len) { + data = data.reallocIfNeeded(len); + } + + public void writeString(String s, int outboundIndex) { + holder.isSet = 1; + byte[] bytes = s.getBytes(Charsets.UTF_8); + ensureLength(bytes.length); + data.clear(); + data.writeBytes(bytes); + holder.buffer = data; + holder.start = 0; + holder.end = bytes.length; + vector.getMutator().setSafe(outboundIndex, holder); + } + } + + /** + * Pojo writer for Enum. If null is encountered does not write it. + */ + public static class EnumWriter extends AbstractStringWriter{ + public EnumWriter(String fieldName, DrillBuf managedBuf) { + super(fieldName, managedBuf); + } + + @Override + public void writeField(Object value, int outboundIndex) { + if (value == null) { + return; + } + writeString(((Enum) value).name(), outboundIndex); + } + } + + /** + * Pojo writer for String. If null is encountered does not write it. + */ + public static class StringWriter extends AbstractStringWriter { + public StringWriter(String fieldName, DrillBuf managedBuf) { + super(fieldName, managedBuf); + } + + @Override + public void writeField(Object value, int outboundIndex) { + if (value != null) { + writeString((String) value, outboundIndex); + } + } + } + + /** + * Pojo writer for Integer. If null is encountered does not write it. + */ + public static class NIntWriter extends AbstractPojoWriter { + + public NIntWriter(String fieldName) { + super(fieldName, Types.optional(MinorType.INT)); + } + + @Override + public void writeField(Object value, int outboundIndex) { + if (value != null) { + vector.getMutator().setSafe(outboundIndex, (Integer) value); + } + } + + } + + /** + * Pojo writer for Long. If null is encountered does not write it. + */ + public static class NBigIntWriter extends AbstractPojoWriter { + + public NBigIntWriter(String fieldName) { + super(fieldName, Types.optional(MinorType.BIGINT)); + } + + @Override + public void writeField(Object value, int outboundIndex) { + if (value != null) { + vector.getMutator().setSafe(outboundIndex, (Long) value); + } + } + + } + + /** + * Pojo writer for Boolean. If null is encountered does not write it. + */ + public static class NBooleanWriter extends AbstractPojoWriter { + + public NBooleanWriter(String fieldName) { + super(fieldName, Types.optional(MinorType.BIT)); + } + + @Override + public void writeField(Object value, int outboundIndex) { + if (value != null) { + vector.getMutator().setSafe(outboundIndex, (Boolean) value ? 1 : 0); + } + } + + } + + /** + * Pojo writer for Double. If null is encountered does not write it. + */ + public static class NDoubleWriter extends AbstractPojoWriter { + + public NDoubleWriter(String fieldName) { + super(fieldName, Types.optional(MinorType.FLOAT8)); + } + + @Override + public void writeField(Object value, int outboundIndex) { + if (value != null) { + vector.getMutator().setSafe(outboundIndex, (Double) value); + } + } + + } + + /** + * Pojo writer for Timestamp. If null is encountered does not write it. + */ + public static class NTimeStampWriter extends AbstractPojoWriter { + + public NTimeStampWriter(String fieldName) { + super(fieldName, Types.optional(MinorType.TIMESTAMP)); + } + + @Override + public void writeField(Object value, int outboundIndex) { + if (value != null) { + vector.getMutator().setSafe(outboundIndex, ((Timestamp) value).getTime()); + } + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java deleted file mode 100644 index e52384e2afc..00000000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java +++ /dev/null @@ -1,274 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.pojo; - -import io.netty.buffer.DrillBuf; - -import java.lang.reflect.Field; -import java.sql.Timestamp; - -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.common.types.Types; -import org.apache.drill.exec.expr.holders.NullableVarCharHolder; -import org.apache.drill.exec.vector.BigIntVector; -import org.apache.drill.exec.vector.BitVector; -import org.apache.drill.exec.vector.Float8Vector; -import org.apache.drill.exec.vector.IntVector; -import org.apache.drill.exec.vector.NullableBigIntVector; -import org.apache.drill.exec.vector.NullableBitVector; -import org.apache.drill.exec.vector.NullableFloat8Vector; -import org.apache.drill.exec.vector.NullableIntVector; -import org.apache.drill.exec.vector.NullableTimeStampVector; -import org.apache.drill.exec.vector.NullableVarCharVector; - -import com.google.common.base.Charsets; - -public class Writers { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Writers.class); - - public static class IntWriter extends AbstractWriter { - - public IntWriter(Field field) { - super(field, Types.required(MinorType.INT)); - if (field.getType() != int.class) { - throw new IllegalStateException(); - } - } - - @Override - public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException { - int i = field.getInt(pojo); - vector.getMutator().setSafe(outboundIndex, i); - } - - } - - public static class BitWriter extends AbstractWriter{ - - public BitWriter(Field field) { - super(field, Types.required(MinorType.BIT)); - if (field.getType() != boolean.class) { - throw new IllegalStateException(); - } - } - - @Override - public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException { - boolean b = field.getBoolean(pojo); - vector.getMutator().setSafe(outboundIndex, b ? 1 : 0); - } - - } - - public static class LongWriter extends AbstractWriter{ - - public LongWriter(Field field) { - super(field, Types.required(MinorType.BIGINT)); - if (field.getType() != long.class) { - throw new IllegalStateException(); - } - } - - @Override - public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException { - long l = field.getLong(pojo); - vector.getMutator().setSafe(outboundIndex, l); - } - - } - - public static class DoubleWriter extends AbstractWriter{ - - public DoubleWriter(Field field) { - super(field, Types.required(MinorType.FLOAT8)); - if (field.getType() != double.class) { - throw new IllegalStateException(); - } - } - - @Override - public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException { - double d = field.getDouble(pojo); - - vector.getMutator().setSafe(outboundIndex, d); - } - - } - - private abstract static class AbstractStringWriter extends AbstractWriter{ - private DrillBuf data; - private final NullableVarCharHolder h = new NullableVarCharHolder(); - - public AbstractStringWriter(Field field, DrillBuf managedBuf) { - super(field, Types.optional(MinorType.VARCHAR)); - this.data = managedBuf; - ensureLength(100); - } - - void ensureLength(int len) { - data = data.reallocIfNeeded(len); - } - - @Override - public void cleanup() { - } - - public void writeString(String s, int outboundIndex) throws IllegalArgumentException, IllegalAccessException { - if (s == null) { - return; - } else { - h.isSet = 1; - byte[] bytes = s.getBytes(Charsets.UTF_8); - ensureLength(bytes.length); - data.clear(); - data.writeBytes(bytes); - h.buffer = data; - h.start = 0; - h.end = bytes.length; - vector.getMutator().setSafe(outboundIndex, h); - } - } - - } - - public static class EnumWriter extends AbstractStringWriter{ - public EnumWriter(Field field, DrillBuf managedBuf) { - super(field, managedBuf); - if (!field.getType().isEnum()) { - throw new IllegalStateException(); - } - } - - @Override - public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException { - Enum e= ((Enum) field.get(pojo)); - if (e == null) { - return; - } - writeString(e.name(), outboundIndex); - } - } - - public static class StringWriter extends AbstractStringWriter { - public StringWriter(Field field, DrillBuf managedBuf) { - super(field, managedBuf); - if (field.getType() != String.class) { - throw new IllegalStateException(); - } - } - - @Override - public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException { - String s = (String) field.get(pojo); - writeString(s, outboundIndex); - } - } - - public static class NIntWriter extends AbstractWriter{ - - public NIntWriter(Field field) { - super(field, Types.optional(MinorType.INT)); - if (field.getType() != Integer.class) { - throw new IllegalStateException(); - } - } - - @Override - public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException { - Integer i = (Integer) field.get(pojo); - if (i != null) { - vector.getMutator().setSafe(outboundIndex, i); - } - } - - } - - public static class NBigIntWriter extends AbstractWriter{ - - public NBigIntWriter(Field field) { - super(field, Types.optional(MinorType.BIGINT)); - if (field.getType() != Long.class) { - throw new IllegalStateException(); - } - } - - @Override - public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException { - Long o = (Long) field.get(pojo); - if (o != null) { - vector.getMutator().setSafe(outboundIndex, o); - } - } - - } - - public static class NBooleanWriter extends AbstractWriter{ - - public NBooleanWriter(Field field) { - super(field, Types.optional(MinorType.BIT)); - if (field.getType() != Boolean.class) { - throw new IllegalStateException(); - } - } - - @Override - public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException { - Boolean o = (Boolean) field.get(pojo); - if (o != null) { - vector.getMutator().setSafe(outboundIndex, o ? 1 : 0); - } - } - - } - public static class NDoubleWriter extends AbstractWriter{ - - public NDoubleWriter(Field field) { - super(field, Types.optional(MinorType.FLOAT8)); - if (field.getType() != Double.class) { - throw new IllegalStateException(); - } - } - - @Override - public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException { - Double o = (Double) field.get(pojo); - if (o != null) { - vector.getMutator().setSafe(outboundIndex, o); - } - } - - } - - public static class NTimeStampWriter extends AbstractWriter{ - - public NTimeStampWriter(Field field) { - super(field, Types.optional(MinorType.TIMESTAMP)); - if (field.getType() != Timestamp.class) { - throw new IllegalStateException(); - } - } - - @Override - public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException { - Timestamp o = (Timestamp) field.get(pojo); - if (o != null) { - vector.getMutator().setSafe(outboundIndex, o.getTime()); - } - } - } -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java index 58bf433fc0d..2b0ef3f7801 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,6 +21,7 @@ import java.util.Iterator; import java.util.List; +import com.google.common.collect.ImmutableList; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; @@ -35,7 +36,6 @@ * Local system tables do not require a full-fledged query because these records are present on every Drillbit. */ public class SystemTableBatchCreator implements BatchCreator { -// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTableBatchCreator.class); @SuppressWarnings({ "rawtypes", "unchecked" }) @Override @@ -44,7 +44,7 @@ public ScanBatch getBatch(final FragmentContext context, final SystemTableScan s throws ExecutionSetupException { final SystemTable table = scan.getTable(); final Iterator iterator = table.getIterator(context); - final RecordReader reader = new PojoRecordReader(table.getPojoClass(), iterator); + final RecordReader reader = new PojoRecordReader(table.getPojoClass(), ImmutableList.copyOf(iterator)); return new ScanBatch(scan, context, Collections.singleton(reader).iterator()); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestFunctionsWithTypeExpoQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestFunctionsWithTypeExpoQueries.java index 43b594bc82a..46a48234779 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestFunctionsWithTypeExpoQueries.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestFunctionsWithTypeExpoQueries.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -256,7 +256,7 @@ public void testMetaDataExposeType() throws Exception { "where concat(a, 'asdf') = 'asdf'", root); // Validate the plan - final String[] expectedPlan = {"Scan.*a.parquet.*numFiles=1"}; + final String[] expectedPlan = {"Scan.*a.parquet.*numFiles = 1"}; final String[] excludedPlan = {"Filter"}; PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan); @@ -265,7 +265,7 @@ public void testMetaDataExposeType() throws Exception { .sqlQuery(query) .ordered() .baselineColumns("col") - .baselineValues(1l) + .baselineValues(1L) .build() .run(); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java index 21b4c798668..04fe913646d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,15 +18,16 @@ package org.apache.drill.exec.planner.logical; import org.apache.drill.PlanTestBase; +import org.apache.drill.exec.ExecConstants; import org.junit.Test; public class TestConvertCountToDirectScan extends PlanTestBase { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestConvertCountToDirectScan.class); @Test public void ensureCaseDoesntConvertToDirectScan() throws Exception { testPlanMatchingPatterns( - "select count(case when n_name = 'ALGERIA' and n_regionkey = 2 then n_nationkey else null end) as cnt from dfs.`${WORKING_PATH}/src/test/resources/directcount.parquet`", + "select count(case when n_name = 'ALGERIA' and n_regionkey = 2 then n_nationkey else null end) as cnt\n" + + "from dfs.`${WORKING_PATH}/src/test/resources/directcount.parquet`", new String[] { "CASE" }, new String[]{}); } @@ -36,7 +37,7 @@ public void ensureConvertSimpleCountToDirectScan() throws Exception { final String sql = "select count(*) as cnt from cp.`tpch/nation.parquet`"; testPlanMatchingPatterns( sql, - new String[] { "PojoRecordReader" }, + new String[] { "DynamicPojoRecordReader" }, new String[]{}); testBuilder() @@ -45,7 +46,6 @@ public void ensureConvertSimpleCountToDirectScan() throws Exception { .baselineColumns("cnt") .baselineValues(25L) .go(); - } @Test @@ -53,7 +53,7 @@ public void ensureConvertSimpleCountConstToDirectScan() throws Exception { final String sql = "select count(100) as cnt from cp.`tpch/nation.parquet`"; testPlanMatchingPatterns( sql, - new String[] { "PojoRecordReader" }, + new String[] { "DynamicPojoRecordReader" }, new String[]{}); testBuilder() @@ -62,7 +62,6 @@ public void ensureConvertSimpleCountConstToDirectScan() throws Exception { .baselineColumns("cnt") .baselineValues(25L) .go(); - } @Test @@ -70,7 +69,39 @@ public void ensureConvertSimpleCountConstExprToDirectScan() throws Exception { final String sql = "select count(1 + 2) as cnt from cp.`tpch/nation.parquet`"; testPlanMatchingPatterns( sql, - new String[] { "PojoRecordReader" }, + new String[] { "DynamicPojoRecordReader" }, + new String[]{}); + + testBuilder() + .sqlQuery(sql) + .unOrdered() + .baselineColumns("cnt") + .baselineValues(25L) + .go(); + } + + @Test + public void ensureDoesNotConvertForDirectoryColumns() throws Exception { + final String sql = "select count(dir0) as cnt from cp.`tpch/nation.parquet`"; + testPlanMatchingPatterns( + sql, + new String[] { "ParquetGroupScan" }, + new String[]{}); + + testBuilder() + .sqlQuery(sql) + .unOrdered() + .baselineColumns("cnt") + .baselineValues(0L) + .go(); + } + + @Test + public void ensureConvertForImplicitColumns() throws Exception { + final String sql = "select count(fqn) as cnt from cp.`tpch/nation.parquet`"; + testPlanMatchingPatterns( + sql, + new String[] { "DynamicPojoRecordReader" }, new String[]{}); testBuilder() @@ -79,7 +110,42 @@ public void ensureConvertSimpleCountConstExprToDirectScan() throws Exception { .baselineColumns("cnt") .baselineValues(25L) .go(); + } + + @Test + public void ensureConvertForSeveralColumns() throws Exception { + test("use %s", TEMP_SCHEMA); + final String tableName = "parquet_table_counts"; + + try { + final String newFqnColumnName = "new_fqn"; + test("alter session set `%s` = '%s'", ExecConstants.IMPLICIT_FQN_COLUMN_LABEL, newFqnColumnName); + test("create table %s as select * from cp.`parquet/alltypes_optional.parquet`", tableName); + test("refresh table metadata %s", tableName); + + final String sql = String.format("select\n" + + "count(%s) as implicit_count,\n" + + "count(*) as star_count,\n" + + "count(col_int) as int_column_count,\n" + + "count(col_vrchr) as vrchr_column_count\n" + + "from %s", newFqnColumnName, tableName); + + testPlanMatchingPatterns( + sql, + new String[] { "DynamicPojoRecordReader" }, + new String[]{}); + + testBuilder() + .sqlQuery(sql) + .unOrdered() + .baselineColumns("implicit_count", "star_count", "int_column_count", "vrchr_column_count") + .baselineValues(6L, 6L, 2L, 3L) + .go(); + } finally { + test("alter session reset `%s`", ExecConstants.IMPLICIT_FQN_COLUMN_LABEL); + test("drop table if exists %s", tableName); + } } }