Skip to content

Commit

Permalink
DRILL-2173: Partition queries to drive dynamic pruning
Browse files Browse the repository at this point in the history
Adds new interface on the QueryContext as well as individual schemas for exploring partitions of tables.
Adds injectable type for partition explorer for use in UDFs. This is hooked into both to expression
materialization and interpreted evaluation. The FragmentContext throws an exception to tell users to turn on
constant folding if a UDF that uses the PartitionExplorer makes it past planning.

2173 update -Address Chris' review comments.

Change the PartitionExplorer to return an Iterable<String> instead of String[]

Add interface level description to PartitionExplorer and StoragePluginPartitionExplorer.

New inner class in FileSystemPlugin to fulfill the new Iterable interface for partitions.

Formatting/cleanup fixes

Clean up error reporting code in MaxDir UDF. Remove method to get a string from a DrillBuf, as it was already defined in StringFunctionHelpers. Add new utility method to specifically convert a VarCharHolder to a string to remove some boilerplate.

Fixed an errant copy paste in a comment and removed unused imports.

Fix docs in FileSystemPlugin, belongs with the 2173 changes.

Fix references in Javadoc to properly use @link instead of @see.

2173 fixes, correctly return an empty list of sub-partitions if the path requested in the partition explorer interface is a file. Fix a few docs.

More 2173, finishing Chris' comments

2173 update - Add validation for PartitionExplorer injectable in UdfUtiltiers.

small change to fix refactored unit tests.

cleanup for 2173

Fix maxdir UDF so it can compile in runtime generated code as well as the interpreted expression system (needed to fully qualify classes and interfaces). It still fails to execute, as we prevent requesting a schema from a non-root fragment. We do not expect these types of functions to ever be used without constant folding so this should not be an issue.

Update error message in the case where the partition explorer is being used outside of planning.

Adding free marker generated maxdir, imaxdir, mindir and imindir

remove import that violates build checks, fix typo in new test class name

Separate out SubDirectoryList from FileSystemSchemaFactory.

Fix unit test to correctly test all four functions.

Update partition explorer to take List instead of Collection. As the lists are used in parallel it should be explicit that these are expected to be ordered (which Collections do not guarantee).

Drop the extra file generated due to the header in the free marker template and fix a typo and remove an unused import.
  • Loading branch information
jaltekruse committed Apr 6, 2015
1 parent ca73990 commit af7a52b
Show file tree
Hide file tree
Showing 20 changed files with 650 additions and 37 deletions.
Expand Up @@ -62,7 +62,7 @@ public void setHolder(SchemaPlus plusOfThis) {
}

