Skip to content

Commit

Permalink
[FLINK-3166] [runtime] The first program in ObjectReuseITCase has the…
Browse files Browse the repository at this point in the history
… wrong expected result, and it succeeds

- TestEnvironment now honors configuration of object reuse
- Fixed reduce transformations to allow the user to modify and return either input

This closes #1464
  • Loading branch information
greghogan authored and StephanEwen committed Dec 26, 2015
1 parent 7cb25cb commit c246ff2
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 22 deletions.
Expand Up @@ -151,14 +151,20 @@ public void invoke() throws Exception {
final Collector<OT> output = this.output; final Collector<OT> output = this.output;


if (objectReuseEnabled) { if (objectReuseEnabled) {
OT reuse = serializer.createInstance(); OT reuse1 = serializer.createInstance();
OT reuse2 = serializer.createInstance();
OT reuse3 = serializer.createInstance();


// as long as there is data to read // as long as there is data to read
while (!this.taskCanceled && !format.reachedEnd()) { while (!this.taskCanceled && !format.reachedEnd()) {


OT returned; OT returned;
if ((returned = format.nextRecord(reuse)) != null) { if ((returned = format.nextRecord(reuse1)) != null) {
output.collect(returned); output.collect(returned);

reuse1 = reuse2;
reuse2 = reuse3;
reuse3 = returned;
} }
} }
} else { } else {
Expand Down
Expand Up @@ -219,6 +219,11 @@ private void sortAndCombine() throws Exception {
if (comparator.equalToReference(value)) { if (comparator.equalToReference(value)) {
// same group, reduce // same group, reduce
res = function.reduce(res, value); res = function.reduce(res, value);
if (res == reuse2) {
T tmp = reuse1;
reuse1 = reuse2;
reuse2 = tmp;
}
} else { } else {
// new key group // new key group
break; break;
Expand Down
Expand Up @@ -135,6 +135,11 @@ public void run() throws Exception {
if (comparator.equalToReference(value)) { if (comparator.equalToReference(value)) {
// same group, reduce // same group, reduce
res = function.reduce(res, value); res = function.reduce(res, value);
if (res == reuse2) {
T tmp = reuse1;
reuse1 = reuse2;
reuse2 = tmp;
}
} else { } else {
// new key group // new key group
break; break;
Expand Down
Expand Up @@ -49,10 +49,21 @@ public JobExecutionResult getLastJobExecutionResult() {
public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism) { public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
this.executor = executor; this.executor = executor;
setParallelism(parallelism); setParallelism(parallelism);

// disabled to improve build time // disabled to improve build time
getConfig().setCodeAnalysisMode(CodeAnalysisMode.DISABLE); getConfig().setCodeAnalysisMode(CodeAnalysisMode.DISABLE);
} }


public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism, boolean isObjectReuseEnabled) {
this(executor, parallelism);

if (isObjectReuseEnabled) {
getConfig().enableObjectReuse();
} else {
getConfig().disableObjectReuse();
}
}

@Override @Override
public void startNewSession() throws Exception { public void startNewSession() throws Exception {
} }
Expand Down Expand Up @@ -89,7 +100,7 @@ public void setAsContext() {
ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() { ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
@Override @Override
public ExecutionEnvironment createExecutionEnvironment() { public ExecutionEnvironment createExecutionEnvironment() {
lastEnv = new TestEnvironment(executor, getParallelism()); lastEnv = new TestEnvironment(executor, getParallelism(), getConfig().isObjectReuseEnabled());
return lastEnv; return lastEnv;
} }
}; };
Expand Down
Expand Up @@ -104,11 +104,12 @@ public static String runProgram(int progId, String resultPath) throws Exception
switch(progId) { switch(progId) {


case 1: { case 1: {
// Grouped reduce


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();


DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inReducePath).types(String.class, Integer.class).setParallelism(1); DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inReducePath).types(String.class, Integer.class).setParallelism(1);

DataSet<Tuple2<String, Integer>> result = input.groupBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() { DataSet<Tuple2<String, Integer>> result = input.groupBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {


@Override @Override
Expand All @@ -124,26 +125,30 @@ public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<Str
env.execute(); env.execute();


// return expected result // return expected result
return "a,100\n"; return "a,60\n";


} }


case 2: { case 2: {
// Global reduce


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();


DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inReducePath).types(String.class, Integer.class).setParallelism(1); DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inReducePath).types(String.class, Integer.class).setParallelism(1);


DataSet<Tuple2<String, Integer>> result = input DataSet<Tuple2<String, Integer>> result = input.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {


@Override @Override
public Tuple2<String, Integer> reduce( public Tuple2<String, Integer> reduce(
Tuple2<String, Integer> value1, Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception { Tuple2<String, Integer> value2) throws Exception {
value2.f1 += value1.f1; if (value1.f1 % 2 == 0) {
return value2; value1.f1 += value2.f1;
return value1;
} else {
value2.f1 += value1.f1;
return value2;
}
} }


}); });
Expand All @@ -152,14 +157,14 @@ public Tuple2<String, Integer> reduce(
env.execute(); env.execute();


// return expected result // return expected result
return "a,100\n"; return "a,60\n";


} }


case 3: { case 3: {
// Add items to list without copying


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();


DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inGroupReducePath).types(String.class, Integer.class).setParallelism(1); DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inGroupReducePath).types(String.class, Integer.class).setParallelism(1);


Expand All @@ -183,18 +188,26 @@ public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<St
env.execute(); env.execute();


// return expected result // return expected result
return "a,4\n" + if (env.getConfig().isObjectReuseEnabled()) {
"a,4\n" + return "a,5\n" +
"a,5\n" + "a,4\n" +
"a,5\n" + "a,5\n" +
"a,5\n"; "a,4\n" +
"a,5\n";
} else {
return "a,1\n" +
"a,2\n" +
"a,3\n" +
"a,4\n" +
"a,5\n";
}


} }


case 4: { case 4: {
// Add items to list after copying


final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();


DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inGroupReducePath).types(String.class, Integer.class).setParallelism(1); DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inGroupReducePath).types(String.class, Integer.class).setParallelism(1);


Expand All @@ -204,7 +217,7 @@ public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<St
public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception { public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception {
List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>(); List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
for (Tuple2<String, Integer> val : values) { for (Tuple2<String, Integer> val : values) {
list.add(val); list.add(val.copy());
} }


for (Tuple2<String, Integer> val : list) { for (Tuple2<String, Integer> val : list) {
Expand All @@ -218,10 +231,10 @@ public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<St
env.execute(); env.execute();


// return expected result // return expected result
return "a,4\n" + return "a,1\n" +
"a,2\n" +
"a,3\n" +
"a,4\n" + "a,4\n" +
"a,5\n" +
"a,5\n" +
"a,5\n"; "a,5\n";


} }
Expand Down

0 comments on commit c246ff2

Please sign in to comment.