Skip to content

Commit

Permalink
DRILL-2143: Part 2 - fix interpreter and add QueryDateTimeInfo inject…
Browse files Browse the repository at this point in the history
…able to fill in holes from removing RecordBatch from UDFs - Move list of accepted injectable types into the UdfUtilities interface for easier maintenance.

Fix one more new function to remove RecordBatch from setup method.

Add back UdfUtilities interface to FragmentContext lost in rebasing.

Remove unneeded testcase in TestExampleQueries, remove commented out code in InterpreterGenerator
previously used to handle DrillBuf injectables, now being handled by reflection to set the DrillBuf
rather than the previously generated code that would use the direct reference to the incoming
RecordBatch to get a buffer.

Fix docs on UdfUtilities and package docs for drill/exec/expr, belongs with 2143 part 2 patch.

Fix belongs with 2143, change interpreter to output a ValueHolder instead of a ValueVector in the case of a constant expression.

2143 update - Use reflection to remove boilerplate for adding new injectable types for UDFs.
  • Loading branch information
jaltekruse committed Mar 17, 2015
1 parent bff7b9e commit 1c5decc
Show file tree
Hide file tree
Showing 13 changed files with 308 additions and 65 deletions.
Expand Up @@ -32,7 +32,9 @@
import org.apache.drill.common.types.Types;
import org.apache.drill.common.util.DrillStringUtils;
import org.apache.drill.exec.expr.fn.interpreter.InterpreterEvaluator;
import org.apache.drill.exec.expr.holders.TimeStampTZHolder;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.QueryDateTimeInfo;
import org.apache.drill.exec.pop.PopUnitTestBase;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.record.MaterializedField;
Expand All @@ -43,8 +45,10 @@
import org.apache.drill.exec.store.mock.MockScanBatchCreator;
import org.apache.drill.exec.store.mock.MockSubScanPOP;
import org.apache.drill.exec.vector.ValueVector;
import org.joda.time.DateTime;
import org.junit.Test;

import java.nio.ByteBuffer;
import java.util.List;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -94,8 +98,42 @@ public void interpreterCaseExpr() throws Exception {
doTest(expressionStr, colNames, colTypes, expectedFirstTwoValues);
}

@Test
public void interpreterDateTest() throws Exception {
String[] colNames = {"col1"};
TypeProtos.MajorType[] colTypes = {Types.optional(TypeProtos.MinorType.INT)};
String expressionStr = "now()";
BitControl.PlanFragment planFragment = BitControl.PlanFragment.getDefaultInstance();
QueryDateTimeInfo dateTime = new QueryDateTimeInfo(planFragment.getQueryStartTime(), planFragment.getTimeZone());
int timeZoneIndex = dateTime.getRootFragmentTimeZone();
org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
org.joda.time.DateTime now = new org.joda.time.DateTime(dateTime.getQueryStartTime(), timeZone);

long queryStartDate = now.getMillis();
int timezoneIndex = org.apache.drill.exec.expr.fn.impl.DateUtility.getIndex(now.getZone().toString());

TimeStampTZHolder out = new TimeStampTZHolder();

out.value = queryStartDate;
out.index = timezoneIndex;

ByteBuffer buffer = ByteBuffer.allocate(12);
buffer.putLong(out.value);
buffer.putInt(out.index);
long l = buffer.getLong(0);
DateTime t = new DateTime(l);

String[] expectedFirstTwoValues = {t.toString(), t.toString()};

doTest(expressionStr, colNames, colTypes, expectedFirstTwoValues, planFragment);
}


protected void doTest(String expressionStr, String[] colNames, TypeProtos.MajorType[] colTypes, String[] expectFirstTwoValues) throws Exception {
doTest(expressionStr, colNames, colTypes, expectFirstTwoValues, BitControl.PlanFragment.getDefaultInstance());
}

