Skip to content

Commit

Permalink
[FLINK-2479] Refactor runtime.operators.* tests
Browse files Browse the repository at this point in the history
This closes apache#1160
  • Loading branch information
supermegaciaccount authored and cfmcgrady committed Oct 23, 2015
1 parent 2a76beb commit 87d1e72
Show file tree
Hide file tree
Showing 14 changed files with 1,374 additions and 1,450 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.apache.flink.runtime.operators.testutils.TestData.Key;
import org.apache.flink.runtime.operators.testutils.TestData.Value;
import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
import org.apache.flink.types.Record;
import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -102,19 +101,19 @@ public void afterTest() {
@Test
public void testWriteReadSmallRecords() throws Exception
{
final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final FileIOChannel.ID channel = this.ioManager.createChannel();
final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();

// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);

// write a number of pairs
final Record rec = new Record();
final Tuple2<Integer, String> rec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
generator.next(rec);
rec.write(outView);
serializer.serialize(rec, outView);
}
this.memoryManager.release(outView.close());

Expand All @@ -125,18 +124,18 @@ public void testWriteReadSmallRecords() throws Exception
generator.reset();

// read and re-generate all records and compare them
final Record readRec = new Record();
final Tuple2<Integer, String> readRec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
generator.next(rec);
readRec.read(inView);
serializer.deserialize(readRec, inView);

Key k1 = rec.getField(0, Key.class);
Value v1 = rec.getField(1, Value.class);
int k1 = rec.f0;
String v1 = rec.f1;

Key k2 = readRec.getField(0, Key.class);
Value v2 = readRec.getField(1, Value.class);
int k2 = readRec.f0;
String v2 = readRec.f1;

Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
}

this.memoryManager.release(inView.close());
Expand All @@ -146,19 +145,20 @@ public void testWriteReadSmallRecords() throws Exception
@Test
public void testWriteAndReadLongRecords() throws Exception
{
final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LONG_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LONG_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final FileIOChannel.ID channel = this.ioManager.createChannel();
final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();

// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);

// write a number of pairs
final Record rec = new Record();
final Tuple2<Integer, String> rec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_LONG; i++) {
generator.next(rec);
rec.write(outView);
serializer.serialize(rec, outView);
}
this.memoryManager.release(outView.close());

Expand All @@ -169,15 +169,15 @@ public void testWriteAndReadLongRecords() throws Exception
generator.reset();

// read and re-generate all records and compare them
final Record readRec = new Record();
final Tuple2<Integer, String> readRec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_LONG; i++) {
generator.next(rec);
readRec.read(inView);
final Key k1 = rec.getField(0, Key.class);
final Value v1 = rec.getField(1, Value.class);
final Key k2 = readRec.getField(0, Key.class);
final Value v2 = readRec.getField(1, Value.class);
Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
serializer.deserialize(readRec, inView);
final int k1 = rec.f0;
final String v1 = rec.f1;
final int k2 = readRec.f0;
final String v2 = readRec.f1;
Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
}

this.memoryManager.release(inView.close());
Expand All @@ -187,19 +187,20 @@ public void testWriteAndReadLongRecords() throws Exception
@Test
public void testReadTooMany() throws Exception
{
final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final FileIOChannel.ID channel = this.ioManager.createChannel();
final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();

// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);

// write a number of pairs
final Record rec = new Record();
final Tuple2<Integer, String> rec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
generator.next(rec);
rec.write(outView);
serializer.serialize(rec, outView);
}
this.memoryManager.release(outView.close());

Expand All @@ -211,15 +212,15 @@ public void testReadTooMany() throws Exception

