diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 0a294fa65a17d..69fdd21b11c32 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -149,6 +149,19 @@ under the License.
com.twitter
chill_${scala.binary.version}
${chill.version}
+
+
+
+ com.esotericsoftware
+ kryo-shaded
+
+
+
+
+
+
+ com.esotericsoftware.kryo
+ kryo
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 6f93722b39827..80c0a211d416f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -998,6 +998,44 @@ public void testJavaCollectionsWithinPojos() throws Exception {
compareResultAsText(result, expected);
}
+ @Test
+ public void testJavaArraysAsListCollectionsWithinPojos() throws Exception {
+ /*
+ * Test Java collections created via Arrays.asList() method within pojos ( == test kryo)
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ DataSet ds = CollectionDataSets.getPojoWithArraysAsListCollection(env);
+ // f0.f0 is first integer
+ DataSet reduceDs = ds.groupBy("key")
+ .reduceGroup(new GroupReducer7());
+ List result = reduceDs.collect();
+
+ String expected = "callFor key 0 we got: pojo.a=apojo.a=bFor key 0 we got: pojo.a=a2pojo.a=b2\n";
+
+ compareResultAsText(result, expected);
+ }
+
+ @Test
+ public void testJavaUnmodifiableCollectionsWithinPojos() throws Exception {
+ /*
+ * Test Java collections created via Collections.unmodifiable...() methods within pojos ( == test kryo)
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ DataSet ds = CollectionDataSets.getPojoWithUnmodifiableCollection(env);
+ // f0.f0 is first integer
+ DataSet reduceDs = ds.groupBy("key")
+ .reduceGroup(new GroupReducer7());
+ List result = reduceDs.collect();
+
+ String expected = "callFor key 0 we got: pojo.a=apojo.a=bFor key 0 we got: pojo.a=a2pojo.a=b2\n";
+
+ compareResultAsText(result, expected);
+ }
+
public static class GroupReducer7 implements GroupReduceFunction {
@Override
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index ba48e121a168d..2bc232a555fb8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -23,6 +23,8 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
@@ -46,10 +48,10 @@
/**
* #######################################################################################################
- *
- * BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA.
+ *
+ * BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA.
* IF YOU MODIFY THE DATA MAKE SURE YOU CHECK THAT ALL TESTS ARE STILL WORKING!
- *
+ *
* #######################################################################################################
*/
public class CollectionDataSets {
@@ -201,7 +203,7 @@ public static DataSet, String, Integer>> getGrou
return env.fromCollection(data, type);
}
-
+
public static DataSet> getTuple2WithByteArrayDataSet(ExecutionEnvironment env) {
List> data = new ArrayList<>();
data.add(new Tuple2<>(new byte[]{0, 4}, 1));
@@ -210,12 +212,12 @@ public static DataSet> getTuple2WithByteArrayDataSet(Exe
data.add(new Tuple2<>(new byte[]{2, 1}, 3));
data.add(new Tuple2<>(new byte[]{0}, 0));
data.add(new Tuple2<>(new byte[]{2, 0}, 1));
-
+
TupleTypeInfo> type = new TupleTypeInfo<>(
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO
);
-
+
return env.fromCollection(data, type);
}
@@ -347,13 +349,13 @@ public static DataSet(3, "Third", 30, 300, 3000L, "Three", 30000L));
return env.fromCollection(data);
}
-
+
public static DataSet> getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env) {
List> data = new ArrayList<>();
data.add(new Tuple7<>(10000L, 10, 100, 1000L, "One", 1, "First"));
data.add(new Tuple7<>(20000L, 20, 200, 2000L, "Two", 2, "Second"));
data.add(new Tuple7<>(30000L, 30, 300, 3000L, "Three", 3, "Third"));
-
+
return env.fromCollection(data);
}
@@ -610,22 +612,22 @@ public static class PojoWithDateAndEnum {
public Date date;
public Category cat;
}
-
+
public static DataSet getPojoWithDateAndEnum(ExecutionEnvironment env) {
List data = new ArrayList<>();
-
+
PojoWithDateAndEnum one = new PojoWithDateAndEnum();
one.group = "a"; one.date = new Date(666); one.cat = Category.CAT_A;
data.add(one);
-
+
PojoWithDateAndEnum two = new PojoWithDateAndEnum();
two.group = "a"; two.date = new Date(666); two.cat = Category.CAT_A;
data.add(two);
-
+
PojoWithDateAndEnum three = new PojoWithDateAndEnum();
three.group = "b"; three.date = new Date(666); three.cat = Category.CAT_B;
data.add(three);
-
+
return env.fromCollection(data);
}
@@ -693,7 +695,7 @@ public static DataSet getPojoWithCollection(ExecutionEnviron
pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc1.scalaBigInt = BigInt.int2bigInt(10);
pwc1.bigDecimalKeepItNull = null;
-
+
// use calendar to make it stable across time zones
GregorianCalendar gcl1 = new GregorianCalendar(2033, 4, 18);
pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis());
@@ -710,7 +712,103 @@ public static DataSet getPojoWithCollection(ExecutionEnviron
pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
pwc2.bigDecimalKeepItNull = null;
-
+
+ GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
+ pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 1976
+
+
+ data.add(pwc1);
+ data.add(pwc2);
+
+ return env.fromCollection(data);
+ }
+
+ public static DataSet getPojoWithArraysAsListCollection(ExecutionEnvironment env) {
+ List data = new ArrayList<>();
+
+ List pojosList1 = Arrays.asList(
+ new Pojo1("a", "aa"),
+ new Pojo1("b", "bb")
+ );
+
+ List pojosList2 = Arrays.asList(
+ new Pojo1("a2", "aa2"),
+ new Pojo1("b2", "bb2")
+ );
+
+ PojoWithCollection pwc1 = new PojoWithCollection();
+ pwc1.pojos = pojosList1;
+ pwc1.key = 0;
+ pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
+ pwc1.scalaBigInt = BigInt.int2bigInt(10);
+ pwc1.bigDecimalKeepItNull = null;
+
+ // use calendar to make it stable across time zones
+ GregorianCalendar gcl1 = new GregorianCalendar(2033, 4, 18);
+ pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis());
+ Map map = new HashMap<>();
+ map.put("someKey", 1); // map.put("anotherKey", 2); map.put("third", 3);
+ pwc1.mixed = Arrays.asList(
+ map,
+ new File(""),
+ "uhlala",
+ Arrays.asList(1, 2, 2, 3, 3, 3, 4, 4, 4, 4) // to test Arrays.asList() with primitives
+ );
+
+ PojoWithCollection pwc2 = new PojoWithCollection();
+ pwc2.pojos = pojosList2;
+ pwc2.key = 0;
+ pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
+ pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
+ pwc2.bigDecimalKeepItNull = null;
+
+ GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
+ pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 1976
+
+
+ data.add(pwc1);
+ data.add(pwc2);
+
+ return env.fromCollection(data);
+ }
+
+ public static DataSet getPojoWithUnmodifiableCollection(ExecutionEnvironment env) {
+ List data = new ArrayList<>();
+
+ List pojosList1 = new ArrayList<>();
+ pojosList1.add(new Pojo1("a", "aa"));
+ pojosList1.add(new Pojo1("b", "bb"));
+
+ List pojosList2 = new ArrayList<>();
+ pojosList2.add(new Pojo1("a2", "aa2"));
+ pojosList2.add(new Pojo1("b2", "bb2"));
+
+ PojoWithCollection pwc1 = new PojoWithCollection();
+ pwc1.pojos = Collections.unmodifiableList(pojosList1);
+ pwc1.key = 0;
+ pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
+ pwc1.scalaBigInt = BigInt.int2bigInt(10);
+ pwc1.bigDecimalKeepItNull = null;
+
+ // use calendar to make it stable across time zones
+ GregorianCalendar gcl1 = new GregorianCalendar(2033, 4, 18);
+ pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis());
+ ArrayList