From 0dc237e3161cf284212cc63f740b229d4fee8fdf Mon Sep 17 00:00:00 2001 From: Jinfeng Ni Date: Fri, 21 Apr 2017 17:34:15 -0700 Subject: [PATCH] DRILL-5459: Extend physical operator test framework to test mini plans consisting of multiple operators. This closes #823 --- .../store/hive/HiveTestDataGenerator.java | 4 +- .../exec/physical/base/AbstractBase.java | 9 +- .../apache/drill/exec/util/TestUtilities.java | 51 ++ .../org/apache/drill/DrillTestWrapper.java | 35 +- .../drill/exec/TestRepeatedReaders.java | 2 +- .../drill/exec/cache/TestWriteToDisk.java | 2 +- .../TestCorruptParquetDateCorrection.java | 2 +- .../impl/writer/TestParquetWriter.java | 2 +- .../writer/TestParquetWriterEmptyFiles.java | 2 +- .../exec/physical/impl/writer/TestWriter.java | 2 +- .../unit/BasicPhysicalOpUnitTest.java | 1 + .../physical/unit/MiniPlanUnitTestBase.java | 442 ++++++++++++++++++ .../physical/unit/PhysicalOpUnitTestBase.java | 117 ++--- .../exec/physical/unit/TestMiniPlan.java | 206 ++++++++ .../exec/store/dfs/TestDrillFileSystem.java | 2 +- .../parquet/TestParquetFilterPushDown.java | 2 +- .../drill/test/rowSet/SchemaBuilder.java | 13 +- 17 files changed, 821 insertions(+), 73 deletions(-) create mode 100644 exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java create mode 100644 exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java index 435c66b3f6c..580cf78f1de 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java @@ -70,7 +70,7 @@ private HiveTestDataGenerator(final String dbDir, final String whDir) { config.put("hive.metastore.uris", ""); config.put("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s;create=true", dbDir)); config.put("hive.metastore.warehouse.dir", whDir); - config.put(FileSystem.FS_DEFAULT_NAME_KEY, "file:///"); + config.put(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS); } /** @@ -115,7 +115,7 @@ private void generateTestData() throws Exception { HiveConf conf = new HiveConf(SessionState.class); conf.set("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s;create=true", dbDir)); - conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///"); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS); conf.set("hive.metastore.warehouse.dir", whDir); conf.set("mapred.job.tracker", "local"); conf.set(ConfVars.SCRATCHDIR.varname, getTempDir("scratch_dir")); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java index 526d728cfa8..a547e26a088 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java @@ -25,10 +25,13 @@ public abstract class AbstractBase implements PhysicalOperator{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBase.class); - private final String userName; + public static long INIT_ALLOCATION = 1_000_000L; + public static long MAX_ALLOCATION = 10_000_000_000L; + + protected long initialAllocation = INIT_ALLOCATION; + protected long maxAllocation = MAX_ALLOCATION; - protected long initialAllocation = 1_000_000L; - protected long maxAllocation = 10_000_000_000L; + private final String userName; private int id; private double cost; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java index 3532956fab0..0200dc5a0f6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java @@ -18,14 +18,25 @@ package org.apache.drill.exec.util; import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.FileSystemConfig; import org.apache.drill.exec.store.dfs.FileSystemPlugin; import org.apache.drill.exec.store.dfs.WorkspaceConfig; import com.google.common.io.Files; +import org.apache.drill.exec.store.easy.json.JSONRecordReader; /** * This class contains utility methods to speed up tests. Some of the production code currently calls this method @@ -95,4 +106,44 @@ public static void makeDfsTmpSchemaImmutable(final StoragePluginRegistry pluginR pluginRegistry.createOrUpdate(dfsPluginName, dfsPluginConfig, true); } + + /** + * Create JSONRecordReader from input strings. + * @param jsonBatches : list of input strings, each element represent a batch. Each string could either + * be in the form of "[{...}, {...}, ..., {...}]", or in the form of "{...}". + * @param fragContext : fragment context + * @param columnsToRead : list of schema pathes to read from JSON reader. + * @return + */ + public static Iterator getJsonReadersFromBatchString(List jsonBatches, FragmentContext fragContext, List columnsToRead) { + ObjectMapper mapper = new ObjectMapper(); + List readers = new ArrayList<>(); + for (String batchJason : jsonBatches) { + JsonNode records; + try { + records = mapper.readTree(batchJason); + } catch (IOException e) { + throw new RuntimeException(e); + } + readers.add(new JSONRecordReader(fragContext, records, null, columnsToRead)); + } + return readers.iterator(); + } + + /** + * Create JSONRecordReader from files on a file system. + * @param fs : file system. + * @param inputPaths : list of .json file paths. + * @param fragContext + * @param columnsToRead + * @return + */ + public static Iterator getJsonReadersFromInputFiles(DrillFileSystem fs, List inputPaths, FragmentContext fragContext, List columnsToRead) { + List readers = new ArrayList<>(); + for (String inputPath : inputPaths) { + readers.add(new JSONRecordReader(fragContext, inputPath, fs, columnsToRead)); + } + return readers.iterator(); + } + } diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java index f217632eab2..64aeef8022d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java +++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java @@ -33,6 +33,7 @@ import java.util.Set; import java.util.TreeMap; +import com.google.common.base.Preconditions; import org.apache.commons.lang3.tuple.Pair; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; @@ -117,9 +118,9 @@ public interface TestServices { private int expectedNumBatches; public DrillTestWrapper(TestBuilder testBuilder, TestServices services, Object query, QueryType queryType, - String baselineOptionSettingQueries, String testOptionSettingQueries, - QueryType baselineQueryType, boolean ordered, boolean highPerformanceComparison, - List> baselineRecords, int expectedNumBatches) { + String baselineOptionSettingQueries, String testOptionSettingQueries, + QueryType baselineQueryType, boolean ordered, boolean highPerformanceComparison, + List> baselineRecords, int expectedNumBatches) { this.testBuilder = testBuilder; this.services = services; this.query = query; @@ -150,7 +151,7 @@ private BufferAllocator getAllocator() { } private void compareHyperVectors(Map expectedRecords, - Map actualRecords) throws Exception { + Map actualRecords) throws Exception { for (String s : expectedRecords.keySet()) { assertNotNull("Expected column '" + s + "' not found.", actualRecords.get(s)); assertEquals(expectedRecords.get(s).getTotalRecords(), actualRecords.get(s).getTotalRecords()); @@ -224,7 +225,7 @@ private static String printNearbyRecords(Map> expectedRecor } private Map addToHyperVectorMap(final List records, - final RecordBatchLoader loader) + final RecordBatchLoader loader) throws SchemaChangeException, UnsupportedEncodingException { // TODO - this does not handle schema changes Map combinedVectors = new TreeMap<>(); @@ -307,12 +308,28 @@ public void close() throws Exception { } /** + * Iterate over batches, and combine the batches into a map, where key is schema path, and value is + * the list of column values across all the batches. * @param batches * @return * @throws SchemaChangeException * @throws UnsupportedEncodingException */ public static Map> addToCombinedVectorResults(Iterable batches) + throws SchemaChangeException, UnsupportedEncodingException { + return addToCombinedVectorResults(batches, null); + } + + /** + * Add to result vectors and compare batch schema against expected schema while iterating batches. + * @param batches + * @param expectedSchema: the expected schema the batches should contain. Through SchemaChangeException + * if encounter different batch schema. + * @return + * @throws SchemaChangeException + * @throws UnsupportedEncodingException + */ + public static Map> addToCombinedVectorResults(Iterable batches, BatchSchema expectedSchema) throws SchemaChangeException, UnsupportedEncodingException { // TODO - this does not handle schema changes Map> combinedVectors = new TreeMap<>(); @@ -320,6 +337,14 @@ public static Map> addToCombinedVectorResults(Iterable data() { @BeforeClass public static void initFs() throws Exception { Configuration conf = new Configuration(); - conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local"); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS); fs = FileSystem.get(conf); test(String.format("alter session set `%s` = true", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY)); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java index 2848b68d53f..d57605b7418 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java @@ -33,7 +33,7 @@ public class TestParquetWriterEmptyFiles extends BaseTestQuery { @BeforeClass public static void initFs() throws Exception { Configuration conf = new Configuration(); - conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local"); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS); fs = FileSystem.get(conf); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java index f4d505db499..5f306c6327f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestWriter.java @@ -49,7 +49,7 @@ public class TestWriter extends BaseTestQuery { @BeforeClass public static void initFs() throws Exception { Configuration conf = new Configuration(); - conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local"); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS); fs = FileSystem.get(conf); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java index a0ea6fe5f43..e39a6443ed6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java @@ -195,6 +195,7 @@ public void testExternalSort() { "[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]"); opTestBuilder() .physicalOperator(sortConf) + .maxAllocation(15_000_000L) .inputDataStreamJson(inputJsonBatches) .baselineColumns("a", "b") .baselineValues(5l, 1l) diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java new file mode 100644 index 00000000000..302d0e507c8 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.unit; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import mockit.NonStrictExpectations; +import org.apache.drill.DrillTestWrapper; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.physical.impl.ScanBatch; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.rpc.NamedThreadFactory; +import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator; +import org.apache.drill.exec.store.parquet.ParquetReaderUtility; +import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader; +import org.apache.drill.exec.util.TestUtilities; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.apache.drill.exec.physical.base.AbstractBase.INIT_ALLOCATION; +import static org.apache.drill.exec.physical.base.AbstractBase.MAX_ALLOCATION; +import static org.apache.drill.exec.physical.unit.TestMiniPlan.fs; + +/** + * A MiniPlanUnitTestBase extends PhysicalOpUnitTestBase, to construct MiniPlan (aka plan fragment). + * in the form of physical operator tree, and verify both the expected schema and output row results. + * Steps to construct a unit: + * 1. Call PopBuilder / ScanPopBuilder to construct the MiniPlan + * 2. Create a MiniPlanTestBuilder, and specify the expected schema and base line values, or if there + * is no batch expected. + */ + +public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase { + + private final ExecutorService scanExecutor = Executors.newFixedThreadPool(2, new NamedThreadFactory("scan-")); + + public static class MiniPlanTestBuilder { + protected List> baselineRecords; + protected RecordBatch root; + protected boolean expectedZeroBatch; + protected BatchSchema expectedSchema; + + /** + * Specify the root operator for a MiniPlan. + * @param root + * @return + */ + public MiniPlanTestBuilder root(RecordBatch root) { + this.root = root; + return this; + } + + /** + * Specify the expected batch schema. + * @param batchSchema + * @return + */ + public MiniPlanTestBuilder expectedSchema(BatchSchema batchSchema) { + this.expectedSchema = batchSchema; + return this; + } + + /** + * Specify one row of expected values. The number of values have to be same as # of fields in expected batch schema. + * @param baselineValues + * @return + */ + public MiniPlanTestBuilder baselineValues(Object ... baselineValues) { + if (baselineRecords == null) { + baselineRecords = new ArrayList<>(); + } + + Map ret = new HashMap<>(); + int i = 0; + Preconditions.checkArgument(expectedSchema != null , "Expected schema should be set before specify baseline values."); + Preconditions.checkArgument(baselineValues.length == expectedSchema.getFieldCount(), + "Must supply the same number of baseline values as columns in expected schema."); + + for (MaterializedField field : expectedSchema) { + ret.put(SchemaPath.getSimplePath(field.getPath()).toExpr(), baselineValues[i]); + i++; + } + + this.baselineRecords.add(ret); + return this; + } + + /** + * Specify one special case, where the operator tree should return 0 batch. + * @param expectedZeroBatch + * @return + */ + public MiniPlanTestBuilder expectZeroBatch(boolean expectedZeroBatch) { + this.expectedZeroBatch = expectedZeroBatch; + return this; + } + + public void go() throws Exception { + final BatchIterator batchIterator = new BatchIterator(root); + + // verify case of zero batch. + if (expectedZeroBatch) { + if (batchIterator.iterator().hasNext()) { + throw new AssertionError("Expected zero batches from scan. But scan return at least 1 batch!"); + } else { + return; // successful + } + } + + Map> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(batchIterator, expectedSchema); + Map> expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(baselineRecords); + DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors); + } + } + + /** + * Similar to {@link OperatorTestBuilder}, build a physical operator (RecordBatch) and specify its input record batches. + * The input record batch could be a non-scan operator by calling {@link PopBuilder#addInputAsChild}, + * or a scan operator by calling {@link PopBuilder#addJsonScanAsChild()} if it's SCAN operator. + * + * A miniplan rooted as join operator like following could be constructed in either the following way: + * + *


