Skip to content

Commit

Permalink
[FLINK-1916] [FLINK-2361] [runtime] Fix EOFException and entry loss i…
Browse files Browse the repository at this point in the history
…n CompactingHashTable

Also a lot of code cleanups in CompactingHashTable
  • Loading branch information
StephanEwen committed Aug 5, 2015
1 parent 5226f0b commit 925ac1f
Show file tree
Hide file tree
Showing 12 changed files with 564 additions and 409 deletions.
Expand Up @@ -20,9 +20,6 @@

import java.io.Serializable;

import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;


public class GenericPairComparator<T1, T2> extends TypePairComparator<T1, T2>
implements Serializable {
Expand Down
Expand Up @@ -20,7 +20,6 @@

import java.io.IOException;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
import org.apache.flink.util.Collector;

Expand All @@ -38,23 +37,20 @@ public class SolutionSetFastUpdateOutputCollector<T> implements Collector<T> {
private final Collector<T> delegate;

private final CompactingHashTable<T> solutionSet;

private final T tmpHolder;

public SolutionSetFastUpdateOutputCollector(CompactingHashTable<T> solutionSet, TypeSerializer<T> serializer) {
this(solutionSet, serializer, null);
public SolutionSetFastUpdateOutputCollector(CompactingHashTable<T> solutionSet) {
this(solutionSet, null);
}

public SolutionSetFastUpdateOutputCollector(CompactingHashTable<T> solutionSet, TypeSerializer<T> serializer, Collector<T> delegate) {
public SolutionSetFastUpdateOutputCollector(CompactingHashTable<T> solutionSet, Collector<T> delegate) {
this.solutionSet = solutionSet;
this.delegate = delegate;
this.tmpHolder = serializer.createInstance();
}

@Override
public void collect(T record) {
try {
solutionSet.insertOrReplaceRecord(record, tmpHolder);
solutionSet.insertOrReplaceRecord(record);
if (delegate != null) {
delegate.collect(record);
}
Expand Down
Expand Up @@ -16,12 +16,10 @@
* limitations under the License.
*/


package org.apache.flink.runtime.iterative.io;

import java.io.IOException;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
import org.apache.flink.util.Collector;

Expand All @@ -40,23 +38,20 @@ public class SolutionSetUpdateOutputCollector<T> implements Collector<T> {
private final Collector<T> delegate;

private final CompactingHashTable<T> solutionSet;

private final T tmpHolder;

public SolutionSetUpdateOutputCollector(CompactingHashTable<T> solutionSet, TypeSerializer<T> serializer) {
this(solutionSet, serializer, null);
public SolutionSetUpdateOutputCollector(CompactingHashTable<T> solutionSet) {
this(solutionSet, null);
}

public SolutionSetUpdateOutputCollector(CompactingHashTable<T> solutionSet, TypeSerializer<T> serializer, Collector<T> delegate) {
public SolutionSetUpdateOutputCollector(CompactingHashTable<T> solutionSet, Collector<T> delegate) {
this.solutionSet = solutionSet;
this.delegate = delegate;
this.tmpHolder = serializer.createInstance();
}

@Override
public void collect(T record) {
try {
solutionSet.insertOrReplaceRecord(record, tmpHolder);
solutionSet.insertOrReplaceRecord(record);
if (delegate != null) {
delegate.collect(record);
}
Expand Down
Expand Up @@ -329,8 +329,7 @@ protected Collector<OT> createSolutionSetUpdateOutputCollector(Collector<OT> del
if (ss instanceof CompactingHashTable) {
@SuppressWarnings("unchecked")
CompactingHashTable<OT> solutionSet = (CompactingHashTable<OT>) ss;
TypeSerializer<OT> serializer = getOutputSerializer();
return new SolutionSetUpdateOutputCollector<OT>(solutionSet, serializer, delegate);
return new SolutionSetUpdateOutputCollector<OT>(solutionSet, delegate);
}
else if (ss instanceof JoinHashMap) {
@SuppressWarnings("unchecked")
Expand Down
Expand Up @@ -61,15 +61,13 @@ public TypeComparator<T> getBuildSideComparator() {

public abstract void abort();

public abstract void buildTable(final MutableObjectIterator<T> input) throws IOException;

public abstract List<MemorySegment> getFreeMemory();

// ------------- Modifier -------------

public abstract void insert(T record) throws IOException;

public abstract void insertOrReplaceRecord(T record, T tempHolder) throws IOException;
public abstract void insertOrReplaceRecord(T record) throws IOException;

// ------------- Accessors -------------

Expand Down

0 comments on commit 925ac1f

Please sign in to comment.