Skip to content

Commit

Permalink
[FLINK-1085] [tests] Make the combiner tests generic. Add more covera…
Browse files Browse the repository at this point in the history
…ge for oversized records.
  • Loading branch information
StephanEwen committed Jul 13, 2015
1 parent 7271881 commit 01c7433
Show file tree
Hide file tree
Showing 11 changed files with 1,199 additions and 215 deletions.
Expand Up @@ -16,13 +16,9 @@
* limitations under the License. * limitations under the License.
*/ */



package org.apache.flink.runtime.operators; package org.apache.flink.runtime.operators;


import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
Expand All @@ -33,22 +29,28 @@
import org.apache.flink.runtime.operators.sort.InMemorySorter; import org.apache.flink.runtime.operators.sort.InMemorySorter;
import org.apache.flink.runtime.operators.sort.NormalizedKeySorter; import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
import org.apache.flink.runtime.operators.sort.QuickSort; import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
import org.apache.flink.runtime.util.ReusingKeyGroupedIterator; import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator; import org.apache.flink.util.MutableObjectIterator;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List; import java.util.List;


/** /**
* Non-chained combine driver which is used for a CombineGroup transformation or a GroupReduce transformation where * Non-chained combine driver which is used for a CombineGroup transformation or a GroupReduce transformation where
* the user supplied a RichGroupReduceFunction with a combine method. The combining is performed in memory with a * the user supplied a RichGroupReduceFunction with a combine method. The combining is performed in memory with a
* lazy approach which only combines elements which currently fit in the sorter. This may lead to a partial solution. * lazy approach which only combines elements which currently fit in the sorter. This may lead to a partial solution.
* In the case of the RichGroupReduceFunction this partial result is transformed into a proper deterministic result. * In the case of the RichGroupReduceFunction this partial result is transformed into a proper deterministic result.
* The CombineGroup uses the GroupCombineFunction interface which allows to combine values of type <IN> to any type * The CombineGroup uses the GroupCombineFunction interface which allows to combine values of type {@code IN}
* of type <OUT>. In contrast, the RichGroupReduceFunction requires the combine method to have the same input and * to any type of type {@code OUT}. In contrast, the RichGroupReduceFunction requires the combine method
* output type to be able to reduce the elements after the combine from <IN> to <OUT>. * to have the same input and output type to be able to reduce the elements after the combine from
* {@code IN} to {@code OUT}.
* *
* The CombineTask uses a combining iterator over its input. The output of the iterator is emitted. * <p>The CombineTask uses a combining iterator over its input. The output of the iterator is emitted.</p>
* *
* @param <IN> The data type consumed by the combiner. * @param <IN> The data type consumed by the combiner.
* @param <OUT> The data type produced by the combiner. * @param <OUT> The data type produced by the combiner.
Expand All @@ -67,8 +69,6 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
private GroupCombineFunction<IN, OUT> combiner; private GroupCombineFunction<IN, OUT> combiner;


private TypeSerializer<IN> serializer; private TypeSerializer<IN> serializer;

private TypeComparator<IN> sortingComparator;


private TypeComparator<IN> groupingComparator; private TypeComparator<IN> groupingComparator;


Expand All @@ -78,7 +78,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin


private Collector<OUT> output; private Collector<OUT> output;


private long oversizedRecordCount = 0L; private long oversizedRecordCount;


private volatile boolean running = true; private volatile boolean running = true;


Expand Down Expand Up @@ -112,17 +112,18 @@ public int getNumberOfDriverComparators() {
@Override @Override
public void prepare() throws Exception { public void prepare() throws Exception {
final DriverStrategy driverStrategy = this.taskContext.getTaskConfig().getDriverStrategy(); final DriverStrategy driverStrategy = this.taskContext.getTaskConfig().getDriverStrategy();
if(driverStrategy != DriverStrategy.SORTED_GROUP_COMBINE){ if (driverStrategy != DriverStrategy.SORTED_GROUP_COMBINE){
throw new Exception("Invalid strategy " + driverStrategy + " for " + throw new Exception("Invalid strategy " + driverStrategy + " for group reduce combiner.");
"group reduce combinder.");
} }


this.memManager = this.taskContext.getMemoryManager(); this.memManager = this.taskContext.getMemoryManager();
final int numMemoryPages = memManager.computeNumberOfPages(this.taskContext.getTaskConfig().getRelativeMemoryDriver()); final int numMemoryPages = memManager.computeNumberOfPages(this.taskContext.getTaskConfig().getRelativeMemoryDriver());


final TypeSerializerFactory<IN> serializerFactory = this.taskContext.getInputSerializer(0); final TypeSerializerFactory<IN> serializerFactory = this.taskContext.getInputSerializer(0);
this.serializer = serializerFactory.getSerializer(); this.serializer = serializerFactory.getSerializer();
this.sortingComparator = this.taskContext.getDriverComparator(0);
final TypeComparator<IN> sortingComparator = this.taskContext.getDriverComparator(0);

this.groupingComparator = this.taskContext.getDriverComparator(1); this.groupingComparator = this.taskContext.getDriverComparator(1);
this.combiner = this.taskContext.getStub(); this.combiner = this.taskContext.getStub();
this.output = this.taskContext.getOutputCollector(); this.output = this.taskContext.getOutputCollector();
Expand All @@ -131,12 +132,12 @@ public void prepare() throws Exception {
numMemoryPages); numMemoryPages);


// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter // instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
if (this.sortingComparator.supportsSerializationWithKeyNormalization() && if (sortingComparator.supportsSerializationWithKeyNormalization() &&
this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING) this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
{ {
this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, this.sortingComparator, memory); this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, sortingComparator, memory);
} else { } else {
this.sorter = new NormalizedKeySorter<IN>(this.serializer, this.sortingComparator.duplicate(), memory); this.sorter = new NormalizedKeySorter<IN>(this.serializer, sortingComparator.duplicate(), memory);
} }


ExecutionConfig executionConfig = taskContext.getExecutionConfig(); ExecutionConfig executionConfig = taskContext.getExecutionConfig();
Expand Down Expand Up @@ -171,10 +172,14 @@ public void run() throws Exception {


// write the value again // write the value again
if (!this.sorter.write(value)) { if (!this.sorter.write(value)) {

++oversizedRecordCount; ++oversizedRecordCount;
LOG.debug("Cannot write record to fresh sort buffer. Record too large. Oversized record count: {}", oversizedRecordCount); LOG.debug("Cannot write record to fresh sort buffer, record is too large. " +
// simply forward the record "Oversized record count: {}", oversizedRecordCount);
this.output.collect((OUT)value);
// simply forward the record. We need to pass it through the combine function to convert it
Iterable<IN> input = Collections.singleton(value);
this.combiner.combine(input, this.output);
} }
} }


Expand Down Expand Up @@ -210,16 +215,25 @@ private void sortAndCombine() throws Exception {


@Override @Override
public void cleanup() throws Exception { public void cleanup() throws Exception {
if(this.sorter != null) { if (this.sorter != null) {
this.memManager.release(this.sorter.dispose()); this.memManager.release(this.sorter.dispose());
} }
} }


@Override @Override
public void cancel() { public void cancel() {
this.running = false; this.running = false;
if(this.sorter != null) { if (this.sorter != null) {
this.memManager.release(this.sorter.dispose()); this.memManager.release(this.sorter.dispose());
} }
} }

/**
* Gets the number of oversized records handled by this combiner.
*
* @return The number of oversized records handled by this combiner.
*/
public long getOversizedRecordCount() {
return oversizedRecordCount;
}
} }
Expand Up @@ -16,17 +16,18 @@
* limitations under the License. * limitations under the License.
*/ */