protected void doTest(String expressionStr, String[] colNames, TypeProtos.MajorType[] colTypes, String[] expectFirstTwoValues, BitControl.PlanFragment planFragment) throws Exception {
RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();

Drillbit bit1 = new Drillbit(CONFIG, serviceSet);
Expand All @@ -114,7 +152,7 @@ protected void doTest(String expressionStr, String[] colNames, TypeProtos.MajorT
MockGroupScanPOP.MockScanEntry entry = new MockGroupScanPOP.MockScanEntry(10, columns);
MockSubScanPOP scanPOP = new MockSubScanPOP("testTable", java.util.Collections.singletonList(entry));

RecordBatch batch = createMockScanBatch(bit1, scanPOP);
RecordBatch batch = createMockScanBatch(bit1, scanPOP, planFragment);

batch.next();

Expand All @@ -134,12 +172,12 @@ protected void doTest(String expressionStr, String[] colNames, TypeProtos.MajorT
}


private RecordBatch createMockScanBatch(Drillbit bit, MockSubScanPOP scanPOP) {
private RecordBatch createMockScanBatch(Drillbit bit, MockSubScanPOP scanPOP, BitControl.PlanFragment planFragment) {
List<RecordBatch> children = Lists.newArrayList();
MockScanBatchCreator creator = new MockScanBatchCreator();

try {
FragmentContext context = new FragmentContext(bit.getContext(), BitControl.PlanFragment.getDefaultInstance(), null, bit.getContext().getFunctionImplementationRegistry());
FragmentContext context = new FragmentContext(bit.getContext(), planFragment, null, bit.getContext().getFunctionImplementationRegistry());
return creator.getBatch(context,scanPOP, children);
} catch (Exception ex) {
throw new DrillRuntimeException("Error when setup fragment context" + ex);
Expand Down
Expand Up @@ -21,6 +21,8 @@
import java.util.List;
import java.util.Map;

import io.netty.buffer.DrillBuf;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FunctionHolderExpression;
import org.apache.drill.common.expression.LogicalExpression;
Expand All @@ -39,6 +41,8 @@
import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionCostCategory;
import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
import org.apache.drill.exec.ops.QueryDateTimeInfo;
import org.apache.drill.exec.ops.UdfUtilities;
import org.apache.drill.exec.vector.complex.reader.FieldReader;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -132,7 +136,16 @@ protected JVar[] declareWorkspaceVariables(ClassGenerator<?> g) {
}

if (ref.isInject()) {
g.getBlock(BlockType.SETUP).assign(workspaceJVars[i], g.getMappingSet().getIncoming().invoke("getContext").invoke("getManagedBuffer"));
if (UdfUtilities.INJECTABLE_GETTER_METHODS.get(ref.getType()) != null) {
g.getBlock(BlockType.SETUP).assign(
workspaceJVars[i],
g.getMappingSet().getIncoming().invoke("getContext").invoke(
UdfUtilities.INJECTABLE_GETTER_METHODS.get(ref.getType())
));
} else {
// Invalid injectable type provided, this should have been caught in FunctionConverter
throw new DrillRuntimeException("Invalid injectable type requested in UDF: " + ref.getType().getSimpleName());
}
} else {
//g.getBlock(BlockType.SETUP).assign(workspaceJVars[i], JExpr._new(jtype));
}
Expand Down
Expand Up @@ -17,21 +17,22 @@
*/
package org.apache.drill.exec.expr.fn;

import com.google.common.base.Joiner;
import io.netty.buffer.DrillBuf;

import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import javax.inject.Inject;

import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.util.FileUtils;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.expr.DrillFunc;
import org.apache.drill.exec.expr.annotations.FunctionTemplate;
import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
Expand All @@ -42,6 +43,8 @@
import org.apache.drill.exec.expr.fn.DrillFuncHolder.WorkspaceReference;
import org.apache.drill.exec.expr.fn.interpreter.InterpreterGenerator;
import org.apache.drill.exec.expr.holders.ValueHolder;
import org.apache.drill.exec.ops.QueryDateTimeInfo;
import org.apache.drill.exec.ops.UdfUtilities;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
import org.codehaus.commons.compiler.CompileException;
Expand Down Expand Up @@ -189,8 +192,9 @@ public <T extends DrillFunc> DrillFuncHolder getHolder(Class<T> clazz) {
} else {
// workspace work.
boolean isInject = inject != null;
if (isInject && !field.getType().equals(DrillBuf.class)) {
return failure(String.format("Only DrillBuf is allowed to be injected. You attempted to inject %s.", field.getType()), clazz, field);
if (isInject && UdfUtilities.INJECTABLE_GETTER_METHODS.get(field.getType()) == null) {
return failure(String.format("A %s cannot be injected into a %s, available injectable classes are: %s.",
field.getType(), DrillFunc.class.getSimpleName(), Joiner.on(",").join(UdfUtilities.INJECTABLE_GETTER_METHODS.keySet())), clazz, field);
}
WorkspaceReference wsReference = new WorkspaceReference(field.getType(), field.getName(), isInject);

Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.apache.drill.exec.expr.holders.TimeStampHolder;
import org.apache.drill.exec.expr.holders.TimeStampTZHolder;
import org.apache.drill.exec.expr.holders.VarCharHolder;
import org.apache.drill.exec.ops.QueryDateTimeInfo;

public class DateTypeFunctions {

Expand Down Expand Up @@ -213,14 +214,15 @@ public void eval() {
public static class CurrentDate implements DrillSimpleFunc {
@Workspace long queryStartDate;
@Output DateHolder out;
@Inject QueryDateTimeInfo dateTime;

public void setup() {

// int timeZoneIndex = incoming.getContext().getRootFragmentTimeZone();
// org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
// org.joda.time.DateTime now = new org.joda.time.DateTime(incoming.getContext().getQueryStartTime(), timeZone);
// queryStartDate = (new org.joda.time.DateMidnight(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), timeZone)).
// withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis();
int timeZoneIndex = dateTime.getRootFragmentTimeZone();
org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
org.joda.time.DateTime now = new org.joda.time.DateTime(dateTime.getQueryStartTime(), timeZone);
queryStartDate = (new org.joda.time.DateMidnight(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), timeZone)).
withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis();
}

public void eval() {
Expand All @@ -234,14 +236,15 @@ public static class CurrentTimeStamp implements DrillSimpleFunc {
@Workspace long queryStartDate;
@Workspace int timezoneIndex;
@Output TimeStampTZHolder out;
@Inject QueryDateTimeInfo dateTime;

public void setup() {

// int timeZoneIndex = incoming.getContext().getRootFragmentTimeZone();
// org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
// org.joda.time.DateTime now = new org.joda.time.DateTime(incoming.getContext().getQueryStartTime(), timeZone);
// queryStartDate = now.getMillis();
// timezoneIndex = org.apache.drill.exec.expr.fn.impl.DateUtility.getIndex(now.getZone().toString());
int timeZoneIndex = dateTime.getRootFragmentTimeZone();
org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
org.joda.time.DateTime now = new org.joda.time.DateTime(dateTime.getQueryStartTime(), timeZone);
queryStartDate = now.getMillis();
timezoneIndex = org.apache.drill.exec.expr.fn.impl.DateUtility.getIndex(now.getZone().toString());
}

public void eval() {
Expand Down Expand Up @@ -290,11 +293,12 @@ public void eval() {
public static class LocalTimeStamp implements DrillSimpleFunc {
@Workspace long queryStartDate;
@Output TimeStampHolder out;
@Inject QueryDateTimeInfo dateTime;

public void setup() {

// org.joda.time.DateTime now = (new org.joda.time.DateTime(incoming.getContext().getQueryStartTime())).withZoneRetainFields(org.joda.time.DateTimeZone.UTC);
// queryStartDate = now.getMillis();
org.joda.time.DateTime now = (new org.joda.time.DateTime(dateTime)).withZoneRetainFields(org.joda.time.DateTimeZone.UTC);
queryStartDate = now.getMillis();
}

public void eval() {
Expand All @@ -306,16 +310,17 @@ public void eval() {
public static class CurrentTime implements DrillSimpleFunc {
@Workspace int queryStartTime;
@Output TimeHolder out;
@Inject QueryDateTimeInfo dateTime;

public void setup() {

// int timeZoneIndex = incoming.getContext().getRootFragmentTimeZone();
// org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
// org.joda.time.DateTime now = new org.joda.time.DateTime(incoming.getContext().getQueryStartTime(), timeZone);
// queryStartTime= (int) ((now.getHourOfDay() * org.apache.drill.exec.expr.fn.impl.DateUtility.hoursToMillis) +
// (now.getMinuteOfHour() * org.apache.drill.exec.expr.fn.impl.DateUtility.minutesToMillis) +
// (now.getSecondOfMinute() * org.apache.drill.exec.expr.fn.impl.DateUtility.secondsToMillis) +
// (now.getMillisOfSecond()));
int timeZoneIndex = dateTime.getRootFragmentTimeZone();
org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
org.joda.time.DateTime now = new org.joda.time.DateTime(dateTime.getQueryStartTime(), timeZone);
queryStartTime= (int) ((now.getHourOfDay() * org.apache.drill.exec.expr.fn.impl.DateUtility.hoursToMillis) +
(now.getMinuteOfHour() * org.apache.drill.exec.expr.fn.impl.DateUtility.minutesToMillis) +
(now.getSecondOfMinute() * org.apache.drill.exec.expr.fn.impl.DateUtility.secondsToMillis) +
(now.getMillisOfSecond()));
}

public void eval() {
Expand Down Expand Up @@ -399,12 +404,13 @@ public static class AgeTimeStamp2Function implements DrillSimpleFunc {
@Param TimeStampHolder right;
@Workspace long queryStartDate;
@Output IntervalHolder out;
@Inject QueryDateTimeInfo dateTime;

public void setup() {
// int timeZoneIndex = incoming.getContext().getRootFragmentTimeZone();
// org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
// org.joda.time.DateTime now = new org.joda.time.DateTime(incoming.getContext().getQueryStartTime(), timeZone);
// queryStartDate = (new org.joda.time.DateMidnight(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), timeZone)).getMillis();
int timeZoneIndex = dateTime.getRootFragmentTimeZone();
org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
org.joda.time.DateTime now = new org.joda.time.DateTime(dateTime.getQueryStartTime(), timeZone);
queryStartDate = (new org.joda.time.DateMidnight(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), timeZone)).getMillis();
}

public void eval() {
Expand Down Expand Up @@ -441,12 +447,13 @@ public static class AgeDate2Function implements DrillSimpleFunc {
@Param DateHolder right;
@Workspace long queryStartDate;
@Output IntervalHolder out;
@Inject QueryDateTimeInfo dateTime;

public void setup() {
// int timeZoneIndex = incoming.getContext().getRootFragmentTimeZone();
// org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
// org.joda.time.DateTime now = new org.joda.time.DateTime(incoming.getContext().getQueryStartTime(), timeZone);
// queryStartDate = (new org.joda.time.DateMidnight(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), timeZone)).getMillis();
int timeZoneIndex = dateTime.getRootFragmentTimeZone();
org.joda.time.DateTimeZone timeZone = org.joda.time.DateTimeZone.forID(org.apache.drill.exec.expr.fn.impl.DateUtility.getTimeZone(timeZoneIndex));
org.joda.time.DateTime now = new org.joda.time.DateTime(dateTime.getQueryStartTime(), timeZone);
queryStartDate = (new org.joda.time.DateMidnight(now.getYear(), now.getMonthOfYear(), now.getDayOfMonth(), timeZone)).getMillis();
}

public void eval() {
Expand Down Expand Up @@ -477,10 +484,11 @@ public void eval() {
public static class UnixTimeStamp implements DrillSimpleFunc {
@Output BigIntHolder out;
@Workspace long queryStartDate;
@Inject QueryDateTimeInfo dateTime;

@Override
public void setup() {
// queryStartDate = incoming.getContext().getQueryStartTime();
queryStartDate = dateTime.getQueryStartTime();
}

@Override
Expand Down
Expand Up @@ -843,7 +843,7 @@ public static class ConcatOperator implements DrillSimpleFunc{
@Output VarCharHolder out;
@Inject DrillBuf buffer;

public void setup(RecordBatch incoming) {
public void setup() {
}

public void eval() {
Expand Down
Expand Up @@ -23,7 +23,7 @@

public interface DrillSimpleFuncInterpreter extends DrillFuncInterpreter {

public void doSetup(ValueHolder[] args, RecordBatch incoming);
public void doSetup(ValueHolder[] args);

public ValueHolder doEval(ValueHolder [] args) ;

Expand Down

0 comments on commit 1c5decc

Please sign in to comment.