Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions flink-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,19 @@ under the License.
<groupId>com.twitter</groupId>
<artifactId>chill_${scala.binary.version}</artifactId>
<version>${chill.version}</version>
<exclusions>
<!-- Exclude Kryo dependency from Chill -->
<exclusion>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Include our own version of Kryo -->
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,44 @@ public void testJavaCollectionsWithinPojos() throws Exception {
compareResultAsText(result, expected);
}

@Test
public void testJavaArraysAsListCollectionsWithinPojos() throws Exception {
/*
* Test Java collections created via Arrays.asList() method within pojos ( == test kryo)
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithArraysAsListCollection(env);
// f0.f0 is first integer
DataSet<String> reduceDs = ds.groupBy("key")
.reduceGroup(new GroupReducer7());
List<String> result = reduceDs.collect();

String expected = "callFor key 0 we got: pojo.a=apojo.a=bFor key 0 we got: pojo.a=a2pojo.a=b2\n";

compareResultAsText(result, expected);
}

@Test
public void testJavaUnmodifiableCollectionsWithinPojos() throws Exception {
/*
* Test Java collections created via Collections.unmodifiable...() methods within pojos ( == test kryo)
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithUnmodifiableCollection(env);
// f0.f0 is first integer
DataSet<String> reduceDs = ds.groupBy("key")
.reduceGroup(new GroupReducer7());
List<String> result = reduceDs.collect();

String expected = "callFor key 0 we got: pojo.a=apojo.a=bFor key 0 we got: pojo.a=a2pojo.a=b2\n";

compareResultAsText(result, expected);
}

public static class GroupReducer7 implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
Expand All @@ -46,10 +48,10 @@

/**
* #######################################################################################################
*
* BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA.
*
* BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA.
* IF YOU MODIFY THE DATA MAKE SURE YOU CHECK THAT ALL TESTS ARE STILL WORKING!
*
*
* #######################################################################################################
*/
public class CollectionDataSets {
Expand Down Expand Up @@ -201,7 +203,7 @@ public static DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> getGrou

return env.fromCollection(data, type);
}

public static DataSet<Tuple2<byte[], Integer>> getTuple2WithByteArrayDataSet(ExecutionEnvironment env) {
List<Tuple2<byte[], Integer>> data = new ArrayList<>();
data.add(new Tuple2<>(new byte[]{0, 4}, 1));
Expand All @@ -210,12 +212,12 @@ public static DataSet<Tuple2<byte[], Integer>> getTuple2WithByteArrayDataSet(Exe
data.add(new Tuple2<>(new byte[]{2, 1}, 3));
data.add(new Tuple2<>(new byte[]{0}, 0));
data.add(new Tuple2<>(new byte[]{2, 0}, 1));

TupleTypeInfo<Tuple2<byte[], Integer>> type = new TupleTypeInfo<>(
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO
);

return env.fromCollection(data, type);
}

Expand Down Expand Up @@ -347,13 +349,13 @@ public static DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Lo
data.add(new Tuple7<>(3, "Third", 30, 300, 3000L, "Three", 30000L));
return env.fromCollection(data);
}

public static DataSet<Tuple7<Long, Integer, Integer, Long, String, Integer, String>> getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env) {
List<Tuple7<Long, Integer, Integer, Long, String, Integer, String>> data = new ArrayList<>();
data.add(new Tuple7<>(10000L, 10, 100, 1000L, "One", 1, "First"));
data.add(new Tuple7<>(20000L, 20, 200, 2000L, "Two", 2, "Second"));
data.add(new Tuple7<>(30000L, 30, 300, 3000L, "Three", 3, "Third"));

return env.fromCollection(data);
}

Expand Down Expand Up @@ -610,22 +612,22 @@ public static class PojoWithDateAndEnum {
public Date date;
public Category cat;
}

public static DataSet<PojoWithDateAndEnum> getPojoWithDateAndEnum(ExecutionEnvironment env) {
List<PojoWithDateAndEnum> data = new ArrayList<>();

PojoWithDateAndEnum one = new PojoWithDateAndEnum();
one.group = "a"; one.date = new Date(666); one.cat = Category.CAT_A;
data.add(one);

PojoWithDateAndEnum two = new PojoWithDateAndEnum();
two.group = "a"; two.date = new Date(666); two.cat = Category.CAT_A;
data.add(two);

PojoWithDateAndEnum three = new PojoWithDateAndEnum();
three.group = "b"; three.date = new Date(666); three.cat = Category.CAT_B;
data.add(three);

return env.fromCollection(data);
}

