Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ public abstract class SerializerTestBase<T> {
public void testInstantiate() {
try {
TypeSerializer<T> serializer = getSerializer();

if(serializer.getClass().getName().endsWith("KryoSerializer")) {
// the kryo serializer will return null. We ignore this test for Kryo.
return;
}
T instance = serializer.createInstance();
assertNotNull("The created instance must not be null.", instance);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.flink.api.common.typeutils;

import org.apache.flink.api.common.typeutils.TypeSerializer;


public class SerializerTestInstance<T> extends SerializerTestBase<T> {

Expand Down
6 changes: 6 additions & 0 deletions flink-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ under the License.
<artifactId>asm</artifactId>
</dependency>

<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_2.10</artifactId>
<version>0.5.1</version>
</dependency>

<!-- guava needs to be in "provided" scope, to make sure it is not included into the jars by the shading -->
<dependency>
<groupId>com.google.guava</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,16 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;

import java.util.Collection;
import org.apache.flink.api.java.typeutils.runtime.KryoSerializer;


public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {

private final Class<T> typeClass;
private final static Class<?>[] unsupportedByAvro = new Class[] {Collection.class};


public GenericTypeInfo(Class<T> typeClass) {
this.typeClass = typeClass;
for (Class<?> unsupported: unsupportedByAvro) {
if(unsupported.isAssignableFrom(typeClass)) {
throw new RuntimeException("The type '"+typeClass+"' is currently not supported " +
"by the Avro Serializer that Flink is using for serializing " +
"arbitrary objects");
}
}
}

@Override
Expand Down Expand Up @@ -76,10 +66,7 @@ public boolean isKeyType() {

@Override
public TypeSerializer<T> createSerializer() {
// NOTE: The TypeExtractor / pojo logic is assuming that we are using a Avro Serializer here
// in particular classes implementing GenericContainer are handled as GenericTypeInfos
// (this will probably not work with Kryo)
return new AvroSerializer<T>(this.typeClass);
return new KryoSerializer<T>(this.typeClass);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.List;
import java.util.Set;

import org.apache.avro.generic.GenericContainer;
import org.apache.commons.lang3.Validate;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.CrossFunction;
Expand Down Expand Up @@ -232,29 +231,6 @@ private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Type returnTy
// get info from hierarchy
return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
}


/**
* @param curT : start type
* @return Type The immediate child of the top class
*/
private Type recursivelyGetTypeHierarchy(ArrayList<Type> typeHierarchy, Type curT, Class<?> stopAtClass) {
while (!(curT instanceof ParameterizedType && ((Class<?>) ((ParameterizedType) curT).getRawType()).equals(
stopAtClass))
&& !(curT instanceof Class<?> && ((Class<?>) curT).equals(stopAtClass))) {
typeHierarchy.add(curT);

// parameterized type
if (curT instanceof ParameterizedType) {
curT = ((Class<?>) ((ParameterizedType) curT).getRawType()).getGenericSuperclass();
}
// class
else {
curT = ((Class<?>) curT).getGenericSuperclass();
}
}
return curT;
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoWithTypeHierarchy(ArrayList<Type> typeHierarchy, Type t,
Expand Down Expand Up @@ -330,7 +306,7 @@ private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoWithTypeHierarchy(Arr
int fieldCount = countFieldsInClass(tAsClass);
if(fieldCount != tupleSubTypes.length) {
// the class is not a real tuple because it contains additional fields. treat as a pojo
return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>(), null); // the typeHierarchy here should be sufficient, even though it stops at the Tuple.class.
return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>(typeHierarchy), null); // the typeHierarchy here should be sufficient, even though it stops at the Tuple.class.
}

return new TupleTypeInfo(tAsClass, tupleSubTypes);
Expand Down Expand Up @@ -396,23 +372,11 @@ else if (t instanceof ParameterizedType) { //TODO
}
// no tuple, no TypeVariable, no generic type
else if (t instanceof Class) {
return privateGetForClass((Class<OUT>) t, new ArrayList<Type>());
return privateGetForClass((Class<OUT>) t, typeHierarchy);
}

throw new InvalidTypesException("Type Information could not be created.");
}

private int countFieldsInClass(Class<?> clazz) {
int fieldCount = 0;
for(Field field : clazz.getFields()) { // get all fields
if( !Modifier.isStatic(field.getModifiers()) &&
!Modifier.isTransient(field.getModifiers())
) {
fieldCount++;
}
}
return fieldCount;
}

private <IN1, IN2> TypeInformation<?> createTypeInfoFromInputs(TypeVariable<?> returnTypeVar, ArrayList<Type> returnTypeHierarchy,
TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> in2TypeInfo) {
Expand All @@ -427,6 +391,11 @@ private <IN1, IN2> TypeInformation<?> createTypeInfoFromInputs(TypeVariable<?> r
returnTypeVar = (TypeVariable<?>) matReturnTypeVar;
}

// no input information exists
if (in1TypeInfo == null && in2TypeInfo == null) {
return null;
}

// create a new type hierarchy for the input
ArrayList<Type> inputTypeHierarchy = new ArrayList<Type>();
// copy the function part of the type hierarchy
Expand Down Expand Up @@ -753,6 +722,34 @@ else if (typeInfo instanceof GenericTypeInfo<?>) {
// Utility methods
// --------------------------------------------------------------------------------------------

/**
* @param curT : start type
* @return Type The immediate child of the top class
*/
private Type getTypeHierarchy(ArrayList<Type> typeHierarchy, Type curT, Class<?> stopAtClass) {
// skip first one
if (typeHierarchy.size() > 0 && typeHierarchy.get(0) == curT && isClassType(curT)) {
curT = typeToClass(curT).getGenericSuperclass();
}
while (!(isClassType(curT) && typeToClass(curT).equals(stopAtClass))) {
typeHierarchy.add(curT);
curT = typeToClass(curT).getGenericSuperclass();
}
return curT;
}

private int countFieldsInClass(Class<?> clazz) {
int fieldCount = 0;
for(Field field : clazz.getFields()) { // get all fields
if( !Modifier.isStatic(field.getModifiers()) &&
!Modifier.isTransient(field.getModifiers())
) {
fieldCount++;
}
}
return fieldCount;
}

private static Type removeGenericWrapper(Type t) {
if(t instanceof ParameterizedType &&
(Collector.class.isAssignableFrom(typeToClass(t))
Expand Down Expand Up @@ -949,16 +946,16 @@ private <X> TypeInformation<X> privateGetForClass(Class<X> clazz, ArrayList<Type
// special case handling for Class, this should not be handled by the POJO logic
return new GenericTypeInfo<X>(clazz);
}
if(GenericContainer.class.isAssignableFrom(clazz)) {
// this is a type generated by Avro. GenericTypeInfo is able to handle this case because its using Avro.
return new GenericTypeInfo<X>(clazz);
}

try {
TypeInformation<X> pojoType = analyzePojo(clazz, typeHierarchy, clazzTypeHint);
TypeInformation<X> pojoType = analyzePojo(clazz, new ArrayList<Type>(typeHierarchy), clazzTypeHint);
if (pojoType != null) {
return pojoType;
}
} catch (InvalidTypesException e) {
if(LOG.isDebugEnabled()) {
LOG.debug("Unable to handle type "+clazz+" as POJO. Message: "+e.getMessage(), e);
}
// ignore and create generic type info
}

Expand Down Expand Up @@ -992,12 +989,12 @@ private boolean isValidPojoField(Field f, Class<?> clazz, ArrayList<Type> typeHi
}
for(Method m : clazz.getMethods()) {
// check for getter
if( // The name should be "get<FieldName>" or "<fieldName>" (for scala).
(m.getName().toLowerCase().equals("get"+fieldNameLow) || m.getName().toLowerCase().equals(fieldNameLow)) &&
if( // The name should be "get<FieldName>" or "<fieldName>" (for scala) or "is<fieldName>" for boolean fields.
(m.getName().toLowerCase().equals("get"+fieldNameLow) || m.getName().toLowerCase().equals("is"+fieldNameLow) || m.getName().toLowerCase().equals(fieldNameLow)) &&
// no arguments for the getter
m.getParameterTypes().length == 0 &&
// return type is same as field type (or the generic variant of it)
(m.getReturnType().equals( fieldType ) || (fieldTypeGeneric != null && m.getGenericReturnType().equals(fieldTypeGeneric)) )
(m.getGenericReturnType().equals( fieldType ) || (fieldTypeGeneric != null && m.getGenericReturnType().equals(fieldTypeGeneric)) )
) {
if(hasGetter) {
throw new IllegalStateException("Detected more than one getter");
Expand All @@ -1007,7 +1004,7 @@ private boolean isValidPojoField(Field f, Class<?> clazz, ArrayList<Type> typeHi
// check for setters (<FieldName>_$eq for scala)
if((m.getName().toLowerCase().equals("set"+fieldNameLow) || m.getName().toLowerCase().equals(fieldNameLow+"_$eq")) &&
m.getParameterTypes().length == 1 && // one parameter of the field's type
( m.getParameterTypes()[0].equals( fieldType ) || (fieldTypeGeneric != null && m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&&
( m.getGenericParameterTypes()[0].equals( fieldType ) || (fieldTypeGeneric != null && m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&&
// return type is void.
m.getReturnType().equals(Void.TYPE)
) {
Expand All @@ -1032,12 +1029,12 @@ private boolean isValidPojoField(Field f, Class<?> clazz, ArrayList<Type> typeHi
}

private <X> TypeInformation<X> analyzePojo(Class<X> clazz, ArrayList<Type> typeHierarchy, ParameterizedType clazzTypeHint) {
// try to create Type hierarchy, if the incoming one is empty.
if(typeHierarchy.size() == 0) {
recursivelyGetTypeHierarchy(typeHierarchy, clazz, Object.class);
// try to create Type hierarchy, if the incoming only contains the most bottom one or none.
if(typeHierarchy.size() <= 1) {
getTypeHierarchy(typeHierarchy, clazz, Object.class);
}
if(clazzTypeHint != null) {
recursivelyGetTypeHierarchy(typeHierarchy, clazzTypeHint, Object.class);
getTypeHierarchy(typeHierarchy, clazzTypeHint, Object.class);
}

List<Field> fields = getAllDeclaredFields(clazz);
Expand All @@ -1049,12 +1046,15 @@ private <X> TypeInformation<X> analyzePojo(Class<X> clazz, ArrayList<Type> typeH
return null;
}
try {
typeHierarchy.add(fieldType);
pojoFields.add(new PojoField(field, createTypeInfoWithTypeHierarchy(typeHierarchy, fieldType, null, null) ));
ArrayList<Type> fieldTypeHierarchy = new ArrayList<Type>(typeHierarchy);
fieldTypeHierarchy.add(fieldType);
pojoFields.add(new PojoField(field, createTypeInfoWithTypeHierarchy(fieldTypeHierarchy, fieldType, null, null) ));
} catch (InvalidTypesException e) {
//pojoFields.add(new PojoField(field, new GenericTypeInfo( Object.class ))); // we need kryo to properly serialize this
throw new InvalidTypesException("Flink is currently unable to serialize this type: "+fieldType+""
+ "\nThe system is internally using the Avro serializer which is not able to handle that type.", e);
Class<?> genericClass = Object.class;
if(isClassType(fieldType)) {
genericClass = typeToClass(fieldType);
}
pojoFields.add(new PojoField(field, new GenericTypeInfo( genericClass )));
}
}

Expand Down
Loading