forked from apache/pig
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
PIG-2547: Easier UDFs: Convenient EvalFunc super-classes (billgraham,…
… dvryaboy) git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1337429 13f79535-47bb-0310-9956-ffa450edef68
- Loading branch information
William W. Graham Jr
committed
May 12, 2012
1 parent
ef40976
commit 8977c70
Showing
10 changed files
with
863 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.pig; | ||
|
||
/** | ||
* An interface that captures a unit of work against an item where an exception might be thrown. | ||
* | ||
* @param <S> The argument type for the function. | ||
* @param <T> The return type for the function. | ||
* @param <E> The exception type that the function throws. | ||
* | ||
*/ | ||
public interface ExceptionalFunction<S, T, E extends Exception> { | ||
|
||
/** | ||
* Performs a unit of work on item, possibly throwing {@code E} in the process. | ||
* | ||
* @param item The item to perform work against. | ||
* @return The result of the computation. | ||
* @throws E if there was a problem performing the work. | ||
*/ | ||
public T apply(S item) throws E; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.pig; | ||
|
||
/** | ||
* A convenience typedef that ties into both google's {@code Function} and | ||
* {@code ExceptionalFunction}. | ||
* | ||
* @param <S> The argument type for the function. | ||
* @param <T> The return type for the function. | ||
* | ||
*/ | ||
public interface Function<S, T> | ||
extends ExceptionalFunction<S, T, RuntimeException>, com.google.common.base.Function<S, T> { | ||
|
||
@Override | ||
public T apply(S item); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.pig; | ||
|
||
import org.apache.pig.data.Tuple; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
|
||
/** | ||
* Base class for simple Pig UDFs that are functions of primitive types IN to OUT. Handles | ||
* marshalling objects, basic error checking, etc. Extend this class and implement the | ||
* <pre>OUT exec(IN input)</pre> method when writting a UDF that operates on only the first input | ||
* (of expected type IN) from the Tuple. | ||
*/ | ||
public abstract class PrimitiveEvalFunc<IN, OUT> extends TypedOutputEvalFunc<OUT> { | ||
|
||
protected Class<IN> inTypeClass = null; | ||
|
||
public Class<IN> getInputTypeClass() { return inTypeClass; } | ||
|
||
@SuppressWarnings("unchecked") | ||
public PrimitiveEvalFunc() { | ||
List<?> typeArgs = getTypeArguments(PrimitiveEvalFunc.class, getClass()); | ||
inTypeClass = (Class<IN>) typeArgs.get(0); | ||
outTypeClass = (Class<OUT>) typeArgs.get(1); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
public PrimitiveEvalFunc(Class inTypeClass, Class outTypeClass) { | ||
this.inTypeClass = inTypeClass; | ||
this.outTypeClass = outTypeClass; | ||
} | ||
|
||
@Override | ||
@SuppressWarnings("unchecked") | ||
public OUT exec(Tuple tuple) throws IOException { | ||
verifyUdfInput(getCounterGroup(), tuple, 1); | ||
|
||
IN input = (IN) tuple.get(0); | ||
if (input == null) { | ||
// Default behavior of null input should be null output. | ||
return null; | ||
} | ||
|
||
return exec(input); | ||
} | ||
|
||
public abstract OUT exec(IN input) throws IOException; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.pig; | ||
|
||
import com.google.common.collect.Maps; | ||
import org.apache.hadoop.mapreduce.Counter; | ||
import org.apache.pig.data.Tuple; | ||
import org.apache.pig.tools.pigstats.PigStatusReporter; | ||
|
||
import java.io.IOException; | ||
import java.lang.reflect.*; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
/** | ||
* Base class for Pig UDFs that are functions from Tuples to generic type OUT. Handles marshalling | ||
* objects, basic error checking, etc. Also infers outputSchema and provides a function to verify | ||
* the input Tuple. | ||
* <P></P> | ||
* Extend this class and implement the <pre>OUT exec(Tuple input)</pre> method when writting a UDF | ||
* that operates on multiple inputs from the Tuple. | ||
*/ | ||
public abstract class TypedOutputEvalFunc<OUT> extends EvalFunc<OUT> { | ||
|
||
// Used to implement outputSchema below. | ||
protected Class<OUT> outTypeClass = null; | ||
|
||
public Class<OUT> getOutputTypeClass() { return outTypeClass; } | ||
|
||
@SuppressWarnings("unchecked") | ||
public TypedOutputEvalFunc() { | ||
outTypeClass = (Class<OUT>) getTypeArguments(TypedOutputEvalFunc.class, getClass()).get(0); | ||
} | ||
|
||
// Increment Hadoop counters for bad inputs which are either null or too small. | ||
protected void verifyInput(Tuple input, int minimumSize) throws IOException { | ||
verifyUdfInput(getCounterGroup(), input, minimumSize); | ||
} | ||
|
||
/** | ||
* Incremented counters will use this as the counter group. Typically this works fine, since | ||
* the subclass name is enough to identify the UDF. In some cases though (i.e. a UDF wrapper that | ||
* is a facade to a number of different transformation functions), a more specific group name is | ||
* needed. | ||
*/ | ||
protected String getCounterGroup() { | ||
return getClass().getName(); | ||
} | ||
|
||
/** | ||
* Get the actual type arguments a child class has used to extend a generic base class. | ||
* | ||
* @param baseClass the base class | ||
* @param childClass the child class | ||
* @return a list of the raw classes for the actual type arguments. | ||
*/ | ||
@SuppressWarnings("unchecked") | ||
protected static <T> List<Class<?>> getTypeArguments(Class<T> baseClass, | ||
Class<? extends T> childClass) { | ||
Map<Type, Type> resolvedTypes = Maps.newHashMap(); | ||
Type type = childClass; | ||
// start walking up the inheritance hierarchy until we hit baseClass | ||
while (!getClass(type).equals(baseClass)) { | ||
if (type instanceof Class) { | ||
// there is no useful information for us in raw types, so just keep going. | ||
type = ((Class) type).getGenericSuperclass(); | ||
} else { | ||
ParameterizedType parameterizedType = (ParameterizedType) type; | ||
Class<?> rawType = (Class) parameterizedType.getRawType(); | ||
|
||
Type[] actualTypeArguments = parameterizedType.getActualTypeArguments(); | ||
TypeVariable<?>[] typeParameters = rawType.getTypeParameters(); | ||
for (int i = 0; i < actualTypeArguments.length; i++) { | ||
resolvedTypes.put(typeParameters[i], actualTypeArguments[i]); | ||
} | ||
|
||
if (!rawType.equals(baseClass)) { | ||
type = rawType.getGenericSuperclass(); | ||
} | ||
} | ||
} | ||
|
||
// finally, for each actual type argument provided to baseClass, determine (if possible) | ||
// the raw class for that type argument. | ||
Type[] actualTypeArguments; | ||
if (type instanceof Class) { | ||
actualTypeArguments = ((Class) type).getTypeParameters(); | ||
} else { | ||
actualTypeArguments = ((ParameterizedType) type).getActualTypeArguments(); | ||
} | ||
List<Class<?>> typeArgumentsAsClasses = new ArrayList<Class<?>>(); | ||
// resolve types by chasing down type variables. | ||
for (Type baseType : actualTypeArguments) { | ||
while (resolvedTypes.containsKey(baseType)) { | ||
baseType = resolvedTypes.get(baseType); | ||
} | ||
typeArgumentsAsClasses.add(getClass(baseType)); | ||
} | ||
return typeArgumentsAsClasses; | ||
} | ||
|
||
/** | ||
* Get the underlying class for a type, or null if the type is a variable type. | ||
* | ||
* @param type the type | ||
* @return the underlying class | ||
*/ | ||
@SuppressWarnings("unchecked") | ||
private static Class<?> getClass(Type type) { | ||
if (type instanceof Class) { | ||
return (Class) type; | ||
} else if (type instanceof ParameterizedType) { | ||
return getClass(((ParameterizedType) type).getRawType()); | ||
} else if (type instanceof GenericArrayType) { | ||
Type componentType = ((GenericArrayType) type).getGenericComponentType(); | ||
Class<?> componentClass = getClass(componentType); | ||
if (componentClass != null) { | ||
return Array.newInstance(componentClass, 0).getClass(); | ||
} else { | ||
return null; | ||
} | ||
} else { | ||
return null; | ||
} | ||
} | ||
|
||
/** Increment Hadoop counters for bad inputs which are either null or too small. | ||
* | ||
* @param klass the name of the calling class, for recording purposes | ||
* @param input the tuple passed to the UDF. | ||
* @param minimumSize the minimum size required of the tuple. | ||
*/ | ||
protected static void verifyUdfInput(String klass, Tuple input, int minimumSize) throws IOException { | ||
if (input == null) { | ||
safeIncrCounter(klass, "NullInput", 1L); | ||
throw new IOException("Null input to UDF " + klass); | ||
} else if (input.size() < minimumSize) { | ||
String reason = "TooFewArguments_Got_" + input.size() + "_NeededAtLeast_" + minimumSize; | ||
safeIncrCounter(klass, reason, 1L); | ||
throw new IOException("Not enough arguments to " + klass + ": got " + input.size() + | ||
", expected at least " + minimumSize); | ||
} else { | ||
safeIncrCounter(klass, "ValidInput", 1L); | ||
} | ||
} | ||
|
||
protected static void safeIncrCounter(String group, String name, Long increment) { | ||
Counter counter = PigStatusReporter.getInstance().getCounter(group, name); | ||
if (counter != null) { | ||
counter.increment(increment); | ||
} | ||
} | ||
} |
Oops, something went wrong.