Expand Down Expand Up @@ -693,7 +695,7 @@ public static DataSet<PojoWithCollection> getPojoWithCollection(ExecutionEnviron
pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc1.scalaBigInt = BigInt.int2bigInt(10);
pwc1.bigDecimalKeepItNull = null;

// use calendar to make it stable across time zones
GregorianCalendar gcl1 = new GregorianCalendar(2033, 4, 18);
pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis());
Expand All @@ -710,7 +712,103 @@ public static DataSet<PojoWithCollection> getPojoWithCollection(ExecutionEnviron
pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
pwc2.bigDecimalKeepItNull = null;


GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 1976


data.add(pwc1);
data.add(pwc2);

return env.fromCollection(data);
}

public static DataSet<PojoWithCollection> getPojoWithArraysAsListCollection(ExecutionEnvironment env) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these still needed any more by the tests?

Copy link
Author

@chermenin chermenin Oct 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can be saved. Just on the safe side.

List<PojoWithCollection> data = new ArrayList<>();

List<Pojo1> pojosList1 = Arrays.asList(
new Pojo1("a", "aa"),
new Pojo1("b", "bb")
);

List<Pojo1> pojosList2 = Arrays.asList(
new Pojo1("a2", "aa2"),
new Pojo1("b2", "bb2")
);

PojoWithCollection pwc1 = new PojoWithCollection();
pwc1.pojos = pojosList1;
pwc1.key = 0;
pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc1.scalaBigInt = BigInt.int2bigInt(10);
pwc1.bigDecimalKeepItNull = null;

// use calendar to make it stable across time zones
GregorianCalendar gcl1 = new GregorianCalendar(2033, 4, 18);
pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis());
Map<String, Integer> map = new HashMap<>();
map.put("someKey", 1); // map.put("anotherKey", 2); map.put("third", 3);
pwc1.mixed = Arrays.asList(
map,
new File(""),
"uhlala",
Arrays.asList(1, 2, 2, 3, 3, 3, 4, 4, 4, 4) // to test Arrays.asList() with primitives
);

PojoWithCollection pwc2 = new PojoWithCollection();
pwc2.pojos = pojosList2;
pwc2.key = 0;
pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
pwc2.bigDecimalKeepItNull = null;

GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 1976


data.add(pwc1);
data.add(pwc2);

return env.fromCollection(data);
}

public static DataSet<PojoWithCollection> getPojoWithUnmodifiableCollection(ExecutionEnvironment env) {
List<PojoWithCollection> data = new ArrayList<>();

List<Pojo1> pojosList1 = new ArrayList<>();
pojosList1.add(new Pojo1("a", "aa"));
pojosList1.add(new Pojo1("b", "bb"));

List<Pojo1> pojosList2 = new ArrayList<>();
pojosList2.add(new Pojo1("a2", "aa2"));
pojosList2.add(new Pojo1("b2", "bb2"));

PojoWithCollection pwc1 = new PojoWithCollection();
pwc1.pojos = Collections.unmodifiableList(pojosList1);
pwc1.key = 0;
pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc1.scalaBigInt = BigInt.int2bigInt(10);
pwc1.bigDecimalKeepItNull = null;

// use calendar to make it stable across time zones
GregorianCalendar gcl1 = new GregorianCalendar(2033, 4, 18);
pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis());
ArrayList<Object> mixedList = new ArrayList<>();
Map<String, Integer> map = new HashMap<>();
map.put("someKey", 1); // map.put("anotherKey", 2); map.put("third", 3);
mixedList.add(map);
mixedList.add(new File("/this/is/wrong"));
mixedList.add("uhlala");

pwc1.mixed = Collections.unmodifiableList(mixedList);

PojoWithCollection pwc2 = new PojoWithCollection();
pwc2.pojos = Collections.unmodifiableList(pojosList2);
pwc2.key = 0;
pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
pwc2.bigDecimalKeepItNull = null;

GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 1976

Expand Down
Loading