Skip to content

Commit

Permalink
[FLINK-6009] [java api] Deprecate DataSetUtils#checksumHashCode
Browse files Browse the repository at this point in the history
This is likely only used by Gelly and we have a more featureful
implementation allowing for multiple outputs and setting the job name.
Deprecation will allow this to be removed in Flink 2.0.

This closes #3516
  • Loading branch information
greghogan authored and StephanEwen committed Mar 16, 2017
1 parent 980d072 commit 40a156e
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 46 deletions.
Expand Up @@ -344,7 +344,9 @@ public TupleSummaryAggregator<R> reduce(TupleSummaryAggregator<R> agg1, TupleSum
* as well as the checksum (sum over element hashes). * as well as the checksum (sum over element hashes).
* *
* @return A ChecksumHashCode that represents the count and checksum of elements in the data set. * @return A ChecksumHashCode that represents the count and checksum of elements in the data set.
* @deprecated replaced with {@code org.apache.flink.graph.asm.dataset.ChecksumHashCode} in Gelly
*/ */
@Deprecated
public static <T> Utils.ChecksumHashCode checksumHashCode(DataSet<T> input) throws Exception { public static <T> Utils.ChecksumHashCode checksumHashCode(DataSet<T> input) throws Exception {
final String id = new AbstractID().toString(); final String id = new AbstractID().toString();


Expand Down
Expand Up @@ -25,19 +25,22 @@
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.Utils.ChecksumHashCode;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.types.IntValue; import org.apache.flink.types.IntValue;
import org.junit.Assert; import org.junit.Assert;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.io.Serializable; import java.io.Serializable;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Random; import java.util.Random;


import static org.hamcrest.Matchers.is;

/* /*
* These programs demonstrate the effects of user defined functions which modify input objects or return locally created * These programs demonstrate the effects of user defined functions which modify input objects or return locally created
* objects that are retained and reused on future calls. The programs do not retain and later modify input objects. * objects that are retained and reused on future calls. The programs do not retain and later modify input objects.
Expand All @@ -47,15 +50,17 @@ public class OverwriteObjects {
public final static Logger LOG = LoggerFactory.getLogger(OverwriteObjects.class); public final static Logger LOG = LoggerFactory.getLogger(OverwriteObjects.class);


// DataSets are created with this number of elements // DataSets are created with this number of elements
private static final int NUMBER_OF_ELEMENTS = 3 * 1000 * 1000; private static final int NUMBER_OF_ELEMENTS = 3_000_000;


// DataSet values are randomly generated over this range // DataSet values are randomly generated over this range
private static final int KEY_RANGE = 1 * 1000 * 1000; private static final int KEY_RANGE = 1_000_000;


private static final int MAX_PARALLELISM = 4; private static final int MAX_PARALLELISM = 4;


private static final long RANDOM_SEED = new Random().nextLong(); private static final long RANDOM_SEED = new Random().nextLong();


private static final Tuple2Comparator<IntValue, IntValue> comparator = new Tuple2Comparator<>();

public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
new OverwriteObjects().run(); new OverwriteObjects().run();
} }
Expand Down Expand Up @@ -116,17 +121,23 @@ public void testGroupedReduce(ExecutionEnvironment env) throws Exception {


env.getConfig().enableObjectReuse(); env.getConfig().enableObjectReuse();


ChecksumHashCode enabledChecksum = DataSetUtils.checksumHashCode(getDataSet(env) List<Tuple2<IntValue, IntValue>> enabledResult = getDataSet(env)
.groupBy(0) .groupBy(0)
.reduce(new OverwriteObjectsReduce(true))); .reduce(new OverwriteObjectsReduce(true))
.collect();

Collections.sort(enabledResult, comparator);


env.getConfig().disableObjectReuse(); env.getConfig().disableObjectReuse();


ChecksumHashCode disabledChecksum = DataSetUtils.checksumHashCode(getDataSet(env) List<Tuple2<IntValue, IntValue>> disabledResult = getDataSet(env)
.groupBy(0) .groupBy(0)
.reduce(new OverwriteObjectsReduce(true))); .reduce(new OverwriteObjectsReduce(true))
.collect();


Assert.assertEquals(disabledChecksum, enabledChecksum); Collections.sort(disabledResult, comparator);

Assert.assertThat(disabledResult, is(enabledResult));
} }


private class OverwriteObjectsReduce implements ReduceFunction<Tuple2<IntValue, IntValue>> { private class OverwriteObjectsReduce implements ReduceFunction<Tuple2<IntValue, IntValue>> {
Expand Down Expand Up @@ -154,31 +165,37 @@ public void testJoin(ExecutionEnvironment env) throws Exception {
continue; continue;
} }


ChecksumHashCode enabledChecksum; List<Tuple2<IntValue, IntValue>> enabledResult;


ChecksumHashCode disabledChecksum; List<Tuple2<IntValue, IntValue>> disabledResult;


// Inner join // Inner join


LOG.info("Testing inner join with JoinHint = {}", joinHint); LOG.info("Testing inner join with JoinHint = {}", joinHint);


env.getConfig().enableObjectReuse(); env.getConfig().enableObjectReuse();


enabledChecksum = DataSetUtils.checksumHashCode(getDataSet(env) enabledResult = getDataSet(env)
.join(getDataSet(env), joinHint) .join(getDataSet(env), joinHint)
.where(0) .where(0)
.equalTo(0) .equalTo(0)
.with(new OverwriteObjectsJoin())); .with(new OverwriteObjectsJoin())
.collect();

Collections.sort(enabledResult, comparator);


env.getConfig().disableObjectReuse(); env.getConfig().disableObjectReuse();


disabledChecksum = DataSetUtils.checksumHashCode(getDataSet(env) disabledResult = getDataSet(env)
.join(getDataSet(env), joinHint) .join(getDataSet(env), joinHint)
.where(0) .where(0)
.equalTo(0) .equalTo(0)
.with(new OverwriteObjectsJoin())); .with(new OverwriteObjectsJoin())
.collect();


Assert.assertEquals("JoinHint=" + joinHint, disabledChecksum, enabledChecksum); Collections.sort(disabledResult, comparator);

Assert.assertEquals("JoinHint=" + joinHint, disabledResult, enabledResult);


// Left outer join // Left outer join


Expand All @@ -187,21 +204,27 @@ public void testJoin(ExecutionEnvironment env) throws Exception {


env.getConfig().enableObjectReuse(); env.getConfig().enableObjectReuse();


enabledChecksum = DataSetUtils.checksumHashCode(getDataSet(env) enabledResult = getDataSet(env)
.leftOuterJoin(getFilteredDataSet(env), joinHint) .leftOuterJoin(getFilteredDataSet(env), joinHint)
.where(0) .where(0)
.equalTo(0) .equalTo(0)
.with(new OverwriteObjectsJoin())); .with(new OverwriteObjectsJoin())
.collect();

Collections.sort(enabledResult, comparator);


env.getConfig().disableObjectReuse(); env.getConfig().disableObjectReuse();


disabledChecksum = DataSetUtils.checksumHashCode(getDataSet(env) disabledResult = getDataSet(env)
.leftOuterJoin(getFilteredDataSet(env), joinHint) .leftOuterJoin(getFilteredDataSet(env), joinHint)
.where(0) .where(0)
.equalTo(0) .equalTo(0)
.with(new OverwriteObjectsJoin())); .with(new OverwriteObjectsJoin())
.collect();

Collections.sort(disabledResult, comparator);


Assert.assertEquals("JoinHint=" + joinHint, disabledChecksum, enabledChecksum); Assert.assertThat("JoinHint=" + joinHint, disabledResult, is(enabledResult));
} }


// Right outer join // Right outer join
Expand All @@ -211,21 +234,27 @@ public void testJoin(ExecutionEnvironment env) throws Exception {


env.getConfig().enableObjectReuse(); env.getConfig().enableObjectReuse();


enabledChecksum = DataSetUtils.checksumHashCode(getDataSet(env) enabledResult = getDataSet(env)
.rightOuterJoin(getFilteredDataSet(env), joinHint) .rightOuterJoin(getFilteredDataSet(env), joinHint)
.where(0) .where(0)
.equalTo(0) .equalTo(0)
.with(new OverwriteObjectsJoin())); .with(new OverwriteObjectsJoin())
.collect();

Collections.sort(enabledResult, comparator);


env.getConfig().disableObjectReuse(); env.getConfig().disableObjectReuse();


disabledChecksum = DataSetUtils.checksumHashCode(getDataSet(env) disabledResult = getDataSet(env)
.rightOuterJoin(getFilteredDataSet(env), joinHint) .rightOuterJoin(getFilteredDataSet(env), joinHint)
.where(0) .where(0)
.equalTo(0) .equalTo(0)
.with(new OverwriteObjectsJoin())); .with(new OverwriteObjectsJoin())
.collect();

Collections.sort(disabledResult, comparator);


Assert.assertEquals("JoinHint=" + joinHint, disabledChecksum, enabledChecksum); Assert.assertThat("JoinHint=" + joinHint, disabledResult, is(enabledResult));
} }


// Full outer join // Full outer join
Expand All @@ -235,21 +264,27 @@ public void testJoin(ExecutionEnvironment env) throws Exception {


env.getConfig().enableObjectReuse(); env.getConfig().enableObjectReuse();


enabledChecksum = DataSetUtils.checksumHashCode(getDataSet(env) enabledResult = getDataSet(env)
.fullOuterJoin(getFilteredDataSet(env), joinHint) .fullOuterJoin(getFilteredDataSet(env), joinHint)
.where(0) .where(0)
.equalTo(0) .equalTo(0)
.with(new OverwriteObjectsJoin())); .with(new OverwriteObjectsJoin())
.collect();

Collections.sort(enabledResult, comparator);


env.getConfig().disableObjectReuse(); env.getConfig().disableObjectReuse();


disabledChecksum = DataSetUtils.checksumHashCode(getDataSet(env) disabledResult = getDataSet(env)
.fullOuterJoin(getFilteredDataSet(env), joinHint) .fullOuterJoin(getFilteredDataSet(env), joinHint)
.where(0) .where(0)
.equalTo(0) .equalTo(0)
.with(new OverwriteObjectsJoin())); .with(new OverwriteObjectsJoin())
.collect();

Collections.sort(disabledResult, comparator);


Assert.assertEquals("JoinHint=" + joinHint, disabledChecksum, enabledChecksum); Assert.assertThat("JoinHint=" + joinHint, disabledResult, is(enabledResult));
} }
} }
} }
Expand Down Expand Up @@ -279,32 +314,37 @@ public void testCross(ExecutionEnvironment env) throws Exception {


env.getConfig().enableObjectReuse(); env.getConfig().enableObjectReuse();


ChecksumHashCode enabledChecksumWithHuge = DataSetUtils.checksumHashCode(small List<Tuple2<IntValue, IntValue>> enabledResultWithHuge = small
.crossWithHuge(large) .crossWithHuge(large)
.with(new OverwriteObjectsCross())); .with(new OverwriteObjectsCross())
.collect();


ChecksumHashCode enabledChecksumWithTiny = DataSetUtils.checksumHashCode(small List<Tuple2<IntValue, IntValue>> enabledResultWithTiny = small
.crossWithTiny(large) .crossWithTiny(large)
.with(new OverwriteObjectsCross())); .with(new OverwriteObjectsCross())
.collect();


Assert.assertEquals(enabledChecksumWithHuge, enabledChecksumWithTiny); Assert.assertThat(enabledResultWithHuge, is(enabledResultWithTiny));


// test NESTEDLOOP_BLOCKED_OUTER_FIRST and NESTEDLOOP_BLOCKED_OUTER_SECOND with object reuse disabled // test NESTEDLOOP_BLOCKED_OUTER_FIRST and NESTEDLOOP_BLOCKED_OUTER_SECOND with object reuse disabled


env.getConfig().disableObjectReuse(); env.getConfig().disableObjectReuse();


ChecksumHashCode disabledChecksumWithHuge = DataSetUtils.checksumHashCode(small List<Tuple2<IntValue, IntValue>> disabledResultWithHuge = small
.crossWithHuge(large) .crossWithHuge(large)
.with(new OverwriteObjectsCross())); .with(new OverwriteObjectsCross())
.collect();


ChecksumHashCode disabledChecksumWithTiny = DataSetUtils.checksumHashCode(small List<Tuple2<IntValue, IntValue>> disabledResultWithTiny = small
.crossWithTiny(large) .crossWithTiny(large)
.with(new OverwriteObjectsCross())); .with(new OverwriteObjectsCross())
.collect();


Assert.assertEquals(disabledChecksumWithHuge, disabledChecksumWithTiny); Assert.assertThat(disabledResultWithHuge, is(disabledResultWithTiny));


// verify that checksums match between object reuse enabled and disabled // verify match between object reuse enabled and disabled
Assert.assertEquals(enabledChecksumWithHuge, disabledChecksumWithHuge); Assert.assertThat(disabledResultWithHuge, is(enabledResultWithHuge));
Assert.assertThat(disabledResultWithTiny, is(enabledResultWithTiny));
} }


private class OverwriteObjectsCross implements CrossFunction<Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>> { private class OverwriteObjectsCross implements CrossFunction<Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>> {
Expand Down Expand Up @@ -338,8 +378,7 @@ public boolean filter(Tuple2<IntValue, IntValue> value) throws Exception {
}); });
} }


private static final class TupleIntValueIntValueIterator implements Iterator<Tuple2<IntValue, IntValue>>, Serializable { private static class TupleIntValueIntValueIterator implements Iterator<Tuple2<IntValue, IntValue>>, Serializable {

private int numElements; private int numElements;
private final int keyRange; private final int keyRange;
private Tuple2<IntValue, IntValue> ret = new Tuple2<>(new IntValue(), new IntValue()); private Tuple2<IntValue, IntValue> ret = new Tuple2<>(new IntValue(), new IntValue());
Expand Down Expand Up @@ -370,9 +409,23 @@ public void remove() {
} }
} }


private static class Tuple2Comparator<T0 extends Comparable<T0>, T1 extends Comparable<T1>>
implements Comparator<Tuple2<T0, T1>> {
@Override
public int compare(Tuple2<T0, T1> o1, Tuple2<T0, T1> o2) {
int cmp = o1.f0.compareTo(o2.f0);

if (cmp != 0) {
return cmp;
}

return o1.f1.compareTo(o2.f1);
}
}

// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------


private static final class Scrambler implements Serializable { private static class Scrambler implements Serializable {
private Tuple2<IntValue, IntValue> d = new Tuple2<>(new IntValue(), new IntValue()); private Tuple2<IntValue, IntValue> d = new Tuple2<>(new IntValue(), new IntValue());


private final boolean keyed; private final boolean keyed;
Expand Down

0 comments on commit 40a156e

Please sign in to comment.