Skip to content

Commit

Permalink
DRILL-5459: Extend physical operator test framework to test mini plan…
Browse files Browse the repository at this point in the history
…s consisting of multiple operators.

This closes #823
  • Loading branch information
jinfengni authored and parthchandra committed May 13, 2017
1 parent cb9547a commit 0dc237e
Show file tree
Hide file tree
Showing 17 changed files with 821 additions and 73 deletions.
Expand Up @@ -70,7 +70,7 @@ private HiveTestDataGenerator(final String dbDir, final String whDir) {
config.put("hive.metastore.uris", "");
config.put("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s;create=true", dbDir));
config.put("hive.metastore.warehouse.dir", whDir);
config.put(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
config.put(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
}

/**
Expand Down Expand Up @@ -115,7 +115,7 @@ private void generateTestData() throws Exception {
HiveConf conf = new HiveConf(SessionState.class);

conf.set("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s;create=true", dbDir));
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
conf.set("hive.metastore.warehouse.dir", whDir);
conf.set("mapred.job.tracker", "local");
conf.set(ConfVars.SCRATCHDIR.varname, getTempDir("scratch_dir"));
Expand Down
Expand Up @@ -25,10 +25,13 @@
public abstract class AbstractBase implements PhysicalOperator{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBase.class);

private final String userName;
public static long INIT_ALLOCATION = 1_000_000L;
public static long MAX_ALLOCATION = 10_000_000_000L;

protected long initialAllocation = INIT_ALLOCATION;
protected long maxAllocation = MAX_ALLOCATION;

protected long initialAllocation = 1_000_000L;
protected long maxAllocation = 10_000_000_000L;
private final String userName;
private int id;
private double cost;

Expand Down
Expand Up @@ -18,14 +18,25 @@
package org.apache.drill.exec.util;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.dfs.WorkspaceConfig;

import com.google.common.io.Files;
import org.apache.drill.exec.store.easy.json.JSONRecordReader;

/**
* This class contains utility methods to speed up tests. Some of the production code currently calls this method
Expand Down Expand Up @@ -95,4 +106,44 @@ public static void makeDfsTmpSchemaImmutable(final StoragePluginRegistry pluginR

pluginRegistry.createOrUpdate(dfsPluginName, dfsPluginConfig, true);
}

/**
* Create JSONRecordReader from input strings.
* @param jsonBatches : list of input strings, each element represent a batch. Each string could either
* be in the form of "[{...}, {...}, ..., {...}]", or in the form of "{...}".
* @param fragContext : fragment context
* @param columnsToRead : list of schema pathes to read from JSON reader.
* @return
*/
public static Iterator<RecordReader> getJsonReadersFromBatchString(List<String> jsonBatches, FragmentContext fragContext, List<SchemaPath> columnsToRead) {
ObjectMapper mapper = new ObjectMapper();
List<RecordReader> readers = new ArrayList<>();
for (String batchJason : jsonBatches) {
JsonNode records;
try {
records = mapper.readTree(batchJason);
} catch (IOException e) {
throw new RuntimeException(e);
}
readers.add(new JSONRecordReader(fragContext, records, null, columnsToRead));
}
return readers.iterator();
}

/**
* Create JSONRecordReader from files on a file system.
* @param fs : file system.
* @param inputPaths : list of .json file paths.
* @param fragContext
* @param columnsToRead
* @return
*/
public static Iterator<RecordReader> getJsonReadersFromInputFiles(DrillFileSystem fs, List<String> inputPaths, FragmentContext fragContext, List<SchemaPath> columnsToRead) {
List<RecordReader> readers = new ArrayList<>();
for (String inputPath : inputPaths) {
readers.add(new JSONRecordReader(fragContext, inputPath, fs, columnsToRead));
}
return readers.iterator();
}

}
Expand Up @@ -33,6 +33,7 @@
import java.util.Set;
import java.util.TreeMap;