+   *                 Join
+   *                /    \
+   *          JSON_T1    Filter
+   *                       \
+   *                     JSON_T2
+   * 
+ * + *

+   * new PopBuilder()
+   *  .physicalOperator(joinPopConfig)
+   *  .addScanAsChild()
+   *      .fileSystem(..)
+   *      .columnsToRead(...)
+   *      .inputPath(...)
+   *      .buildAddAsInput()
+   *  .addInputAsChild()
+   *      .physicalOperator(filterPopConfig)
+   *      .addScanAsChild()
+   *          .fileSystem(...)
+   *          .columnsToRead(...)
+   *          .inputPath(...)
+   *          .buildAddAsInput()
+   *      .buildAddAsInput()
+   *  .build();
+   * 
+ * + *

+   *   RecordBatch scan1 = new ScanPopBuilder()
+   *                          .fileSystem(...)
+   *                          .columnsToRead(..)
+   *                          .inputPath(...)
+   *                          .build();
+   *   RecordBatch scan2 = ... ;
+   *
+   *   RecordBatch filter = new PopBuilder()
+   *                          .physicalOperator(filterPopConfig)
+   *                          .addInput(scan2);
+   *   RecordBatch join = new PopBuilder()
+   *                          .physicalOperator(joinPopConfig)
+   *                          .addInput(scan1)
+   *                          .addInput(filter)
+   *                          .build();
+   *
+   * 
+ */ + + public class PopBuilder { + private PhysicalOperator popConfig; + protected long initReservation = INIT_ALLOCATION; + protected long maxAllocation = MAX_ALLOCATION; + + final private List inputs = Lists.newArrayList(); + final PopBuilder parent ; + + public PopBuilder() { + this.parent = null; + } + + public PopBuilder(PopBuilder parent) { + this.parent = parent; + } + + public PopBuilder physicalOperator(PhysicalOperator popConfig) { + this.popConfig = popConfig; + return this; + } + + /** + * Set initial memory reservation used by this operator's allocator. Default is {@link PhysicalOpUnitTestBase#INIT_ALLOCATION} + * @param initReservation + * @return + */ + public PopBuilder initReservation(long initReservation) { + this.initReservation = initReservation; + return this; + } + + /** + * Set max memory reservation used by this operator's allocator. Default is {@link PhysicalOpUnitTestBase#MAX_ALLOCATION} + * @param maxAllocation + * @return + */ + public PopBuilder maxAllocation(long maxAllocation) { + this.maxAllocation = maxAllocation; + return this; + } + + /** + * Return a ScanPopBuilder to build a Scan recordBatch, which will be added as input batch after + * call {@link PopBuilder#buildAddAsInput()} + * @return ScanPopBuilder + */ + public JsonScanBuilder addJsonScanAsChild() { + return new JsonScanBuilder(this); + } + + /** + * Return a ScanPopBuilder to build a Scan recordBatch, which will be added as input batch after + * call {@link PopBuilder#buildAddAsInput()} + * @return ScanPopBuilder + */ + public ParquetScanBuilder addParquetScanAsChild() { + return new ParquetScanBuilder(this); + } + + /** + * Return a nested PopBuilder to build a non-scan recordBatch, which will be added as input batch after + * call {@link PopBuilder#buildAddAsInput()} + * @return a nested PopBuild for non-scan recordbatch. + */ + public PopBuilder addInputAsChild() { + return new PopBuilder(this) { + }; + } + + public PopBuilder addInput(RecordBatch batch) { + inputs.add(batch); + return this; + } + + public PopBuilder buildAddAsInput() throws Exception { + mockOpContext(initReservation, maxAllocation); + BatchCreator opCreator = (BatchCreator) getOpCreatorReg().getOperatorCreator(popConfig.getClass()); + RecordBatch batch= opCreator.getBatch(fragContext, popConfig, inputs); + return parent.addInput(batch); + } + + public RecordBatch build() throws Exception { + mockOpContext(initReservation, maxAllocation); + BatchCreator opCreator = (BatchCreator) getOpCreatorReg().getOperatorCreator(popConfig.getClass()); + return opCreator.getBatch(fragContext, popConfig, inputs); + } + } + + public abstract class ScanPopBuider extends PopBuilder { + List columnsToRead = Collections.singletonList(SchemaPath.getSimplePath("*")); + DrillFileSystem fs = null; + + public ScanPopBuider() { + super(null); // Scan is root operator. + } + + public ScanPopBuider(PopBuilder parent) { + super(parent); + } + + public T fileSystem(DrillFileSystem fs) { + this.fs = fs; + return (T) this; + } + + public T columnsToRead(SchemaPath ... columnsToRead) { + this.columnsToRead = Lists.newArrayList(columnsToRead); + return (T) this; + } + + public T columnsToRead(String ... columnsToRead) { + this.columnsToRead = Lists.newArrayList(); + + for (String column : columnsToRead) { + + this.columnsToRead.add(SchemaPath.getSimplePath(column)); + } + return (T) this; + } + + } + + /** + * Builder for Json Scan RecordBatch. + */ + public class JsonScanBuilder extends ScanPopBuider { + List jsonBatches = null; + List inputPaths = Collections.EMPTY_LIST; + + public JsonScanBuilder(PopBuilder parent) { + super(parent); + } + + public JsonScanBuilder() { + super(); + } + + public JsonScanBuilder jsonBatches(List jsonBatches) { + this.jsonBatches = jsonBatches; + return this; + } + + public JsonScanBuilder inputPaths(List inputPaths) { + this.inputPaths = inputPaths; + return this; + } + + public PopBuilder buildAddAsInput() throws Exception { + mockOpContext(this.initReservation, this.maxAllocation); + RecordBatch scanBatch = getScanBatch(); + return parent.addInput(scanBatch); + } + + public RecordBatch build() throws Exception { + mockOpContext(this.initReservation, this.maxAllocation); + return getScanBatch(); + } + + private RecordBatch getScanBatch() throws Exception { + Iterator readers = null; + + if (jsonBatches != null) { + readers = TestUtilities.getJsonReadersFromBatchString(jsonBatches, fragContext, columnsToRead); + } else { + readers = TestUtilities.getJsonReadersFromInputFiles(fs, inputPaths, fragContext, columnsToRead); + } + + RecordBatch scanBatch = new ScanBatch(null, fragContext, readers); + return scanBatch; + } + } + + /** + * Builder for parquet Scan RecordBatch. + */ + public class ParquetScanBuilder extends ScanPopBuider { + List inputPaths = Collections.EMPTY_LIST; + + public ParquetScanBuilder() { + super(); + } + + public ParquetScanBuilder(PopBuilder parent) { + super(parent); + } + + public ParquetScanBuilder inputPaths(List inputPaths) { + this.inputPaths = inputPaths; + return this; + } + + public PopBuilder buildAddAsInput() throws Exception { + mockOpContext(this.initReservation, this.maxAllocation); + RecordBatch scanBatch = getScanBatch(); + return parent.addInput(scanBatch); + } + + public RecordBatch build() throws Exception { + mockOpContext(this.initReservation, this.maxAllocation); + return getScanBatch(); + } + + private RecordBatch getScanBatch() throws Exception { + List readers = Lists.newArrayList(); + + for (String path : inputPaths) { + ParquetMetadata footer = ParquetFileReader.readFooter(fs.getConf(), new Path(path)); + + for (int i = 0; i < footer.getBlocks().size(); i++) { + readers.add(new ParquetRecordReader(fragContext, + path, + i, + fs, + CodecFactory.createDirectCodecFactory(fs.getConf(), + new ParquetDirectByteBufferAllocator(opContext.getAllocator()), 0), + footer, + columnsToRead, + ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_NO_CORRUPTION)); + } + } + + RecordBatch scanBatch = new ScanBatch(null, fragContext, readers.iterator()); + return scanBatch; + } + } // end of ParquetScanBuilder + + @Override + protected void mockOpContext(long initReservation, long maxAllocation) throws Exception { + super.mockOpContext(initReservation, maxAllocation); + + // mock ScanExecutor used by parquet reader. + new NonStrictExpectations() { + { + opContext.getScanExecutor();result = scanExecutor; + } + }; + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java index ad7367f5352..7d09ca52b7e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java @@ -17,8 +17,6 @@ */ package org.apache.drill.exec.physical.unit; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import mockit.Delegate; @@ -56,6 +54,7 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.physical.base.AbstractBase; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry; @@ -66,8 +65,9 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.store.RecordReader; -import org.apache.drill.exec.store.easy.json.JSONRecordReader; import org.apache.drill.exec.testing.ExecutionControls; +import org.apache.drill.exec.util.TestUtilities; +import org.junit.Before; import java.io.IOException; import java.io.UnsupportedEncodingException; @@ -78,10 +78,14 @@ import java.util.List; import java.util.Map; +import static org.apache.drill.exec.physical.base.AbstractBase.INIT_ALLOCATION; + /** * Look! Doesn't extend BaseTestQuery!! */ public class PhysicalOpUnitTestBase extends ExecTest { +// public static long INIT_ALLOCATION = 1_000_000l; +// public static long MAX_ALLOCATION = 10_000_000L; @Injectable FragmentContext fragContext; @Injectable OperatorContext opContext; @@ -97,6 +101,11 @@ public class PhysicalOpUnitTestBase extends ExecTest { private final TemplateClassDefinition templateClassDefinition = new TemplateClassDefinition(Projector.class, ProjectorTemplate.class); private final OperatorCreatorRegistry opCreatorReg = new OperatorCreatorRegistry(classpathScan); + @Before + public void setup() throws Exception { + mockFragmentContext(); + } + @Override protected LogicalExpression parseExpr(String expr) { ExprLexer lexer = new ExprLexer(new ANTLRStringStream(expr)); @@ -128,37 +137,7 @@ protected List parseExprs(String... expressionsAndOutputNames) return ret; } - @SuppressWarnings("resource") - void runTest(OperatorTestBuilder testBuilder) { - BatchCreator opCreator; - RecordBatch testOperator; - try { - mockFragmentContext(testBuilder.initReservation, testBuilder.maxAllocation); - opCreator = (BatchCreator) - opCreatorReg.getOperatorCreator(testBuilder.popConfig.getClass()); - List incomingStreams = Lists.newArrayList(); - for (List batchesJson : testBuilder.inputStreamsJSON) { - incomingStreams.add(new ScanBatch(null, fragContext, - getRecordReadersForJsonBatches(batchesJson, fragContext))); - } - testOperator = opCreator.getBatch(fragContext, testBuilder.popConfig, incomingStreams); - - Map> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator)); - Map> expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(testBuilder.baselineRecords); - DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors); - - } catch (ExecutionSetupException e) { - throw new RuntimeException(e); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } catch (SchemaChangeException e) { - throw new RuntimeException(e); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private static class BatchIterator implements Iterable { + protected static class BatchIterator implements Iterable { private RecordBatch operator; public BatchIterator(RecordBatch operator) { @@ -213,11 +192,40 @@ protected class OperatorTestBuilder { private String[] baselineColumns; private List> baselineRecords; private List> inputStreamsJSON; - private long initReservation = 10000000; - private long maxAllocation = 15000000; + private long initReservation = AbstractBase.INIT_ALLOCATION; + private long maxAllocation = AbstractBase.MAX_ALLOCATION; public void go() { - runTest(this); + BatchCreator opCreator; + RecordBatch testOperator; + try { + mockOpContext(initReservation, maxAllocation); + + opCreator = (BatchCreator) + opCreatorReg.getOperatorCreator(popConfig.getClass()); + List incomingStreams = Lists.newArrayList(); + if (inputStreamsJSON != null) { + for (List batchesJson : inputStreamsJSON) { + incomingStreams.add(new ScanBatch(null, fragContext, + getRecordReadersForJsonBatches(batchesJson, fragContext))); + } + } + + testOperator = opCreator.getBatch(fragContext, popConfig, incomingStreams); + + Map> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator)); + Map> expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(baselineRecords); + DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors); + + } catch (ExecutionSetupException e) { + throw new RuntimeException(e); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } catch (SchemaChangeException e) { + throw new RuntimeException(e); + } catch (Exception e) { + throw new RuntimeException(e); + } } public OperatorTestBuilder physicalOperator(PhysicalOperator batch) { @@ -276,9 +284,9 @@ public OperatorTestBuilder baselineValues(Object ... baselineValues) { } } - private void mockFragmentContext(long initReservation, long maxAllocation) throws Exception{ + protected void mockFragmentContext() throws Exception{ final CodeCompiler compiler = new CodeCompiler(drillConf, optionManager); - final BufferAllocator allocator = this.allocator.newChildAllocator("allocator_for_operator_test", initReservation, maxAllocation); +// final BufferAllocator allocator = this.allocator.newChildAllocator("allocator_for_operator_test", initReservation, maxAllocation); new NonStrictExpectations() { { // optManager.getOption(withAny(new TypeValidators.BooleanValidator("", false))); result = false; @@ -320,27 +328,28 @@ Object getImplementationClass(ClassGenerator gen) throws IOException, Cl } catch (IOException e) { throw new RuntimeException(e); } + } + }; + } + + protected void mockOpContext(long initReservation, long maxAllocation) throws Exception{ + final BufferAllocator allocator = this.allocator.newChildAllocator("allocator_for_operator_test", initReservation, maxAllocation); + new NonStrictExpectations() { + { opContext.getStats();result = opStats; opContext.getAllocator(); result = allocator; - fragContext.newOperatorContext(withAny(popConf)); - result = opContext; + fragContext.newOperatorContext(withAny(popConf));result = opContext; } }; } - @SuppressWarnings("resource") + protected OperatorCreatorRegistry getOpCreatorReg() { + return opCreatorReg; + } + private Iterator getRecordReadersForJsonBatches(List jsonBatches, FragmentContext fragContext) { - ObjectMapper mapper = new ObjectMapper(); - List readers = new ArrayList<>(); - for (String batchJason : jsonBatches) { - JsonNode records; - try { - records = mapper.readTree(batchJason); - } catch (IOException e) { - throw new RuntimeException(e); - } - readers.add(new JSONRecordReader(fragContext, records, null, Collections.singletonList(SchemaPath.getSimplePath("*")))); - } - return readers.iterator(); + return TestUtilities.getJsonReadersFromBatchString(jsonBatches, fragContext, Collections.singletonList(SchemaPath.getSimplePath("*"))); } + + } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java new file mode 100644 index 00000000000..d0a64f45568 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.unit; + +import com.google.common.collect.Lists; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.util.FileUtils; +import org.apache.drill.exec.physical.config.Filter; +import org.apache.drill.exec.physical.config.UnionAll; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.test.rowSet.SchemaBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +/** + * This class contains examples to show how to use MiniPlanTestBuilder to test a + * specific plan fragment (MiniPlan). Each testcase requires 1) a RecordBatch, + * built from PopBuilder/ScanBuilder, 2)an expected schema and base line values, + * or 3) indicating no batch is expected. + */ +public class TestMiniPlan extends MiniPlanUnitTestBase { + + protected static DrillFileSystem fs; + + @BeforeClass + public static void initFS() throws Exception { + Configuration conf = new Configuration(); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS); + fs = new DrillFileSystem(conf); + } + + @Test + @Ignore("DRILL-5464: A bug in JsonRecordReader handling empty file") + public void testEmptyJsonInput() throws Exception { + String emptyFile = FileUtils.getResourceAsFile("/project/pushdown/empty.json").toURI().toString(); + + RecordBatch scanBatch = new JsonScanBuilder() + .fileSystem(fs) + .inputPaths(Lists.newArrayList(emptyFile)) + .build(); + + new MiniPlanTestBuilder() + .root(scanBatch) + .expectZeroBatch(true) + .go(); + } + + @Test + public void testSimpleParquetScan() throws Exception { + String file = FileUtils.getResourceAsFile("/tpchmulti/region/01.parquet").toURI().toString(); + + RecordBatch scanBatch = new ParquetScanBuilder() + .fileSystem(fs) + .columnsToRead("R_REGIONKEY") + .inputPaths(Lists.newArrayList(file)) + .build(); + + BatchSchema expectedSchema = new SchemaBuilder() + .add("R_REGIONKEY", TypeProtos.MinorType.BIGINT) + .build(); + + new MiniPlanTestBuilder() + .root(scanBatch) + .expectedSchema(expectedSchema) + .baselineValues(0L) + .baselineValues(1L) + .go(); + } + + @Test + public void testSimpleJson() throws Exception { + List jsonBatches = Lists.newArrayList( + "{\"a\":100}" + ); + + RecordBatch scanBatch = new JsonScanBuilder() + .jsonBatches(jsonBatches) + .build(); + + BatchSchema expectedSchema = new SchemaBuilder() + .addNullable("a", TypeProtos.MinorType.BIGINT) + .build(); + + new MiniPlanTestBuilder() + .root(scanBatch) + .expectedSchema(expectedSchema) + .baselineValues(100L) + .go(); + } + + @Test + public void testUnionFilter() throws Exception { + List leftJsonBatches = Lists.newArrayList( + "[{\"a\": 5, \"b\" : 1 }]", + "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]", + "[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]"); + + List rightJsonBatches = Lists.newArrayList( + "[{\"a\": 5, \"b\" : 10 }]", + "[{\"a\": 50, \"b\" : 100}]"); + + RecordBatch batch = new PopBuilder() + .physicalOperator(new UnionAll(Collections.EMPTY_LIST)) // Children list is provided through RecordBatch + .addInputAsChild() + .physicalOperator(new Filter(null, parseExpr("a=5"), 1.0f)) + .addJsonScanAsChild() + .jsonBatches(leftJsonBatches) + .columnsToRead("a", "b") + .buildAddAsInput() + .buildAddAsInput() + .addInputAsChild() + .physicalOperator(new Filter(null, parseExpr("a=50"), 1.0f)) + .addJsonScanAsChild() + .jsonBatches(rightJsonBatches) + .columnsToRead("a", "b") + .buildAddAsInput() + .buildAddAsInput() + .build(); + + BatchSchema expectedSchema = new SchemaBuilder() + .addNullable("a", TypeProtos.MinorType.BIGINT) + .addNullable("b", TypeProtos.MinorType.BIGINT) + .withSVMode(BatchSchema.SelectionVectorMode.NONE) + .build(); + + new MiniPlanTestBuilder() + .root(batch) + .expectedSchema(expectedSchema) + .baselineValues(5l, 1l) + .baselineValues(5l, 5l) + .baselineValues(50l, 100l) + .go(); + } + + @Test + @Ignore ("DRILL-5327: A bug in UnionAll handling empty inputs from both sides") + public void testUnionFilterAll() throws Exception { + List leftJsonBatches = Lists.newArrayList( + "[{\"a\": 5, \"b\" : 1 }]"); + + List rightJsonBatches = Lists.newArrayList( + "[{\"a\": 50, \"b\" : 10 }]"); + + RecordBatch leftScan = new JsonScanBuilder() + .jsonBatches(leftJsonBatches) + .columnsToRead("a", "b") + .build(); + + RecordBatch leftFilter = new PopBuilder() + .physicalOperator(new Filter(null, parseExpr("a < 0"), 1.0f)) + .addInput(leftScan) + .build(); + + RecordBatch rightScan = new JsonScanBuilder() + .jsonBatches(rightJsonBatches) + .columnsToRead("a", "b") + .build(); + + RecordBatch rightFilter = new PopBuilder() + .physicalOperator(new Filter(null, parseExpr("a < 0"), 1.0f)) + .addInput(rightScan) + .build(); + + RecordBatch batch = new PopBuilder() + .physicalOperator(new UnionAll(Collections.EMPTY_LIST)) // Children list is provided through RecordBatch + .addInput(leftFilter) + .addInput(rightFilter) + .build(); + + BatchSchema expectedSchema = new SchemaBuilder() + .addNullable("a", TypeProtos.MinorType.BIGINT) + .addNullable("b", TypeProtos.MinorType.BIGINT) + .withSVMode(BatchSchema.SelectionVectorMode.NONE) + .build(); + + new MiniPlanTestBuilder() + .root(batch) + .expectedSchema(expectedSchema) + .go(); + } + +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java index 550f56f440c..7d66795dd50 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java @@ -66,7 +66,7 @@ public void testIOStats() throws Exception { DrillFileSystem dfs = null; InputStream is = null; Configuration conf = new Configuration(); - conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///"); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS); OpProfileDef profileDef = new OpProfileDef(0 /*operatorId*/, 0 /*operatorType*/, 0 /*inputCount*/); OperatorStats stats = new OperatorStats(profileDef, null /*allocator*/); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java index 782973d5f77..277e6f935e6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java @@ -52,7 +52,7 @@ public static void initFSAndCreateFragContext() throws Exception { BitControl.PlanFragment.getDefaultInstance(), null, bits[0].getContext().getFunctionImplementationRegistry()); Configuration conf = new Configuration(); - conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local"); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS); fs = FileSystem.get(conf); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java index 48657c7482e..b946ab99427 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/SchemaBuilder.java @@ -84,9 +84,15 @@ public SchemaBuilder buildMap() { parent.finishMap(col); return parent; } + + @Override + public SchemaBuilder withSVMode(SelectionVectorMode svMode) { + throw new IllegalStateException("Cannot set SVMode for a nested schema"); + } } protected List columns = new ArrayList<>( ); + private SelectionVectorMode svMode = SelectionVectorMode.NONE; public SchemaBuilder() { } @@ -128,8 +134,13 @@ public MapBuilder addMap(String pathName) { return new MapBuilder(this, pathName); } + public SchemaBuilder withSVMode(SelectionVectorMode svMode) { + this.svMode = svMode; + return this; + } + public BatchSchema build() { - return new BatchSchema(SelectionVectorMode.NONE, columns); + return new BatchSchema(svMode, columns); } void finishMap(MaterializedField map) {