Skip to content

Commit

Permalink
[FLINK-2724] [runtime] Fix object reuse in GroupReduceCombineDriver a…
Browse files Browse the repository at this point in the history
…nd ReduceCombineDriver
  • Loading branch information
StephanEwen committed Sep 21, 2015
1 parent 435ee4e commit f3809d8
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 61 deletions.
Expand Up @@ -157,29 +157,29 @@ public void run() throws Exception {
final MutableObjectIterator<IN> in = this.taskContext.getInput(0);
final TypeSerializer<IN> serializer = this.serializer;

IN value = serializer.createInstance();

while (running && (value = in.next(value)) != null) {

// try writing to the sorter first
if (this.sorter.write(value)) {
continue;
if (objectReuseEnabled) {
IN value = serializer.createInstance();

while (running && (value = in.next(value)) != null) {
// try writing to the sorter first
if (this.sorter.write(value)) {
continue;
}

// do the actual sorting, combining, and data writing
sortAndCombineAndRetryWrite(value);
}

// do the actual sorting, combining, and data writing
sortAndCombine();
this.sorter.reset();

// write the value again
if (!this.sorter.write(value)) {

++oversizedRecordCount;
LOG.debug("Cannot write record to fresh sort buffer, record is too large. " +
"Oversized record count: {}", oversizedRecordCount);

// simply forward the record. We need to pass it through the combine function to convert it
Iterable<IN> input = Collections.singleton(value);
this.combiner.combine(input, this.output);
}
else {
IN value;
while (running && (value = in.next()) != null) {
// try writing to the sorter first
if (this.sorter.write(value)) {
continue;
}

// do the actual sorting, combining, and data writing
sortAndCombineAndRetryWrite(value);
}
}

Expand Down Expand Up @@ -214,6 +214,24 @@ private void sortAndCombine() throws Exception {
}
}
}

private void sortAndCombineAndRetryWrite(IN value) throws Exception {
sortAndCombine();
this.sorter.reset();

// write the value again
if (!this.sorter.write(value)) {

++oversizedRecordCount;
LOG.debug("Cannot write record to fresh sort buffer, record is too large. " +
"Oversized record count: {}", oversizedRecordCount);

// simply forward the record. We need to pass it through the combine function to convert it
Iterable<IN> input = Collections.singleton(value);
this.combiner.combine(input, this.output);
this.sorter.reset();
}
}

@Override
public void cleanup() throws Exception {
Expand Down
Expand Up @@ -143,22 +143,43 @@ public void run() throws Exception {
final MutableObjectIterator<T> in = this.taskContext.getInput(0);
final TypeSerializer<T> serializer = this.serializer;

T value = serializer.createInstance();
if (objectReuseEnabled) {
T value = serializer.createInstance();

while (running && (value = in.next(value)) != null) {

// try writing to the sorter first
if (this.sorter.write(value)) {
continue;
while (running && (value = in.next(value)) != null) {

// try writing to the sorter first
if (this.sorter.write(value)) {
continue;
}

// do the actual sorting, combining, and data writing
sortAndCombine();
this.sorter.reset();

// write the value again
if (!this.sorter.write(value)) {
throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
}
}

// do the actual sorting, combining, and data writing
sortAndCombine();
this.sorter.reset();

// write the value again
if (!this.sorter.write(value)) {
throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
}
else {
T value;
while (running && (value = in.next()) != null) {

// try writing to the sorter first
if (this.sorter.write(value)) {
continue;
}

// do the actual sorting, combining, and data writing
sortAndCombine();
this.sorter.reset();

// write the value again
if (!this.sorter.write(value)) {
throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
}
}
}

Expand All @@ -174,11 +195,8 @@ private void sortAndCombine() throws Exception {

final TypeSerializer<T> serializer = this.serializer;
final TypeComparator<T> comparator = this.comparator;

final ReduceFunction<T> function = this.reducer;

final Collector<T> output = this.output;

final MutableObjectIterator<T> input = sorter.getIterator();

if (objectReuseEnabled) {
Expand Down Expand Up @@ -214,15 +232,15 @@ private void sortAndCombine() throws Exception {
}
}
} else {
T value = input.next(serializer.createInstance());
T value = input.next();

// iterate over key groups
while (this.running && value != null) {
comparator.setReference(value);
T res = value;

// iterate within a key group
while ((value = input.next(serializer.createInstance())) != null) {
while ((value = input.next()) != null) {
if (comparator.equalToReference(value)) {
// same group, reduce
res = function.reduce(res, value);
Expand Down
Expand Up @@ -48,7 +48,7 @@
*
* Acts like a combiner with a custom output type OUT.
*
* Sorting and reducing of the elements is performed invididually for each partition without data exchange. This may
* Sorting and reducing of the elements is performed individually for each partition without data exchange. This may
* lead to a partial group reduce.
*
* @param <IN> The data type consumed
Expand All @@ -58,9 +58,7 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {

private static final Logger LOG = LoggerFactory.getLogger(GroupCombineChainedDriver.class);

/**
* Fix length records with a length below this threshold will be in-place sorted, if possible.
*/
/** Fix length records with a length below this threshold will be in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -174,23 +172,16 @@ public void collect(IN record) {
if (this.sorter.write(record)) {
return;
}
} catch (IOException e) {
throw new ExceptionInChainedStubException(this.taskName, e);
}

// do the actual sorting
try {
// do the actual sorting
sortAndReduce();
} catch (Exception e) {
throw new ExceptionInChainedStubException(this.taskName, e);
}
this.sorter.reset();

try {
this.sorter.reset();

if (!this.sorter.write(record)) {
throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
}
} catch (IOException e) {
}
catch (Exception e) {
throw new ExceptionInChainedStubException(this.taskName, e);
}
}
Expand All @@ -215,9 +206,9 @@ private void sortAndReduce() throws Exception {
if (!sorter.isEmpty()) {
this.sortAlgo.sort(sorter);
// run the reducer
final ReusingKeyGroupedIterator<IN> keyIter = new ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, this.groupingComparator);


final ReusingKeyGroupedIterator<IN> keyIter = new ReusingKeyGroupedIterator<IN>(
sorter.getIterator(), this.serializer, this.groupingComparator);
// cache references on the stack
final GroupReduceFunction<IN, OUT> stub = this.reducer;
final Collector<OUT> output = this.outputCollector;
Expand All @@ -231,7 +222,8 @@ private void sortAndReduce() throws Exception {
if (!sorter.isEmpty()) {
this.sortAlgo.sort(sorter);
// run the reducer
final NonReusingKeyGroupedIterator<IN> keyIter = new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
final NonReusingKeyGroupedIterator<IN> keyIter = new NonReusingKeyGroupedIterator<IN>(
sorter.getIterator(), this.groupingComparator);


// cache references on the stack
Expand Down

0 comments on commit f3809d8

Please sign in to comment.