From e75fa28b8c213973fbbf40552c327770264716a6 Mon Sep 17 00:00:00 2001 From: twalthr Date: Wed, 10 Dec 2014 22:02:07 +0100 Subject: [PATCH 1/5] Fix invalid type hierarchy creation by Pojo logic --- .../api/java/typeutils/TypeExtractor.java | 87 +++++++++---------- 1 file changed, 43 insertions(+), 44 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 67b2a51303f5f..3bceac5dcd8d9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -232,29 +232,6 @@ private TypeInformation privateCreateTypeInfo(Type returnTy // get info from hierarchy return (TypeInformation) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type); } - - - /** - * @param curT : start type - * @return Type The immediate child of the top class - */ - private Type recursivelyGetTypeHierarchy(ArrayList typeHierarchy, Type curT, Class stopAtClass) { - while (!(curT instanceof ParameterizedType && ((Class) ((ParameterizedType) curT).getRawType()).equals( - stopAtClass)) - && !(curT instanceof Class && ((Class) curT).equals(stopAtClass))) { - typeHierarchy.add(curT); - - // parameterized type - if (curT instanceof ParameterizedType) { - curT = ((Class) ((ParameterizedType) curT).getRawType()).getGenericSuperclass(); - } - // class - else { - curT = ((Class) curT).getGenericSuperclass(); - } - } - return curT; - } @SuppressWarnings({ "unchecked", "rawtypes" }) private TypeInformation createTypeInfoWithTypeHierarchy(ArrayList typeHierarchy, Type t, @@ -330,7 +307,7 @@ private TypeInformation createTypeInfoWithTypeHierarchy(Arr int fieldCount = countFieldsInClass(tAsClass); if(fieldCount != tupleSubTypes.length) { // the class is not a real tuple because it contains additional fields. treat as a pojo - return (TypeInformation) analyzePojo(tAsClass, new ArrayList(), null); // the typeHierarchy here should be sufficient, even though it stops at the Tuple.class. + return (TypeInformation) analyzePojo(tAsClass, new ArrayList(typeHierarchy), null); // the typeHierarchy here should be sufficient, even though it stops at the Tuple.class. } return new TupleTypeInfo(tAsClass, tupleSubTypes); @@ -396,23 +373,11 @@ else if (t instanceof ParameterizedType) { //TODO } // no tuple, no TypeVariable, no generic type else if (t instanceof Class) { - return privateGetForClass((Class) t, new ArrayList()); + return privateGetForClass((Class) t, typeHierarchy); } throw new InvalidTypesException("Type Information could not be created."); } - - private int countFieldsInClass(Class clazz) { - int fieldCount = 0; - for(Field field : clazz.getFields()) { // get all fields - if( !Modifier.isStatic(field.getModifiers()) && - !Modifier.isTransient(field.getModifiers()) - ) { - fieldCount++; - } - } - return fieldCount; - } private TypeInformation createTypeInfoFromInputs(TypeVariable returnTypeVar, ArrayList returnTypeHierarchy, TypeInformation in1TypeInfo, TypeInformation in2TypeInfo) { @@ -427,6 +392,11 @@ private TypeInformation createTypeInfoFromInputs(TypeVariable r returnTypeVar = (TypeVariable) matReturnTypeVar; } + // no input information exists + if (in1TypeInfo == null && in2TypeInfo == null) { + return null; + } + // create a new type hierarchy for the input ArrayList inputTypeHierarchy = new ArrayList(); // copy the function part of the type hierarchy @@ -753,6 +723,34 @@ else if (typeInfo instanceof GenericTypeInfo) { // Utility methods // -------------------------------------------------------------------------------------------- + /** + * @param curT : start type + * @return Type The immediate child of the top class + */ + private Type getTypeHierarchy(ArrayList typeHierarchy, Type curT, Class stopAtClass) { + // skip first one + if (typeHierarchy.size() > 0 && typeHierarchy.get(0) == curT && isClassType(curT)) { + curT = typeToClass(curT).getGenericSuperclass(); + } + while (!(isClassType(curT) && typeToClass(curT).equals(stopAtClass))) { + typeHierarchy.add(curT); + curT = typeToClass(curT).getGenericSuperclass(); + } + return curT; + } + + private int countFieldsInClass(Class clazz) { + int fieldCount = 0; + for(Field field : clazz.getFields()) { // get all fields + if( !Modifier.isStatic(field.getModifiers()) && + !Modifier.isTransient(field.getModifiers()) + ) { + fieldCount++; + } + } + return fieldCount; + } + private static Type removeGenericWrapper(Type t) { if(t instanceof ParameterizedType && (Collector.class.isAssignableFrom(typeToClass(t)) @@ -954,7 +952,7 @@ private TypeInformation privateGetForClass(Class clazz, ArrayList(clazz); } try { - TypeInformation pojoType = analyzePojo(clazz, typeHierarchy, clazzTypeHint); + TypeInformation pojoType = analyzePojo(clazz, new ArrayList(typeHierarchy), clazzTypeHint); if (pojoType != null) { return pojoType; } @@ -1032,12 +1030,12 @@ private boolean isValidPojoField(Field f, Class clazz, ArrayList typeHi } private TypeInformation analyzePojo(Class clazz, ArrayList typeHierarchy, ParameterizedType clazzTypeHint) { - // try to create Type hierarchy, if the incoming one is empty. - if(typeHierarchy.size() == 0) { - recursivelyGetTypeHierarchy(typeHierarchy, clazz, Object.class); + // try to create Type hierarchy, if the incoming only contains the most bottom one or none. + if(typeHierarchy.size() <= 1) { + getTypeHierarchy(typeHierarchy, clazz, Object.class); } if(clazzTypeHint != null) { - recursivelyGetTypeHierarchy(typeHierarchy, clazzTypeHint, Object.class); + getTypeHierarchy(typeHierarchy, clazzTypeHint, Object.class); } List fields = getAllDeclaredFields(clazz); @@ -1049,8 +1047,9 @@ private TypeInformation analyzePojo(Class clazz, ArrayList typeH return null; } try { - typeHierarchy.add(fieldType); - pojoFields.add(new PojoField(field, createTypeInfoWithTypeHierarchy(typeHierarchy, fieldType, null, null) )); + ArrayList fieldTypeHierarchy = new ArrayList(typeHierarchy); + fieldTypeHierarchy.add(fieldType); + pojoFields.add(new PojoField(field, createTypeInfoWithTypeHierarchy(fieldTypeHierarchy, fieldType, null, null) )); } catch (InvalidTypesException e) { //pojoFields.add(new PojoField(field, new GenericTypeInfo( Object.class ))); // we need kryo to properly serialize this throw new InvalidTypesException("Flink is currently unable to serialize this type: "+fieldType+"" From baac4e837134832c364e8cc390fdef81a7c96b7b Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Tue, 16 Dec 2014 11:30:52 +0100 Subject: [PATCH 2/5] [FLINK-610] Replace Avro by Kryo as the GenericType serializer The performance of data-intensive jobs using Kryo is probably going to be slow. --- .../common/typeutils/SerializerTestBase.java | 5 +- .../typeutils/SerializerTestInstance.java | 2 - flink-java/pom.xml | 6 + .../api/java/typeutils/GenericTypeInfo.java | 19 +- .../api/java/typeutils/TypeExtractor.java | 17 +- .../typeutils/runtime/KryoSerializer.java | 52 +++--- .../extractor/PojoTypeExtractionTest.java | 166 ++++++++++-------- .../AbstractGenericTypeSerializerTest.java | 2 +- .../KryoGenericTypeComparatorTest.java | 2 +- .../KryoGenericTypeSerializerTest.java | 44 ++++- flink-scala/pom.xml | 8 + .../KryoGenericTypeSerializerTest.scala | 128 ++++++++++++++ .../javaApiOperators/GroupReduceITCase.java | 71 +++++++- .../util/CollectionDataSets.java | 77 ++++++++ 14 files changed, 475 insertions(+), 124 deletions(-) create mode 100644 flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java index d50928400441a..5122af975dd4e 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java @@ -59,7 +59,10 @@ public abstract class SerializerTestBase { public void testInstantiate() { try { TypeSerializer serializer = getSerializer(); - + if(serializer.getClass().getName().endsWith("KryoSerializer")) { + // the kryo serializer will return null. We ignore this test for Kryo. + return; + } T instance = serializer.createInstance(); assertNotNull("The created instance must not be null.", instance); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java index c48e8796a1560..7f6599553372a 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java @@ -18,8 +18,6 @@ package org.apache.flink.api.common.typeutils; -import org.apache.flink.api.common.typeutils.TypeSerializer; - public class SerializerTestInstance extends SerializerTestBase { diff --git a/flink-java/pom.xml b/flink-java/pom.xml index 22826d895b117..14cb46926c8d2 100644 --- a/flink-java/pom.xml +++ b/flink-java/pom.xml @@ -58,6 +58,12 @@ under the License. asm + + com.twitter + chill_2.10 + 0.5.1 + + com.google.guava diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java index 8a1406bd55470..5bc6cb949520f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java @@ -22,26 +22,16 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.AvroSerializer; import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator; - -import java.util.Collection; +import org.apache.flink.api.java.typeutils.runtime.KryoSerializer; public class GenericTypeInfo extends TypeInformation implements AtomicType { private final Class typeClass; - private final static Class[] unsupportedByAvro = new Class[] {Collection.class}; - + public GenericTypeInfo(Class typeClass) { this.typeClass = typeClass; - for (Class unsupported: unsupportedByAvro) { - if(unsupported.isAssignableFrom(typeClass)) { - throw new RuntimeException("The type '"+typeClass+"' is currently not supported " + - "by the Avro Serializer that Flink is using for serializing " + - "arbitrary objects"); - } - } } @Override @@ -76,10 +66,7 @@ public boolean isKeyType() { @Override public TypeSerializer createSerializer() { - // NOTE: The TypeExtractor / pojo logic is assuming that we are using a Avro Serializer here - // in particular classes implementing GenericContainer are handled as GenericTypeInfos - // (this will probably not work with Kryo) - return new AvroSerializer(this.typeClass); + return new KryoSerializer(this.typeClass); } @SuppressWarnings("unchecked") diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 3bceac5dcd8d9..e52e2af1b6abc 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Set; -import org.apache.avro.generic.GenericContainer; import org.apache.commons.lang3.Validate; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.CrossFunction; @@ -947,16 +946,16 @@ private TypeInformation privateGetForClass(Class clazz, ArrayList(clazz); } - if(GenericContainer.class.isAssignableFrom(clazz)) { - // this is a type generated by Avro. GenericTypeInfo is able to handle this case because its using Avro. - return new GenericTypeInfo(clazz); - } + try { TypeInformation pojoType = analyzePojo(clazz, new ArrayList(typeHierarchy), clazzTypeHint); if (pojoType != null) { return pojoType; } } catch (InvalidTypesException e) { + if(LOG.isDebugEnabled()) { + LOG.debug("Unable to handle type "+clazz+" as POJO. Message: "+e.getMessage(), e); + } // ignore and create generic type info } @@ -1051,9 +1050,11 @@ private TypeInformation analyzePojo(Class clazz, ArrayList typeH fieldTypeHierarchy.add(fieldType); pojoFields.add(new PojoField(field, createTypeInfoWithTypeHierarchy(fieldTypeHierarchy, fieldType, null, null) )); } catch (InvalidTypesException e) { - //pojoFields.add(new PojoField(field, new GenericTypeInfo( Object.class ))); // we need kryo to properly serialize this - throw new InvalidTypesException("Flink is currently unable to serialize this type: "+fieldType+"" - + "\nThe system is internally using the Avro serializer which is not able to handle that type.", e); + Class genericClass = Object.class; + if(isClassType(fieldType)) { + genericClass = typeToClass(fieldType); + } + pojoFields.add(new PojoField(field, new GenericTypeInfo( genericClass ))); } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java index 7a98abf6e970e..3eda9caf56580 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java @@ -22,17 +22,19 @@ import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; +import com.twitter.chill.ScalaKryoInstantiator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; public class KryoSerializer extends TypeSerializer { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; private final Class type; - private final Class typeToInstantiate; private transient Kryo kryo; private transient T copyInstance; @@ -44,21 +46,13 @@ public class KryoSerializer extends TypeSerializer { private transient Output output; public KryoSerializer(Class type){ - this(type,type); - } - - public KryoSerializer(Class type, Class typeToInstantiate){ - if(type == null || typeToInstantiate == null){ + if(type == null){ throw new NullPointerException("Type class cannot be null."); } - this.type = type; - this.typeToInstantiate = typeToInstantiate; - kryo = new Kryo(); - kryo.setAsmEnabled(true); - kryo.register(type); } + @Override public boolean isImmutableType() { return false; @@ -71,20 +65,32 @@ public boolean isStateful() { @Override public T createInstance() { - checkKryoInitialized(); - return kryo.newInstance(typeToInstantiate); + return null; } @Override public T copy(T from) { + if(from == null) { + return null; + } checkKryoInitialized(); - return kryo.copy(from); + // We have to do a serialization round because not all Kryo (/Chill) serializers have the copy() method implemented. + ByteArrayOutputStream baout = new ByteArrayOutputStream(); + Output output = new Output(baout); + + kryo.writeObject(output, from); + + output.flush(); + + ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray()); + Input input = new Input(bain); + + return (T)kryo.readObject(input, from.getClass()); } @Override public T copy(T from, T reuse) { - checkKryoInitialized(); - return kryo.copy(from); + return copy(from); } @Override @@ -101,7 +107,7 @@ public void serialize(T record, DataOutputView target) throws IOException { previousOut = target; } - kryo.writeObject(output, record); + kryo.writeClassAndObject(output, record); output.flush(); } @@ -113,7 +119,7 @@ public T deserialize(DataInputView source) throws IOException { input = new NoFetchingInput(inputStream); previousIn = source; } - return kryo.readObject(input, typeToInstantiate); + return (T) kryo.readClassAndObject(input); } @Override @@ -136,14 +142,14 @@ public void copy(DataInputView source, DataOutputView target) throws IOException @Override public int hashCode() { - return type.hashCode() + 31 * typeToInstantiate.hashCode(); + return type.hashCode(); } @Override public boolean equals(Object obj) { if (obj != null && obj instanceof KryoSerializer) { KryoSerializer other = (KryoSerializer) obj; - return other.type == this.type && other.typeToInstantiate == this.typeToInstantiate; + return other.type == this.type; } else { return false; } @@ -153,8 +159,8 @@ public boolean equals(Object obj) { private void checkKryoInitialized() { if (this.kryo == null) { - this.kryo = new Kryo(); - this.kryo.setAsmEnabled(true); + this.kryo = new ScalaKryoInstantiator().newKryo(); + this.kryo.setRegistrationRequired(false); this.kryo.register(type); } } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java index e5ac1cac3c22f..893e63c321149 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java @@ -45,7 +45,7 @@ /** * Pojo Type tests - * + * * A Pojo is a bean-style class with getters, setters and empty ctor * OR a class with all fields public (or for every private field, there has to be a public getter/setter) * everything else is a generic type (that can't be used for field selection) @@ -55,12 +55,12 @@ public class PojoTypeExtractionTest { public static class HasDuplicateField extends WC { private int count; // duplicate } - + @Test(expected=RuntimeException.class) public void testDuplicateFieldException() { TypeExtractor.createTypeInfo(HasDuplicateField.class); } - + // test with correct pojo types public static class WC { // is a pojo public ComplexNestedClass complex; // is a pojo @@ -84,6 +84,7 @@ public static class ComplexNestedClass { // pojo public Tuple3 word; //Tuple Type with three basic types public Object nothing; // generic type public MyWritable hadoopCitizen; // writableType + public List collection; } // all public test @@ -92,7 +93,7 @@ public static class AllPublic extends ComplexNestedClass { public HashMultiset fancyIds; // generic type public String[] fancyArray; // generic type } - + public static class ParentSettingGenerics extends PojoWithGenerics { public String field3; } @@ -101,16 +102,16 @@ public static class PojoWithGenerics { public T1 field1; public T2 field2; } - + public static class ComplexHierarchyTop extends ComplexHierarchy> {} public static class ComplexHierarchy extends PojoWithGenerics {} - + // extends from Tuple and adds a field public static class FromTuple extends Tuple3 { private static final long serialVersionUID = 1L; public int special; } - + public static class IncorrectPojo { private int isPrivate; public int getIsPrivate() { @@ -118,7 +119,7 @@ public int getIsPrivate() { } // setter is missing (intentional) } - + // correct pojo public static class BeanStylePojo { public String abc; @@ -136,7 +137,7 @@ public WrongCtorPojo(int a) { this.a = a; } } - + // in this test, the location of the getters and setters is mixed across the type hierarchy. public static class TypedPojoGetterSetterCheck extends GenericPojoGetterSetterCheck { public void setPackageProtected(String in) { @@ -149,50 +150,64 @@ public T getPackageProtected() { return packageProtected; } } - + @Test public void testIncorrectPojos() { TypeInformation typeForClass = TypeExtractor.createTypeInfo(IncorrectPojo.class); Assert.assertTrue(typeForClass instanceof GenericTypeInfo); - + typeForClass = TypeExtractor.createTypeInfo(WrongCtorPojo.class); Assert.assertTrue(typeForClass instanceof GenericTypeInfo); } - + @Test public void testCorrectPojos() { TypeInformation typeForClass = TypeExtractor.createTypeInfo(BeanStylePojo.class); Assert.assertTrue(typeForClass instanceof PojoTypeInfo); - + typeForClass = TypeExtractor.createTypeInfo(TypedPojoGetterSetterCheck.class); Assert.assertTrue(typeForClass instanceof PojoTypeInfo); } - + @Test public void testPojoWC() { TypeInformation typeForClass = TypeExtractor.createTypeInfo(WC.class); checkWCPojoAsserts(typeForClass); - + WC t = new WC(); t.complex = new ComplexNestedClass(); TypeInformation typeForObject = TypeExtractor.getForObject(t); checkWCPojoAsserts(typeForObject); } - + private void checkWCPojoAsserts(TypeInformation typeInfo) { Assert.assertFalse(typeInfo.isBasicType()); Assert.assertFalse(typeInfo.isTupleType()); - Assert.assertEquals(9, typeInfo.getTotalFields()); + Assert.assertEquals(10, typeInfo.getTotalFields()); Assert.assertTrue(typeInfo instanceof PojoTypeInfo); PojoTypeInfo pojoType = (PojoTypeInfo) typeInfo; - + List ffd = new ArrayList(); - String[] fields = {"count","complex.date", "complex.hadoopCitizen", "complex.nothing", - "complex.someFloat", "complex.someNumber", "complex.word.f0", - "complex.word.f1", "complex.word.f2"}; - int[] positions = {8,0,1,2, - 3,4,5, - 6,7}; + String[] fields = {"count", + "complex.date", + "complex.hadoopCitizen", + "complex.collection", + "complex.nothing", + "complex.someFloat", + "complex.someNumber", + "complex.word.f0", + "complex.word.f1", + "complex.word.f2"}; + int[] positions = {9, + 1, + 2, + 0, + 3, + 4, + 5, + 6, + 7, + 8}; Assert.assertEquals(fields.length, positions.length); for(int i = 0; i < fields.length; i++) { pojoType.getKey(fields[i], 0, ffd); @@ -200,86 +215,93 @@ private void checkWCPojoAsserts(TypeInformation typeInfo) { Assert.assertEquals("position of field "+fields[i]+" wrong", positions[i], ffd.get(0).getPosition()); ffd.clear(); } - + pojoType.getKey("complex.word.*", 0, ffd); Assert.assertEquals(3, ffd.size()); // check if it returns 5,6,7 for(FlatFieldDescriptor ffdE : ffd) { final int pos = ffdE.getPosition(); - Assert.assertTrue(pos <= 7 ); - Assert.assertTrue(5 <= pos ); - if(pos == 5) { - Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); - } + Assert.assertTrue(pos <= 8 ); + Assert.assertTrue(6 <= pos ); if(pos == 6) { Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); } if(pos == 7) { + Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); + } + if(pos == 8) { Assert.assertEquals(String.class, ffdE.getType().getTypeClass()); } } ffd.clear(); - + // scala style full tuple selection for pojos pojoType.getKey("complex.word._", 0, ffd); Assert.assertEquals(3, ffd.size()); ffd.clear(); - + pojoType.getKey("complex.*", 0, ffd); - Assert.assertEquals(8, ffd.size()); + Assert.assertEquals(9, ffd.size()); // check if it returns 0-7 for(FlatFieldDescriptor ffdE : ffd) { final int pos = ffdE.getPosition(); - Assert.assertTrue(ffdE.getPosition() <= 7 ); + Assert.assertTrue(ffdE.getPosition() <= 8 ); Assert.assertTrue(0 <= ffdE.getPosition() ); + if(pos == 0) { - Assert.assertEquals(Date.class, ffdE.getType().getTypeClass()); + Assert.assertEquals(List.class, ffdE.getType().getTypeClass()); } if(pos == 1) { - Assert.assertEquals(MyWritable.class, ffdE.getType().getTypeClass()); + Assert.assertEquals(Date.class, ffdE.getType().getTypeClass()); } if(pos == 2) { - Assert.assertEquals(Object.class, ffdE.getType().getTypeClass()); + Assert.assertEquals(MyWritable.class, ffdE.getType().getTypeClass()); } if(pos == 3) { - Assert.assertEquals(Float.class, ffdE.getType().getTypeClass()); + Assert.assertEquals(Object.class, ffdE.getType().getTypeClass()); } if(pos == 4) { - Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass()); + Assert.assertEquals(Float.class, ffdE.getType().getTypeClass()); } if(pos == 5) { - Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); + Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass()); } if(pos == 6) { Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); } if(pos == 7) { + Assert.assertEquals(Long.class, ffdE.getType().getTypeClass()); + } + if(pos == 8) { Assert.assertEquals(String.class, ffdE.getType().getTypeClass()); } + if(pos == 9) { + Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass()); + } } ffd.clear(); - + pojoType.getKey("*", 0, ffd); - Assert.assertEquals(9, ffd.size()); + Assert.assertEquals(10, ffd.size()); // check if it returns 0-8 for(FlatFieldDescriptor ffdE : ffd) { - Assert.assertTrue(ffdE.getPosition() <= 8 ); + Assert.assertTrue(ffdE.getPosition() <= 9 ); Assert.assertTrue(0 <= ffdE.getPosition() ); - if(ffdE.getPosition() == 8) { + if(ffdE.getPosition() == 9) { Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass()); } } ffd.clear(); - + TypeInformation typeComplexNested = pojoType.getTypeAt(0); // ComplexNestedClass complex Assert.assertTrue(typeComplexNested instanceof PojoTypeInfo); - - Assert.assertEquals(6, typeComplexNested.getArity()); - Assert.assertEquals(8, typeComplexNested.getTotalFields()); + + Assert.assertEquals(7, typeComplexNested.getArity()); + Assert.assertEquals(9, typeComplexNested.getTotalFields()); PojoTypeInfo pojoTypeComplexNested = (PojoTypeInfo) typeComplexNested; - + boolean dateSeen = false, intSeen = false, floatSeen = false, - tupleSeen = false, objectSeen = false, writableSeen = false; + tupleSeen = false, objectSeen = false, writableSeen = false, collectionSeen = false; for(int i = 0; i < pojoTypeComplexNested.getArity(); i++) { PojoField field = pojoTypeComplexNested.getPojoFieldAt(i); String name = field.field.getName(); @@ -330,6 +352,13 @@ private void checkWCPojoAsserts(TypeInformation typeInfo) { writableSeen = true; Assert.assertEquals(new WritableTypeInfo(MyWritable.class), field.type); Assert.assertEquals(MyWritable.class, field.type.getTypeClass()); + } else if(name.equals("collection")) { + if(collectionSeen) { + Assert.fail("already seen"); + } + collectionSeen = true; + Assert.assertEquals(new GenericTypeInfo(List.class), field.type); + } else { Assert.fail("field "+field+" is not expected"); } @@ -340,29 +369,29 @@ private void checkWCPojoAsserts(TypeInformation typeInfo) { Assert.assertTrue("Field was not present", tupleSeen); Assert.assertTrue("Field was not present", objectSeen); Assert.assertTrue("Field was not present", writableSeen); - + Assert.assertTrue("Field was not present", collectionSeen); + TypeInformation typeAtOne = pojoType.getTypeAt(1); // int count Assert.assertTrue(typeAtOne instanceof BasicTypeInfo); - + Assert.assertEquals(typeInfo.getTypeClass(), WC.class); Assert.assertEquals(typeInfo.getArity(), 2); } // Kryo is required for this, so disable for now. - @Ignore @Test public void testPojoAllPublic() { TypeInformation typeForClass = TypeExtractor.createTypeInfo(AllPublic.class); checkAllPublicAsserts(typeForClass); - + TypeInformation typeForObject = TypeExtractor.getForObject(new AllPublic() ); checkAllPublicAsserts(typeForObject); } - + private void checkAllPublicAsserts(TypeInformation typeInformation) { Assert.assertTrue(typeInformation instanceof PojoTypeInfo); - Assert.assertEquals(9, typeInformation.getArity()); - Assert.assertEquals(11, typeInformation.getTotalFields()); + Assert.assertEquals(10, typeInformation.getArity()); + Assert.assertEquals(12, typeInformation.getTotalFields()); // check if the three additional fields are identified correctly boolean arrayListSeen = false, multisetSeen = false, strArraySeen = false; PojoTypeInfo pojoTypeForClass = (PojoTypeInfo) typeInformation; @@ -390,9 +419,9 @@ private void checkAllPublicAsserts(TypeInformation typeInformation) { strArraySeen = true; Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.type); Assert.assertEquals(String[].class, field.type.getTypeClass()); - } else if(Arrays.asList("date", "someNumber", "someFloat", "word", "nothing", "hadoopCitizen").contains(name)) { + } else if(Arrays.asList("date", "someNumber", "someFloat", "word", "nothing", "hadoopCitizen", "collection").contains(name)) { // ignore these, they are inherited from the ComplexNestedClass - } + } else { Assert.fail("field "+field+" is not expected"); } @@ -401,18 +430,18 @@ private void checkAllPublicAsserts(TypeInformation typeInformation) { Assert.assertTrue("Field was not present", multisetSeen); Assert.assertTrue("Field was not present", strArraySeen); } - + @Test public void testPojoExtendingTuple() { TypeInformation typeForClass = TypeExtractor.createTypeInfo(FromTuple.class); checkFromTuplePojo(typeForClass); - + FromTuple ft = new FromTuple(); ft.f0 = ""; ft.f1 = ""; ft.f2 = 0L; TypeInformation typeForObject = TypeExtractor.getForObject(ft); checkFromTuplePojo(typeForObject); } - + private void checkFromTuplePojo(TypeInformation typeInformation) { Assert.assertTrue(typeInformation instanceof PojoTypeInfo); Assert.assertEquals(4, typeInformation.getTotalFields()); @@ -431,7 +460,7 @@ private void checkFromTuplePojo(TypeInformation typeInformation) { } } } - + @Test public void testPojoWithGenerics() { TypeInformation typeForClass = TypeExtractor.createTypeInfo(ParentSettingGenerics.class); @@ -453,13 +482,12 @@ public void testPojoWithGenerics() { } } } - + /** * Test if the TypeExtractor is accepting untyped generics, * making them GenericTypes */ @Test - @Ignore // kryo needed. public void testPojoWithGenericsSomeFieldsGeneric() { TypeInformation typeForClass = TypeExtractor.createTypeInfo(PojoWithGenerics.class); Assert.assertTrue(typeForClass instanceof PojoTypeInfo); @@ -478,8 +506,8 @@ public void testPojoWithGenericsSomeFieldsGeneric() { } } } - - + + @Test public void testPojoWithComplexHierarchy() { TypeInformation typeForClass = TypeExtractor.createTypeInfo(ComplexHierarchyTop.class); @@ -554,10 +582,10 @@ public VertexTyped(Long l, Double d) { public VertexTyped() { } } - + @Test public void testGetterSetterWithVertex() { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet set = env.fromElements(new VertexTyped(0L, 3.0), new VertexTyped(1L, 1.0)); } -} +} \ No newline at end of file diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java index cacc05bbe59b3..d60410507bcf7 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java @@ -143,7 +143,7 @@ public void testNestedInterfaces() { } } - private final void runTests(T... instances) { + protected final void runTests(T... instances) { if (instances == null || instances.length == 0) { throw new IllegalArgumentException(); } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java index c6ef4db01a02a..37dba4e075138 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java @@ -25,4 +25,4 @@ public class KryoGenericTypeComparatorTest extends AbstractGenericTypeComparator protected TypeSerializer createSerializer(Class type) { return new KryoSerializer(type); } -} +} \ No newline at end of file diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java index f6fc9870d7cc1..3c22b15cf9742 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java @@ -18,11 +18,53 @@ package org.apache.flink.api.java.typeutils.runtime; + import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest { + + @Test + public void testJavaList(){ + Collection a = new ArrayList(); + + fillCollection(a); + + runTests(a); + } + + @Test + public void testJavaSet(){ + Collection b = new HashSet(); + + fillCollection(b); + + runTests(b); + } + + @Test + public void testJavaDequeue(){ + Collection c = new LinkedList(); + + fillCollection(c); + + runTests(c); + } + + private void fillCollection(Collection coll){ + coll.add(42); + coll.add(1337); + coll.add(49); + coll.add(1); + } + @Override protected TypeSerializer createSerializer(Class type) { return new KryoSerializer(type); } -} +} \ No newline at end of file diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml index ae16876a6d2b9..edeb7f29bff18 100644 --- a/flink-scala/pom.xml +++ b/flink-scala/pom.xml @@ -95,6 +95,14 @@ under the License. test + + org.apache.flink + flink-core + ${project.version} + test + test-jar + + diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala new file mode 100644 index 0000000000000..ddbe32235c10f --- /dev/null +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.runtime + +import org.apache.flink.api.common.typeutils.SerializerTestInstance +import org.apache.flink.api.java.typeutils.GenericTypeInfo +import org.junit.Test + +import scala.reflect._ + +class KryoGenericTypeSerializerTest { + + @Test + def testScalaListSerialization: Unit = { + val a = List(42,1,49,1337) + + runTests(a) + } + + @Test + def testScalaMutablelistSerialization: Unit = { + val a = scala.collection.mutable.ListBuffer(42,1,49,1337) + + runTests(a) + } + + @Test + def testScalaMapSerialization: Unit = { + val a = Map(("1" -> 1), ("2" -> 2), ("42" -> 42), ("1337" -> 1337)) + + runTests(a) + } + + @Test + def testMutableMapSerialization: Unit ={ + val a = scala.collection.mutable.Map((1 -> "1"), (2 -> "2"), (3 -> "3")) + + runTests(a) + } + + @Test + def testScalaListComplexTypeSerialization: Unit = { + val a = ComplexType("1234", 42, List(1,2,3,4)) + val b = ComplexType("4321", 24, List(4,3,2,1)) + val c = ComplexType("1337", 1, List(1)) + val list = List(a, b, c) + + runTests(list) + } + + @Test + def testHeterogenousScalaList: Unit = { + val a = new DerivedType("foo", "bar") + val b = new BaseType("foobar") + val c = new DerivedType2("bar", "foo") + val list = List(a,b,c) + + runTests(list) + } + + case class ComplexType(id: String, number: Int, values: List[Int]){ + override def equals(obj: Any): Boolean ={ + if(obj != null && obj.isInstanceOf[ComplexType]){ + val complexType = obj.asInstanceOf[ComplexType] + id.equals(complexType.id) && number.equals(complexType.number) && values.equals( + complexType.values) + }else{ + false + } + } + } + + class BaseType(val name: String){ + override def equals(obj: Any): Boolean = { + if(obj != null && obj.isInstanceOf[BaseType]){ + obj.asInstanceOf[BaseType].name.equals(name) + }else{ + false + } + } + } + + class DerivedType(name: String, val sub: String) extends BaseType(name){ + override def equals(obj: Any): Boolean = { + if(obj != null && obj.isInstanceOf[DerivedType]){ + super.equals(obj) && obj.asInstanceOf[DerivedType].sub.equals(sub) + }else{ + false + } + } + } + + class DerivedType2(name: String, val sub: String) extends BaseType(name){ + override def equals(obj: Any): Boolean = { + if(obj != null && obj.isInstanceOf[DerivedType2]){ + super.equals(obj) && obj.asInstanceOf[DerivedType2].sub.equals(sub) + }else{ + false + } + } + } + + def runTests[T : ClassTag](objects: T *): Unit ={ + val clsTag = classTag[T] + val typeInfo = new GenericTypeInfo[T](clsTag.runtimeClass.asInstanceOf[Class[T]]) + val serializer = typeInfo.createSerializer() + val typeClass = typeInfo.getTypeClass + + val instance = new SerializerTestInstance[T](serializer, typeClass, -1, objects: _*) + + instance.testAll() + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java index 60a0d891783be..8994ba94ae9fb 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java @@ -53,7 +53,7 @@ @RunWith(Parameterized.class) public class GroupReduceITCase extends JavaProgramTestBase { - private static int NUM_PROGRAMS = 26; + private static int NUM_PROGRAMS = 28; private int curProgId = config.getInteger("ProgramId", -1); private String resultPath; @@ -763,7 +763,74 @@ public void reduce( // return expected result return "b\nccc\nee\n"; } - + + case 27: { + /* + * Test Java collections within pojos ( == test kryo) + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(1); + + DataSet ds = CollectionDataSets.getPojoWithCollection(env); + // f0.f0 is first integer + DataSet reduceDs = ds.groupBy("key") + .reduceGroup(new GroupReduceFunction() { + @Override + public void reduce( + Iterable values, + Collector out) throws Exception { + StringBuilder concat = new StringBuilder(); + concat.append("call"); + for(CollectionDataSets.PojoWithCollection value : values) { + concat.append("For key "+value.key+" we got: "); + for(CollectionDataSets.Pojo1 p :value.pojos) { + concat.append("pojo.a="+p.a); + } + } + out.collect(concat.toString()); + } + }); + reduceDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "callFor key 0 we got: pojo.a=apojo.a=bFor key 0 we got: pojo.a=a2pojo.a=b2\n"; + } + + case 28: { + /* + * Group by generic type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(1); + + DataSet ds = CollectionDataSets.getPojoWithCollection(env); + // f0.f0 is first integer + DataSet reduceDs = ds.groupBy("bigInt") + .reduceGroup(new GroupReduceFunction() { + @Override + public void reduce( + Iterable values, + Collector out) throws Exception { + StringBuilder concat = new StringBuilder(); + concat.append("call"); + for(CollectionDataSets.PojoWithCollection value : values) { + concat.append("\nFor key "+value.bigInt+" we got:\n"+value); + } + out.collect(concat.toString()); + } + }); + reduceDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "call\n" + + "For key 92233720368547758070 we got:\n" + + "PojoWithCollection{pojos.size()=2, key=0, sqlDate=2033-05-18, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=10, mixed=[{someKey=1}, /this/is/wrong, uhlala]}\n" + + "For key 92233720368547758070 we got:\n" + + "PojoWithCollection{pojos.size()=2, key=0, sqlDate=1976-05-03, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=31104000, mixed=null}\n"; + } + default: { throw new IllegalArgumentException("Invalid program id"); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java index 1f812d995aa04..895e996d98f81 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java @@ -18,11 +18,17 @@ package org.apache.flink.test.javaApiOperators.util; +import java.io.File; import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.HashMap; +import java.util.Hashtable; import java.util.List; +import java.util.Map; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.tuple.Tuple2; @@ -33,6 +39,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.hadoop.io.IntWritable; +import scala.math.BigInt; /** * ####################################################################################################### @@ -496,6 +503,13 @@ public static DataSet> getTupleContainingPojo public static class Pojo1 { public String a; public String b; + + public Pojo1() {} + + public Pojo1(String a, String b) { + this.a = a; + this.b = b; + } } public static class Pojo2 { @@ -561,5 +575,68 @@ public static DataSet getPojoWithDateAndEnum(ExecutionEnvir return env.fromCollection(data); } + public static class PojoWithCollection { + public List pojos; + public int key; + public java.sql.Date sqlDate; + public BigInteger bigInt; + public BigDecimal bigDecimalKeepItNull; + public BigInt scalaBigInt; + public List mixed; + + @Override + public String toString() { + return "PojoWithCollection{" + + "pojos.size()=" + pojos.size() + + ", key=" + key + + ", sqlDate=" + sqlDate + + ", bigInt=" + bigInt + + ", bigDecimalKeepItNull=" + bigDecimalKeepItNull + + ", scalaBigInt=" + scalaBigInt + + ", mixed=" + mixed + + '}'; + } + } + + public static DataSet getPojoWithCollection(ExecutionEnvironment env) { + List data = new ArrayList(); + + List pojosList1 = new ArrayList(); + pojosList1.add(new Pojo1("a", "aa")); + pojosList1.add(new Pojo1("b", "bb")); + + List pojosList2 = new ArrayList(); + pojosList2.add(new Pojo1("a2", "aa2")); + pojosList2.add(new Pojo1("b2", "bb2")); + + PojoWithCollection pwc1 = new PojoWithCollection(); + pwc1.pojos = pojosList1; + pwc1.key = 0; + pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN); + pwc1.scalaBigInt = BigInt.int2bigInt(10); + pwc1.bigDecimalKeepItNull = null; + pwc1.sqlDate = new java.sql.Date(2000000000000L); // 2033 ;) + pwc1.mixed = new ArrayList(); + Map map = new HashMap(); + map.put("someKey", 1); // map.put("anotherKey", 2); map.put("third", 3); + pwc1.mixed.add(map); + pwc1.mixed.add(new File("/this/is/wrong")); + pwc1.mixed.add("uhlala"); + + PojoWithCollection pwc2 = new PojoWithCollection(); + pwc2.pojos = pojosList2; + pwc2.key = 0; + pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN); + pwc2.scalaBigInt = BigInt.int2bigInt(31104000); + pwc2.bigDecimalKeepItNull = null; + pwc2.sqlDate = new java.sql.Date(200000000000L); // 1976 + + + data.add(pwc1); + data.add(pwc2); + + return env.fromCollection(data); + } + } From f80fc7d5d8701df8071b4b5f61431c25016892e3 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Tue, 16 Dec 2014 22:00:50 +0100 Subject: [PATCH 3/5] [FLINK-1333] Fixed getter/setter recognition for POJOs --- .../api/java/typeutils/TypeExtractor.java | 8 ++--- .../extractor/PojoTypeExtractionTest.java | 29 +++++++++++++++++++ 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index e52e2af1b6abc..b528d00d95a2e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -989,12 +989,12 @@ private boolean isValidPojoField(Field f, Class clazz, ArrayList typeHi } for(Method m : clazz.getMethods()) { // check for getter - if( // The name should be "get" or "" (for scala). - (m.getName().toLowerCase().equals("get"+fieldNameLow) || m.getName().toLowerCase().equals(fieldNameLow)) && + if( // The name should be "get" or "" (for scala) or "is" for boolean fields. + (m.getName().toLowerCase().equals("get"+fieldNameLow) || m.getName().toLowerCase().equals("is"+fieldNameLow) || m.getName().toLowerCase().equals(fieldNameLow)) && // no arguments for the getter m.getParameterTypes().length == 0 && // return type is same as field type (or the generic variant of it) - (m.getReturnType().equals( fieldType ) || (fieldTypeGeneric != null && m.getGenericReturnType().equals(fieldTypeGeneric)) ) + (m.getGenericReturnType().equals( fieldType ) || (fieldTypeGeneric != null && m.getGenericReturnType().equals(fieldTypeGeneric)) ) ) { if(hasGetter) { throw new IllegalStateException("Detected more than one getter"); @@ -1004,7 +1004,7 @@ private boolean isValidPojoField(Field f, Class clazz, ArrayList typeHi // check for setters (_$eq for scala) if((m.getName().toLowerCase().equals("set"+fieldNameLow) || m.getName().toLowerCase().equals(fieldNameLow+"_$eq")) && m.getParameterTypes().length == 1 && // one parameter of the field's type - ( m.getParameterTypes()[0].equals( fieldType ) || (fieldTypeGeneric != null && m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&& + ( m.getGenericParameterTypes()[0].equals( fieldType ) || (fieldTypeGeneric != null && m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&& // return type is void. m.getReturnType().equals(Void.TYPE) ) { diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java index 893e63c321149..7cff856646b83 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Date; import java.util.List; @@ -138,6 +139,34 @@ public WrongCtorPojo(int a) { } } + public static class PojoWithGenericFields { + private Collection users; + private boolean favorited; + + public boolean isFavorited() { + return favorited; + } + + public void setFavorited(boolean favorited) { + this.favorited = favorited; + } + + public Collection getUsers() { + return users; + } + + public void setUsers(Collection users) { + this.users = users; + } + } + @Test + public void testPojoWithGenericFields() { + TypeInformation typeForClass = TypeExtractor.createTypeInfo(PojoWithGenericFields.class); + + Assert.assertTrue(typeForClass instanceof PojoTypeInfo); + } + + // in this test, the location of the getters and setters is mixed across the type hierarchy. public static class TypedPojoGetterSetterCheck extends GenericPojoGetterSetterCheck { public void setPackageProtected(String in) { From e29e9af0ed3fbae931f3f713b0fee1888df943e4 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Wed, 17 Dec 2014 12:16:49 +0100 Subject: [PATCH 4/5] Set correct classloader --- .../apache/flink/api/java/typeutils/runtime/KryoSerializer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java index 3eda9caf56580..c52ec9840e485 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java @@ -162,6 +162,7 @@ private void checkKryoInitialized() { this.kryo = new ScalaKryoInstantiator().newKryo(); this.kryo.setRegistrationRequired(false); this.kryo.register(type); + this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); } } } From 546ac1be7c0503d4046948608ae123aa2b441294 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Wed, 17 Dec 2014 14:08:34 +0100 Subject: [PATCH 5/5] try to use Kryo.copy() with fallback to serialization copy --- .../typeutils/runtime/KryoSerializer.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java index c52ec9840e485..f2c5848e6c2b3 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java @@ -19,6 +19,7 @@ package org.apache.flink.api.java.typeutils.runtime; import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; @@ -74,18 +75,22 @@ public T copy(T from) { return null; } checkKryoInitialized(); - // We have to do a serialization round because not all Kryo (/Chill) serializers have the copy() method implemented. - ByteArrayOutputStream baout = new ByteArrayOutputStream(); - Output output = new Output(baout); + try { + return kryo.copy(from); + } catch(KryoException ke) { + // kryo was unable to copy it, so we do it through serialization: + ByteArrayOutputStream baout = new ByteArrayOutputStream(); + Output output = new Output(baout); - kryo.writeObject(output, from); + kryo.writeObject(output, from); - output.flush(); + output.flush(); - ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray()); - Input input = new Input(bain); + ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray()); + Input input = new Input(bain); - return (T)kryo.readObject(input, from.getClass()); + return (T)kryo.readObject(input, from.getClass()); + } } @Override