Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-4078] [java, DataSet] Introduce missing calls to closure cleaner #2116

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ public CoGroupOperatorSetsPredicate where(String... fields) {
*/
public <K> CoGroupOperatorSetsPredicate where(KeySelector<I1, K> keyExtractor) {
TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, input1.getType());
return new CoGroupOperatorSetsPredicate(new SelectorFunctionKeys<>(keyExtractor, input1.getType(), keyType));
return new CoGroupOperatorSetsPredicate(new SelectorFunctionKeys<>(input1.clean(keyExtractor), input1.getType(), keyType));
}

// ----------------------------------------------------------------------------------------
Expand Down Expand Up @@ -520,7 +520,7 @@ public CoGroupOperatorWithoutFunction equalTo(String... fields) {
*/
public <K> CoGroupOperatorWithoutFunction equalTo(KeySelector<I2, K> keyExtractor) {
TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, input2.getType());
return createCoGroupOperator(new SelectorFunctionKeys<>(keyExtractor, input2.getType(), keyType));
return createCoGroupOperator(new SelectorFunctionKeys<>(input1.clean(keyExtractor), input2.getType(), keyType));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.test.javaApiOperators;

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
Expand Down Expand Up @@ -308,7 +309,7 @@ public void testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception
}

@Test
public void testCoGroupWithMultipleKeyFieldsWithKeyExtractor() throws Exception {
public void testCoGroupWithMultipleKeyFieldsWithStaticClassKeyExtractor() throws Exception {
/*
* CoGroup with multiple key fields
*/
Expand All @@ -334,6 +335,124 @@ public void testCoGroupWithMultipleKeyFieldsWithKeyExtractor() throws Exception
compareResultAsTuples(result, expected);
}

@Test
public void testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithClosureCleaner() throws Exception {
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually do inline multi line comments via // ....

* CoGroup with multiple key fields, test working closure cleaner for inner classes
*/

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);

DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
where(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>,
Tuple2<Integer, Long>>() {
@Override
public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception {
return new Tuple2<Integer, Long>(t.f0, t.f4);
}
}).
equalTo(new KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>>() {

@Override
public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) {
return new Tuple2<Integer, Long>(t.f0, t.f1);
}
}).
with(new CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
@Override
public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
Iterable<Tuple3<Integer, Long, String>> second,
Collector<Tuple3<Integer, Long, String>> out)
{
List<String> strs = new ArrayList<String>();

for (Tuple5<Integer, Long, Integer, String, Long> t : first) {
strs.add(t.f3);
}

for(Tuple3<Integer, Long, String> t : second) {
for(String s : strs) {
out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s));
}
}
}
});

List<Tuple3<Integer, Long, String>> result = coGrouped.collect();

String expected = "1,1,Hallo\n" +
"2,2,Hallo Welt\n" +
"3,2,Hallo Welt wie gehts?\n" +
"3,2,ABC\n" +
"5,3,HIJ\n" +
"5,3,IJK\n";

compareResultAsTuples(result, expected);
}

@Test(expected = InvalidProgramException.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would make sense to test this in a more specific way, e.g. wrap in try { ... } catch (InvalidProgramException e) { } and check that the root cause of e is NotSerializableException. Otherwise, a not respected closure cleaner usage flag might be hidden by another exception.

public void testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithoutClosureCleaner() throws Exception {
/*
* CoGroup with multiple key fields, test that disabling closure cleaner leads to an exception when using inner
* classes.
*/

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableClosureCleaner();

DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);

DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
where(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>,
Tuple2<Integer, Long>>() {
@Override
public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception {
return new Tuple2<Integer, Long>(t.f0, t.f4);
}
}).
equalTo(new KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>>() {

@Override
public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) {
return new Tuple2<Integer, Long>(t.f0, t.f1);
}
}).
with(new CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
@Override
public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
Iterable<Tuple3<Integer, Long, String>> second,
Collector<Tuple3<Integer, Long, String>> out)
{
List<String> strs = new ArrayList<String>();

for (Tuple5<Integer, Long, Integer, String, Long> t : first) {
strs.add(t.f3);
}

for(Tuple3<Integer, Long, String> t : second) {
for(String s : strs) {
out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s));
}
}
}
});

List<Tuple3<Integer, Long, String>> result = coGrouped.collect();

String expected = "1,1,Hallo\n" +
"2,2,Hallo Welt\n" +
"3,2,Hallo Welt wie gehts?\n" +
"3,2,ABC\n" +
"5,3,HIJ\n" +
"5,3,IJK\n";

compareResultAsTuples(result, expected);
}

public static class KeySelector7 implements KeySelector<Tuple5<Integer,Long,Integer,String,Long>,
Tuple2<Integer, Long>> {
private static final long serialVersionUID = 1L;
Expand Down