@Override
public Schema getSubSchema(String name) {
public AbstractSchema getSubSchema(String name) {
return null;
}

Expand Down
Expand Up @@ -202,7 +202,7 @@ public HiveSchema(String name) {
}

@Override
public Schema getSubSchema(String name) {
public AbstractSchema getSubSchema(String name) {
List<String> tables;
try {
List<String> dbs = databases.get(DATABASES);
Expand Down
Expand Up @@ -136,7 +136,7 @@ public MongoSchema(String name) {
}

@Override
public Schema getSubSchema(String name) {
public AbstractSchema getSubSchema(String name) {
List<String> tables;
try {
if (! schemaMap.containsKey(name)) {
Expand Down
107 changes: 107 additions & 0 deletions exec/java-exec/src/main/codegen/templates/DirectoryExplorers.java
@@ -0,0 +1,107 @@
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/

<@pp.dropOutputFile />

<@pp.changeOutputFile name="/org/apache/drill/exec/expr/fn/impl/DirectoryExplorers.java" />

<#include "/@includes/license.ftl" />

package org.apache.drill.exec.expr.fn.impl;

import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.expr.DrillSimpleFunc;
import org.apache.drill.exec.expr.annotations.FunctionTemplate;
import org.apache.drill.exec.expr.annotations.Output;
import org.apache.drill.exec.expr.annotations.Param;
import org.apache.drill.exec.expr.holders.VarCharHolder;

import javax.inject.Inject;

/**
* This file is generated with Freemarker using the template exec/java-exec/src/main/codegen/templates/DirectoryExplorers.java
*/
public class DirectoryExplorers {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirectoryExplorers.class);

<#list [ { "name" : "\"maxdir\"", "functionClassName" : "MaxDir", "comparison" : "compareTo(curr) < 0", "goal" : "maximum", "comparisonType" : "case-sensitive"},
{ "name" : "\"imaxdir\"", "functionClassName" : "IMaxDir", "comparison" : "compareToIgnoreCase(curr) < 0", "goal" : "maximum", "comparisonType" : "case-insensitive"},
{ "name" : "\"mindir\"", "functionClassName" : "MinDir", "comparison" : "compareTo(curr) > 0", "goal" : "minimum", "comparisonType" : "case-sensitive"},
{ "name" : "\"imindir\"", "functionClassName" : "IMinDir", "comparison" : "compareToIgnoreCase(curr) > 0", "goal" : "minimum", "comparisonType" : "case-insensitive"}
] as dirAggrProps>


@FunctionTemplate(name = ${dirAggrProps.name}, scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL)
public static class ${dirAggrProps.functionClassName} implements DrillSimpleFunc {

@Param VarCharHolder schema;
@Param VarCharHolder table;
@Output VarCharHolder out;
@Inject DrillBuf buffer;
@Inject org.apache.drill.exec.store.PartitionExplorer partitionExplorer;

public void setup() {
}

public void eval() {
Iterable<String> subPartitions;
try {
subPartitions = partitionExplorer.getSubPartitions(
org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.getStringFromVarCharHolder(schema),
org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.getStringFromVarCharHolder(table),
new java.util.ArrayList<String>(),
new java.util.ArrayList<String>());
} catch (org.apache.drill.exec.store.PartitionNotFoundException e) {
throw new RuntimeException(
String.format("Error in %s function: Table %s does not exist in schema %s ",
${dirAggrProps.name},
org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.getStringFromVarCharHolder(table),
org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.getStringFromVarCharHolder(schema))
);
}
java.util.Iterator partitionIterator = subPartitions.iterator();
if (!partitionIterator.hasNext()) {
throw new RuntimeException(
String.format("Error in %s function: Table %s in schema %s does not contain sub-partitions.",
${dirAggrProps.name},
org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.getStringFromVarCharHolder(table),
org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.getStringFromVarCharHolder(schema)
)
);
}
String subPartitionStr = (String) partitionIterator.next();
String curr;
// find the ${dirAggrProps.goal} directory in the list using a ${dirAggrProps.comparisonType} string comparison
while (partitionIterator.hasNext()){
curr = (String) partitionIterator.next();
if (subPartitionStr.${dirAggrProps.comparison}) {
subPartitionStr = curr;
}
}
String[] subPartitionParts = subPartitionStr.split(java.io.File.separator);
subPartitionStr = subPartitionParts[subPartitionParts.length - 1];
byte[] result = subPartitionStr.getBytes();
out.buffer = buffer = buffer.reallocIfNeeded(result.length);

out.buffer.setBytes(0, subPartitionStr.getBytes(), 0, result.length);
out.start = 0;
out.end = result.length;
}
}
</#list>
}
Expand Up @@ -18,14 +18,12 @@
package org.apache.drill.exec.expr.fn;

import com.google.common.base.Joiner;
import io.netty.buffer.DrillBuf;

import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

Expand All @@ -43,7 +41,6 @@
import org.apache.drill.exec.expr.fn.DrillFuncHolder.ValueReference;
import org.apache.drill.exec.expr.fn.DrillFuncHolder.WorkspaceReference;
import org.apache.drill.exec.expr.holders.ValueHolder;
import org.apache.drill.exec.ops.QueryDateTimeInfo;
import org.apache.drill.exec.ops.UdfUtilities;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
Expand Down
Expand Up @@ -34,10 +34,6 @@
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.expression.TypedNullConstant;
import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
import org.apache.drill.common.expression.ValueExpressions.DateExpression;
import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.expr.DrillFuncHolderExpr;
Expand All @@ -49,21 +45,15 @@
import org.apache.drill.exec.expr.annotations.Param;
import org.apache.drill.exec.expr.fn.DrillSimpleFuncHolder;
import org.apache.drill.exec.expr.holders.BitHolder;
import org.apache.drill.exec.expr.holders.DateHolder;
import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
import org.apache.drill.exec.expr.holders.NullableBitHolder;
import org.apache.drill.exec.expr.holders.TimeHolder;
import org.apache.drill.exec.expr.holders.TimeStampHolder;
import org.apache.drill.exec.expr.holders.ValueHolder;
import org.apache.drill.exec.ops.QueryDateTimeInfo;
import org.apache.drill.exec.ops.UdfUtilities;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.vector.ValueHolderHelper;
import org.apache.drill.exec.vector.ValueVector;

import javax.inject.Inject;
import java.lang.reflect.Field;
import java.lang.reflect.Method;

public class InterpreterEvaluator {
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
Expand All @@ -45,6 +46,7 @@
import org.apache.drill.exec.server.options.FragmentOptionManager;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.PartitionExplorer;
import org.apache.drill.exec.work.batch.IncomingBuffers;

import com.google.common.collect.Maps;
Expand Down Expand Up @@ -316,4 +318,11 @@ public DrillBuf getManagedBuffer() {
public DrillBuf getManagedBuffer(int size) {
return bufferManager.getManagedBuffer(size);
}

@Override
public PartitionExplorer getPartitionExplorer() {
throw new UnsupportedOperationException(String.format("The partition explorer interface can only be used " +
"in functions that can be evaluated at planning time. Make sure that the %s configuration " +
"option is set to true.", PlannerSettings.CONSTANT_FOLDING.getOptionName()));
}
}
Expand Up @@ -36,6 +36,8 @@
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.QueryOptionManager;
import org.apache.drill.exec.store.PartitionExplorer;
import org.apache.drill.exec.store.PartitionExplorerImpl;
import org.apache.drill.exec.store.StoragePluginRegistry;

// TODO except for a couple of tests, this is only created by Foreman
Expand Down Expand Up @@ -152,6 +154,11 @@ public DrillBuf getManagedBuffer() {
return bufferManager.getManagedBuffer();
}

@Override
public PartitionExplorer getPartitionExplorer() {
return new PartitionExplorerImpl(getRootSchema());
}

@Override
public void close() throws Exception {
try {
Expand Down
Expand Up @@ -19,6 +19,7 @@

import com.google.common.collect.ImmutableMap;
import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.store.PartitionExplorer;

/**
* Defines the query state and shared resources available to UDFs through
Expand All @@ -34,6 +35,7 @@ public interface UdfUtilities {
new ImmutableMap.Builder<Class, String>()
.put(DrillBuf.class, "getManagedBuffer")
.put(QueryDateTimeInfo.class, "getQueryDateTimeInfo")
.put(PartitionExplorer.class, "getPartitionExplorer")
.build();

/**
Expand All @@ -54,4 +56,26 @@ public interface UdfUtilities {
* for memory management
*/
DrillBuf getManagedBuffer();

/**
* A partition explorer allows UDFs to view the sub-partitions below a
* particular partition. This allows for the implementation of UDFs to
* query against the partition information, without having to read
* the actual data contained in the partition. This interface is designed
* for UDFs that take only constant inputs, as this interface will only
* be useful if we can evaluate the constant UDF at planning time.
*
* Any function defined to use this interface that is not evaluated
* at planning time by the constant folding rule will be querying
* the storage plugin for meta-data for each record processed.
*
* Be sure to check the query plans to see that this expression has already
* been evaluated during planning if you write UDFs against this interface.
*
* See {@link org.apache.drill.exec.expr.fn.impl.DirectoryExplorers} for
* example usages of this interface.
*
* @return - an object for exploring partitions of all available schemas
*/
PartitionExplorer getPartitionExplorer();
}
Expand Up @@ -34,7 +34,7 @@
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;

public abstract class AbstractSchema implements Schema{
public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSchema.class);

protected final List<String> schemaPath;
Expand All @@ -48,6 +48,17 @@ public AbstractSchema(List<String> parentSchemaPath, String name) {
this.name = name;
}

@Override
public Iterable<String> getSubPartitions(String table,
List<String> partitionColumns,
List<String> partitionValues
) throws PartitionNotFoundException {
throw new UnsupportedOperationException(
String.format("Schema of type: %s " +
"does not support retrieving sub-partition information.",
this.getClass().getSimpleName()));
}

public String getName() {
return name;
}
Expand Down Expand Up @@ -96,7 +107,7 @@ public Set<String> getFunctionNames() {
}

@Override
public Schema getSubSchema(String name) {
public AbstractSchema getSubSchema(String name) {
return null;
}

Expand Down

0 comments on commit af7a52b

Please sign in to comment.