import com.google.common.base.Preconditions;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
Expand Down Expand Up @@ -117,9 +118,9 @@ public interface TestServices {
private int expectedNumBatches;

public DrillTestWrapper(TestBuilder testBuilder, TestServices services, Object query, QueryType queryType,
String baselineOptionSettingQueries, String testOptionSettingQueries,
QueryType baselineQueryType, boolean ordered, boolean highPerformanceComparison,
List<Map<String, Object>> baselineRecords, int expectedNumBatches) {
String baselineOptionSettingQueries, String testOptionSettingQueries,
QueryType baselineQueryType, boolean ordered, boolean highPerformanceComparison,
List<Map<String, Object>> baselineRecords, int expectedNumBatches) {
this.testBuilder = testBuilder;
this.services = services;
this.query = query;
Expand Down Expand Up @@ -150,7 +151,7 @@ private BufferAllocator getAllocator() {
}

private void compareHyperVectors(Map<String, HyperVectorValueIterator> expectedRecords,
Map<String, HyperVectorValueIterator> actualRecords) throws Exception {
Map<String, HyperVectorValueIterator> actualRecords) throws Exception {
for (String s : expectedRecords.keySet()) {
assertNotNull("Expected column '" + s + "' not found.", actualRecords.get(s));
assertEquals(expectedRecords.get(s).getTotalRecords(), actualRecords.get(s).getTotalRecords());
Expand Down Expand Up @@ -224,7 +225,7 @@ private static String printNearbyRecords(Map<String, List<Object>> expectedRecor
}

private Map<String, HyperVectorValueIterator> addToHyperVectorMap(final List<QueryDataBatch> records,
final RecordBatchLoader loader)
final RecordBatchLoader loader)
throws SchemaChangeException, UnsupportedEncodingException {
// TODO - this does not handle schema changes
Map<String, HyperVectorValueIterator> combinedVectors = new TreeMap<>();
Expand Down Expand Up @@ -307,19 +308,43 @@ public void close() throws Exception {
}

/**
* Iterate over batches, and combine the batches into a map, where key is schema path, and value is
* the list of column values across all the batches.
* @param batches
* @return
* @throws SchemaChangeException
* @throws UnsupportedEncodingException
*/
public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches)
throws SchemaChangeException, UnsupportedEncodingException {
return addToCombinedVectorResults(batches, null);
}

/**
* Add to result vectors and compare batch schema against expected schema while iterating batches.
* @param batches
* @param expectedSchema: the expected schema the batches should contain. Through SchemaChangeException
* if encounter different batch schema.
* @return
* @throws SchemaChangeException
* @throws UnsupportedEncodingException
*/
public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches, BatchSchema expectedSchema)
throws SchemaChangeException, UnsupportedEncodingException {
// TODO - this does not handle schema changes
Map<String, List<Object>> combinedVectors = new TreeMap<>();

long totalRecords = 0;
BatchSchema schema = null;
for (VectorAccessible loader : batches) {
if (expectedSchema != null) {
if (! expectedSchema.equals(loader.getSchema())) {
throw new SchemaChangeException(String.format("Batch schema does not match expected schema\n" +
"Actual schema: %s. Expected schema : %s",
loader.getSchema(), expectedSchema));
}
}

// TODO: Clean: DRILL-2933: That load(...) no longer throws
// SchemaChangeException, so check/clean throws clause above.
if (schema == null) {
Expand Down
Expand Up @@ -32,7 +32,7 @@ public class TestRepeatedReaders extends BaseTestQuery {
@BeforeClass
public static void initFs() throws Exception {
Configuration conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local");
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);

fs = FileSystem.get(conf);
}
Expand Down
Expand Up @@ -97,7 +97,7 @@ public void test() throws Exception {
batch, context.getAllocator());

Configuration conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);

final VectorAccessibleSerializable newWrap = new VectorAccessibleSerializable(
context.getAllocator());
Expand Down
Expand Up @@ -112,7 +112,7 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
@BeforeClass
public static void initFs() throws Exception {
Configuration conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local");
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
fs = FileSystem.get(conf);
path = new Path(getDfsTestTmpSchemaLocation());

Expand Down
Expand Up @@ -120,7 +120,7 @@ public static Collection<Object[]> data() {
@BeforeClass
public static void initFs() throws Exception {
Configuration conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local");
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);

fs = FileSystem.get(conf);
test(String.format("alter session set `%s` = true", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
Expand Down
Expand Up @@ -33,7 +33,7 @@ public class TestParquetWriterEmptyFiles extends BaseTestQuery {
@BeforeClass
public static void initFs() throws Exception {
Configuration conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local");
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);

fs = FileSystem.get(conf);

Expand Down
Expand Up @@ -49,7 +49,7 @@ public class TestWriter extends BaseTestQuery {
@BeforeClass
public static void initFs() throws Exception {
Configuration conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local");
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);

fs = FileSystem.get(conf);
}
Expand Down
Expand Up @@ -195,6 +195,7 @@ public void testExternalSort() {
"[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
opTestBuilder()
.physicalOperator(sortConf)
.maxAllocation(15_000_000L)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b")
.baselineValues(5l, 1l)
Expand Down

0 comments on commit 0dc237e

Please sign in to comment.