Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-1982] [record-api] Remove dependencies on Record API from flink-runtime tests #1294

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -22,11 +22,11 @@
import java.util.HashMap;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
import org.apache.flink.api.common.typeutils.record.RecordComparator;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
Expand All @@ -42,7 +42,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun

private final double combine_frac;

private final ArrayList<Record> outList = new ArrayList<Record>();
private final ArrayList<Record> outList = new ArrayList<>();

@SuppressWarnings("unchecked")
private final RecordComparator comparator = new RecordComparator(
Expand All @@ -69,7 +69,7 @@ public void testSingleLevelMergeCombineTask() {
getTaskConfig().setRelativeMemoryDriver(combine_frac);
getTaskConfig().setFilehandlesDriver(2);

final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<>();

try {
testDriver(testTask, MockCombiningReduceStub.class);
Expand All @@ -85,7 +85,7 @@ public void testSingleLevelMergeCombineTask() {

// wee need to do the final aggregation manually in the test, because the
// combiner is not guaranteed to do that
final HashMap<IntValue, IntValue> aggMap = new HashMap<IntValue, IntValue>();
final HashMap<IntValue, IntValue> aggMap = new HashMap<>();
for (Record record : this.outList) {
IntValue key = new IntValue();
IntValue value = new IntValue();
Expand Down Expand Up @@ -123,7 +123,7 @@ public void testMultiLevelMergeCombineTask() throws Exception {
getTaskConfig().setRelativeMemoryDriver(combine_frac);
getTaskConfig().setFilehandlesDriver(2);

final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<>();

try {
testDriver(testTask, MockCombiningReduceStub.class);
Expand All @@ -139,7 +139,7 @@ public void testMultiLevelMergeCombineTask() throws Exception {

// wee need to do the final aggregation manually in the test, because the
// combiner is not guaranteed to do that
final HashMap<IntValue, IntValue> aggMap = new HashMap<IntValue, IntValue>();
final HashMap<IntValue, IntValue> aggMap = new HashMap<>();
for (Record record : this.outList) {
IntValue key = new IntValue();
IntValue value = new IntValue();
Expand All @@ -166,7 +166,7 @@ public void testMultiLevelMergeCombineTask() throws Exception {
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------

@ReduceOperator.Combinable
@Combinable
public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -194,7 +194,7 @@ public void combine(Iterable<Record> records, Collector<Record> out) throws Exce
}
}

@ReduceOperator.Combinable
@Combinable
public static final class MockFailingCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
private static final long serialVersionUID = 1L;

Expand Down
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.types.Record;
import org.junit.Test;

@SuppressWarnings("deprecation")
public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, Record>> {

private static final long CROSS_MEM = 1024 * 1024;
Expand Down Expand Up @@ -65,7 +64,7 @@ public void testBlock1CrossTask()
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockCrossStub.class);
Expand Down Expand Up @@ -95,7 +94,7 @@ public void testBlock2CrossTask() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockCrossStub.class);
Expand Down Expand Up @@ -123,7 +122,7 @@ public void testFailingBlockCrossTask() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockFailingCrossStub.class);
Expand Down Expand Up @@ -153,7 +152,7 @@ public void testFailingBlockCrossTask2() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockFailingCrossStub.class);
Expand Down Expand Up @@ -184,7 +183,7 @@ public void testStream1CrossTask() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockCrossStub.class);
Expand Down Expand Up @@ -215,7 +214,7 @@ public void testStream2CrossTask() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockCrossStub.class);
Expand Down Expand Up @@ -243,7 +242,7 @@ public void testFailingStreamCrossTask() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockFailingCrossStub.class);
Expand Down Expand Up @@ -272,7 +271,7 @@ public void testFailingStreamCrossTask2() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockFailingCrossStub.class);
Expand Down Expand Up @@ -303,7 +302,7 @@ public void testStreamEmptyInnerCrossTask() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockCrossStub.class);
Expand Down Expand Up @@ -333,7 +332,7 @@ public void testStreamEmptyOuterCrossTask() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockCrossStub.class);
Expand Down Expand Up @@ -363,7 +362,7 @@ public void testBlockEmptyInnerCrossTask() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockCrossStub.class);
Expand Down Expand Up @@ -393,7 +392,7 @@ public void testBlockEmptyOuterCrossTask() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

try {
testDriver(testTask, MockCrossStub.class);
Expand All @@ -420,7 +419,7 @@ public void testCancelBlockCrossTaskInit() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

final AtomicBoolean success = new AtomicBoolean(false);

Expand Down Expand Up @@ -463,7 +462,7 @@ public void testCancelBlockCrossTaskCrossing() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

final AtomicBoolean success = new AtomicBoolean(false);

Expand All @@ -485,7 +484,7 @@ public void run() {

try {
tct.join();
taskRunner.join();
taskRunner.join();
} catch(InterruptedException ie) {
Assert.fail("Joining threads failed");
}
Expand All @@ -506,7 +505,7 @@ public void testCancelStreamCrossTaskInit() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

final AtomicBoolean success = new AtomicBoolean(false);

Expand Down Expand Up @@ -549,7 +548,7 @@ public void testCancelStreamCrossTaskCrossing() {
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);

final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();

final AtomicBoolean success = new AtomicBoolean(false);

Expand All @@ -571,15 +570,15 @@ public void run() {

try {
tct.join();
taskRunner.join();
taskRunner.join();
} catch(InterruptedException ie) {
Assert.fail("Joining threads failed");
}

Assert.assertTrue("Exception was thrown despite proper canceling.", success.get());
}

public static final class MockCrossStub extends org.apache.flink.api.java.record.functions.CrossFunction {
public static final class MockCrossStub implements CrossFunction<Record, Record, Record> {
private static final long serialVersionUID = 1L;

@Override
Expand All @@ -588,7 +587,7 @@ public Record cross(Record record1, Record record2) throws Exception {
}
}

public static final class MockFailingCrossStub extends org.apache.flink.api.java.record.functions.CrossFunction {
public static final class MockFailingCrossStub implements CrossFunction<Record, Record, Record> {
private static final long serialVersionUID = 1L;

private int cnt = 0;
Expand Down