Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import org.apache.flink.api.common.typeutils.GenericPairComparator;
import org.apache.flink.api.java.tuple.Tuple2;

@SuppressWarnings({"serial", "deprecation"})
@SuppressWarnings({"serial"})
public class NonReusingHashJoinIteratorITCase {

private static final int MEMORY_SIZE = 16000000; // total memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
* Test specialized hash join that keeps the build side data (in memory and on hard disk)
* This is used for iterative tasks.
*/
@SuppressWarnings("deprecation")
public class NonReusingReOpenableHashTableITCase {

private static final int PAGE_SIZE = 8 * 1024;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.collectTupleData;
import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.collectIntPairData;

@SuppressWarnings({"serial", "deprecation"})
@SuppressWarnings({"serial"})
public class ReusingHashJoinIteratorITCase {

private static final int MEMORY_SIZE = 16000000; // total memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
* Test specialized hash join that keeps the build side data (in memory and on hard disk)
* This is used for iterative tasks.
*/
@SuppressWarnings("deprecation")
public class ReusingReOpenableHashTableITCase {

private static final int PAGE_SIZE = 8 * 1024;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ else if (result instanceof CancellationFailure) {
}
}

private void awaitRunning(ActorGateway jobManager, JobID jobId, FiniteDuration timeout)
public static void awaitRunning(ActorGateway jobManager, JobID jobId, FiniteDuration timeout)
throws Exception {

checkNotNull(jobManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,124 +19,59 @@

package org.apache.flink.test.accumulators;

import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.io.CsvOutputFormat;
import org.apache.flink.api.java.record.io.TextInputFormat;
import org.apache.flink.api.java.record.operators.BulkIteration;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.RecordAPITestBase;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

/**
* To be finished !!! Didn't test with iterations yet;-(
*/
@SuppressWarnings("deprecation")
@RunWith(Parameterized.class)
public class AccumulatorIterativeITCase extends RecordAPITestBase {

private static final String INPUT = "1\n" + "2\n" + "3\n";
private static final String EXPECTED = "6\n";

public class AccumulatorIterativeITCase extends JavaProgramTestBase {
private static final int NUM_ITERATIONS = 3;
private static final int NUM_SUBTASKS = 1;

protected String dataPath;
protected String resultPath;

public AccumulatorIterativeITCase(Configuration config) {
super(config);
}

@Override
protected void preSubmit() throws Exception {
dataPath = createTempFile("datapoints.txt", INPUT);
resultPath = getTempFilePath("result");
}
private static final String ACC_NAME = "test";

@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EXPECTED, resultPath);

Integer res = getJobExecutionResult().getAccumulatorResult("test");
Assert.assertEquals(Integer.valueOf(NUM_ITERATIONS * 6), res);
protected boolean skipCollectionExecution() {
return true;
}

@Override
protected Plan getTestJob() {
Plan plan = getTestPlanPlan(config.getInteger("IterationAllReducer#NoSubtasks", 1), dataPath, resultPath);
return plan;
}

@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config1 = new Configuration();
config1.setInteger("IterationAllReducer#NoSubtasks", NUM_SUBTASKS);
return toParameterList(config1);
}

static Plan getTestPlanPlan(int numSubTasks, String input, String output) {

FileDataSource initialInput = new FileDataSource(TextInputFormat.class, input, "input");
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(NUM_SUBTASKS);

BulkIteration iteration = new BulkIteration("Loop");
iteration.setInput(initialInput);
iteration.setMaximumNumberOfIterations(NUM_ITERATIONS);

ReduceOperator sumReduce = ReduceOperator.builder(new SumReducer())
.input(iteration.getPartialSolution())
.name("Compute sum (Reduce)")
.build();
IterativeDataSet<Integer> iteration = env.fromElements(1, 2, 3).iterate(NUM_ITERATIONS);

iteration.setNextPartialSolution(sumReduce);

@SuppressWarnings("unchecked")
FileDataSink finalResult = new FileDataSink(new CsvOutputFormat("\n", " ", StringValue.class), output, iteration, "Output");

Plan plan = new Plan(finalResult, "Iteration with AllReducer (keyless Reducer)");
plan.setDefaultParallelism(numSubTasks);
return plan;
iteration.closeWith(iteration.reduceGroup(new SumReducer())).output(new DiscardingOutputFormat());

Assert.assertEquals(Integer.valueOf(NUM_ITERATIONS * 6), (Integer)env.execute().getAccumulatorResult(ACC_NAME));
}

static final class SumReducer extends ReduceFunction implements Serializable {
static final class SumReducer extends RichGroupReduceFunction<Integer, Integer> {

private static final long serialVersionUID = 1L;

private IntCounter testCounter = new IntCounter();

@Override
public void open(Configuration config) throws Exception {
getRuntimeContext().addAccumulator(ACC_NAME, this.testCounter);
}

@Override
public void reduce(Iterator<Record> records, Collector<Record> out) {
public void reduce(Iterable<Integer> values, Collector<Integer> out) {
// Compute the sum
int sum = 0;

while (records.hasNext()) {
Record r = records.next();
Integer value = Integer.parseInt(r.getField(0, StringValue.class).getValue());
for (Integer value : values) {
sum += value;
testCounter.add(value);
}
out.collect(new Record(new StringValue(Integer.toString(sum))));
}

@Override
public void close() throws Exception {
super.close();
getRuntimeContext().addAccumulator("test", this.testCounter);
out.collect(sum);
}
}

}
Loading