Skip to content

Commit

Permalink
[FLINK-1982] [record-api] Remove dependencies on Record API from flin…
Browse files Browse the repository at this point in the history
…k-runtime tests

Rename Match*Test to Join*Test and MapTaskTest to FlatMapTaskTest

This closes #1294
  • Loading branch information
fhueske committed Oct 23, 2015
1 parent 7ff071f commit 3c8a658
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 159 deletions.
Expand Up @@ -22,11 +22,11 @@
import java.util.HashMap;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
import org.apache.flink.api.common.typeutils.record.RecordComparator;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
Expand All @@ -42,7 +42,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun

private final double combine_frac;

private final ArrayList<Record> outList = new ArrayList<Record>();
private final ArrayList<Record> outList = new ArrayList<>();

@SuppressWarnings("unchecked")
private final RecordComparator comparator = new RecordComparator(
Expand All @@ -69,7 +69,7 @@ public void testSingleLevelMergeCombineTask() {
getTaskConfig().setRelativeMemoryDriver(combine_frac);
getTaskConfig().setFilehandlesDriver(2);

final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<>();

try {
testDriver(testTask, MockCombiningReduceStub.class);
Expand All @@ -85,7 +85,7 @@ public void testSingleLevelMergeCombineTask() {

// wee need to do the final aggregation manually in the test, because the
// combiner is not guaranteed to do that
final HashMap<IntValue, IntValue> aggMap = new HashMap<IntValue, IntValue>();
final HashMap<IntValue, IntValue> aggMap = new HashMap<>();
for (Record record : this.outList) {
IntValue key = new IntValue();
IntValue value = new IntValue();
Expand Down Expand Up @@ -123,7 +123,7 @@ public void testMultiLevelMergeCombineTask() throws Exception {
getTaskConfig().setRelativeMemoryDriver(combine_frac);
getTaskConfig().setFilehandlesDriver(2);

final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<>();

try {
testDriver(testTask, MockCombiningReduceStub.class);
Expand All @@ -139,7 +139,7 @@ public void testMultiLevelMergeCombineTask() throws Exception {

// wee need to do the final aggregation manually in the test, because the
// combiner is not guaranteed to do that
final HashMap<IntValue, IntValue> aggMap = new HashMap<IntValue, IntValue>();
final HashMap<IntValue, IntValue> aggMap = new HashMap<>();
for (Record record : this.outList) {
IntValue key = new IntValue();
IntValue value = new IntValue();
Expand All @@ -166,7 +166,7 @@ public void testMultiLevelMergeCombineTask() throws Exception {
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------

@ReduceOperator.Combinable
@Combinable
public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -194,7 +194,7 @@ public void combine(Iterable<Record> records, Collector<Record> out) throws Exce
}
}

@ReduceOperator.Combinable
@Combinable
public static final class MockFailingCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
private static final long serialVersionUID = 1L;

Expand Down
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.types.Record;
import org.junit.Test;

@SuppressWarnings("deprecation")
public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, Record>> {

private static final long CROSS_MEM = 1024 * 1024;
Expand Down Expand Up @@ -65,7 +64,7 @@ public void testBlock1CrossTask()
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockCrossStub.class);
Expand Down Expand Up @@ -95,7 +94,7 @@ public void testBlock2CrossTask() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockCrossStub.class);
Expand Down Expand Up @@ -123,7 +122,7 @@ public void testFailingBlockCrossTask() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockFailingCrossStub.class);
Expand Down Expand Up @@ -153,7 +152,7 @@ public void testFailingBlockCrossTask2() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockFailingCrossStub.class);
Expand Down Expand Up @@ -184,7 +183,7 @@ public void testStream1CrossTask() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockCrossStub.class);
Expand Down Expand Up @@ -215,7 +214,7 @@ public void testStream2CrossTask() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockCrossStub.class);
Expand Down Expand Up @@ -243,7 +242,7 @@ public void testFailingStreamCrossTask() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockFailingCrossStub.class);
Expand Down Expand Up @@ -272,7 +271,7 @@ public void testFailingStreamCrossTask2() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockFailingCrossStub.class);
Expand Down Expand Up @@ -303,7 +302,7 @@ public void testStreamEmptyInnerCrossTask() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockCrossStub.class);
Expand Down Expand Up @@ -333,7 +332,7 @@ public void testStreamEmptyOuterCrossTask() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockCrossStub.class);
Expand Down Expand Up @@ -363,7 +362,7 @@ public void testBlockEmptyInnerCrossTask() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockCrossStub.class);
Expand Down Expand Up @@ -393,7 +392,7 @@ public void testBlockEmptyOuterCrossTask() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockCrossStub.class);
Expand All @@ -420,7 +419,7 @@ public void testCancelBlockCrossTaskInit() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

final AtomicBoolean success = new AtomicBoolean(false);

Expand Down Expand Up @@ -463,7 +462,7 @@ public void testCancelBlockCrossTaskCrossing() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

final AtomicBoolean success = new AtomicBoolean(false);

Expand All @@ -485,7 +484,7 @@ public void run() {

try {
tct.join();
taskRunner.join();
taskRunner.join();
} catch(InterruptedException ie) {
Assert.fail("Joining threads failed");
}
Expand All @@ -506,7 +505,7 @@ public void testCancelStreamCrossTaskInit() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

final AtomicBoolean success = new AtomicBoolean(false);

Expand Down Expand Up @@ -549,7 +548,7 @@ public void testCancelStreamCrossTaskCrossing() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

final AtomicBoolean success = new AtomicBoolean(false);

Expand All @@ -571,15 +570,15 @@ public void run() {

try {
tct.join();
taskRunner.join();
taskRunner.join();
} catch(InterruptedException ie) {
Assert.fail("Joining threads failed");
}

Assert.assertTrue("Exception was thrown despite proper canceling.", success.get());
}

public static final class MockCrossStub extends org.apache.flink.api.java.record.functions.CrossFunction {
public static final class MockCrossStub implements CrossFunction<Record, Record, Record> {
private static final long serialVersionUID = 1L;

@Override
Expand All @@ -588,7 +587,7 @@ public Record cross(Record record1, Record record2) throws Exception {
}
}

public static final class MockFailingCrossStub extends org.apache.flink.api.java.record.functions.CrossFunction {
public static final class MockFailingCrossStub implements CrossFunction<Record, Record, Record> {
private static final long serialVersionUID = 1L;

private int cnt = 0;
Expand Down

0 comments on commit 3c8a658

Please sign in to comment.