Skip to content

Commit

Permalink
[FLINK-3340] [runtime] Fix object juggling in drivers
Browse files Browse the repository at this point in the history
A second attempt at object swapping in reduce drivers.

This closes #1626
  • Loading branch information
greghogan authored and rmetzger committed Feb 26, 2016
1 parent ee76fc4 commit fa8b001
Show file tree
Hide file tree
Showing 6 changed files with 484 additions and 41 deletions.
15 changes: 15 additions & 0 deletions flink-java/src/main/java/org/apache/flink/api/java/Utils.java
Expand Up @@ -192,6 +192,21 @@ public ChecksumHashCode clone() {
return new ChecksumHashCode(count, checksum);
}

@Override
public boolean equals(Object obj) {
if (obj instanceof ChecksumHashCode) {
ChecksumHashCode other = (ChecksumHashCode) obj;
return this.count == other.count && this.checksum == other.checksum;
} else {
return false;
}
}

@Override
public int hashCode() {
return (int) (this.count + this.hashCode());
}

@Override
public String toString() {
return "ChecksumHashCode " + this.checksum + ", count " + this.count;
Expand Down
Expand Up @@ -108,25 +108,32 @@ public void run() throws Exception {
final MutableObjectIterator<T> input = this.input;
final TypeSerializer<T> serializer = this.serializer;

T val1;
if ((val1 = input.next()) == null) {
return;
}

if (objectReuseEnabled) {
T val1 = serializer.createInstance();
// We only need two objects. The first reference stores results and is
// eventually collected. New values are read into the second.
T val2 = serializer.createInstance();

if ((val1 = input.next(val1)) == null) {
return;
}
T value = val1;

T val2 = serializer.createInstance();
while (running && (val2 = input.next(val2)) != null) {
val1 = stub.reduce(val1, val2);
value = stub.reduce(value, val2);

// we must never read into the object returned
// by the user, so swap the reuse objects,
if (value == val2) {
T tmp = val1;
val1 = val2;
val2 = tmp;
}
}

this.taskContext.getOutputCollector().collect(val1);
this.taskContext.getOutputCollector().collect(value);
} else {
T val1;
if ((val1 = input.next()) == null) {
return;
}

T val2;
while (running && (val2 = input.next()) != null) {
val1 = stub.reduce(val1, val2);
Expand Down
Expand Up @@ -200,26 +200,29 @@ private void sortAndCombine() throws Exception {
final MutableObjectIterator<T> input = sorter.getIterator();

if (objectReuseEnabled) {
// We only need two objects. The user function is expected to return
// the first input as the result. The output value is also expected
// to have the same key fields as the input elements.
// We only need two objects. The first reference stores results and is
// eventually collected. New values are read into the second.
//
// The output value must have the same key fields as the input values.

T reuse1 = serializer.createInstance();
T reuse1 = input.next();
T reuse2 = serializer.createInstance();

T value = input.next(reuse1);
T value = reuse1;

// iterate over key groups
while (this.running && value != null) {
comparator.setReference(value);
T res = value;

// iterate within a key group
while ((value = input.next(reuse2)) != null) {
if (comparator.equalToReference(value)) {
while ((reuse2 = input.next(reuse2)) != null) {
if (comparator.equalToReference(reuse2)) {
// same group, reduce
res = function.reduce(res, value);
if (res == reuse2) {
value = function.reduce(value, reuse2);

// we must never read into the object returned
// by the user, so swap the reuse objects
if (value == reuse2) {
T tmp = reuse1;
reuse1 = reuse2;
reuse2 = tmp;
Expand All @@ -230,11 +233,14 @@ private void sortAndCombine() throws Exception {
}
}

output.collect(res);
output.collect(value);

if (value != null) {
value = serializer.copy(value, reuse1);
}
// swap the value from the new key group into the first object
T tmp = reuse1;
reuse1 = reuse2;
reuse2 = tmp;

value = reuse1;
}
} else {
T value = input.next();
Expand Down
Expand Up @@ -116,26 +116,29 @@ public void run() throws Exception {
final Collector<T> output = this.taskContext.getOutputCollector();

if (objectReuseEnabled) {
// We only need two objects. The user function is expected to return
// the first input as the result. The output value is also expected
// to have the same key fields as the input elements.
// We only need two objects. The first reference stores results and is
// eventually collected. New values are read into the second.
//
// The output value must have the same key fields as the input values.

T reuse1 = serializer.createInstance();
T reuse1 = input.next();
T reuse2 = serializer.createInstance();

T value = input.next(reuse1);
T value = reuse1;

// iterate over key groups
while (this.running && value != null) {
comparator.setReference(value);
T res = value;

// iterate within a key group
while ((value = input.next(reuse2)) != null) {
if (comparator.equalToReference(value)) {
while ((reuse2 = input.next(reuse2)) != null) {
if (comparator.equalToReference(reuse2)) {
// same group, reduce
res = function.reduce(res, value);
if (res == reuse2) {
value = function.reduce(value, reuse2);

// we must never read into the object returned
// by the user, so swap the reuse objects
if (value == reuse2) {
T tmp = reuse1;
reuse1 = reuse2;
reuse2 = tmp;
Expand All @@ -146,11 +149,14 @@ public void run() throws Exception {
}
}

output.collect(res);
output.collect(value);

if (value != null) {
value = serializer.copy(value, reuse1);
}
// swap the value from the new key group into the first object
T tmp = reuse1;
reuse1 = reuse2;
reuse2 = tmp;

value = reuse1;
}
} else {
T value = input.next();
Expand Down
Expand Up @@ -44,7 +44,7 @@ protected void testProgram() throws Exception {
.reduce(new Reducer())
.collect();

Assert.assertEquals(result.get(0), Long.valueOf(3025));
Assert.assertEquals(Long.valueOf(3025), result.get(0));
}

public static class Mapper extends RichMapFunction<Long, Long> {
Expand Down

0 comments on commit fa8b001

Please sign in to comment.