package org.apache.flink.runtime.operators; package org.apache.flink.runtime.operators;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;


import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.util.Collector;
import org.junit.Assert; import org.junit.Assert;
import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeutils.record.RecordComparator; import org.apache.flink.api.common.typeutils.record.RecordComparator;
import org.apache.flink.runtime.operators.CombineTaskTest.MockCombiningReduceStub;
import org.apache.flink.runtime.operators.testutils.DriverTestBase; import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.types.IntValue; import org.apache.flink.types.IntValue;
Expand All @@ -45,7 +46,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private final RecordComparator comparator = new RecordComparator( private final RecordComparator comparator = new RecordComparator(
new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class }); new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });


public CombineTaskExternalITCase(ExecutionConfig config) { public CombineTaskExternalITCase(ExecutionConfig config) {
super(config, COMBINE_MEM, 0); super(config, COMBINE_MEM, 0);
Expand Down Expand Up @@ -161,4 +162,84 @@ public void testMultiLevelMergeCombineTask() throws Exception {


this.outList.clear(); this.outList.clear();
} }

// ------------------------------------------------------------------------
// ------------------------------------------------------------------------

@ReduceOperator.Combinable
public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
private static final long serialVersionUID = 1L;

private final IntValue theInteger = new IntValue();

@Override
public void reduce(Iterable<Record> records, Collector<Record> out) {
Record element = null;
int sum = 0;

for (Record next : records) {
element = next;
element.getField(1, this.theInteger);

sum += this.theInteger.getValue();
}
this.theInteger.setValue(sum);
element.setField(1, this.theInteger);
out.collect(element);
}

@Override
public void combine(Iterable<Record> records, Collector<Record> out) throws Exception {
reduce(records, out);
}
}

@ReduceOperator.Combinable
public static final class MockFailingCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
private static final long serialVersionUID = 1L;

private int cnt = 0;

private final IntValue key = new IntValue();
private final IntValue value = new IntValue();
private final IntValue combineValue = new IntValue();

@Override
public void reduce(Iterable<Record> records, Collector<Record> out) {
Record element = null;
int sum = 0;

for (Record next : records) {
element = next;
element.getField(1, this.value);

sum += this.value.getValue();
}
element.getField(0, this.key);
this.value.setValue(sum - this.key.getValue());
element.setField(1, this.value);
out.collect(element);
}

@Override
public void combine(Iterable<Record> records, Collector<Record> out) {
Record element = null;
int sum = 0;

for (Record next : records) {
element = next;
element.getField(1, this.combineValue);

sum += this.combineValue.getValue();
}

if (++this.cnt >= 10) {
throw new ExpectedTestException();
}

this.combineValue.setValue(sum);
element.setField(1, this.combineValue);
out.collect(element);
}
}
} }

0 comments on commit 01c7433

Please sign in to comment.