From cb20120ccbaac1ebf3c2f8aafd4d3e82e5fe9fc1 Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Thu, 1 Jun 2017 13:17:25 +0200 Subject: [PATCH 1/6] [FLINK-6783] Changed passing index of type argument while extracting return type. --- .../api/java/typeutils/TypeExtractor.java | 443 +++++++++++++----- .../org/apache/flink/cep/PatternStream.java | 24 +- .../flink/graph/asm/translate/Translate.java | 44 +- .../api/datastream/AllWindowedStream.java | 56 ++- .../api/datastream/AsyncDataStream.java | 13 +- .../streaming/api/datastream/DataStream.java | 16 +- .../streaming/api/datastream/KeyedStream.java | 16 +- .../api/datastream/WindowedStream.java | 60 ++- .../windowing/WindowTranslationTest.java | 232 ++++----- 9 files changed, 604 insertions(+), 300 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 112ca38c8a2ce..1aa5d867d27f9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -116,6 +116,8 @@ public class TypeExtractor { private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class); + public static final int[] NO_OUTPUT_INDEX = new int[] {}; + protected TypeExtractor() { // only create instances for special use cases } @@ -161,9 +163,18 @@ public static TypeInformation getMapReturnTypes(MapFunction TypeInformation getMapReturnTypes(MapFunction mapInterface, TypeInformation inType, String functionName, boolean allowMissing) { - return getUnaryOperatorReturnType((Function) mapInterface, MapFunction.class, false, false, inType, functionName, allowMissing); + return getUnaryOperatorReturnType( + (Function) mapInterface, + MapFunction.class, + 0, + 1, + new int[]{0}, + NO_OUTPUT_INDEX, + inType, + functionName, + allowMissing); } - + @PublicEvolving public static TypeInformation getFlatMapReturnTypes(FlatMapFunction flatMapInterface, TypeInformation inType) { @@ -174,7 +185,16 @@ public static TypeInformation getFlatMapReturnTypes(FlatMapFuncti public static TypeInformation getFlatMapReturnTypes(FlatMapFunction flatMapInterface, TypeInformation inType, String functionName, boolean allowMissing) { - return getUnaryOperatorReturnType((Function) flatMapInterface, FlatMapFunction.class, false, true, inType, functionName, allowMissing); + return getUnaryOperatorReturnType( + (Function) flatMapInterface, + FlatMapFunction.class, + 0, + 1, + new int[]{0}, + new int[]{1, 0}, + inType, + functionName, + allowMissing); } /** @@ -194,7 +214,16 @@ public static TypeInformation getFoldReturnTypes(FoldFunction TypeInformation getFoldReturnTypes(FoldFunction foldInterface, TypeInformation inType, String functionName, boolean allowMissing) { - return getUnaryOperatorReturnType((Function) foldInterface, FoldFunction.class, false, false, inType, functionName, allowMissing); + return getUnaryOperatorReturnType( + (Function) foldInterface, + FoldFunction.class, + 0, + 1, + new int[]{1}, + NO_OUTPUT_INDEX, + inType, + functionName, + allowMissing); } @PublicEvolving @@ -228,7 +257,16 @@ public static TypeInformation getMapPartitionReturnTypes(MapParti public static TypeInformation getMapPartitionReturnTypes(MapPartitionFunction mapPartitionInterface, TypeInformation inType, String functionName, boolean allowMissing) { - return getUnaryOperatorReturnType((Function) mapPartitionInterface, MapPartitionFunction.class, true, true, inType, functionName, allowMissing); + return getUnaryOperatorReturnType( + (Function) mapPartitionInterface, + MapPartitionFunction.class, + 0, + 1, + new int[]{0, 1}, + new int[]{0, 1}, + inType, + functionName, + allowMissing); } @PublicEvolving @@ -240,7 +278,16 @@ public static TypeInformation getGroupReduceReturnTypes(GroupRedu public static TypeInformation getGroupReduceReturnTypes(GroupReduceFunction groupReduceInterface, TypeInformation inType, String functionName, boolean allowMissing) { - return getUnaryOperatorReturnType((Function) groupReduceInterface, GroupReduceFunction.class, true, true, inType, functionName, allowMissing); + return getUnaryOperatorReturnType( + (Function) groupReduceInterface, + GroupReduceFunction.class, + 0, + 1, + new int[]{0, 1}, + new int[]{0, 1}, + inType, + functionName, + allowMissing); } @PublicEvolving @@ -252,7 +299,16 @@ public static TypeInformation getGroupCombineReturnTypes(GroupCom public static TypeInformation getGroupCombineReturnTypes(GroupCombineFunction combineInterface, TypeInformation inType, String functionName, boolean allowMissing) { - return getUnaryOperatorReturnType((Function) combineInterface, GroupCombineFunction.class, true, true, inType, functionName, allowMissing); + return getUnaryOperatorReturnType( + (Function) combineInterface, + GroupCombineFunction.class, + 0, + 1, + new int[]{0, 1}, + new int[]{0, 1}, + inType, + functionName, + allowMissing); } @PublicEvolving @@ -324,7 +380,16 @@ public static TypeInformation getKeySelectorTypes(KeySelector TypeInformation getKeySelectorTypes(KeySelector selectorInterface, TypeInformation inType, String functionName, boolean allowMissing) { - return getUnaryOperatorReturnType((Function) selectorInterface, KeySelector.class, false, false, inType, functionName, allowMissing); + return getUnaryOperatorReturnType( + (Function) selectorInterface, + KeySelector.class, + 0, + 1, + new int[]{0}, + NO_OUTPUT_INDEX, + inType, + functionName, + allowMissing); } @PublicEvolving @@ -336,8 +401,8 @@ public static TypeInformation getPartitionerTypes(Partitioner partitio public static TypeInformation getPartitionerTypes(Partitioner partitioner, String functionName, boolean allowMissing) { return new TypeExtractor().privateCreateTypeInfo(Partitioner.class, partitioner.getClass(), 0, null, null); } - - + + @SuppressWarnings("unchecked") @PublicEvolving public static TypeInformation getInputFormatTypes(InputFormat inputFormatInterface) { @@ -364,9 +429,11 @@ public static TypeInformation getInputFormatTypes(InputFormat in * @param Input type * @param Output type * @return TypeInformation of the return type of the function + * @deprecated Use version explicitly specyifing types indices. */ @SuppressWarnings("unchecked") @PublicEvolving + @Deprecated public static TypeInformation getUnaryOperatorReturnType( Function function, Class baseClass, @@ -379,8 +446,10 @@ public static TypeInformation getUnaryOperatorReturnType( return getUnaryOperatorReturnType( function, baseClass, - hasIterable ? 0 : -1, - hasCollector ? 0 : -1, + 0, + 1, + hasIterable ? new int[]{0, 0} : new int[]{0}, + hasCollector ? new int[]{1, 0} : NO_OUTPUT_INDEX, inType, functionName, allowMissing); @@ -414,6 +483,55 @@ public static TypeInformation getUnaryOperatorReturnType( TypeInformation inType, String functionName, boolean allowMissing) { + return getUnaryOperatorReturnType( + function, + baseClass, + inputTypeArgumentIndex, + outputTypeArgumentIndex, + new int[]{0}, + new int[]{}, + inType, + functionName, + allowMissing); + } + + /** + * Returns the unary operator's return type. + * + *

NOTE: lambda type indices allows extraction of Type from lambdas. To extract input type IN + * from the function given below one should pass {@code new int[] {0,1,0}} as lambdaInputTypeArgumentIndices. + * + *

+	 * 
+	 * OUT apply(Map> value)
+	 * 
+	 * 
+ * + * @param function Function to extract the return type from + * @param baseClass Base class of the function + * @param inputTypeArgumentIndex Index of input type in the class specification + * @param outputTypeArgumentIndex Index of output type in the class specification + * @param lambdaInputTypeArgumentIndices Table of indices of the type argument specifying the input type. See example. + * @param lambdaOutputTypeArgumentIndices Table of indices of the type argument specifying the input type. See example. + * @param inType Type of the input elements (In case of an iterable, it is the element type) + * @param functionName Function name + * @param allowMissing Can the type information be missing + * @param Input type + * @param Output type + * @return TypeInformation of the return type of the function + */ + @SuppressWarnings("unchecked") + @PublicEvolving + public static TypeInformation getUnaryOperatorReturnType( + Function function, + Class baseClass, + int inputTypeArgumentIndex, + int outputTypeArgumentIndex, + int[] lambdaInputTypeArgumentIndices, + int[] lambdaOutputTypeArgumentIndices, + TypeInformation inType, + String functionName, + boolean allowMissing) { try { final LambdaExecutable exec; try { @@ -428,6 +546,9 @@ public static TypeInformation getUnaryOperatorReturnType( // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function final int paramLen = exec.getParameterTypes().length - 1; + final Method sam = getSingleAbstractMethod(baseClass); + final int baseParametersLen = sam.getParameterCount(); + // executable references "this" implicitly if (paramLen < 0) { // executable declaring class can also be a super class of the input type @@ -435,24 +556,30 @@ public static TypeInformation getUnaryOperatorReturnType( validateInputContainsExecutable(exec, inType); } else { - final Type input = (outputTypeArgumentIndex >= 0) ? exec.getParameterTypes()[paramLen - 1] : exec.getParameterTypes()[paramLen]; - validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input, inputTypeArgumentIndex) : input, inType); + final Type input = extractType(exec, lambdaInputTypeArgumentIndices, paramLen, baseParametersLen); + validateInputType(input, inType); } if (function instanceof ResultTypeQueryable) { return ((ResultTypeQueryable) function).getProducedType(); } - return new TypeExtractor().privateCreateTypeInfo( - (outputTypeArgumentIndex >= 0) ? extractTypeArgument(exec.getParameterTypes()[paramLen], outputTypeArgumentIndex) : exec.getReturnType(), - inType, - null); - } - else { - validateInputType(baseClass, function.getClass(), 0, inType); + + final Type output; + if (lambdaOutputTypeArgumentIndices.length > 0) { + output = extractType(exec, lambdaOutputTypeArgumentIndices, paramLen, baseParametersLen); + } else { + output = exec.getReturnType(); + } + + return new TypeExtractor().privateCreateTypeInfo(output, inType, null); + } else { + Preconditions.checkArgument(inputTypeArgumentIndex >= 0, "Input type argument index was not provided"); + Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Input type argument index was not provided"); + validateInputType(baseClass, function.getClass(), inputTypeArgumentIndex, inType); if(function instanceof ResultTypeQueryable) { return ((ResultTypeQueryable) function).getProducedType(); } - return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 1, inType, null); + return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), outputTypeArgumentIndex, inType, null); } } catch (InvalidTypesException e) { @@ -464,6 +591,32 @@ public static TypeInformation getUnaryOperatorReturnType( } } + private static Type extractType( + LambdaExecutable exec, + int[] lambdaTypeArgumentIndices, + int paramLen, + int baseParametersLen) { + Type output = exec.getParameterTypes()[paramLen - baseParametersLen + lambdaTypeArgumentIndices[0]]; + for (int i = 1; i < lambdaTypeArgumentIndices.length; i++) { + output = extractTypeArgument(output, lambdaTypeArgumentIndices[i]); + } + return output; + } + + private static Method getSingleAbstractMethod(Class baseClass) { + Method sam = null; + for (Method method : baseClass.getMethods()) { + if (Modifier.isAbstract(method.getModifiers())) { + if (sam == null) { + sam = method; + } else { + throw new InvalidTypesException("Lambda type does not match provided baseClass: " + baseClass); + } + } + } + return sam; + } + /** * Returns the binary operator's return type. * @@ -495,8 +648,12 @@ public static TypeInformation getBinaryOperatorReturnType( return getBinaryOperatorReturnType( function, baseClass, - hasIterables ? 0 : -1, - hasCollector ? 0 : -1, + 0, + 1, + 2, + hasIterables ? new int[]{0, 0} : new int[]{0}, + hasIterables ? new int[]{1, 0} : new int[]{1}, + hasCollector ? new int[]{2, 0} : NO_OUTPUT_INDEX, in1Type, in2Type, functionName, @@ -507,14 +664,23 @@ public static TypeInformation getBinaryOperatorReturnType( /** * Returns the binary operator's return type. * + *

NOTE: lambda type indices allows extraction of Type from lambdas. To extract input type IN + * from the function given below one should pass {@code new int[] {0,1,0}} as lambdaInputTypeArgumentIndices. + * + *

+	 * 
+	 * OUT apply(Map> value)
+	 * 
+	 * 
+ * * @param function Function to extract the return type from * @param baseClass Base class of the function - * @param inputTypeArgumentIndex Index of the type argument of function's first parameter - * specifying the input type if it is wrapped (Iterable, Map, - * etc.). Otherwise -1. - * @param outputTypeArgumentIndex Index of the type argument of functions second parameter - * specifying the output type if it is wrapped in a Collector. - * Otherwise -1. + * @param input1TypeArgumentIndex Index of first input type in the class specification + * @param input2TypeArgumentIndex Index of second input type in the class specification + * @param outputTypeArgumentIndex Index of output type in the class specification + * @param lambdaInput1TypeArgumentIndices Table of indices of the type argument specifying the first input type. See example. + * @param lambdaInput2TypeArgumentIndices Table of indices of the type argument specifying the second input type. See example. + * @param lambdaOutputTypeArgumentIndices Table of indices of the type argument specifying the input type. See example. * @param in1Type Type of the left side input elements (In case of an iterable, it is the element type) * @param in2Type Type of the right side input elements (In case of an iterable, it is the element type) * @param functionName Function name @@ -529,8 +695,12 @@ public static TypeInformation getBinaryOperatorReturnType( public static TypeInformation getBinaryOperatorReturnType( Function function, Class baseClass, - int inputTypeArgumentIndex, + int input1TypeArgumentIndex, + int input2TypeArgumentIndex, int outputTypeArgumentIndex, + int[] lambdaInput1TypeArgumentIndices, + int[] lambdaInput2TypeArgumentIndices, + int[] lambdaOutputTypeArgumentIndices, TypeInformation in1Type, TypeInformation in2Type, String functionName, @@ -545,28 +715,41 @@ public static TypeInformation getBinaryOperatorReturnType( if (exec != null) { // check for lambda type erasure validateLambdaGenericParameters(exec); - + + final Method sam = getSingleAbstractMethod(baseClass); + final int baseParametersLen = sam.getParameterCount(); + // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function final int paramLen = exec.getParameterTypes().length - 1; - final Type input1 = (outputTypeArgumentIndex >= 0) ? exec.getParameterTypes()[paramLen - 2] : exec.getParameterTypes()[paramLen - 1]; - final Type input2 = (outputTypeArgumentIndex >= 0 ) ? exec.getParameterTypes()[paramLen - 1] : exec.getParameterTypes()[paramLen]; - validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input1, inputTypeArgumentIndex) : input1, in1Type); - validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input2, inputTypeArgumentIndex) : input2, in2Type); + + final Type input1 = extractType(exec, lambdaInput1TypeArgumentIndices, paramLen, baseParametersLen); + final Type input2 = extractType(exec, lambdaInput2TypeArgumentIndices, paramLen, baseParametersLen); + + validateInputType(input1, in1Type); + validateInputType(input2, in2Type); if(function instanceof ResultTypeQueryable) { return ((ResultTypeQueryable) function).getProducedType(); } + + final Type output; + if (lambdaOutputTypeArgumentIndices.length > 0) { + output = extractType(exec, lambdaOutputTypeArgumentIndices, paramLen, baseParametersLen); + } else { + output = exec.getReturnType(); + } + return new TypeExtractor().privateCreateTypeInfo( - (outputTypeArgumentIndex >= 0) ? extractTypeArgument(exec.getParameterTypes()[paramLen], outputTypeArgumentIndex) : exec.getReturnType(), + output, in1Type, in2Type); } else { - validateInputType(baseClass, function.getClass(), 0, in1Type); - validateInputType(baseClass, function.getClass(), 1, in2Type); + validateInputType(baseClass, function.getClass(), input1TypeArgumentIndex, in1Type); + validateInputType(baseClass, function.getClass(), input2TypeArgumentIndex, in2Type); if(function instanceof ResultTypeQueryable) { return ((ResultTypeQueryable) function).getProducedType(); } - return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 2, in1Type, in2Type); + return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), outputTypeArgumentIndex, in1Type, in2Type); } } catch (InvalidTypesException e) { @@ -577,7 +760,7 @@ public static TypeInformation getBinaryOperatorReturnType( } } } - + // -------------------------------------------------------------------------------------------- // Create type information // -------------------------------------------------------------------------------------------- @@ -586,7 +769,7 @@ public static TypeInformation getBinaryOperatorReturnType( public static TypeInformation createTypeInfo(Class type) { return (TypeInformation) createTypeInfo((Type) type); } - + public static TypeInformation createTypeInfo(Type t) { TypeInformation ti = new TypeExtractor().privateCreateTypeInfo(t); if (ti == null) { @@ -628,46 +811,46 @@ public static TypeInformation createTypeInfo(Class baseC } return ti; } - + // ----------------------------------- private methods ---------------------------------------- - + private TypeInformation privateCreateTypeInfo(Type t) { ArrayList typeHierarchy = new ArrayList(); typeHierarchy.add(t); return createTypeInfoWithTypeHierarchy(typeHierarchy, t, null, null); } - + // for (Rich)Functions @SuppressWarnings("unchecked") private TypeInformation privateCreateTypeInfo(Class baseClass, Class clazz, int returnParamPos, TypeInformation in1Type, TypeInformation in2Type) { ArrayList typeHierarchy = new ArrayList(); Type returnType = getParameterType(baseClass, typeHierarchy, clazz, returnParamPos); - + TypeInformation typeInfo; - + // return type is a variable -> try to get the type info from the input directly if (returnType instanceof TypeVariable) { typeInfo = (TypeInformation) createTypeInfoFromInputs((TypeVariable) returnType, typeHierarchy, in1Type, in2Type); - + if (typeInfo != null) { return typeInfo; } } - + // get info from hierarchy return (TypeInformation) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type); } - + // for LambdaFunctions @SuppressWarnings("unchecked") private TypeInformation privateCreateTypeInfo(Type returnType, TypeInformation in1Type, TypeInformation in2Type) { ArrayList typeHierarchy = new ArrayList(); - + // get info from hierarchy return (TypeInformation) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type); } - + @SuppressWarnings({ "unchecked", "rawtypes" }) private TypeInformation createTypeInfoWithTypeHierarchy(ArrayList typeHierarchy, Type t, TypeInformation in1Type, TypeInformation in2Type) { @@ -680,29 +863,29 @@ private TypeInformation createTypeInfoWithTypeHierarchy(Arr // check if type is a subclass of tuple else if (isClassType(t) && Tuple.class.isAssignableFrom(typeToClass(t))) { Type curT = t; - + // do not allow usage of Tuple as type if (typeToClass(t).equals(Tuple.class)) { throw new InvalidTypesException( "Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead."); } - + // go up the hierarchy until we reach immediate child of Tuple (with or without generics) - // collect the types while moving up for a later top-down + // collect the types while moving up for a later top-down while (!(isClassType(curT) && typeToClass(curT).getSuperclass().equals(Tuple.class))) { typeHierarchy.add(curT); curT = typeToClass(curT).getGenericSuperclass(); } - + if(curT == Tuple0.class) { return new TupleTypeInfo(Tuple0.class); } - + // check if immediate child of Tuple has generics if (curT instanceof Class) { throw new InvalidTypesException("Tuple needs to be parameterized by using generics."); } - + typeHierarchy.add(curT); // create the type information for the subtypes @@ -718,13 +901,13 @@ else if (isClassType(t) && Tuple.class.isAssignableFrom(typeToClass(t))) { } // return tuple info return new TupleTypeInfo(typeToClass(t), subTypesInfo); - + } // type depends on another type // e.g. class MyMapper extends MapFunction else if (t instanceof TypeVariable) { Type typeVar = materializeTypeVariable(typeHierarchy, (TypeVariable) t); - + if (!(typeVar instanceof TypeVariable)) { return createTypeInfoWithTypeHierarchy(typeHierarchy, typeVar, in1Type, in2Type); } @@ -741,12 +924,12 @@ else if (t instanceof TypeVariable) { } } } - // arrays with generics + // arrays with generics else if (t instanceof GenericArrayType) { GenericArrayType genericArray = (GenericArrayType) t; - + Type componentType = genericArray.getGenericComponentType(); - + // due to a Java 6 bug, it is possible that the JVM classifies e.g. String[] or int[] as GenericArrayType instead of Class if (componentType instanceof Class) { Class componentClass = (Class) componentType; @@ -775,11 +958,11 @@ else if (t instanceof ParameterizedType) { else if (t instanceof Class) { return privateGetForClass((Class) t, typeHierarchy); } - + throw new InvalidTypesException("Type Information could not be created."); } - private TypeInformation createTypeInfoFromInputs(TypeVariable returnTypeVar, ArrayList returnTypeHierarchy, + private TypeInformation createTypeInfoFromInputs(TypeVariable returnTypeVar, ArrayList returnTypeHierarchy, TypeInformation in1TypeInfo, TypeInformation in2TypeInfo) { Type matReturnTypeVar = materializeTypeVariable(returnTypeHierarchy, returnTypeVar); @@ -791,12 +974,12 @@ private TypeInformation createTypeInfoFromInputs(TypeVariable r else { returnTypeVar = (TypeVariable) matReturnTypeVar; } - + // no input information exists if (in1TypeInfo == null && in2TypeInfo == null) { return null; } - + // create a new type hierarchy for the input ArrayList inputTypeHierarchy = new ArrayList(); // copy the function part of the type hierarchy @@ -809,7 +992,7 @@ private TypeInformation createTypeInfoFromInputs(TypeVariable r } } ParameterizedType baseClass = (ParameterizedType) inputTypeHierarchy.get(inputTypeHierarchy.size() - 1); - + TypeInformation info = null; if (in1TypeInfo != null) { // find the deepest type variable that describes the type of input 1 @@ -898,18 +1081,18 @@ else if (inTypeInfo instanceof ObjectArrayTypeInfo) { // the input is a tuple else if (inTypeInfo instanceof TupleTypeInfo && isClassType(inType) && Tuple.class.isAssignableFrom(typeToClass(inType))) { ParameterizedType tupleBaseClass; - + // get tuple from possible tuple subclass while (!(isClassType(inType) && typeToClass(inType).getSuperclass().equals(Tuple.class))) { inputTypeHierarchy.add(inType); inType = typeToClass(inType).getGenericSuperclass(); } inputTypeHierarchy.add(inType); - + // we can assume to be parameterized since we // already did input validation tupleBaseClass = (ParameterizedType) inType; - + Type[] tupleElements = tupleBaseClass.getActualTypeArguments(); // go thru all tuple elements and search for type variables for (int i = 0; i < tupleElements.length; i++) { @@ -1068,13 +1251,13 @@ private TypeInformation createTypeInfoFromFactory( public static Type getParameterType(Class baseClass, Class clazz, int pos) { return getParameterType(baseClass, null, clazz, pos); } - + private static Type getParameterType(Class baseClass, ArrayList typeHierarchy, Class clazz, int pos) { if (typeHierarchy != null) { typeHierarchy.add(clazz); } Type[] interfaceTypes = clazz.getGenericInterfaces(); - + // search in interfaces for base class for (Type t : interfaceTypes) { Type parameter = getParameterTypeFromGenericType(baseClass, typeHierarchy, t, pos); @@ -1082,18 +1265,18 @@ private static Type getParameterType(Class baseClass, ArrayList typeHie return parameter; } } - + // search in superclass for base class Type t = clazz.getGenericSuperclass(); Type parameter = getParameterTypeFromGenericType(baseClass, typeHierarchy, t, pos); if (parameter != null) { return parameter; } - - throw new InvalidTypesException("The types of the interface " + baseClass.getName() + " could not be inferred. " + + + throw new InvalidTypesException("The types of the interface " + baseClass.getName() + " could not be inferred. " + "Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point"); } - + private static Type getParameterTypeFromGenericType(Class baseClass, ArrayList typeHierarchy, Type t, int pos) { // base class if (t instanceof ParameterizedType && baseClass.equals(((ParameterizedType) t).getRawType())) { @@ -1109,7 +1292,7 @@ else if (t instanceof ParameterizedType && baseClass.isAssignableFrom((Class) typeHierarchy.add(t); } return getParameterType(baseClass, typeHierarchy, (Class) ((ParameterizedType) t).getRawType(), pos); - } + } else if (t instanceof Class && baseClass.isAssignableFrom((Class) t)) { if (typeHierarchy != null) { typeHierarchy.add(t); @@ -1118,11 +1301,11 @@ else if (t instanceof Class && baseClass.isAssignableFrom((Class) t)) { } return null; } - + // -------------------------------------------------------------------------------------------- // Validate input // -------------------------------------------------------------------------------------------- - + private static void validateInputType(Type t, TypeInformation inType) { ArrayList typeHierarchy = new ArrayList(); try { @@ -1132,7 +1315,7 @@ private static void validateInputType(Type t, TypeInformation inType) { throw new InvalidTypesException("Input mismatch: " + e.getMessage(), e); } } - + private static void validateInputType(Class baseClass, Class clazz, int inputParamPos, TypeInformation inTypeInfo) { ArrayList typeHierarchy = new ArrayList(); @@ -1152,21 +1335,21 @@ private static void validateInputType(Class baseClass, Class clazz, int in throw new InvalidTypesException("Input mismatch: " + e.getMessage(), e); } } - + @SuppressWarnings("unchecked") private static void validateInfo(ArrayList typeHierarchy, Type type, TypeInformation typeInfo) { if (type == null) { throw new InvalidTypesException("Unknown Error. Type is null."); } - + if (typeInfo == null) { throw new InvalidTypesException("Unknown Error. TypeInformation is null."); } - + if (!(type instanceof TypeVariable)) { // check for Java Basic Types if (typeInfo instanceof BasicTypeInfo) { - + TypeInformation actual; // check if basic type at all if (!(type instanceof Class) || (actual = BasicTypeInfo.getInfoFor((Class) type)) == null) { @@ -1176,7 +1359,7 @@ private static void validateInfo(ArrayList typeHierarchy, Type type, TypeI if (!typeInfo.equals(actual)) { throw new InvalidTypesException("Basic type '" + typeInfo + "' expected but was '" + actual + "'."); } - + } // check for Java SQL time types else if (typeInfo instanceof SqlTimeTypeInfo) { @@ -1198,36 +1381,36 @@ else if (typeInfo instanceof TupleTypeInfo) { if (!(isClassType(type) && Tuple.class.isAssignableFrom(typeToClass(type)))) { throw new InvalidTypesException("Tuple type expected."); } - + // do not allow usage of Tuple as type if (isClassType(type) && typeToClass(type).equals(Tuple.class)) { throw new InvalidTypesException("Concrete subclass of Tuple expected."); } - + // go up the hierarchy until we reach immediate child of Tuple (with or without generics) while (!(isClassType(type) && typeToClass(type).getSuperclass().equals(Tuple.class))) { typeHierarchy.add(type); type = typeToClass(type).getGenericSuperclass(); } - + if(type == Tuple0.class) { return; } - + // check if immediate child of Tuple has generics if (type instanceof Class) { throw new InvalidTypesException("Parameterized Tuple type expected."); } - + TupleTypeInfo tti = (TupleTypeInfo) typeInfo; - + Type[] subTypes = ((ParameterizedType) type).getActualTypeArguments(); - + if (subTypes.length != tti.getArity()) { throw new InvalidTypesException("Tuple arity '" + tti.getArity() + "' expected but was '" + subTypes.length + "'."); } - + for (int i = 0; i < subTypes.length; i++) { validateInfo(new ArrayList(typeHierarchy), subTypes[i], tti.getTypeAt(i)); } @@ -1258,16 +1441,16 @@ else if (typeInfo instanceof BasicArrayTypeInfo) { && !(type instanceof GenericArrayType && (component = ((GenericArrayType) type).getGenericComponentType()) != null)) { throw new InvalidTypesException("Array type expected."); } - + if (component instanceof TypeVariable) { component = materializeTypeVariable(typeHierarchy, (TypeVariable) component); if (component instanceof TypeVariable) { return; } } - + validateInfo(typeHierarchy, component, ((BasicArrayTypeInfo) typeInfo).getComponentInfo()); - + } // check for object array else if (typeInfo instanceof ObjectArrayTypeInfo) { @@ -1275,7 +1458,7 @@ else if (typeInfo instanceof ObjectArrayTypeInfo) { if (!(type instanceof Class && ((Class) type).isArray()) && !(type instanceof GenericArrayType)) { throw new InvalidTypesException("Object array type expected."); } - + // check component Type component; if (type instanceof Class) { @@ -1283,14 +1466,14 @@ else if (typeInfo instanceof ObjectArrayTypeInfo) { } else { component = ((GenericArrayType) type).getGenericComponentType(); } - + if (component instanceof TypeVariable) { component = materializeTypeVariable(typeHierarchy, (TypeVariable) component); if (component instanceof TypeVariable) { return; } } - + validateInfo(typeHierarchy, component, ((ObjectArrayTypeInfo) typeInfo).getComponentInfo()); } // check for value @@ -1299,7 +1482,7 @@ else if (typeInfo instanceof ValueTypeInfo) { if (!(type instanceof Class && Value.class.isAssignableFrom((Class) type))) { throw new InvalidTypesException("Value type expected."); } - + TypeInformation actual; // check value type contents if (!((ValueTypeInfo) typeInfo).equals(actual = ValueTypeInfo.getValueTypeInfo((Class) type))) { @@ -1482,7 +1665,7 @@ private static Type extractTypeArgument(Type t, int index) throws InvalidTypesEx throw new InvalidTypesException("The given type " + t + " is not a parameterized type."); } } - + private static void validateLambdaGenericParameters(LambdaExecutable exec) { // check the arguments for (Type t : exec.getParameterTypes()) { @@ -1516,19 +1699,19 @@ private static Type materializeTypeVariable(ArrayList typeHierarchy, TypeV // iterate thru hierarchy from top to bottom until type variable gets a class assigned for (int i = typeHierarchy.size() - 1; i >= 0; i--) { Type curT = typeHierarchy.get(i); - + // parameterized type if (curT instanceof ParameterizedType) { Class rawType = ((Class) ((ParameterizedType) curT).getRawType()); - + for (int paramIndex = 0; paramIndex < rawType.getTypeParameters().length; paramIndex++) { - + TypeVariable curVarOfCurT = rawType.getTypeParameters()[paramIndex]; - + // check if variable names match if (sameTypeVars(curVarOfCurT, inTypeTypeVar)) { Type curVarType = ((ParameterizedType) curT).getActualTypeArguments()[paramIndex]; - + // another type variable level if (curVarType instanceof TypeVariable) { inTypeTypeVar = (TypeVariable) curVarType; @@ -1545,14 +1728,14 @@ private static Type materializeTypeVariable(ArrayList typeHierarchy, TypeV // return the type variable of the deepest level return inTypeTypeVar; } - + /** * Creates type information from a given Class such as Integer, String[] or POJOs. - * - * This method does not support ParameterizedTypes such as Tuples or complex type hierarchies. + * + * This method does not support ParameterizedTypes such as Tuples or complex type hierarchies. * In most cases {@link TypeExtractor#createTypeInfo(Type)} is the recommended method for type extraction * (a Class is a child of Type). - * + * * @param clazz a Class to create TypeInformation for * @return TypeInformation that describes the passed Class */ @@ -1561,7 +1744,7 @@ public static TypeInformation getForClass(Class clazz) { typeHierarchy.add(clazz); return new TypeExtractor().privateGetForClass(clazz, typeHierarchy); } - + private TypeInformation privateGetForClass(Class clazz, ArrayList typeHierarchy) { return privateGetForClass(clazz, typeHierarchy, null, null, null); } @@ -1600,13 +1783,13 @@ private TypeInformation privateGetForClass(Class clazz, if (primitiveArrayInfo != null) { return primitiveArrayInfo; } - + // basic type arrays: String[], Integer[], Double[] BasicArrayTypeInfo basicArrayInfo = BasicArrayTypeInfo.getInfoFor(clazz); if (basicArrayInfo != null) { return basicArrayInfo; } - + // object arrays else { TypeInformation componentTypeInfo = createTypeInfoWithTypeHierarchy( @@ -1618,7 +1801,7 @@ private TypeInformation privateGetForClass(Class clazz, return ObjectArrayTypeInfo.getInfoFor(clazz, componentTypeInfo); } } - + // check for writable types if (isHadoopWritable(clazz)) { return createHadoopWritableTypeInfo(clazz); @@ -1635,13 +1818,13 @@ private TypeInformation privateGetForClass(Class clazz, if (timeTypeInfo != null) { return timeTypeInfo; } - + // check for subclasses of Value if (Value.class.isAssignableFrom(clazz)) { Class valueClass = clazz.asSubclass(Value.class); return (TypeInformation) ValueTypeInfo.getValueTypeInfo(valueClass); } - + // check for subclasses of Tuple if (Tuple.class.isAssignableFrom(clazz)) { if(clazz == Tuple0.class) { @@ -1680,13 +1863,13 @@ private TypeInformation privateGetForClass(Class clazz, // return a generic type return new GenericTypeInfo(clazz); } - + /** * Checks if the given field is a valid pojo field: * - it is public * OR * - there are getter and setter methods for the field. - * + * * @param f field to check * @param clazz class of field * @param typeHierarchy type hierarchy for materializing generic types @@ -1753,7 +1936,7 @@ protected TypeInformation analyzePojo(Class clazz, Arr LOG.info("Class " + clazz.getName() + " is not public, cannot treat it as a POJO type. Will be handled as GenericType"); return new GenericTypeInfo(clazz); } - + // add the hierarchy of the POJO itself if it is generic if (parameterizedType != null) { getTypeHierarchy(typeHierarchy, parameterizedType, Object.class); @@ -1762,7 +1945,7 @@ protected TypeInformation analyzePojo(Class clazz, Arr else if (typeHierarchy.size() <= 1) { getTypeHierarchy(typeHierarchy, clazz, Object.class); } - + List fields = getAllDeclaredFields(clazz, false); if (fields.size() == 0) { LOG.info("No fields detected for " + clazz + ". Cannot be used as a PojoType. Will be handled as GenericType"); @@ -1822,7 +2005,7 @@ else if (typeHierarchy.size() <= 1) { LOG.info("The default constructor of " + clazz + " should be Public to be used as a POJO."); return null; } - + // everything is checked, we return the pojo return pojoType; } @@ -1870,7 +2053,7 @@ public static Field getDeclaredField(Class clazz, String name) { } return null; } - + private static boolean hasFieldWithSameName(String name, List fields) { for(Field field : fields) { if(name.equals(field.getName())) { @@ -1879,7 +2062,7 @@ private static boolean hasFieldWithSameName(String name, List fields) { } return false; } - + private static TypeInformation getTypeOfPojoField(TypeInformation pojoInfo, Field field) { for (int j = 0; j < pojoInfo.getArity(); j++) { PojoField pf = ((PojoTypeInfo) pojoInfo).getPojoFieldAt(j); @@ -1911,20 +2094,20 @@ private TypeInformation privateGetForObject(X value) { Tuple t = (Tuple) value; int numFields = t.getArity(); if(numFields != countFieldsInClass(value.getClass())) { - // not a tuple since it has more fields. + // not a tuple since it has more fields. return analyzePojo((Class) value.getClass(), new ArrayList(), null, null, null); // we immediately call analyze Pojo here, because // there is currently no other type that can handle such a class. } - + TypeInformation[] infos = new TypeInformation[numFields]; for (int i = 0; i < numFields; i++) { Object field = t.getField(i); - + if (field == null) { throw new InvalidTypesException("Automatic type extraction is not possible on candidates with null values. " + "Please specify the types directly."); } - + infos[i] = privateGetForObject(field); } return new TupleTypeInfo(value.getClass(), infos); @@ -2013,10 +2196,10 @@ public static TypeInformation createHadoopWritableTypeInfo(Class clazz static void validateIfWritable(TypeInformation typeInfo, Type type) { try { // try to load the writable type info - + Class writableTypeInfoClass = Class .forName(HADOOP_WRITABLE_TYPEINFO_CLASS, false, typeInfo.getClass().getClassLoader()); - + if (writableTypeInfoClass.isAssignableFrom(typeInfo.getClass())) { // this is actually a writable type info // check if the type is a writable diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java index 71614cf81e586..3131a9412aa11 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java @@ -83,8 +83,10 @@ public SingleOutputStreamOperator select(final PatternSelectFunction returnType = TypeExtractor.getUnaryOperatorReturnType( patternSelectFunction, PatternSelectFunction.class, + 0, 1, - -1, + new int[]{0, 1, 0}, + new int[]{}, inputStream.getType(), null, false); @@ -142,8 +144,10 @@ public SingleOutputStreamOperator> select( TypeInformation leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType( patternTimeoutFunction, PatternTimeoutFunction.class, + 0, 1, - -1, + new int[]{0, 1, 0}, + new int[]{}, inputStream.getType(), null, false); @@ -151,8 +155,10 @@ public SingleOutputStreamOperator> select( TypeInformation rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType( patternSelectFunction, PatternSelectFunction.class, + 0, 1, - -1, + new int[]{0, 1, 0}, + new int[]{}, inputStream.getType(), null, false); @@ -184,8 +190,10 @@ public SingleOutputStreamOperator flatSelect(final PatternFlatSelectFunct TypeInformation outTypeInfo = TypeExtractor.getUnaryOperatorReturnType( patternFlatSelectFunction, PatternFlatSelectFunction.class, - 1, 0, + 1, + new int[] {0, 1, 0}, + new int[] {1, 0}, inputStream.getType(), null, false); @@ -244,8 +252,10 @@ public SingleOutputStreamOperator> flatSelect( TypeInformation leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType( patternFlatTimeoutFunction, PatternFlatTimeoutFunction.class, + 0, 1, - -1, + new int[]{0, 1, 0}, + new int[]{2, 0}, inputStream.getType(), null, false); @@ -253,8 +263,10 @@ public SingleOutputStreamOperator> flatSelect( TypeInformation rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType( patternFlatSelectFunction, PatternFlatSelectFunction.class, + 0, 1, - -1, + new int[]{0, 1, 0}, + new int[]{1, 0}, inputStream.getType(), null, false); diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java index 9c4f88e9971d6..4cb4e010e09e6 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java @@ -73,7 +73,16 @@ public static DataSet> translateVertexIds(DataSet Class> vertexClass = (Class>) (Class) Vertex.class; TypeInformation oldType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(0); - TypeInformation newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false); + TypeInformation newType = TypeExtractor.getUnaryOperatorReturnType( + translator, + TranslateFunction.class, + 0, + 1, + new int[]{0}, + new int[]{1}, + oldType, + null, + false); TypeInformation vertexValueType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); @@ -148,7 +157,16 @@ public static DataSet> translateEdgeIds(DataSet> edgeClass = (Class>) (Class) Edge.class; TypeInformation oldType = ((TupleTypeInfo>) edges.getType()).getTypeAt(0); - TypeInformation newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false); + TypeInformation newType = TypeExtractor.getUnaryOperatorReturnType( + translator, + TranslateFunction.class, + 0, + 1, + new int[] {0}, + new int[] {1}, + oldType, + null, + false); TypeInformation edgeValueType = ((TupleTypeInfo>) edges.getType()).getTypeAt(2); TupleTypeInfo> returnType = new TupleTypeInfo<>(edgeClass, newType, newType, edgeValueType); @@ -225,7 +243,16 @@ public static DataSet> translateVertexValues(DataSe Class> vertexClass = (Class>) (Class) Vertex.class; TypeInformation idType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(0); TypeInformation oldType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); - TypeInformation newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false); + TypeInformation newType = TypeExtractor.getUnaryOperatorReturnType( + translator, + TranslateFunction.class, + 0, + 1, + new int[]{0}, + new int[]{1}, + oldType, + null, + false); TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, idType, newType); @@ -300,7 +327,16 @@ public static DataSet> translateEdgeValues(DataSet> edgeClass = (Class>) (Class) Edge.class; TypeInformation idType = ((TupleTypeInfo>) edges.getType()).getTypeAt(0); TypeInformation oldType = ((TupleTypeInfo>) edges.getType()).getTypeAt(2); - TypeInformation newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false); + TypeInformation newType = TypeExtractor.getUnaryOperatorReturnType( + translator, + TranslateFunction.class, + 0, + 1, + new int[]{0}, + new int[]{1}, + oldType, + null, + false); TupleTypeInfo> returnType = new TupleTypeInfo<>(edgeClass, idType, idType, newType); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 31dbb4f7142cb..1eddd5c2504bb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -236,8 +236,7 @@ public SingleOutputStreamOperator reduce( AllWindowFunction function) { TypeInformation inType = input.getType(); - TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( - function, AllWindowFunction.class, true, true, inType, null, false); + TypeInformation resultType = getAllWindowFunctionReturnType(function, inType); return reduce(reduceFunction, function, resultType); } @@ -332,8 +331,7 @@ public SingleOutputStreamOperator reduce( ReduceFunction reduceFunction, ProcessAllWindowFunction function) { - TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( - function, ProcessAllWindowFunction.class, true, true, input.getType(), null, false); + TypeInformation resultType = getProcessAllWindowFunctionReturnType(function, input.getType()); return reduce(reduceFunction, function, resultType); } @@ -507,12 +505,41 @@ public SingleOutputStreamOperator aggregate( TypeInformation aggResultType = TypeExtractor.getAggregateFunctionReturnType( aggFunction, input.getType(), null, false); - TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( - windowFunction, AllWindowFunction.class, true, true, aggResultType, null, false); + TypeInformation resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType); return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType); } + private static TypeInformation getAllWindowFunctionReturnType( + AllWindowFunction function, + TypeInformation inType) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + AllWindowFunction.class, + 0, + 1, + new int[]{1, 0}, + new int[]{2, 0}, + inType, + null, + false); + } + + private static TypeInformation getProcessAllWindowFunctionReturnType( + ProcessAllWindowFunction function, + TypeInformation inType) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + ProcessAllWindowFunction.class, + 0, + 1, + new int[]{1, 0}, + new int[]{2, 0}, + inType, + null, + false); + } + /** * Applies the given window function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the window function is @@ -643,7 +670,7 @@ public SingleOutputStreamOperator aggregate( aggFunction, input.getType(), null, false); TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( - windowFunction, ProcessAllWindowFunction.class, true, true, aggResultType, null, false); + windowFunction, ProcessAllWindowFunction.class, 0, 1, aggResultType, null, false); return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType); } @@ -811,8 +838,7 @@ public SingleOutputStreamOperator fold(ACC initialValue, FoldFunctio TypeInformation foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), Utils.getCallLocationName(), true); - TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( - function, AllWindowFunction.class, true, true, foldAccumulatorType, null, false); + TypeInformation resultType = getAllWindowFunctionReturnType(function, foldAccumulatorType); return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType); } @@ -923,8 +949,7 @@ public SingleOutputStreamOperator fold(ACC initialValue, FoldFunctio TypeInformation foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), Utils.getCallLocationName(), true); - TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( - function, ProcessAllWindowFunction.class, true, true, foldAccumulatorType, null, false); + TypeInformation resultType = getProcessAllWindowFunctionReturnType(function, foldAccumulatorType); return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType); } @@ -1032,8 +1057,7 @@ public SingleOutputStreamOperator fold(ACC initialValue, public SingleOutputStreamOperator apply(AllWindowFunction function) { String callLocation = Utils.getCallLocationName(); function = input.getExecutionEnvironment().clean(function); - TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( - function, AllWindowFunction.class, true, true, getInputType(), null, false); + TypeInformation resultType = getAllWindowFunctionReturnType(function, getInputType()); return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation); } @@ -1069,8 +1093,7 @@ public SingleOutputStreamOperator apply(AllWindowFunction functi public SingleOutputStreamOperator process(ProcessAllWindowFunction function) { String callLocation = Utils.getCallLocationName(); function = input.getExecutionEnvironment().clean(function); - TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( - function, ProcessAllWindowFunction.class, true, true, getInputType(), null, false); + TypeInformation resultType = getProcessAllWindowFunctionReturnType(function, getInputType()); return apply(new InternalIterableProcessAllWindowFunction<>(function), resultType, callLocation); } @@ -1160,8 +1183,7 @@ private SingleOutputStreamOperator apply(InternalWindowFunction SingleOutputStreamOperator apply(ReduceFunction reduceFunction, AllWindowFunction function) { TypeInformation inType = input.getType(); - TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( - function, AllWindowFunction.class, true, true, inType, null, false); + TypeInformation resultType = getAllWindowFunctionReturnType(function, inType); return apply(reduceFunction, function, resultType); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java index 8461d2c859b01..cb18a3f6e4db3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java @@ -67,9 +67,16 @@ private static SingleOutputStreamOperator addOperator( int bufSize, OutputMode mode) { - TypeInformation outTypeInfo = - TypeExtractor.getUnaryOperatorReturnType(func, AsyncFunction.class, false, - true, in.getType(), Utils.getCallLocationName(), true); + TypeInformation outTypeInfo = TypeExtractor.getUnaryOperatorReturnType( + func, + AsyncFunction.class, + 0, + 1, + new int[]{0}, + new int[]{1, 0}, + in.getType(), + Utils.getCallLocationName(), + true); // create transform AsyncWaitOperator operator = new AsyncWaitOperator<>( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 0cdc9a11c2d11..66cd8e6c8a67b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -574,13 +574,15 @@ public SingleOutputStreamOperator flatMap(FlatMapFunction flatMappe public SingleOutputStreamOperator process(ProcessFunction processFunction) { TypeInformation outType = TypeExtractor.getUnaryOperatorReturnType( - processFunction, - ProcessFunction.class, - false, - true, - getType(), - Utils.getCallLocationName(), - true); + processFunction, + ProcessFunction.class, + 0, + 1, + new int[]{0}, + new int[]{2, 0}, + getType(), + Utils.getCallLocationName(), + true); return process(processFunction, outType); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index 698deb8791f93..851e6141b8756 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -267,13 +267,15 @@ public DataStreamSink addSink(SinkFunction sinkFunction) { public SingleOutputStreamOperator process(ProcessFunction processFunction) { TypeInformation outType = TypeExtractor.getUnaryOperatorReturnType( - processFunction, - ProcessFunction.class, - false, - true, - getType(), - Utils.getCallLocationName(), - true); + processFunction, + ProcessFunction.class, + 0, + 1, + new int[]{0}, + new int[]{2, 0}, + getType(), + Utils.getCallLocationName(), + true); return process(processFunction, outType); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index f8a1914ab6609..bb8baab32b19e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -295,8 +295,7 @@ private SingleOutputStreamOperator reduce( LegacyWindowOperatorType legacyWindowOpType) { TypeInformation inType = input.getType(); - TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( - function, WindowFunction.class, true, true, inType, null, false); + TypeInformation resultType = getWindowFunctionReturnType(function, inType); return reduce(reduceFunction, function, resultType, legacyWindowOpType); } @@ -396,8 +395,7 @@ private SingleOutputStreamOperator reduce( */ @PublicEvolving public SingleOutputStreamOperator reduce(ReduceFunction reduceFunction, ProcessWindowFunction function) { - TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( - function, ProcessWindowFunction.class, true, true, input.getType(), null, false); + TypeInformation resultType = getProcessWindowFunctionReturnType(function, input.getType(), null); return reduce(reduceFunction, function, resultType); } @@ -544,8 +542,7 @@ public SingleOutputStreamOperator fold(ACC initialValue, FoldFunctio TypeInformation foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), Utils.getCallLocationName(), true); - TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( - function, WindowFunction.class, true, true, foldAccumulatorType, null, false); + TypeInformation resultType = getWindowFunctionReturnType(function, foldAccumulatorType); return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType); } @@ -663,8 +660,9 @@ public SingleOutputStreamOperator fold(ACC initialValue, FoldFunctio TypeInformation foldResultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), Utils.getCallLocationName(), true); - TypeInformation windowResultType = TypeExtractor.getUnaryOperatorReturnType( - windowFunction, ProcessWindowFunction.class, true, true, foldResultType, Utils.getCallLocationName(), false); + TypeInformation windowResultType = getProcessWindowFunctionReturnType(windowFunction, foldResultType, Utils.getCallLocationName()); + TypeExtractor.getUnaryOperatorReturnType( + windowFunction, ProcessWindowFunction.class, 0, 1, foldResultType, Utils.getCallLocationName(), false); return fold(initialValue, foldFunction, windowFunction, foldResultType, windowResultType); } @@ -852,8 +850,7 @@ public SingleOutputStreamOperator aggregate( TypeInformation aggResultType = TypeExtractor.getAggregateFunctionReturnType( aggFunction, input.getType(), null, false); - TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( - windowFunction, WindowFunction.class, true, true, aggResultType, null, false); + TypeInformation resultType = getWindowFunctionReturnType(windowFunction, aggResultType); return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType); } @@ -981,12 +978,42 @@ public SingleOutputStreamOperator aggregate( TypeInformation aggResultType = TypeExtractor.getAggregateFunctionReturnType( aggFunction, input.getType(), null, false); - TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( - windowFunction, ProcessWindowFunction.class, true, true, aggResultType, null, false); + TypeInformation resultType = getProcessWindowFunctionReturnType(windowFunction, aggResultType, null); return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType); } + private static TypeInformation getWindowFunctionReturnType( + WindowFunction function, + TypeInformation inType) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + WindowFunction.class, + 0, + 1, + new int[]{2, 0}, + new int[]{3, 0}, + inType, + null, + false); + } + + private static TypeInformation getProcessWindowFunctionReturnType( + ProcessWindowFunction function, + TypeInformation inType, + String functionName) { + return TypeExtractor.getUnaryOperatorReturnType( + function, + ProcessWindowFunction.class, + 0, + 1, + new int[]{2, 0}, + new int[]{3, 0}, + inType, + functionName, + false); + } + /** * Applies the given window function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the window function is @@ -1094,8 +1121,7 @@ public SingleOutputStreamOperator aggregate( * @return The data stream that is the result of applying the window function to the window. */ public SingleOutputStreamOperator apply(WindowFunction function) { - TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( - function, WindowFunction.class, true, true, getInputType(), null, false); + TypeInformation resultType = getWindowFunctionReturnType(function, getInputType()); return apply(function, resultType); } @@ -1131,8 +1157,7 @@ public SingleOutputStreamOperator apply(WindowFunction functi */ @PublicEvolving public SingleOutputStreamOperator process(ProcessWindowFunction function) { - TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( - function, ProcessWindowFunction.class, true, true, getInputType(), null, false); + TypeInformation resultType = getProcessWindowFunctionReturnType(function, getInputType(), null); return process(function, resultType); } @@ -1231,8 +1256,7 @@ private SingleOutputStreamOperator apply(InternalWindowFunction SingleOutputStreamOperator apply(ReduceFunction reduceFunction, WindowFunction function) { TypeInformation inType = input.getType(); - TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( - function, WindowFunction.class, true, true, inType, null, false); + TypeInformation resultType = getWindowFunctionReturnType(function, inType); return apply(reduceFunction, function, resultType); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java index ced27b61f4247..8748ed4da3b56 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java @@ -260,23 +260,28 @@ public void testMergingWindowsWithEvictor() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - DataStream> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream source = env.fromElements(1, 2); - DataStream> window1 = source - .keyBy(new TupleKeySelector()) + DataStream window1 = source + .keyBy(new KeySelector() { + @Override + public String getKey(Integer value) throws Exception { + return value.toString(); + } + }) .window(EventTimeSessionWindows.withGap(Time.seconds(5))) .evictor(CountEvictor.of(5)) .process(new TestProcessWindowFunction()); - OneInputTransformation, Tuple3> transform = (OneInputTransformation, Tuple3>) window1.getTransformation(); - OneInputStreamOperator, Tuple3> operator = transform.getOperator(); + final OneInputTransformation transform = (OneInputTransformation) window1.getTransformation(); + final OneInputStreamOperator operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator, ?, ?, ?> winOperator = (WindowOperator, ?, ?, ?>) operator; + WindowOperator winOperator = (WindowOperator) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof EventTimeSessionWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); - processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, 1); } // ------------------------------------------------------------------------ @@ -604,28 +609,30 @@ public void testAggregateEventTime() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - DataStream> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); - DataStream> window1 = source - .keyBy(new TupleKeySelector()) + DataStream window1 = source + .keyBy(new Tuple3KeySelector()) .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .aggregate(new DummyAggregationFunction()); - OneInputTransformation, Tuple2> transform = - (OneInputTransformation, Tuple2>) window1.getTransformation(); + final OneInputTransformation, Integer> transform = + (OneInputTransformation, Integer>) window1.getTransformation(); - OneInputStreamOperator, Tuple2> operator = transform.getOperator(); + final OneInputStreamOperator, Integer> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator, ?, ?, ?> winOperator = - (WindowOperator, ?, ?, ?>) operator; + WindowOperator, ?, ?, ?> winOperator = + (WindowOperator, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor); processElementAndEnsureOutput( - winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } @Test @@ -633,28 +640,30 @@ public void testAggregateProcessingTime() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); - DataStream> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); - DataStream> window1 = source - .keyBy(new TupleKeySelector()) + DataStream window1 = source + .keyBy(new Tuple3KeySelector()) .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .aggregate(new DummyAggregationFunction()); - OneInputTransformation, Tuple2> transform = - (OneInputTransformation, Tuple2>) window1.getTransformation(); + final OneInputTransformation, Integer> transform = + (OneInputTransformation, Integer>) window1.getTransformation(); - OneInputStreamOperator, Tuple2> operator = transform.getOperator(); + final OneInputStreamOperator, Integer> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator, ?, ?, ?> winOperator = - (WindowOperator, ?, ?, ?>) operator; + WindowOperator, ?, ?, ?> winOperator = + (WindowOperator, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor); processElementAndEnsureOutput( - winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } @Test @@ -662,30 +671,32 @@ public void testAggregateWithWindowFunctionEventTime() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - DataStream> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); DummyReducer reducer = new DummyReducer(); - DataStream> window = source - .keyBy(new TupleKeySelector()) + DataStream window = source + .keyBy(new Tuple3KeySelector()) .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .aggregate(new DummyAggregationFunction(), new TestWindowFunction()); - OneInputTransformation, Tuple3> transform = - (OneInputTransformation, Tuple3>) window.getTransformation(); + final OneInputTransformation, String> transform = + (OneInputTransformation, String>) window.getTransformation(); - OneInputStreamOperator, Tuple3> operator = transform.getOperator(); + final OneInputStreamOperator, String> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator, ?, ?, ?> winOperator = - (WindowOperator, ?, ?, ?>) operator; + WindowOperator, ?, ?, ?> winOperator = + (WindowOperator, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor); processElementAndEnsureOutput( - operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } @Test @@ -693,28 +704,30 @@ public void testAggregateWithWindowFunctionProcessingTime() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); - DataStream> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); - DataStream> window = source - .keyBy(new TupleKeySelector()) + DataStream window = source + .keyBy(new Tuple3KeySelector()) .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .aggregate(new DummyAggregationFunction(), new TestWindowFunction()); - OneInputTransformation, Tuple3> transform = - (OneInputTransformation, Tuple3>) window.getTransformation(); + final OneInputTransformation, String> transform = + (OneInputTransformation, String>) window.getTransformation(); - OneInputStreamOperator, Tuple3> operator = transform.getOperator(); + final OneInputStreamOperator, String> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator, ?, ?, ?> winOperator = - (WindowOperator, ?, ?, ?>) operator; + WindowOperator, ?, ?, ?> winOperator = + (WindowOperator, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor); processElementAndEnsureOutput( - operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } @Test @@ -722,30 +735,30 @@ public void testAggregateWithProcessWindowFunctionEventTime() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - DataStream> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); - DummyReducer reducer = new DummyReducer(); - - DataStream> window = source - .keyBy(new TupleKeySelector()) + DataStream window = source + .keyBy(new Tuple3KeySelector()) .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction()); - OneInputTransformation, Tuple3> transform = - (OneInputTransformation, Tuple3>) window.getTransformation(); + final OneInputTransformation, String> transform = + (OneInputTransformation, String>) window.getTransformation(); - OneInputStreamOperator, Tuple3> operator = transform.getOperator(); + final OneInputStreamOperator, String> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator, ?, ?, ?> winOperator = - (WindowOperator, ?, ?, ?>) operator; + WindowOperator, ?, ?, ?> winOperator = + (WindowOperator, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor); processElementAndEnsureOutput( - operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } @Test @@ -753,28 +766,30 @@ public void testAggregateWithProcessWindowFunctionProcessingTime() throws Except StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); - DataStream> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); - DataStream> window = source - .keyBy(new TupleKeySelector()) + DataStream window = source + .keyBy(new Tuple3KeySelector()) .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction()); - OneInputTransformation, Tuple3> transform = - (OneInputTransformation, Tuple3>) window.getTransformation(); + final OneInputTransformation, String> transform = + (OneInputTransformation, String>) window.getTransformation(); - OneInputStreamOperator, Tuple3> operator = transform.getOperator(); + final OneInputStreamOperator, String> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator, ?, ?, ?> winOperator = - (WindowOperator, ?, ?, ?>) operator; + WindowOperator, ?, ?, ?> winOperator = + (WindowOperator, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor); processElementAndEnsureOutput( - operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } // ------------------------------------------------------------------------ @@ -1406,29 +1421,31 @@ public void testAggregateWithEvictor() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - DataStream> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); - DataStream> window1 = source - .keyBy(new TupleKeySelector()) + DataStream window1 = source + .keyBy(new Tuple3KeySelector()) .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .evictor(CountEvictor.of(100)) .aggregate(new DummyAggregationFunction()); - OneInputTransformation, Tuple2> transform = - (OneInputTransformation, Tuple2>) window1.getTransformation(); + final OneInputTransformation, Integer> transform = + (OneInputTransformation, Integer>) window1.getTransformation(); - OneInputStreamOperator, Tuple2> operator = transform.getOperator(); + final OneInputStreamOperator, Integer> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator, ?, ?, ?> winOperator = - (WindowOperator, ?, ?, ?>) operator; + WindowOperator, ?, ?, ?> winOperator = + (WindowOperator, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); processElementAndEnsureOutput( - winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } @Test @@ -1436,42 +1453,33 @@ public void testAggregateWithEvictorAndProcessFunction() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - DataStream> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + DataStream> source = env.fromElements( + Tuple3.of("hello", "hallo", 1), + Tuple3.of("hello", "hallo", 2)); - DataStream> window1 = source - .keyBy(new TupleKeySelector()) + DataStream window1 = source + .keyBy(new Tuple3KeySelector()) .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .evictor(CountEvictor.of(100)) .aggregate( new DummyAggregationFunction(), - new ProcessWindowFunction, Tuple2, String, TimeWindow>() { - @Override - public void process( - String s, - Context context, - Iterable> elements, - Collector> out) throws Exception { - for (Tuple2 in : elements) { - out.collect(in); - } - } - }); + new TestProcessWindowFunction()); - OneInputTransformation, Tuple2> transform = - (OneInputTransformation, Tuple2>) window1.getTransformation(); + final OneInputTransformation, String> transform = + (OneInputTransformation, String>) window1.getTransformation(); - OneInputStreamOperator, Tuple2> operator = transform.getOperator(); + final OneInputStreamOperator, String> operator = transform.getOperator(); Assert.assertTrue(operator instanceof WindowOperator); - WindowOperator, ?, ?, ?> winOperator = - (WindowOperator, ?, ?, ?>) operator; + WindowOperator, ?, ?, ?> winOperator = + (WindowOperator, ?, ?, ?>) operator; Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows); Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor); processElementAndEnsureOutput( - winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); + winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello", "hallo", 1)); } @Test @@ -1683,7 +1691,7 @@ public Tuple3 fold( } private static class DummyAggregationFunction - implements AggregateFunction, Tuple2, Tuple2> { + implements AggregateFunction, Tuple2, Integer> { @Override public Tuple2 createAccumulator() { @@ -1691,14 +1699,14 @@ public Tuple2 createAccumulator() { } @Override - public void add(Tuple2 value, Tuple2 accumulator) { + public void add(Tuple3 value, Tuple2 accumulator) { accumulator.f0 = value.f0; - accumulator.f1 = value.f1; + accumulator.f1 = value.f2; } @Override - public Tuple2 getResult(Tuple2 accumulator) { - return accumulator; + public Integer getResult(Tuple2 accumulator) { + return accumulator.f1; } @Override @@ -1729,31 +1737,31 @@ public T merge(T a, T b) { } private static class TestWindowFunction - implements WindowFunction, Tuple3, String, TimeWindow> { + implements WindowFunction { @Override public void apply(String key, TimeWindow window, - Iterable> values, - Collector> out) throws Exception { + Iterable values, + Collector out) throws Exception { - for (Tuple2 in : values) { - out.collect(new Tuple3<>(in.f0, in.f0, in.f1)); + for (Integer in : values) { + out.collect(in.toString()); } } } private static class TestProcessWindowFunction - extends ProcessWindowFunction, Tuple3, String, TimeWindow> { + extends ProcessWindowFunction { @Override public void process(String key, Context ctx, - Iterable> values, - Collector> out) throws Exception { + Iterable values, + Collector out) throws Exception { - for (Tuple2 in : values) { - out.collect(new Tuple3<>(in.f0, in.f0, in.f1)); + for (Integer in : values) { + out.collect(in.toString()); } } } @@ -1765,4 +1773,12 @@ public String getKey(Tuple2 value) throws Exception { return value.f0; } } + + private static class Tuple3KeySelector implements KeySelector, String> { + + @Override + public String getKey(Tuple3 value) throws Exception { + return value.f0; + } + } } From 4c8cbeb9020d3045641912234414f032c8b3f348 Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Fri, 2 Jun 2017 11:58:41 +0200 Subject: [PATCH 2/6] Rewritten also getBinaryOperatorReturnType --- .../api/java/typeutils/TypeExtractor.java | 227 +++++++----------- .../org/apache/flink/cep/CEPLambdaTest.java | 2 - .../cep/operator/CEPMigration11to13Test.java | 4 +- .../api/datastream/AllWindowedStream.java | 3 +- .../api/datastream/CoGroupedStreams.java | 20 +- .../api/datastream/ConnectedStreams.java | 48 +++- .../api/datastream/JoinedStreams.java | 40 +-- .../api/datastream/WindowedStream.java | 2 - 8 files changed, 160 insertions(+), 186 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 1aa5d867d27f9..408235d926265 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -234,7 +234,15 @@ public static TypeInformation getAggregateFunctionAccumulatorType boolean allowMissing) { return getUnaryOperatorReturnType( - function, AggregateFunction.class, 0, 1, inType, functionName, allowMissing); + function, + AggregateFunction.class, + 0, + 1, + new int[]{0}, + NO_OUTPUT_INDEX, + inType, + functionName, + allowMissing); } @PublicEvolving @@ -245,7 +253,15 @@ public static TypeInformation getAggregateFunctionReturnType( boolean allowMissing) { return getUnaryOperatorReturnType( - function, AggregateFunction.class, 0, 2, inType, functionName, allowMissing); + function, + AggregateFunction.class, + 0, + 2, + new int[]{0}, + NO_OUTPUT_INDEX, + inType, + functionName, + allowMissing); } @PublicEvolving @@ -262,8 +278,8 @@ public static TypeInformation getMapPartitionReturnTypes(MapParti MapPartitionFunction.class, 0, 1, - new int[]{0, 1}, - new int[]{0, 1}, + new int[]{0, 0}, + new int[]{1, 0}, inType, functionName, allowMissing); @@ -283,8 +299,8 @@ public static TypeInformation getGroupReduceReturnTypes(GroupRedu GroupReduceFunction.class, 0, 1, - new int[]{0, 1}, - new int[]{0, 1}, + new int[]{0, 0}, + new int[]{1, 0}, inType, functionName, allowMissing); @@ -304,8 +320,8 @@ public static TypeInformation getGroupCombineReturnTypes(GroupCom GroupCombineFunction.class, 0, 1, - new int[]{0, 1}, - new int[]{0, 1}, + new int[]{0, 0}, + new int[]{1, 0}, inType, functionName, allowMissing); @@ -322,8 +338,19 @@ public static TypeInformation getFlatJoinReturnTypes(FlatJo public static TypeInformation getFlatJoinReturnTypes(FlatJoinFunction joinInterface, TypeInformation in1Type, TypeInformation in2Type, String functionName, boolean allowMissing) { - return getBinaryOperatorReturnType((Function) joinInterface, FlatJoinFunction.class, false, true, - in1Type, in2Type, functionName, allowMissing); + return getBinaryOperatorReturnType( + (Function) joinInterface, + FlatJoinFunction.class, + 0, + 1, + 2, + new int[]{0}, + new int[]{1}, + new int[]{2, 0}, + in1Type, + in2Type, + functionName, + allowMissing); } @PublicEvolving @@ -337,8 +364,19 @@ public static TypeInformation getJoinReturnTypes(JoinFuncti public static TypeInformation getJoinReturnTypes(JoinFunction joinInterface, TypeInformation in1Type, TypeInformation in2Type, String functionName, boolean allowMissing) { - return getBinaryOperatorReturnType((Function) joinInterface, JoinFunction.class, false, false, - in1Type, in2Type, functionName, allowMissing); + return getBinaryOperatorReturnType( + (Function) joinInterface, + JoinFunction.class, + 0, + 1, + 2, + new int[]{0}, + new int[]{1}, + NO_OUTPUT_INDEX,, + in1Type, + in2Type, + functionName, + allowMissing); } @PublicEvolving @@ -352,8 +390,19 @@ public static TypeInformation getCoGroupReturnTypes(CoGroup public static TypeInformation getCoGroupReturnTypes(CoGroupFunction coGroupInterface, TypeInformation in1Type, TypeInformation in2Type, String functionName, boolean allowMissing) { - return getBinaryOperatorReturnType((Function) coGroupInterface, CoGroupFunction.class, true, true, - in1Type, in2Type, functionName, allowMissing); + return getBinaryOperatorReturnType( + (Function) coGroupInterface, + CoGroupFunction.class, + 0, + 1, + 2, + new int[]{0, 0}, + new int[]{1, 0}, + new int[]{2, 0}, + in1Type, + in2Type, + functionName, + allowMissing); } @PublicEvolving @@ -367,8 +416,19 @@ public static TypeInformation getCrossReturnTypes(CrossFunc public static TypeInformation getCrossReturnTypes(CrossFunction crossInterface, TypeInformation in1Type, TypeInformation in2Type, String functionName, boolean allowMissing) { - return getBinaryOperatorReturnType((Function) crossInterface, CrossFunction.class, false, false, - in1Type, in2Type, functionName, allowMissing); + return getBinaryOperatorReturnType( + (Function) crossInterface, + CrossFunction.class, + 0, + 1, + 2, + new int[]{0}, + new int[]{1}, + NO_OUTPUT_INDEX, + in1Type, + in2Type, + functionName, + allowMissing); } @PublicEvolving @@ -416,85 +476,6 @@ public static TypeInformation getInputFormatTypes(InputFormat in // Generic extraction methods // -------------------------------------------------------------------------------------------- - /** - * Returns the unary operator's return type. - * - * @param function Function to extract the return type from - * @param baseClass Base class of the function - * @param hasIterable True if the first function parameter is an iterable, otherwise false - * @param hasCollector True if the function has an additional collector parameter, otherwise false - * @param inType Type of the input elements (In case of an iterable, it is the element type) - * @param functionName Function name - * @param allowMissing Can the type information be missing - * @param Input type - * @param Output type - * @return TypeInformation of the return type of the function - * @deprecated Use version explicitly specyifing types indices. - */ - @SuppressWarnings("unchecked") - @PublicEvolving - @Deprecated - public static TypeInformation getUnaryOperatorReturnType( - Function function, - Class baseClass, - boolean hasIterable, - boolean hasCollector, - TypeInformation inType, - String functionName, - boolean allowMissing) { - - return getUnaryOperatorReturnType( - function, - baseClass, - 0, - 1, - hasIterable ? new int[]{0, 0} : new int[]{0}, - hasCollector ? new int[]{1, 0} : NO_OUTPUT_INDEX, - inType, - functionName, - allowMissing); - } - - /** - * Returns the unary operator's return type. - * - * @param function Function to extract the return type from - * @param baseClass Base class of the function - * @param inputTypeArgumentIndex Index of the type argument of function's first parameter - * specifying the input type if it is wrapped (Iterable, Map, - * etc.). Otherwise -1. - * @param outputTypeArgumentIndex Index of the type argument of function's second parameter - * specifying the output type if it is wrapped in a Collector. - * Otherwise -1. - * @param inType Type of the input elements (In case of an iterable, it is the element type) - * @param functionName Function name - * @param allowMissing Can the type information be missing - * @param Input type - * @param Output type - * @return TypeInformation of the return type of the function - */ - @SuppressWarnings("unchecked") - @PublicEvolving - public static TypeInformation getUnaryOperatorReturnType( - Function function, - Class baseClass, - int inputTypeArgumentIndex, - int outputTypeArgumentIndex, - TypeInformation inType, - String functionName, - boolean allowMissing) { - return getUnaryOperatorReturnType( - function, - baseClass, - inputTypeArgumentIndex, - outputTypeArgumentIndex, - new int[]{0}, - new int[]{}, - inType, - functionName, - allowMissing); - } - /** * Returns the unary operator's return type. * @@ -544,13 +525,13 @@ public static TypeInformation getUnaryOperatorReturnType( validateLambdaGenericParameters(exec); // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function - final int paramLen = exec.getParameterTypes().length - 1; + final int paramLen = exec.getParameterTypes().length; final Method sam = getSingleAbstractMethod(baseClass); final int baseParametersLen = sam.getParameterCount(); // executable references "this" implicitly - if (paramLen < 0) { + if (paramLen <= 0) { // executable declaring class can also be a super class of the input type // we only validate if the executable exists in input type validateInputContainsExecutable(exec, inType); @@ -620,56 +601,12 @@ private static Method getSingleAbstractMethod(Class baseClass) { /** * Returns the binary operator's return type. * - * @param function Function to extract the return type from - * @param baseClass Base class of the function - * @param hasIterables True if the first function parameter is an iterable, otherwise false - * @param hasCollector True if the function has an additional collector parameter, otherwise false - * @param in1Type Type of the left side input elements (In case of an iterable, it is the element type) - * @param in2Type Type of the right side input elements (In case of an iterable, it is the element type) - * @param functionName Function name - * @param allowMissing Can the type information be missing - * @param Left side input type - * @param Right side input type - * @param Output type - * @return TypeInformation of the return type of the function - */ - @SuppressWarnings("unchecked") - @PublicEvolving - public static TypeInformation getBinaryOperatorReturnType( - Function function, - Class baseClass, - boolean hasIterables, - boolean hasCollector, - TypeInformation in1Type, - TypeInformation in2Type, - String functionName, - boolean allowMissing) { - - return getBinaryOperatorReturnType( - function, - baseClass, - 0, - 1, - 2, - hasIterables ? new int[]{0, 0} : new int[]{0}, - hasIterables ? new int[]{1, 0} : new int[]{1}, - hasCollector ? new int[]{2, 0} : NO_OUTPUT_INDEX, - in1Type, - in2Type, - functionName, - allowMissing - ); - } - - /** - * Returns the binary operator's return type. - * - *

NOTE: lambda type indices allows extraction of Type from lambdas. To extract input type IN - * from the function given below one should pass {@code new int[] {0,1,0}} as lambdaInputTypeArgumentIndices. + *

NOTE: lambda type indices allows extraction of Type from lambdas. To extract input type IN1 + * from the function given below one should pass {@code new int[] {0,1,0}} as lambdaInput1TypeArgumentIndices. * *

 	 * 
-	 * OUT apply(Map> value)
+	 * OUT apply(Map> value1, List value2)
 	 * 
 	 * 
* @@ -720,7 +657,7 @@ public static TypeInformation getBinaryOperatorReturnType( final int baseParametersLen = sam.getParameterCount(); // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function - final int paramLen = exec.getParameterTypes().length - 1; + final int paramLen = exec.getParameterTypes().length; final Type input1 = extractType(exec, lambdaInput1TypeArgumentIndices, paramLen, baseParametersLen); final Type input2 = extractType(exec, lambdaInput2TypeArgumentIndices, paramLen, baseParametersLen); diff --git a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java index 4eff03739dc7d..37bf872f6ee00 100644 --- a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java +++ b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java @@ -53,7 +53,6 @@ public static class EventB {} * Tests that a Java8 lambda can be passed as a CEP select function. */ @Test - @Ignore public void testLambdaSelectFunction() { TypeInformation eventTypeInformation = TypeExtractor.getForClass(EventA.class); TypeInformation outputTypeInformation = TypeExtractor.getForClass(EventB.class); @@ -81,7 +80,6 @@ public void testLambdaSelectFunction() { * Tests that a Java8 lambda can be passed as a CEP flat select function. */ @Test - @Ignore public void testLambdaFlatSelectFunction() { TypeInformation eventTypeInformation = TypeExtractor.getForClass(EventA.class); TypeInformation outputTypeInformation = TypeExtractor.getForClass(EventB.class); diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java index 69ba42fbb3c23..950b5daeaa134 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java @@ -232,7 +232,7 @@ public void testNonKeyedCEPFunctionMigration() throws Exception { NullByteKeySelector keySelector = new NullByteKeySelector(); OneInputStreamOperatorTestHarness>> harness = - new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedOneInputStreamOperatorTestHarness>>( new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), false, @@ -284,7 +284,7 @@ public void testNonKeyedCEPFunctionMigration() throws Exception { OperatorStateHandles snapshot = harness.snapshot(1L, 1L); harness.close(); - harness = new KeyedOneInputStreamOperatorTestHarness<>( + harness = new KeyedOneInputStreamOperatorTestHarness>>( new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), false, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 1eddd5c2504bb..ae971097afa59 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -669,8 +669,7 @@ public SingleOutputStreamOperator aggregate( TypeInformation aggResultType = TypeExtractor.getAggregateFunctionReturnType( aggFunction, input.getType(), null, false); - TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( - windowFunction, ProcessAllWindowFunction.class, 0, 1, aggResultType, null, false); + TypeInformation resultType = getProcessAllWindowFunctionReturnType(windowFunction, aggResultType); return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java index d112260631a2b..8ea882a4e438c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java @@ -231,14 +231,18 @@ public WithWindow evictor(Evictor, ? public DataStream apply(CoGroupFunction function) { TypeInformation resultType = TypeExtractor.getBinaryOperatorReturnType( - function, - CoGroupFunction.class, - true, - true, - input1.getType(), - input2.getType(), - "CoGroup", - false); + function, + CoGroupFunction.class, + 0, + 1, + 2, + new int[]{0, 0}, + new int[]{1, 0}, + new int[]{2, 0}, + input1.getType(), + input2.getType(), + "CoGroup", + false); return apply(function, resultType); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java index 0b882c8b0e18b..dbf3768c68fd7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java @@ -203,9 +203,19 @@ public ConnectedStreams keyBy(KeySelector keySelector1, KeySel */ public SingleOutputStreamOperator map(CoMapFunction coMapper) { - TypeInformation outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper, - CoMapFunction.class, false, true, getType1(), getType2(), - Utils.getCallLocationName(), true); + TypeInformation outTypeInfo = TypeExtractor.getBinaryOperatorReturnType( + coMapper, + CoMapFunction.class, + 0, + 1, + 2, + new int[]{0}, + new int[]{1}, + new int[]{2, 0}, + getType1(), + getType2(), + Utils.getCallLocationName(), + true); return transform("Co-Map", outTypeInfo, new CoStreamMap<>(inputStream1.clean(coMapper))); @@ -227,9 +237,19 @@ CoMapFunction.class, false, true, getType1(), getType2(), public SingleOutputStreamOperator flatMap( CoFlatMapFunction coFlatMapper) { - TypeInformation outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper, - CoFlatMapFunction.class, false, true, getType1(), getType2(), - Utils.getCallLocationName(), true); + TypeInformation outTypeInfo = TypeExtractor.getBinaryOperatorReturnType( + coFlatMapper, + CoFlatMapFunction.class, + 0, + 1, + 2, + new int[]{0}, + new int[]{1}, + new int[]{2, 0}, + getType1(), + getType2(), + Utils.getCallLocationName(), + true); return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper))); } @@ -254,9 +274,19 @@ CoFlatMapFunction.class, false, true, getType1(), getType2(), public SingleOutputStreamOperator process( CoProcessFunction coProcessFunction) { - TypeInformation outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coProcessFunction, - CoProcessFunction.class, false, true, getType1(), getType2(), - Utils.getCallLocationName(), true); + TypeInformation outTypeInfo = TypeExtractor.getBinaryOperatorReturnType( + coProcessFunction, + CoProcessFunction.class, + 0, + 1, + 2, + new int[]{0}, + new int[]{1}, + new int[]{2, 0}, + getType1(), + getType2(), + Utils.getCallLocationName(), + true); return process(coProcessFunction, outTypeInfo); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java index e1ffe866312ae..c63b4567b9138 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java @@ -221,14 +221,18 @@ public WithWindow evictor(Evictor, ? */ public DataStream apply(JoinFunction function) { TypeInformation resultType = TypeExtractor.getBinaryOperatorReturnType( - function, - JoinFunction.class, - true, - true, - input1.getType(), - input2.getType(), - "Join", - false); + function, + JoinFunction.class, + 0, + 1, + 2, + new int[]{0, 0}, + new int[]{1, 0}, + new int[]{2, 0}, + input1.getType(), + input2.getType(), + "Join", + false); return apply(function, resultType); } @@ -300,14 +304,18 @@ public SingleOutputStreamOperator with(FlatJoinFunction functi */ public DataStream apply(FlatJoinFunction function) { TypeInformation resultType = TypeExtractor.getBinaryOperatorReturnType( - function, - FlatJoinFunction.class, - true, - true, - input1.getType(), - input2.getType(), - "Join", - false); + function, + FlatJoinFunction.class, + 0, + 1, + 2, + new int[]{0, 0}, + new int[]{1, 0}, + new int[]{2, 0}, + input1.getType(), + input2.getType(), + "Join", + false); return apply(function, resultType); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index bb8baab32b19e..a7950645ae1c3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -661,8 +661,6 @@ public SingleOutputStreamOperator fold(ACC initialValue, FoldFunctio Utils.getCallLocationName(), true); TypeInformation windowResultType = getProcessWindowFunctionReturnType(windowFunction, foldResultType, Utils.getCallLocationName()); - TypeExtractor.getUnaryOperatorReturnType( - windowFunction, ProcessWindowFunction.class, 0, 1, foldResultType, Utils.getCallLocationName(), false); return fold(initialValue, foldFunction, windowFunction, foldResultType, windowResultType); } From 3f4108b86a85221012061c60cf017779ce4466ae Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Fri, 2 Jun 2017 13:00:52 +0200 Subject: [PATCH 3/6] Fixed compilation error --- .../java/org/apache/flink/api/java/typeutils/TypeExtractor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 408235d926265..aa2564607ec8c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -372,7 +372,7 @@ public static TypeInformation getJoinReturnTypes(JoinFuncti 2, new int[]{0}, new int[]{1}, - NO_OUTPUT_INDEX,, + NO_OUTPUT_INDEX, in1Type, in2Type, functionName, From 81a89006185549094bfd5ed4f7554350fea2ddb3 Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Fri, 2 Jun 2017 14:09:58 +0200 Subject: [PATCH 4/6] Make the code java7 compatible --- .../org/apache/flink/api/java/typeutils/TypeExtractor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index aa2564607ec8c..a09bc9d4e8c75 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -528,7 +528,7 @@ public static TypeInformation getUnaryOperatorReturnType( final int paramLen = exec.getParameterTypes().length; final Method sam = getSingleAbstractMethod(baseClass); - final int baseParametersLen = sam.getParameterCount(); + final int baseParametersLen = sam.getParameterTypes().length; // executable references "this" implicitly if (paramLen <= 0) { @@ -654,7 +654,7 @@ public static TypeInformation getBinaryOperatorReturnType( validateLambdaGenericParameters(exec); final Method sam = getSingleAbstractMethod(baseClass); - final int baseParametersLen = sam.getParameterCount(); + final int baseParametersLen = sam.getParameterTypes().length; // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function final int paramLen = exec.getParameterTypes().length; From 262ba54f4741162d46614206c4593953462cfcff Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Fri, 2 Jun 2017 14:24:57 +0200 Subject: [PATCH 5/6] Updated preconditions --- .../api/java/typeutils/TypeExtractor.java | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index a09bc9d4e8c75..684f7b2aa30ca 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -521,6 +521,12 @@ public static TypeInformation getUnaryOperatorReturnType( throw new InvalidTypesException("Internal error occurred.", e); } if (exec != null) { + Preconditions.checkArgument( + lambdaInputTypeArgumentIndices != null && lambdaInputTypeArgumentIndices.length >= 1, + "Indices for input type arguments within lambda not provided"); + Preconditions.checkArgument( + lambdaOutputTypeArgumentIndices != null, + "Indices for output type arguments within lambda not provided"); // check for lambda type erasure validateLambdaGenericParameters(exec); @@ -555,7 +561,7 @@ public static TypeInformation getUnaryOperatorReturnType( return new TypeExtractor().privateCreateTypeInfo(output, inType, null); } else { Preconditions.checkArgument(inputTypeArgumentIndex >= 0, "Input type argument index was not provided"); - Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Input type argument index was not provided"); + Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Output type argument index was not provided"); validateInputType(baseClass, function.getClass(), inputTypeArgumentIndex, inType); if(function instanceof ResultTypeQueryable) { return ((ResultTypeQueryable) function).getProducedType(); @@ -650,6 +656,15 @@ public static TypeInformation getBinaryOperatorReturnType( throw new InvalidTypesException("Internal error occurred.", e); } if (exec != null) { + Preconditions.checkArgument( + lambdaInput1TypeArgumentIndices != null && lambdaInput1TypeArgumentIndices.length >= 1, + "Indices for first input type arguments within lambda not provided"); + Preconditions.checkArgument( + lambdaInput2TypeArgumentIndices != null && lambdaInput1TypeArgumentIndices.length >= 1, + "Indices for second input type arguments within lambda not provided"); + Preconditions.checkArgument( + lambdaOutputTypeArgumentIndices != null, + "Indices for output type arguments within lambda not provided"); // check for lambda type erasure validateLambdaGenericParameters(exec); @@ -681,6 +696,9 @@ public static TypeInformation getBinaryOperatorReturnType( in2Type); } else { + Preconditions.checkArgument(input1TypeArgumentIndex >= 0, "Input 1 type argument index was not provided"); + Preconditions.checkArgument(input2TypeArgumentIndex >= 0, "Input 2 type argument index was not provided"); + Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Output type argument index was not provided"); validateInputType(baseClass, function.getClass(), input1TypeArgumentIndex, in1Type); validateInputType(baseClass, function.getClass(), input2TypeArgumentIndex, in2Type); if(function instanceof ResultTypeQueryable) { From c55dd5149cd75994d8d92337664272470cdc7f34 Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Wed, 7 Jun 2017 15:57:08 +0200 Subject: [PATCH 6/6] comments addressed --- .../api/common/functions/Partitioner.java | 6 +- .../java/typeutils/TypeExtractionUtils.java | 74 +++++++++ .../api/java/typeutils/TypeExtractor.java | 156 ++++++++++-------- .../type/lambdas/LambdaExtractionTest.java | 13 ++ .../api/datastream/CoGroupedStreams.java | 9 +- .../api/datastream/ConnectedStreams.java | 18 +- .../api/datastream/JoinedStreams.java | 10 +- 7 files changed, 189 insertions(+), 97 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java index 6c237ed57d5b3..c272d3a332620 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java @@ -22,15 +22,15 @@ /** * Function to implement a custom partition assignment for keys. - * + * * @param The type of the key to be partitioned. */ @Public -public interface Partitioner extends java.io.Serializable { +public interface Partitioner extends java.io.Serializable, Function { /** * Computes the partition for the given key. - * + * * @param key The key. * @param numPartitions The number of partitions to partition into. * @return The partition index. diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java index 0aac257aa84c6..0c5ba52374989 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java @@ -20,6 +20,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Method; +import java.lang.reflect.Modifier; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.lang.reflect.TypeVariable; @@ -28,6 +29,8 @@ import java.util.List; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.InvalidTypesException; + import static org.objectweb.asm.Type.getConstructorDescriptor; import static org.objectweb.asm.Type.getMethodDescriptor; @@ -160,6 +163,77 @@ public static LambdaExecutable checkAndExtractLambda(Function function) throws T } } + /** + * Extracts type from given index from lambda. It supports nested types. + * + * @param exec lambda function to extract the type from + * @param lambdaTypeArgumentIndices position of type to extract in type hierarchy + * @param paramLen count of total parameters of the lambda (including closure parameters) + * @param baseParametersLen count of lambda interface parameters (without closure parameters) + * @return extracted type + */ + public static Type extractTypeFromLambda( + LambdaExecutable exec, + int[] lambdaTypeArgumentIndices, + int paramLen, + int baseParametersLen) { + Type output = exec.getParameterTypes()[paramLen - baseParametersLen + lambdaTypeArgumentIndices[0]]; + for (int i = 1; i < lambdaTypeArgumentIndices.length; i++) { + output = extractTypeArgument(output, lambdaTypeArgumentIndices[i]); + } + return output; + } + + /** + * * This method extracts the n-th type argument from the given type. An InvalidTypesException + * is thrown if the type does not have any type arguments or if the index exceeds the number + * of type arguments. + * + * @param t Type to extract the type arguments from + * @param index Index of the type argument to extract + * @return The extracted type argument + * @throws InvalidTypesException if the given type does not have any type arguments or if the + * index exceeds the number of type arguments. + */ + public static Type extractTypeArgument(Type t, int index) throws InvalidTypesException { + if (t instanceof ParameterizedType) { + Type[] actualTypeArguments = ((ParameterizedType) t).getActualTypeArguments(); + + if (index < 0 || index >= actualTypeArguments.length) { + throw new InvalidTypesException("Cannot extract the type argument with index " + + index + " because the type has only " + actualTypeArguments.length + + " type arguments."); + } else { + return actualTypeArguments[index]; + } + } else { + throw new InvalidTypesException("The given type " + t + " is not a parameterized type."); + } + } + + /** + * Extracts a Single Abstract Method (SAM) as defined in Java Specification (4.3.2. The Class Object, + * 9.8 Functional Interfaces, 9.4.3 Interface Method Body) from given class. + * + * @param baseClass + * @throws InvalidTypesException if the given class does not implement + * @return + */ + public static Method getSingleAbstractMethod(Class baseClass) { + Method sam = null; + for (Method method : baseClass.getMethods()) { + if (Modifier.isAbstract(method.getModifiers())) { + if (sam == null) { + sam = method; + } else { + throw new InvalidTypesException( + "Given class: " + baseClass + " is not a FunctionalInterface. It does not have a SAM."); + } + } + } + return sam; + } + /** * Returns all declared methods of a class including methods of superclasses. */ diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 684f7b2aa30ca..c50dfc9b8115d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -116,7 +116,7 @@ public class TypeExtractor { private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class); - public static final int[] NO_OUTPUT_INDEX = new int[] {}; + public static final int[] NO_INDEX = new int[] {}; protected TypeExtractor() { // only create instances for special use cases @@ -169,7 +169,7 @@ public static TypeInformation getMapReturnTypes(MapFunction TypeInformation getFoldReturnTypes(FoldFunction TypeInformation getAggregateFunctionAccumulatorType 0, 1, new int[]{0}, - NO_OUTPUT_INDEX, + NO_INDEX, inType, functionName, allowMissing); @@ -257,8 +257,8 @@ public static TypeInformation getAggregateFunctionReturnType( AggregateFunction.class, 0, 2, - new int[]{0}, - NO_OUTPUT_INDEX, + NO_INDEX, + NO_INDEX, inType, functionName, allowMissing); @@ -372,7 +372,7 @@ public static TypeInformation getJoinReturnTypes(JoinFuncti 2, new int[]{0}, new int[]{1}, - NO_OUTPUT_INDEX, + NO_INDEX, in1Type, in2Type, functionName, @@ -424,7 +424,7 @@ public static TypeInformation getCrossReturnTypes(CrossFunc 2, new int[]{0}, new int[]{1}, - NO_OUTPUT_INDEX, + NO_INDEX, in1Type, in2Type, functionName, @@ -446,7 +446,7 @@ public static TypeInformation getKeySelectorTypes(KeySelector TypeInformation getPartitionerTypes(Partitioner partitio } @PublicEvolving - public static TypeInformation getPartitionerTypes(Partitioner partitioner, String functionName, boolean allowMissing) { - return new TypeExtractor().privateCreateTypeInfo(Partitioner.class, partitioner.getClass(), 0, null, null); + public static TypeInformation getPartitionerTypes( + Partitioner partitioner, + String functionName, + boolean allowMissing) { + try { + final LambdaExecutable exec; + try { + exec = checkAndExtractLambda(partitioner); + } catch (TypeExtractionException e) { + throw new InvalidTypesException("Internal error occurred.", e); + } + if (exec != null) { + // check for lambda type erasure + validateLambdaGenericParameters(exec); + + // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function + // paramLen is the total number of parameters of the provided lambda, it includes parameters added through closure + final int paramLen = exec.getParameterTypes().length; + + final Method sam = TypeExtractionUtils.getSingleAbstractMethod(Partitioner.class); + // number of parameters the SAM of implemented interface has, the parameter indexing aplicates to this range + final int baseParametersLen = sam.getParameterTypes().length; + + final Type keyType = TypeExtractionUtils.extractTypeFromLambda( + exec, + new int[]{0}, + paramLen, + baseParametersLen); + return new TypeExtractor().privateCreateTypeInfo(keyType, null, null); + } else { + return new TypeExtractor().privateCreateTypeInfo( + Partitioner.class, + partitioner.getClass(), + 0, + null, + null); + } + } catch (InvalidTypesException e) { + if (allowMissing) { + return (TypeInformation) new MissingTypeInfo(functionName != null ? functionName : partitioner.toString(), e); + } else { + throw e; + } + } } @@ -479,7 +521,7 @@ public static TypeInformation getInputFormatTypes(InputFormat in /** * Returns the unary operator's return type. * - *

NOTE: lambda type indices allows extraction of Type from lambdas. To extract input type IN + *

NOTE: lambda type indices allow extraction of Type from lambdas. To extract input type IN * from the function given below one should pass {@code new int[] {0,1,0}} as lambdaInputTypeArgumentIndices. * *

@@ -531,9 +573,12 @@ public static  TypeInformation getUnaryOperatorReturnType(
 				validateLambdaGenericParameters(exec);
 
 				// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
+				// paramLen is the total number of parameters of the provided lambda, it includes parameters added through closure
 				final int paramLen = exec.getParameterTypes().length;
 
-				final Method sam = getSingleAbstractMethod(baseClass);
+				final Method sam = TypeExtractionUtils.getSingleAbstractMethod(baseClass);
+
+				// number of parameters the SAM of implemented interface has, the parameter indexing aplicates to this range
 				final int baseParametersLen = sam.getParameterTypes().length;
 
 				// executable references "this" implicitly
@@ -543,7 +588,11 @@ public static  TypeInformation getUnaryOperatorReturnType(
 					validateInputContainsExecutable(exec, inType);
 				}
 				else {
-					final Type input = extractType(exec, lambdaInputTypeArgumentIndices, paramLen, baseParametersLen);
+					final Type input = TypeExtractionUtils.extractTypeFromLambda(
+						exec,
+						lambdaInputTypeArgumentIndices,
+						paramLen,
+						baseParametersLen);
 					validateInputType(input, inType);
 				}
 
@@ -553,7 +602,11 @@ public static  TypeInformation getUnaryOperatorReturnType(
 
 				final Type output;
 				if (lambdaOutputTypeArgumentIndices.length > 0) {
-					output = extractType(exec, lambdaOutputTypeArgumentIndices, paramLen, baseParametersLen);
+					output = TypeExtractionUtils.extractTypeFromLambda(
+						exec,
+						lambdaOutputTypeArgumentIndices,
+						paramLen,
+						baseParametersLen);
 				} else {
 					output = exec.getReturnType();
 				}
@@ -578,32 +631,6 @@ public static  TypeInformation getUnaryOperatorReturnType(
 		}
 	}
 
-	private static Type extractType(
-			LambdaExecutable exec,
-			int[] lambdaTypeArgumentIndices,
-			int paramLen,
-			int baseParametersLen) {
-		Type output = exec.getParameterTypes()[paramLen - baseParametersLen + lambdaTypeArgumentIndices[0]];
-		for (int i = 1; i < lambdaTypeArgumentIndices.length; i++) {
-			output = extractTypeArgument(output, lambdaTypeArgumentIndices[i]);
-		}
-		return output;
-	}
-
-	private static Method getSingleAbstractMethod(Class baseClass) {
-		Method sam = null;
-		for (Method method : baseClass.getMethods()) {
-			if (Modifier.isAbstract(method.getModifiers())) {
-				if (sam == null) {
-					sam = method;
-				} else {
-					throw new InvalidTypesException("Lambda type does not match provided baseClass: " + baseClass);
-				}
-			}
-		}
-		return sam;
-	}
-
 	/**
 	 * Returns the binary operator's return type.
 	 *
@@ -668,14 +695,22 @@ public static  TypeInformation getBinaryOperatorReturnType(
 				// check for lambda type erasure
 				validateLambdaGenericParameters(exec);
 
-				final Method sam = getSingleAbstractMethod(baseClass);
+				final Method sam = TypeExtractionUtils.getSingleAbstractMethod(baseClass);
 				final int baseParametersLen = sam.getParameterTypes().length;
 
 				// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
 				final int paramLen = exec.getParameterTypes().length;
 
-				final Type input1 = extractType(exec, lambdaInput1TypeArgumentIndices, paramLen, baseParametersLen);
-				final Type input2 = extractType(exec, lambdaInput2TypeArgumentIndices, paramLen, baseParametersLen);
+				final Type input1 = TypeExtractionUtils.extractTypeFromLambda(
+					exec,
+					lambdaInput1TypeArgumentIndices,
+					paramLen,
+					baseParametersLen);
+				final Type input2 = TypeExtractionUtils.extractTypeFromLambda(
+					exec,
+					lambdaInput2TypeArgumentIndices,
+					paramLen,
+					baseParametersLen);
 
 				validateInputType(input1, in1Type);
 				validateInputType(input2, in2Type);
@@ -685,7 +720,11 @@ public static  TypeInformation getBinaryOperatorReturnType(
 
 				final Type output;
 				if (lambdaOutputTypeArgumentIndices.length > 0) {
-					output = extractType(exec, lambdaOutputTypeArgumentIndices, paramLen, baseParametersLen);
+					output = TypeExtractionUtils.extractTypeFromLambda(
+						exec,
+						lambdaOutputTypeArgumentIndices,
+						paramLen,
+						baseParametersLen);
 				} else {
 					output = exec.getReturnType();
 				}
@@ -1594,33 +1633,6 @@ private int countFieldsInClass(Class clazz) {
 		return fieldCount;
 	}
 
-	/**
-	 * * This method extracts the n-th type argument from the given type. An InvalidTypesException
-	 * is thrown if the type does not have any type arguments or if the index exceeds the number
-	 * of type arguments.
-	 *
-	 * @param t Type to extract the type arguments from
-	 * @param index Index of the type argument to extract
-	 * @return The extracted type argument
-	 * @throws InvalidTypesException if the given type does not have any type arguments or if the
-	 * index exceeds the number of type arguments.
-	 */
-	private static Type extractTypeArgument(Type t, int index) throws InvalidTypesException {
-		if(t instanceof ParameterizedType) {
-			Type[] actualTypeArguments = ((ParameterizedType) t).getActualTypeArguments();
-
-			if (index < 0 || index >= actualTypeArguments.length) {
-				throw new InvalidTypesException("Cannot extract the type argument with index " +
-					index + " because the type has only " + actualTypeArguments.length +
-					" type arguments.");
-			} else {
-				return actualTypeArguments[index];
-			}
-		} else {
-			throw new InvalidTypesException("The given type " + t + " is not a parameterized type.");
-		}
-	}
-
 	private static void validateLambdaGenericParameters(LambdaExecutable exec) {
 		// check the arguments
 		for (Type t : exec.getParameterTypes()) {
diff --git a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
index 64ff60553ffcd..7500d73323b3f 100644
--- a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
@@ -26,6 +26,7 @@
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -261,6 +262,18 @@ public void testLambdaTypeErasure() {
 		Assert.assertTrue(ti instanceof MissingTypeInfo);
 	}
 
+	@Test
+	public void testPartitionerLambda() {
+		Partitioner> partitioner = (key, numPartitions) -> key.f1.length() % numPartitions;
+		final TypeInformation ti = TypeExtractor.getPartitionerTypes(partitioner);
+
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(2, ti.getArity());
+		Assert.assertEquals(((TupleTypeInfo) ti).getTypeAt(0), BasicTypeInfo.INT_TYPE_INFO);
+		Assert.assertEquals(((TupleTypeInfo) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+
+	}
+
 	private static class MyType {
 		private int key;
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index 8ea882a4e438c..32ced914e3447 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -230,15 +230,8 @@ public WithWindow evictor(Evictor, ?
 		 */
 		public  DataStream apply(CoGroupFunction function) {
 
-			TypeInformation resultType = TypeExtractor.getBinaryOperatorReturnType(
+			TypeInformation resultType = TypeExtractor.getCoGroupReturnTypes(
 				function,
-				CoGroupFunction.class,
-				0,
-				1,
-				2,
-				new int[]{0, 0},
-				new int[]{1, 0},
-				new int[]{2, 0},
 				input1.getType(),
 				input2.getType(),
 				"CoGroup",
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index dbf3768c68fd7..e244bd26bb0c1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -209,9 +209,9 @@ public  SingleOutputStreamOperator map(CoMapFunction coMapper
 			0,
 			1,
 			2,
-			new int[]{0},
-			new int[]{1},
-			new int[]{2, 0},
+			TypeExtractor.NO_INDEX,
+			TypeExtractor.NO_INDEX,
+			TypeExtractor.NO_INDEX,
 			getType1(),
 			getType2(),
 			Utils.getCallLocationName(),
@@ -243,9 +243,9 @@ public  SingleOutputStreamOperator flatMap(
 			0,
 			1,
 			2,
-			new int[]{0},
-			new int[]{1},
-			new int[]{2, 0},
+			TypeExtractor.NO_INDEX,
+			TypeExtractor.NO_INDEX,
+			TypeExtractor.NO_INDEX,
 			getType1(),
 			getType2(),
 			Utils.getCallLocationName(),
@@ -280,9 +280,9 @@ public  SingleOutputStreamOperator process(
 			0,
 			1,
 			2,
-			new int[]{0},
-			new int[]{1},
-			new int[]{2, 0},
+			TypeExtractor.NO_INDEX,
+			TypeExtractor.NO_INDEX,
+			TypeExtractor.NO_INDEX,
 			getType1(),
 			getType2(),
 			Utils.getCallLocationName(),
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
index c63b4567b9138..f23ebcfde2a3c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@@ -226,9 +226,9 @@ public  DataStream apply(JoinFunction function) {
 				0,
 				1,
 				2,
-				new int[]{0, 0},
-				new int[]{1, 0},
-				new int[]{2, 0},
+				new int[]{0},
+				new int[]{1},
+				TypeExtractor.NO_INDEX,
 				input1.getType(),
 				input2.getType(),
 				"Join",
@@ -309,8 +309,8 @@ public  DataStream apply(FlatJoinFunction function) {
 				0,
 				1,
 				2,
-				new int[]{0, 0},
-				new int[]{1, 0},
+				new int[]{0},
+				new int[]{1},
 				new int[]{2, 0},
 				input1.getType(),
 				input2.getType(),