// read and re-generate all records and compare them
try {
final Record readRec = new Record();
final Tuple2<Integer, String> readRec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_SHORT + 1; i++) {
generator.next(rec);
readRec.read(inView);
final Key k1 = rec.getField(0, Key.class);
final Value v1 = rec.getField(1, Value.class);
final Key k2 = readRec.getField(0, Key.class);
final Value v2 = readRec.getField(1, Value.class);
Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
serializer.deserialize(readRec, inView);
final int k1 = rec.f0;
final String v1 = rec.f1;
final int k2 = readRec.f0;
final String v2 = readRec.f1;
Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
}
Assert.fail("Expected an EOFException which did not occur.");
}
Expand All @@ -238,19 +239,20 @@ public void testReadTooMany() throws Exception
@Test
public void testReadWithoutKnownBlockCount() throws Exception
{
final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final FileIOChannel.ID channel = this.ioManager.createChannel();
final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();

// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);

// write a number of pairs
final Record rec = new Record();
final Tuple2<Integer, String> rec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
generator.next(rec);
rec.write(outView);
serializer.serialize(rec, outView);
}
this.memoryManager.release(outView.close());

Expand All @@ -261,18 +263,18 @@ public void testReadWithoutKnownBlockCount() throws Exception
generator.reset();

// read and re-generate all records and cmpare them
final Record readRec = new Record();
final Tuple2<Integer, String> readRec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
generator.next(rec);
readRec.read(inView);
serializer.deserialize(readRec, inView);

Key k1 = rec.getField(0, Key.class);
Value v1 = rec.getField(1, Value.class);
int k1 = rec.f0;
String v1 = rec.f1;

Key k2 = readRec.getField(0, Key.class);
Value v2 = readRec.getField(1, Value.class);
int k2 = readRec.f0;
String v2 = readRec.f1;

Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
}

this.memoryManager.release(inView.close());
Expand All @@ -282,19 +284,20 @@ public void testReadWithoutKnownBlockCount() throws Exception
@Test
public void testWriteReadOneBufferOnly() throws Exception
{
final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final FileIOChannel.ID channel = this.ioManager.createChannel();
final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();

// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, 1);
final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);

// write a number of pairs
final Record rec = new Record();
final Tuple2<Integer, String> rec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
generator.next(rec);
rec.write(outView);
serializer.serialize(rec, outView);
}
this.memoryManager.release(outView.close());

Expand All @@ -305,18 +308,18 @@ public void testWriteReadOneBufferOnly() throws Exception
generator.reset();

// read and re-generate all records and compare them
final Record readRec = new Record();
final Tuple2<Integer, String> readRec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
generator.next(rec);
readRec.read(inView);
serializer.deserialize(readRec, inView);

Key k1 = rec.getField(0, Key.class);
Value v1 = rec.getField(1, Value.class);
int k1 = rec.f0;
String v1 = rec.f1;

Key k2 = readRec.getField(0, Key.class);
Value v2 = readRec.getField(1, Value.class);
int k2 = readRec.f0;
String v2 = readRec.f1;

Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
}

this.memoryManager.release(inView.close());
Expand All @@ -326,19 +329,20 @@ public void testWriteReadOneBufferOnly() throws Exception
@Test
public void testWriteReadNotAll() throws Exception
{
final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final FileIOChannel.ID channel = this.ioManager.createChannel();
final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();

// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);

// write a number of pairs
final Record rec = new Record();
final Tuple2<Integer, String> rec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
generator.next(rec);
rec.write(outView);
serializer.serialize(rec, outView);
}
this.memoryManager.release(outView.close());

Expand All @@ -349,18 +353,18 @@ public void testWriteReadNotAll() throws Exception
generator.reset();

// read and re-generate all records and compare them
final Record readRec = new Record();
final Tuple2<Integer, String> readRec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_SHORT / 2; i++) {
generator.next(rec);
readRec.read(inView);
serializer.deserialize(readRec, inView);

Key k1 = rec.getField(0, Key.class);
Value v1 = rec.getField(1, Value.class);
int k1 = rec.f0;
String v1 = rec.f1;

Key k2 = readRec.getField(0, Key.class);
Value v2 = readRec.getField(1, Value.class);
int k2 = readRec.f0;
String v2 = readRec.f1;

Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
}

this.memoryManager.release(inView.close());
Expand Down

0 comments on commit 87d1e72

Please sign in to comment.