From 65f54df532829994a8be240a27b9138d01a186b5 Mon Sep 17 00:00:00 2001 From: Todd Lisonbee Date: Mon, 4 Apr 2016 22:51:12 -0700 Subject: [PATCH 1/3] [FLINK-3664] Create DataSetUtils method to easily summarize a DataSet of Tuples --- .../java/summarize/BooleanColumnSummary.java | 68 +++++++ .../api/java/summarize/ColumnSummary.java | 60 ++++++ .../java/summarize/NumericColumnSummary.java | 174 +++++++++++++++++ .../java/summarize/ObjectColumnSummary.java | 58 ++++++ .../java/summarize/StringColumnSummary.java | 92 +++++++++ .../summarize/aggregation/Aggregator.java | 44 +++++ .../aggregation/BooleanSummaryAggregator.java | 56 ++++++ .../summarize/aggregation/CompensatedSum.java | 89 +++++++++ .../aggregation/DoubleSummaryAggregator.java | 118 ++++++++++++ .../aggregation/FloatSummaryAggregator.java | 120 ++++++++++++ .../aggregation/IntegerSummaryAggregator.java | 120 ++++++++++++ .../aggregation/LongSummaryAggregator.java | 120 ++++++++++++ .../aggregation/NumericSummaryAggregator.java | 165 ++++++++++++++++ .../aggregation/ObjectSummaryAggregator.java | 51 +++++ .../aggregation/ShortSummaryAggregator.java | 133 +++++++++++++ .../aggregation/StringSummaryAggregator.java | 93 +++++++++ .../aggregation/SummaryAggregatorFactory.java | 103 ++++++++++ .../aggregation/TupleSummaryAggregator.java | 72 +++++++ .../aggregation/ValueSummaryAggregator.java | 177 ++++++++++++++++++ .../flink/api/java/utils/DataSetUtils.java | 50 +++++ .../aggregation/AggregateCombineHarness.java | 111 +++++++++++ .../BooleanSummaryAggregatorTest.java | 86 +++++++++ .../BooleanValueSummaryAggregatorTest.java | 52 +++++ .../aggregation/CompensatedSumTest.java | 78 ++++++++ .../DoubleSummaryAggregatorTest.java | 176 +++++++++++++++++ .../FloatSummaryAggregatorTest.java | 166 ++++++++++++++++ .../FloatValueSummaryAggregatorTest.java | 56 ++++++ .../IntegerSummaryAggregatorTest.java | 117 ++++++++++++ .../IntegerValueSummaryAggregatorTest.java | 61 ++++++ .../LongSummaryAggregatorTest.java | 114 +++++++++++ .../LongValueSummaryAggregatorTest.java | 65 +++++++ .../ShortSummaryAggregatorTest.java | 126 +++++++++++++ .../ShortValueSummaryAggregatorTest.java | 63 +++++++ .../StringSummaryAggregatorTest.java | 92 +++++++++ .../StringValueSummaryAggregatorTest.java | 64 +++++++ .../SummaryAggregatorFactoryTest.java | 55 ++++++ .../flink/test/util/DataSetUtilsITCase.java | 79 ++++++++ 37 files changed, 3524 insertions(+) create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/summarize/BooleanColumnSummary.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/summarize/ColumnSummary.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/summarize/NumericColumnSummary.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/summarize/ObjectColumnSummary.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/summarize/StringColumnSummary.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/Aggregator.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/BooleanSummaryAggregator.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSum.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregator.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregator.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/IntegerSummaryAggregator.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/LongSummaryAggregator.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/NumericSummaryAggregator.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/ObjectSummaryAggregator.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/ShortSummaryAggregator.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/StringSummaryAggregator.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/SummaryAggregatorFactory.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/TupleSummaryAggregator.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/ValueSummaryAggregator.java create mode 100644 flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/AggregateCombineHarness.java create mode 100644 flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/BooleanSummaryAggregatorTest.java create mode 100644 flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/BooleanValueSummaryAggregatorTest.java create mode 100644 flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSumTest.java create mode 100644 flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregatorTest.java create mode 100644 flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregatorTest.java create mode 100644 flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatValueSummaryAggregatorTest.java create mode 100644 flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerSummaryAggregatorTest.java create mode 100644 flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerValueSummaryAggregatorTest.java create mode 100644 flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongSummaryAggregatorTest.java create mode 100644 flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongValueSummaryAggregatorTest.java create mode 100644 flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortSummaryAggregatorTest.java create mode 100644 flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortValueSummaryAggregatorTest.java create mode 100644 flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringSummaryAggregatorTest.java create mode 100644 flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringValueSummaryAggregatorTest.java create mode 100644 flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/SummaryAggregatorFactoryTest.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/BooleanColumnSummary.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/BooleanColumnSummary.java new file mode 100644 index 0000000000000..944808f23059f --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/BooleanColumnSummary.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Summary for a column of booleans + */ +@PublicEvolving +public class BooleanColumnSummary extends ColumnSummary { + + private long trueCount; + private long falseCount; + private long nullCount; + + public BooleanColumnSummary(long trueCount, long falseCount, long nullCount) { + this.trueCount = trueCount; + this.falseCount = falseCount; + this.nullCount = nullCount; + } + + public long getTrueCount() { + return trueCount; + } + + public long getFalseCount() { + return falseCount; + } + + /** + * The number of non-null values in this column + */ + @Override + public long getNonNullCount() { + return trueCount + falseCount; + } + + public long getNullCount() { + return nullCount; + } + + @Override + public String toString() { + return "BooleanColumnSummary{" + + "totalCount=" + getTotalCount() + + ", trueCount=" + trueCount + + ", falseCount=" + falseCount + + ", nullCount=" + nullCount + + '}'; + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/ColumnSummary.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/ColumnSummary.java new file mode 100644 index 0000000000000..b73d60cba463b --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/ColumnSummary.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Summary for a column of values + */ +@PublicEvolving +public abstract class ColumnSummary { + + /** + * The number of all rows in this column including both nulls and non-nulls + */ + public long getTotalCount() { + return getNullCount() + getNonNullCount(); + } + + /** + * The number of non-null values in this column + */ + public abstract long getNonNullCount(); + + /** + * The number of null values in this column + */ + public abstract long getNullCount(); + + /** + * True if this column contains any null values + */ + public boolean containsNull() { + return getNullCount() > 0L; + } + + /** + * True if this column contains any non-null values + */ + public boolean containsNonNull() { + return getNonNullCount() > 0L; + } + +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/NumericColumnSummary.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/NumericColumnSummary.java new file mode 100644 index 0000000000000..fd5342647e0b6 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/NumericColumnSummary.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Generic Column Summary for Numeric Types. + * + * Some values are considered "missing" where "missing" is defined as null, NaN, or Infinity. + * These values are ignored in some calculations like mean, variance, and standardDeviation. + * + * Uses the Kahan summation algorithm to avoid numeric instability when computing variance. + * The algorithm is described in: "Scalable and Numerically Stable Descriptive Statistics in SystemML", + * Tian et al, International Conference on Data Engineering 2012. + * + * @param the numeric type e.g. Integer, Double + */ +@PublicEvolving +public class NumericColumnSummary extends ColumnSummary implements java.io.Serializable { + + private static final long serialVersionUID = 1L; + + private final long nonMissingCount; // count of elements that are NOT null, NaN, or Infinite + private final long nullCount; + private final long nanCount; // always zero for types like Short, Integer, Long + private final long infinityCount; // always zero for types like Short, Integer, Long + + private final T min; + private final T max; + private final T sum; + + private final Double mean; + private final Double variance; + private final Double standardDeviation; + + public NumericColumnSummary(long nonMissingCount, long nullCount, long nanCount, long infinityCount, T min, T max, T sum, Double mean, Double variance, Double standardDeviation) { + this.nonMissingCount = nonMissingCount; + this.nullCount = nullCount; + this.nanCount = nanCount; + this.infinityCount = infinityCount; + this.min = min; + this.max = max; + this.sum = sum; + this.mean = mean; + this.variance = variance; + this.standardDeviation = standardDeviation; + } + + /** + * The number of "missing" values where "missing" is defined as null, NaN, or Infinity. + * + * These values are ignored in some calculations like mean, variance, and standardDeviation. + */ + public long getMissingCount() { + return nullCount + nanCount + infinityCount; + } + + /** + * The number of values that are not null, NaN, or Infinity. + */ + public long getNonMissingCount() { + return nonMissingCount; + } + + /** + * The number of non-null values in this column + */ + @Override + public long getNonNullCount() { + return nonMissingCount + nanCount + infinityCount; + } + + @Override + public long getNullCount() { + return nullCount; + } + + /** + * Number of values that are NaN. + * + * (always zero for types like Short, Integer, Long) + */ + public long getNanCount() { + return nanCount; + } + + /** + * Number of values that are positive or negative infinity. + * + * (always zero for types like Short, Integer, Long) + */ + public long getInfinityCount() { + return infinityCount; + } + + public T getMin() { + return min; + } + + public T getMax() { + return max; + } + + public T getSum() { + return sum; + } + + /** + * Null, NaN, and Infinite values are ignored in this calculation. + * + * @see Arithmetic Mean + */ + public Double getMean() { + return mean; + } + + /** + * Variance is a measure of how far a set of numbers are spread out. + * + * Null, NaN, and Infinite values are ignored in this calculation. + * + * @see Variance + */ + public Double getVariance() { + return variance; + } + + /** + * Standard Deviation is a measure of variation in a set of numbers. It is the square root of the variance. + * + * Null, NaN, and Infinite values are ignored in this calculation. + * + * @see Standard Deviation + */ + public Double getStandardDeviation() { + return standardDeviation; + } + + @Override + public String toString() { + return "NumericColumnSummary{" + + "totalCount=" + getTotalCount() + + ", nullCount=" + nullCount + + ", nonNullCount=" + getNonNullCount() + + ", missingCount=" + getMissingCount() + + ", nonMissingCount=" + nonMissingCount + + ", nanCount=" + nanCount + + ", infinityCount=" + infinityCount + + ", min=" + min + + ", max=" + max + + ", sum=" + sum + + ", mean=" + mean + + ", variance=" + variance + + ", standardDeviation=" + standardDeviation + + '}'; + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/ObjectColumnSummary.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/ObjectColumnSummary.java new file mode 100644 index 0000000000000..42d9ae3c89757 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/ObjectColumnSummary.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Summary for a column of generic Objects (this is a fallback for unsupported types). + */ +@PublicEvolving +public class ObjectColumnSummary extends ColumnSummary { + + private long notNullCount; + private long nullCount; + + public ObjectColumnSummary(long notNullCount, long nullCount) { + this.notNullCount = notNullCount; + this.nullCount = nullCount; + } + + /** + * The number of non-null values in this column + */ + @Override + public long getNonNullCount() { + return 0; + } + + @Override + public long getNullCount() { + return nullCount; + } + + @Override + public String toString() { + return "ObjectColumnSummary{" + + "totalCount=" + getTotalCount() + + ", notNullCount=" + notNullCount + + ", nullCount=" + nullCount + + '}'; + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/StringColumnSummary.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/StringColumnSummary.java new file mode 100644 index 0000000000000..98840daf7108f --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/StringColumnSummary.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Summary for a column of Strings + */ +@PublicEvolving +public class StringColumnSummary extends ColumnSummary { + + private long nonNullCount; + private long nullCount; + private long emptyCount; + private Integer minLength; + private Integer maxLength; + private Double meanLength; + + public StringColumnSummary(long nonNullCount, long nullCount, long emptyCount, Integer minLength, Integer maxLength, Double meanLength) { + this.nonNullCount = nonNullCount; + this.nullCount = nullCount; + this.emptyCount = emptyCount; + this.minLength = minLength; + this.maxLength = maxLength; + this.meanLength = meanLength; + } + + @Override + public long getNonNullCount() { + return nonNullCount; + } + + @Override + public long getNullCount() { + return nullCount; + } + + /** + * Number of empty strings e.g. java.lang.String.isEmpty() + */ + public long getEmptyCount() { + return emptyCount; + } + + /** + * Shortest String length + */ + public Integer getMinLength() { + return minLength; + } + + /** + * Longest String length + */ + public Integer getMaxLength() { + return maxLength; + } + + public Double getMeanLength() { + return meanLength; + } + + @Override + public String toString() { + return "StringColumnSummary{" + + "totalCount=" + getTotalCount() + + ", nonNullCount=" + nonNullCount + + ", nullCount=" + nullCount + + ", emptyCount=" + emptyCount + + ", minLength=" + minLength + + ", maxLength=" + maxLength + + ", meanLength=" + meanLength + + '}'; + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/Aggregator.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/Aggregator.java new file mode 100644 index 0000000000000..2ece022dd6021 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/Aggregator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.annotation.Internal; + +/** + * Generic interface for aggregation + * + * @param the type to be aggregated + * @param the result type of the aggregation + */ +@Internal +public interface Aggregator extends java.io.Serializable { + + /** Add a value to the current aggregation */ + void aggregate(T value); + + /** + * Combine two aggregations of the same type. + * + * (Implementations will need to do an unchecked cast). + */ + void combine(Aggregator otherSameType); + + /** Provide the final result of the aggregation */ + R result(); +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/BooleanSummaryAggregator.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/BooleanSummaryAggregator.java new file mode 100644 index 0000000000000..bc3022e24f0db --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/BooleanSummaryAggregator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.summarize.BooleanColumnSummary; + +@Internal +public class BooleanSummaryAggregator implements Aggregator { + + private long trueCount = 0L; + private long falseCount = 0L; + private long nullCount = 0L; + + @Override + public void aggregate(Boolean value) { + if (value == null) { + nullCount++; + } + else if (value) { + trueCount++; + } + else { + falseCount++; + } + } + + @Override + public void combine(Aggregator otherSameType) { + BooleanSummaryAggregator other = (BooleanSummaryAggregator) otherSameType; + trueCount += other.trueCount; + falseCount += other.falseCount; + nullCount += other.nullCount; + } + + @Override + public BooleanColumnSummary result() { + return new BooleanColumnSummary(trueCount,falseCount,nullCount); + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSum.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSum.java new file mode 100644 index 0000000000000..5693b384bf551 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSum.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.annotation.Internal; + +/** + * Used to calculate sums using the Kahan summation algorithm + * + * The Kahan summation algorithm (also known as compensated summation) reduces the numerical errors that + * occur when adding a sequence of finite precision floating point numbers. Numerical errors arise due to + * truncation and rounding. These errors can lead to numerical instability. + * + * @see Kahan Summation Algorithm + */ +@Internal +public class CompensatedSum implements java.io.Serializable { + + private static final double NO_CORRECTION = 0.0; + public static final CompensatedSum ZERO = new CompensatedSum(0.0, NO_CORRECTION); + + private static final long serialVersionUID = 1L; + + private final double value; + private final double delta; + + /** + * + * @param value the sum + * @param delta correction term + */ + public CompensatedSum(double value, double delta) { + this.value = value; + this.delta = delta; + } + + public double value() { + return value; + } + + /** + * The correction term + */ + public double delta() { + return delta; + } + + /** + * Increments the Kahan sum by adding a value and a correction term + */ + public CompensatedSum add(double value, double delta) { + return add(new CompensatedSum(value, delta)); + } + + /** + * Increments the Kahan sum by adding a value without a correction term + */ + public CompensatedSum add(double value) { + return add(new CompensatedSum(value, NO_CORRECTION)); + } + + /** + * Increments the Kahan sum by adding two sums, and updating the correction term for reducing numeric errors + */ + public CompensatedSum add(CompensatedSum other) { + double correctedSum = other.value() + (delta + other.delta()); + double updatedValue = value + correctedSum; + double updatedDelta = correctedSum - (updatedValue - value); + return new CompensatedSum(updatedValue, updatedDelta); + } + +} + diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregator.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregator.java new file mode 100644 index 0000000000000..d89496b57e6cc --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregator.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +import org.apache.flink.annotation.Internal; + +/** + * Aggregator that can handle Double types + */ +@Internal +public class DoubleSummaryAggregator extends NumericSummaryAggregator { + + // Nested classes are only "public static" for Kryo serialization, otherwise they'd be private + + public static class MinDoubleAggregator implements Aggregator { + + private double min = Double.MAX_VALUE; + + @Override + public void aggregate(Double value) { + min = Math.min(min, value); + } + + @Override + public void combine(Aggregator other) { + min = Math.min(min,((MinDoubleAggregator)other).min); + } + + @Override + public Double result() { + return min; + } + } + + public static class MaxDoubleAggregator implements Aggregator { + + private double max = Double.MIN_VALUE; + + @Override + public void aggregate(Double value) { + max = Math.max(max, value); + } + + @Override + public void combine(Aggregator other) { + max = Math.max(max, ((MaxDoubleAggregator) other).max); + } + + @Override + public Double result() { + return max; + } + } + + public static class SumDoubleAggregator implements Aggregator { + + private CompensatedSum sum = ZERO; + + @Override + public void aggregate(Double value) { + sum = sum.add(value); + } + + @Override + public void combine(Aggregator other) { + sum = sum.add(((SumDoubleAggregator)other).sum); + } + + @Override + public Double result() { + return sum.value(); + } + } + + @Override + protected Aggregator initMin() { + return new MinDoubleAggregator(); + } + + @Override + protected Aggregator initMax() { + return new MaxDoubleAggregator(); + } + + @Override + protected Aggregator initSum() { + return new SumDoubleAggregator(); + } + + @Override + protected boolean isNan(Double number) { + return number.isNaN(); + } + + @Override + protected boolean isInfinite(Double number) { + return number.isInfinite(); + } + +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregator.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregator.java new file mode 100644 index 0000000000000..745ca416bd833 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregator.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.annotation.Internal; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +/** + * Aggregator that can handle Float types + */ +@Internal +public class FloatSummaryAggregator extends NumericSummaryAggregator { + + private static final long serialVersionUID = 1L; + + // Nested classes are only "public static" for Kryo serialization, otherwise they'd be private + + public static class MinFloatAggregator implements Aggregator { + + private float min = Float.MAX_VALUE; + + @Override + public void aggregate(Float value) { + min = Math.min(min, value); + } + + @Override + public void combine(Aggregator other) { + min = Math.min(min,((MinFloatAggregator)other).min); + } + + @Override + public Float result() { + return min; + } + } + + public static class MaxFloatAggregator implements Aggregator { + + private float max = Float.MIN_VALUE; + + @Override + public void aggregate(Float value) { + max = Math.max(max, value); + } + + @Override + public void combine(Aggregator other) { + max = Math.max(max, ((MaxFloatAggregator) other).max); + } + + @Override + public Float result() { + return max; + } + } + + public static class SumFloatAggregator implements Aggregator { + + private CompensatedSum sum = ZERO; + + @Override + public void aggregate(Float value) { + sum = sum.add(value); + } + + @Override + public void combine(Aggregator other) { + sum = sum.add(((SumFloatAggregator)other).sum); + } + + @Override + public Float result() { + // overflow will go to infinity + return new Double(sum.value()).floatValue(); + } + } + + @Override + protected Aggregator initMin() { + return new MinFloatAggregator(); + } + + @Override + protected Aggregator initMax() { + return new MaxFloatAggregator(); + } + + @Override + protected Aggregator initSum() { + return new SumFloatAggregator(); + } + + @Override + protected boolean isNan(Float number) { + return number.isNaN(); + } + + @Override + protected boolean isInfinite(Float number) { + return number.isInfinite(); + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/IntegerSummaryAggregator.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/IntegerSummaryAggregator.java new file mode 100644 index 0000000000000..a443019d10fc1 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/IntegerSummaryAggregator.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.annotation.Internal; + +/** + * Aggregator that can handle Integer types + */ +@Internal +public class IntegerSummaryAggregator extends NumericSummaryAggregator { + + private static final long serialVersionUID = 1L; + + // Nested classes are only "public static" for Kryo serialization, otherwise they'd be private + + public static class MinIntegerAggregator implements Aggregator { + + private int min = Integer.MAX_VALUE; + + @Override + public void aggregate(Integer value) { + min = Math.min(min, value); + } + + @Override + public void combine(Aggregator other) { + min = Math.min(min,((MinIntegerAggregator)other).min); + } + + @Override + public Integer result() { + return min; + } + } + + public static class MaxIntegerAggregator implements Aggregator { + + private int max = Integer.MIN_VALUE; + + @Override + public void aggregate(Integer value) { + max = Math.max(max, value); + } + + @Override + public void combine(Aggregator other) { + max = Math.max(max, ((MaxIntegerAggregator) other).max); + } + + @Override + public Integer result() { + return max; + } + } + + public static class SumIntegerAggregator implements Aggregator { + + private int sum = 0; + + @Override + public void aggregate(Integer value) { + sum += value; + } + + @Override + public void combine(Aggregator other) { + sum += ((SumIntegerAggregator)other).sum; + } + + @Override + public Integer result() { + return sum; + } + } + + @Override + protected Aggregator initMin() { + return new MinIntegerAggregator(); + } + + @Override + protected Aggregator initMax() { + return new MaxIntegerAggregator(); + } + + @Override + protected Aggregator initSum() { + return new SumIntegerAggregator(); + } + + @Override + protected boolean isNan(Integer number) { + // NaN never applies here because only types like Float and Double have NaN + return false; + } + + @Override + protected boolean isInfinite(Integer number) { + // Infinity never applies here because only types like Float and Double have Infinity + return false; + } + +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/LongSummaryAggregator.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/LongSummaryAggregator.java new file mode 100644 index 0000000000000..5cd314ebd0e6b --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/LongSummaryAggregator.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.annotation.Internal; + +/** + * Aggregator that can handle Long types + */ +@Internal +public class LongSummaryAggregator extends NumericSummaryAggregator { + + private static final long serialVersionUID = 1L; + + // Nested classes are only "public static" for Kryo serialization, otherwise they'd be private + + public static class MinLongAggregator implements Aggregator { + + private long min = Long.MAX_VALUE; + + @Override + public void aggregate(Long value) { + min = Math.min(min, value); + } + + @Override + public void combine(Aggregator other) { + min = Math.min(min,((MinLongAggregator)other).min); + } + + @Override + public Long result() { + return min; + } + } + + public static class MaxLongAggregator implements Aggregator { + + private long max = Long.MIN_VALUE; + + @Override + public void aggregate(Long value) { + max = Math.max(max, value); + } + + @Override + public void combine(Aggregator other) { + max = Math.max(max, ((MaxLongAggregator) other).max); + } + + @Override + public Long result() { + return max; + } + } + + public static class SumLongAggregator implements Aggregator { + + private long sum = 0; + + @Override + public void aggregate(Long value) { + sum += value; + } + + @Override + public void combine(Aggregator other) { + sum += ((SumLongAggregator)other).sum; + } + + @Override + public Long result() { + return sum; + } + } + + @Override + protected Aggregator initMin() { + return new MinLongAggregator(); + } + + @Override + protected Aggregator initMax() { + return new MaxLongAggregator(); + } + + @Override + protected Aggregator initSum() { + return new SumLongAggregator(); + } + + @Override + protected boolean isNan(Long number) { + // NaN never applies here because only types like Float and Double have NaN + return false; + } + + @Override + protected boolean isInfinite(Long number) { + // Infinity never applies here because only types like Float and Double have Infinity + return false; + } + +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/NumericSummaryAggregator.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/NumericSummaryAggregator.java new file mode 100644 index 0000000000000..a0b1ed0d2d8a1 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/NumericSummaryAggregator.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.summarize.NumericColumnSummary; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +/** + * Generic aggregator for all numeric types creates a summary of a column of numbers. + * + * Uses the Kahan summation algorithm to avoid numeric instability when computing variance. + * The algorithm is described in: "Scalable and Numerically Stable Descriptive Statistics in SystemML", + * Tian et al, International Conference on Data Engineering 2012 + * + * Implementation that couldn't be generic for all numbers was pushed to subclasses. + * For example, there isn't a generic way to calculate min, max, sum, isNan, isInfinite + * for all numeric types so subclasses must implement these. + * + * @param numeric type to aggregrate and create a summary, e.g. Integer, DoubleValue + */ +@Internal +public abstract class NumericSummaryAggregator implements Aggregator> { + + private static final long serialVersionUID = 1L; + + private long nonMissingCount = 0L; // count of elements that are NOT null, NaN, or Infinite + private long nullCount = 0L; + private long nanCount = 0L; + private long infinityCount = 0L; + + // these fields are initialized by type specific subclasses + private Aggregator min = initMin(); + private Aggregator max = initMax(); + private Aggregator sum = initSum(); + + private CompensatedSum mean = ZERO; + /** + * Sum of squares of differences from the current mean (used to calculate variance). + * + * The algorithm is described in: "Scalable and Numerically Stable Descriptive Statistics in SystemML", + * Tian et al, International Conference on Data Engineering 2012 + */ + private CompensatedSum m2 = ZERO; + + /** + * Add a value to the current aggregation + */ + @Override + public void aggregate(T value) { + + if (value == null) { + nullCount++; + } + else if (isNan(value)) { + nanCount++; + } + else if(isInfinite(value)) { + infinityCount++; + } + else { + nonMissingCount++; + + min.aggregate(value); + max.aggregate(value); + sum.aggregate(value); + + double doubleValue = value.doubleValue(); + double delta = doubleValue - mean.value(); + mean = mean.add(delta / nonMissingCount); + m2 = m2.add(delta * (doubleValue - mean.value())); + } + } + + /** + * combine two aggregations + */ + @Override + public void combine(Aggregator> otherSameType) { + NumericSummaryAggregator other = (NumericSummaryAggregator) otherSameType; + + nullCount += other.nullCount; + nanCount += other.nanCount; + infinityCount += other.infinityCount; + + if (nonMissingCount == 0) { + nonMissingCount = other.nonMissingCount; + + min = other.min; + max = other.max; + + sum = other.sum; + mean = other.mean; + m2 = other.m2; + } + else if (other.nonMissingCount != 0) { + long combinedCount = nonMissingCount + other.nonMissingCount; + + min.combine(other.min); + max.combine(other.max); + + sum.combine(other.sum); + + double deltaMean = other.mean.value() - mean.value(); + mean = mean.add(deltaMean * other.nonMissingCount / combinedCount); + m2 = m2.add(other.m2).add(deltaMean * deltaMean * nonMissingCount * other.nonMissingCount / combinedCount); + + nonMissingCount = combinedCount; + } + } + + @Override + public NumericColumnSummary result() { + + Double variance = null; + if(nonMissingCount > 1) { + variance = m2.value() / (nonMissingCount - 1); + } + + return new NumericColumnSummary( + nonMissingCount, + nullCount, + nanCount, + infinityCount, + // if nonMissingCount was zero some fields should be undefined + nonMissingCount == 0 ? null : min.result(), + nonMissingCount == 0 ? null : max.result(), + nonMissingCount == 0 ? null : sum.result(), + nonMissingCount == 0 ? null : mean.value(), + variance, + variance == null ? null : Math.sqrt(variance) // standard deviation + ); + } + + // there isn't a generic way to calculate min, max, sum, isNan, isInfinite for all numeric types + // so subclasses must implement these + + protected abstract Aggregator initMin(); + + protected abstract Aggregator initMax(); + + protected abstract Aggregator initSum(); + + protected abstract boolean isNan(T number); + + protected abstract boolean isInfinite(T number); + +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/ObjectSummaryAggregator.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/ObjectSummaryAggregator.java new file mode 100644 index 0000000000000..532d91f524bf4 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/ObjectSummaryAggregator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.summarize.ObjectColumnSummary; + +@Internal +public class ObjectSummaryAggregator implements Aggregator { + + private long nonNullCount; + private long nullCount; + + @Override + public void aggregate(Object value) { + if (value == null) { + nullCount++; + } + else { + nonNullCount++; + } + } + + @Override + public void combine(Aggregator otherSameType) { + ObjectSummaryAggregator other = (ObjectSummaryAggregator) otherSameType; + nonNullCount += other.nonNullCount; + nullCount += other.nullCount; + } + + @Override + public ObjectColumnSummary result() { + return new ObjectColumnSummary(nonNullCount, nullCount); + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/ShortSummaryAggregator.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/ShortSummaryAggregator.java new file mode 100644 index 0000000000000..a2a395d6efc09 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/ShortSummaryAggregator.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.annotation.Internal; + +/** + * Aggregator that can handle Short types + */ +@Internal +public class ShortSummaryAggregator extends NumericSummaryAggregator { + + private static final long serialVersionUID = 1L; + + /** + * Like Math.min() except for shorts + */ + public static Short min(Short a, Short b) { + return a <= b ? a : b; + } + + /** + * Like Math.max() except for shorts + */ + public static Short max(Short a, Short b) { + return a >= b ? a : b; + } + + // Nested classes are only "public static" for Kryo serialization, otherwise they'd be private + + public static class MinShortAggregator implements Aggregator { + + private short min = Short.MAX_VALUE; + + @Override + public void aggregate(Short value) { + min = min(min, value); + } + + @Override + public void combine(Aggregator other) { + min = min(min,((MinShortAggregator)other).min); + } + + @Override + public Short result() { + return min; + } + } + + public static class MaxShortAggregator implements Aggregator { + + private short max = Short.MIN_VALUE; + + @Override + public void aggregate(Short value) { + max = max(max, value); + } + + @Override + public void combine(Aggregator other) { + max = max(max, ((MaxShortAggregator) other).max); + } + + @Override + public Short result() { + return max; + } + } + + public static class SumShortAggregator implements Aggregator { + + private short sum = 0; + + @Override + public void aggregate(Short value) { + sum += value; + } + + @Override + public void combine(Aggregator other) { + sum += ((SumShortAggregator)other).sum; + } + + @Override + public Short result() { + return sum; + } + } + + @Override + protected Aggregator initMin() { + return new MinShortAggregator(); + } + + @Override + protected Aggregator initMax() { + return new MaxShortAggregator(); + } + + @Override + protected Aggregator initSum() { + return new SumShortAggregator(); + } + + @Override + protected boolean isNan(Short number) { + // NaN never applies here because only types like Float and Double have NaN + return false; + } + + @Override + protected boolean isInfinite(Short number) { + // Infinity never applies here because only types like Float and Double have Infinity + return false; + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/StringSummaryAggregator.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/StringSummaryAggregator.java new file mode 100644 index 0000000000000..fa7d7216df5b2 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/StringSummaryAggregator.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.summarize.StringColumnSummary; + +@Internal +public class StringSummaryAggregator implements Aggregator { + + private long nonNullCount = 0L; + private long nullCount = 0L; + private long emptyCount = 0L; + private int minStringLength = Integer.MAX_VALUE; + private int maxStringLength = -1; + private CompensatedSum meanLength = CompensatedSum.ZERO; + + @Override + public void aggregate(String value) { + if(value == null) { + nullCount++; + } + else { + nonNullCount++; + + if(value.isEmpty()) { + emptyCount++; + } + + int length = value.length(); + + minStringLength = Math.min(minStringLength, length); + maxStringLength = Math.max(maxStringLength, length); + + double delta = length - meanLength.value(); + meanLength = meanLength.add(delta / nonNullCount); + } + } + + @Override + public void combine(Aggregator otherSameType) { + StringSummaryAggregator other = (StringSummaryAggregator) otherSameType; + + nullCount += other.nullCount; + + minStringLength = Math.min(minStringLength, other.minStringLength); + maxStringLength = Math.max(maxStringLength, other.maxStringLength); + + if (nonNullCount == 0) { + nonNullCount = other.nonNullCount; + emptyCount = other.emptyCount; + meanLength = other.meanLength; + + } + else if (other.nonNullCount != 0) { + long combinedCount = nonNullCount + other.nonNullCount; + + emptyCount += other.emptyCount; + + double deltaMean = other.meanLength.value() - meanLength.value(); + meanLength = meanLength.add(deltaMean * other.nonNullCount / combinedCount); + nonNullCount = combinedCount; + } + } + + @Override + public StringColumnSummary result() { + return new StringColumnSummary( + nonNullCount, + nullCount, + emptyCount, + nonNullCount == 0L ? null : minStringLength, + nonNullCount == 0L ? null : maxStringLength, + nonNullCount == 0L ? null : meanLength.value() + ); + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/SummaryAggregatorFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/SummaryAggregatorFactory.java new file mode 100644 index 0000000000000..26e88b21364d9 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/SummaryAggregatorFactory.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.flink.types.BooleanValue; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.types.FloatValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.ShortValue; +import org.apache.flink.types.StringValue; + +/** + * Factory for creating Summary Aggregators + */ +@Internal +public class SummaryAggregatorFactory { + + @SuppressWarnings("unchecked") + public static TupleSummaryAggregator create(TupleTypeInfoBase inType) { + Aggregator[] columnAggregators = new Aggregator[inType.getArity()]; + for (int field = 0; field < inType.getArity(); field++) { + Class clazz = inType.getTypeAt(field).getTypeClass(); + columnAggregators[field] = SummaryAggregatorFactory.create(clazz); + } + return new TupleSummaryAggregator<>(columnAggregators); + } + + /** + * Create an SummaryAggregator for the supplied type + * @param the type to aggregate + * @param the result type of the aggregation + */ + @SuppressWarnings("unchecked") + public static Aggregator create(Class type) { + if (type == Long.class) { + return (Aggregator) new LongSummaryAggregator(); + } + else if (type == LongValue.class) { + return (Aggregator) new ValueSummaryAggregator.LongValueSummaryAggregator(); + } + else if (type == Integer.class) { + return (Aggregator) new IntegerSummaryAggregator(); + } + else if (type == IntValue.class) { + return (Aggregator) new ValueSummaryAggregator.IntegerValueSummaryAggregator(); + } + else if (type == Double.class) { + return (Aggregator) new DoubleSummaryAggregator(); + } + else if (type == DoubleValue.class) { + return (Aggregator) new ValueSummaryAggregator.DoubleValueSummaryAggregator(); + } + else if (type == Float.class) { + return (Aggregator) new FloatSummaryAggregator(); + } + else if (type == FloatValue.class) { + return (Aggregator) new ValueSummaryAggregator.FloatValueSummaryAggregator(); + } + else if (type == Short.class) { + return (Aggregator) new ShortSummaryAggregator(); + } + else if (type == ShortValue.class) { + return (Aggregator) new ValueSummaryAggregator.ShortValueSummaryAggregator(); + } + else if (type == Boolean.class) { + return (Aggregator) new BooleanSummaryAggregator(); + } + else if (type == BooleanValue.class) { + return (Aggregator) new ValueSummaryAggregator.BooleanValueSummaryAggregator(); + } + else if (type == String.class) { + return (Aggregator) new StringSummaryAggregator(); + } + else if (type == StringValue.class) { + return (Aggregator) new ValueSummaryAggregator.StringValueSummaryAggregator(); + } + else { + // rather than error for unsupported types do something very generic + return (Aggregator) new ObjectSummaryAggregator(); + } + } + +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/TupleSummaryAggregator.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/TupleSummaryAggregator.java new file mode 100644 index 0000000000000..a75e582fc570b --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/TupleSummaryAggregator.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.tuple.Tuple; + +/** + * Aggregate tuples using an array of aggregators, one for each "column" or position within the Tuple. + */ +@Internal +public class TupleSummaryAggregator implements Aggregator { + + private static final long serialVersionUID = 1L; + + private final Aggregator[] columnAggregators; + + public TupleSummaryAggregator(Aggregator[] columnAggregators) { + this.columnAggregators = columnAggregators; + } + + @Override + @SuppressWarnings("unchecked") + public void aggregate(Tuple value) { + for(int i = 0; i < columnAggregators.length; i++) { + columnAggregators[i].aggregate(value.getField(i)); + } + + } + + @Override + @SuppressWarnings("unchecked") + public void combine(Aggregator other) { + TupleSummaryAggregator tupleSummaryAggregator = (TupleSummaryAggregator) other; + for( int i = 0; i < columnAggregators.length; i++) { + columnAggregators[i].combine(tupleSummaryAggregator.columnAggregators[i]); + } + } + + @Override + @SuppressWarnings("unchecked") + public R result() { + try { + Class tupleClass = Tuple.getTupleClass(columnAggregators.length); + R tuple = (R) tupleClass.newInstance(); + for(int i = 0; i < columnAggregators.length; i++) { + tuple.setField(columnAggregators[i].result(), i); + } + return tuple; + } + catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException("Unexpected error instantiating Tuple class for aggregation results", e); + + } + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/ValueSummaryAggregator.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/ValueSummaryAggregator.java new file mode 100644 index 0000000000000..4e55dd033c568 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/ValueSummaryAggregator.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.summarize.BooleanColumnSummary; +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.apache.flink.api.java.summarize.StringColumnSummary; +import org.apache.flink.types.BooleanValue; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.types.FloatValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.ShortValue; +import org.apache.flink.types.StringValue; +import org.apache.flink.types.Value; + +/** + * This is a generic Aggregator for Value types like StringValue, DoubleValue, etc. + * This class makes it easy to re-use the implementation of another aggregator. + * + * @param the "Value Type" to aggregate, e.g. DoubleValue, StringValue + * @param the "Primitive Type" that "Value Type" can be naturally converted to, e.g. DoubleValue converts to Double + * @param the result type of the aggregation, e.g. NumericColumnSummary + * @param the underlying primitive Aggregator that does the actual work, e.g. DoubleSummaryAggregator + */ +@Internal +public abstract class ValueSummaryAggregator> implements Aggregator { + + private A aggregator = initPrimitiveAggregator(); + + @Override + public void aggregate(VT value) { + if (value != null) { + aggregator.aggregate(getValue(value)); + } + else { + aggregator.aggregate(null); + } + } + + @Override + @SuppressWarnings("unchecked") + public void combine(Aggregator otherSameType) { + ValueSummaryAggregator other = (ValueSummaryAggregator) otherSameType; + aggregator.combine(other.aggregator); + } + + @Override + public R result() { + return aggregator.result(); + } + + /** + * Initialize an aggregator that can be used for the underlying primitive in the Value type. + * + * E.g. DoubleValues can easily be converted to Double and could use an underlying Aggregator + */ + protected abstract A initPrimitiveAggregator(); + + /** + * Get the value out of a value type. + */ + protected abstract PT getValue(VT value); + + + // ----------------------------------------------------------------------------- + // Implementations below + // ----------------------------------------------------------------------------- + + public static class ShortValueSummaryAggregator extends ValueSummaryAggregator,ShortSummaryAggregator> { + + @Override + protected ShortSummaryAggregator initPrimitiveAggregator() { + return new ShortSummaryAggregator(); + } + + @Override + protected Short getValue(ShortValue value) { + return value.getValue(); + } + } + + public static class IntegerValueSummaryAggregator extends ValueSummaryAggregator,IntegerSummaryAggregator> { + + @Override + protected IntegerSummaryAggregator initPrimitiveAggregator() { + return new IntegerSummaryAggregator(); + } + + @Override + protected Integer getValue(IntValue value) { + return value.getValue(); + } + } + + public static class LongValueSummaryAggregator extends ValueSummaryAggregator,LongSummaryAggregator> { + + @Override + protected LongSummaryAggregator initPrimitiveAggregator() { + return new LongSummaryAggregator(); + } + + @Override + protected Long getValue(LongValue value) { + return value.getValue(); + } + } + + public static class FloatValueSummaryAggregator extends ValueSummaryAggregator,FloatSummaryAggregator> { + + @Override + protected FloatSummaryAggregator initPrimitiveAggregator() { + return new FloatSummaryAggregator(); + } + + @Override + protected Float getValue(FloatValue value) { + return value.getValue(); + } + } + + public static class DoubleValueSummaryAggregator extends ValueSummaryAggregator,DoubleSummaryAggregator> { + + @Override + protected DoubleSummaryAggregator initPrimitiveAggregator() { + return new DoubleSummaryAggregator(); + } + + @Override + protected Double getValue(DoubleValue value) { + return value.getValue(); + } + } + + public static class BooleanValueSummaryAggregator extends ValueSummaryAggregator { + + @Override + protected BooleanSummaryAggregator initPrimitiveAggregator() { + return new BooleanSummaryAggregator(); + } + + @Override + protected Boolean getValue(BooleanValue value) { + return value.getValue(); + } + } + + public static class StringValueSummaryAggregator extends ValueSummaryAggregator { + + @Override + protected StringSummaryAggregator initPrimitiveAggregator() { + return new StringSummaryAggregator(); + } + + @Override + protected String getValue(StringValue value) { + return value.getValue(); + } + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java index 61a71aa32a75b..3f5ea1660c378 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java @@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.distributions.DataDistribution; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichMapPartitionFunction; import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.base.PartitionOperatorBase; @@ -36,8 +38,12 @@ import org.apache.flink.api.java.operators.GroupReduceOperator; import org.apache.flink.api.java.operators.MapPartitionOperator; import org.apache.flink.api.java.operators.PartitionOperator; +import org.apache.flink.api.java.summarize.aggregation.SummaryAggregatorFactory; +import org.apache.flink.api.java.summarize.aggregation.TupleSummaryAggregator; +import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.AbstractID; import org.apache.flink.util.Collector; @@ -283,6 +289,50 @@ public static > PartitionOperator partitionByRange return new PartitionOperator<>(input, PartitionOperatorBase.PartitionMethod.RANGE, new Keys.SelectorFunctionKeys<>(input.clean(keyExtractor), input.getType(), keyType), distribution, Utils.getCallLocationName()); } + // -------------------------------------------------------------------------------------------- + // Summarize + // -------------------------------------------------------------------------------------------- + + + /** + * Summarize a DataSet of Tuples by collecting single pass statistics for all columns + * + * Example usage: + *
+	 * {@code
+	 * Dataset> input = // [...]
+	 * Tuple3 summary = DataSetUtils.summarize(input)
+	 *
+	 * summary.f0.getStandardDeviation()
+	 * summary.f1.getMaxLength()
+	 * }
+	 * 
+ * @return the summary as a Tuple the same width as input rows + */ + public static R summarize(DataSet input) throws Exception { + if( !input.getType().isTupleType()) { + throw new IllegalArgumentException("summarize() is only implemented for DataSet's of Tuples"); + } + final TupleTypeInfoBase inType = (TupleTypeInfoBase) input.getType(); + DataSet> result = input.mapPartition(new MapPartitionFunction>() { + @Override + public void mapPartition(Iterable values, Collector> out) throws Exception { + TupleSummaryAggregator aggregator = SummaryAggregatorFactory.create(inType); + for (Tuple value: values) { + aggregator.aggregate(value); + } + out.collect(aggregator); + } + }).reduce(new ReduceFunction>() { + @Override + public TupleSummaryAggregator reduce(TupleSummaryAggregator agg1, TupleSummaryAggregator agg2) throws Exception { + agg1.combine(agg2); + return agg1; + } + }); + return result.collect().get(0).result(); + } + // -------------------------------------------------------------------------------------------- // Checksum // -------------------------------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/AggregateCombineHarness.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/AggregateCombineHarness.java new file mode 100644 index 0000000000000..abb36c3b6dfb5 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/AggregateCombineHarness.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import java.lang.reflect.ParameterizedType; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * This harness uses multiple aggregators and variously aggregates and combines against + * a list of values while calling a compareResults() method. + * + * This method breaks the rule of "testing only one thing" by aggregating and combining + * a bunch of different ways but can help uncover various kinds of bugs that can show + * up in aggregators. + * + * @param the type to aggregate + * @param the type of the results of the aggregation + * @param
the aggregator to use + */ +public abstract class AggregateCombineHarness> { + + /** + * Compare results from different runs of aggregate/combine to make sure they are the same. + * + * Subclasses should cause an Assertion failure or throw an Exception if the results aren't + * equal or at least close enough. + */ + protected abstract void compareResults(R result1, R result2); + + /** + * Variously aggregate and combine against a list of values, comparing results each time. + */ + @SafeVarargs + public final R summarize(T... values) { + if (values.length == 0 ) { + // when there is nothing to aggregate just combine two empty aggregators and get the result. + A agg1 = initAggregator(); + agg1.combine(initAggregator()); + return agg1.result(); + } + else { + R previousResult = null; + R result = null; + + // Shuffling the values might cause test instability but only in the + // case that there are underlying bugs that need to be fixed + List list = Arrays.asList(values); + Collections.shuffle(list); + + for (int i = 0; i < values.length; i++ ) { + + // Two aggregators are used so that combine() can be tested also. + // It shouldn't matter which aggregator is used because they are combined at the end so + // we're looping through all points of the data and making sure it doesn't make a difference. + + A aggregator1 = initAggregator(); + A aggregator2 = initAggregator(); + + for (int j = 0; j < i; j++) { + aggregator1.aggregate(list.get(j)); + } + for (int j = i; j < values.length; j++){ + aggregator2.aggregate(list.get(j)); + } + + aggregator1.combine(aggregator2); + + previousResult = result; + result = aggregator1.result(); + + if (previousResult != null) { + // validate that variously aggregating then combining doesn't give different results + compareResults(result, previousResult); + } + } + return result; + } + } + + @SuppressWarnings("unchecked") + private A initAggregator() { + try { + // Instantiate a generic type + // http://stackoverflow.com/questions/75175/create-instance-of-generic-type-in-java + return (A) ((Class)((ParameterizedType)this.getClass().getGenericSuperclass()).getActualTypeArguments()[2]).newInstance(); + } + catch (Exception e) { + throw new RuntimeException("Could not initialize aggregator", e); + } + + } + +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/BooleanSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/BooleanSummaryAggregatorTest.java new file mode 100644 index 0000000000000..43406430fcb44 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/BooleanSummaryAggregatorTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.BooleanColumnSummary; +import org.junit.Assert; +import org.junit.Test; + + +public class BooleanSummaryAggregatorTest { + + @Test + public void testMixedGroup() { + BooleanColumnSummary summary = summarize(true, false, null, true, true, true, false, null, true, false, true); + Assert.assertEquals(11, summary.getTotalCount()); + Assert.assertEquals(2, summary.getNullCount()); + Assert.assertEquals(9, summary.getNonNullCount()); + Assert.assertEquals(6, summary.getTrueCount()); + Assert.assertEquals(3, summary.getFalseCount()); + } + + @Test + public void testAllNullBooleans() { + BooleanColumnSummary summary = summarize(null, null, null, null); + Assert.assertEquals(4, summary.getTotalCount()); + Assert.assertEquals(4, summary.getNullCount()); + Assert.assertEquals(0, summary.getNonNullCount()); + Assert.assertEquals(0, summary.getTrueCount()); + Assert.assertEquals(0, summary.getFalseCount()); + } + + @Test + public void testAllTrue() { + BooleanColumnSummary summary = summarize(true, true, true, true, true, true); + Assert.assertEquals(6, summary.getTotalCount()); + Assert.assertEquals(0, summary.getNullCount()); + Assert.assertEquals(6, summary.getNonNullCount()); + Assert.assertEquals(6, summary.getTrueCount()); + Assert.assertEquals(0, summary.getFalseCount()); + } + + @Test + public void testAllFalse() { + BooleanColumnSummary summary = summarize(false, false, false); + Assert.assertEquals(3, summary.getTotalCount()); + Assert.assertEquals(0, summary.getNullCount()); + Assert.assertEquals(3, summary.getNonNullCount()); + Assert.assertEquals(0, summary.getTrueCount()); + Assert.assertEquals(3, summary.getFalseCount()); + } + + /** + * Helper method for summarizing a list of values. + * + * This method breaks the rule of "testing only one thing" by aggregating and combining + * a bunch of different ways. + */ + protected BooleanColumnSummary summarize(Boolean... values) { + return new AggregateCombineHarness() { + @Override + protected void compareResults(BooleanColumnSummary result1, BooleanColumnSummary result2) { + Assert.assertEquals(result1.getNullCount(), result2.getNullCount()); + Assert.assertEquals(result1.getNonNullCount(), result2.getNonNullCount()); + Assert.assertEquals(result1.getTrueCount(), result2.getTrueCount()); + Assert.assertEquals(result1.getFalseCount(), result2.getFalseCount()); + } + }.summarize(values); + } + +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/BooleanValueSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/BooleanValueSummaryAggregatorTest.java new file mode 100644 index 0000000000000..b5896813a79cb --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/BooleanValueSummaryAggregatorTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.BooleanColumnSummary; +import org.apache.flink.types.BooleanValue; +import org.junit.Assert; + +public class BooleanValueSummaryAggregatorTest extends BooleanSummaryAggregatorTest { + + /** + * Helper method for summarizing a list of values. + * + * This method breaks the rule of "testing only one thing" by aggregating and combining + * a bunch of different ways. + */ + protected BooleanColumnSummary summarize(Boolean... values) { + + BooleanValue[] booleanValues = new BooleanValue[values.length]; + for(int i = 0; i < values.length; i++) { + if (values[i] != null) { + booleanValues[i] = new BooleanValue(values[i]); + } + } + + return new AggregateCombineHarness() { + @Override + protected void compareResults(BooleanColumnSummary result1, BooleanColumnSummary result2) { + Assert.assertEquals(result1.getNullCount(), result2.getNullCount()); + Assert.assertEquals(result1.getNonNullCount(), result2.getNonNullCount()); + Assert.assertEquals(result1.getTrueCount(), result2.getTrueCount()); + Assert.assertEquals(result1.getFalseCount(), result2.getFalseCount()); + } + }.summarize(booleanValues); + } +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSumTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSumTest.java new file mode 100644 index 0000000000000..50361239aaa18 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSumTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.junit.Assert; +import org.junit.Test; + +public class CompensatedSumTest { + + /** + * When adding a series of numbers the order of the numbers should not impact the results. + * + * This test shows that a naive summation comes up with a different result than Kahan + * Summation when you start with either a smaller or larger number in some cases and + * helps prove our Kahan Summation is working. + */ + @Test + public void testAdd1() throws Exception { + final CompensatedSum smallSum = new CompensatedSum(0.001, 0.0); + final CompensatedSum largeSum = new CompensatedSum(1000, 0.0); + + CompensatedSum compensatedResult1 = smallSum; + CompensatedSum compensatedResult2 = largeSum; + double naiveResult1 = smallSum.value(); + double naiveResult2 = largeSum.value(); + + for(int i = 0; i < 10; i++) { + compensatedResult1 = compensatedResult1.add(smallSum); + compensatedResult2 = compensatedResult2.add(smallSum); + naiveResult1 += smallSum.value(); + naiveResult2 += smallSum.value(); + } + + compensatedResult1 = compensatedResult1.add(largeSum); + compensatedResult2 = compensatedResult2.add(smallSum); + naiveResult1 += largeSum.value(); + naiveResult2 += smallSum.value(); + + // Kahan summation gave the same result no matter what order we added + Assert.assertEquals(1000.011, compensatedResult1.value(), 0.0); + Assert.assertEquals(1000.011, compensatedResult2.value(), 0.0); + + // naive addition gave a small floating point error + Assert.assertEquals(1000.011, naiveResult1, 0.0); + Assert.assertEquals(1000.0109999999997, naiveResult2, 0.0); + + Assert.assertEquals(compensatedResult1.value(), compensatedResult2.value(), 0.0); + Assert.assertEquals(naiveResult1, naiveResult2, 0.0001); + Assert.assertNotEquals(naiveResult1, naiveResult2, 0.0); + } + + @Test + public void testDelta() throws Exception { + CompensatedSum compensatedResult1 = new CompensatedSum(0.001, 0.0); + for(int i = 0; i < 10; i++) { + compensatedResult1 = compensatedResult1.add(0.001); + } + Assert.assertEquals(0.011, compensatedResult1.value(), 0.0); + Assert.assertEquals(new Double("8.673617379884035E-19"), compensatedResult1.delta(), 0.0); + } + +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregatorTest.java new file mode 100644 index 0000000000000..64df46d1a22d9 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregatorTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.junit.Assert; +import org.junit.Test; + +public class DoubleSummaryAggregatorTest { + + /** + * Use some values from Anscombe's Quartet for testing. + * + * There was no particular reason to use these except they have known means and variance. + * + * https://en.wikipedia.org/wiki/Anscombe%27s_quartet + */ + @Test + public void testAnscomesQuartetXValues() throws Exception { + + final Double[] q1x = { 10.0, 8.0, 13.0, 9.0, 11.0, 14.0, 6.0, 4.0, 12.0, 7.0, 5.0 }; + final Double[] q4x = { 8.0, 8.0, 8.0, 8.0, 8.0, 8.0, 8.0, 19.0, 8.0, 8.0, 8.0 }; + + NumericColumnSummary q1 = summarize(q1x); + NumericColumnSummary q4 = summarize(q4x); + + Assert.assertEquals(9.0, q1.getMean().doubleValue(), 0.0); + Assert.assertEquals(9.0, q4.getMean().doubleValue(), 0.0); + + Assert.assertEquals(11.0, q1.getVariance().doubleValue(), 1e-10d); + Assert.assertEquals(11.0, q4.getVariance().doubleValue(), 1e-10d); + + double stddev = Math.sqrt(11.0); + Assert.assertEquals(stddev, q1.getStandardDeviation().doubleValue(), 1e-10d); + Assert.assertEquals(stddev, q4.getStandardDeviation().doubleValue(), 1e-10d); + } + + /** + * Use some values from Anscombe's Quartet for testing. + * + * There was no particular reason to use these except they have known means and variance. + * + * https://en.wikipedia.org/wiki/Anscombe%27s_quartet + */ + @Test + public void testAnscomesQuartetYValues() throws Exception { + final Double[] q1y = { 8.04, 6.95, 7.58, 8.81, 8.33, 9.96, 7.24, 4.26, 10.84, 4.82, 5.68 }; + final Double[] q2y = { 9.14, 8.14, 8.74, 8.77, 9.26, 8.1, 6.13, 3.1, 9.13, 7.26, 4.74 }; + final Double[] q3y = { 7.46, 6.77, 12.74, 7.11, 7.81, 8.84, 6.08, 5.39, 8.15, 6.42, 5.73 }; + final Double[] q4y = { 6.58, 5.76, 7.71, 8.84, 8.47, 7.04, 5.25, 12.5, 5.56, 7.91, 6.89 }; + + NumericColumnSummary q1 = summarize(q1y); + NumericColumnSummary q2 = summarize(q2y); + NumericColumnSummary q3 = summarize(q3y); + NumericColumnSummary q4 = summarize(q4y); + + // the y values are have less precisely matching means and variances + + Assert.assertEquals(7.5, q1.getMean().doubleValue(), 0.001); + Assert.assertEquals(7.5, q2.getMean().doubleValue(), 0.001); + Assert.assertEquals(7.5, q3.getMean().doubleValue(), 0.001); + Assert.assertEquals(7.5, q4.getMean().doubleValue(), 0.001); + + Assert.assertEquals(4.12, q1.getVariance().doubleValue(), 0.01); + Assert.assertEquals(4.12, q2.getVariance().doubleValue(), 0.01); + Assert.assertEquals(4.12, q3.getVariance().doubleValue(), 0.01); + Assert.assertEquals(4.12, q4.getVariance().doubleValue(), 0.01); + } + + @Test + public void testIsNan() throws Exception { + DoubleSummaryAggregator ag = new DoubleSummaryAggregator(); + Assert.assertFalse(ag.isNan(-1.0)); + Assert.assertFalse(ag.isNan(0.0)); + Assert.assertFalse(ag.isNan(23.0)); + Assert.assertFalse(ag.isNan(Double.MAX_VALUE)); + Assert.assertFalse(ag.isNan(Double.MIN_VALUE)); + Assert.assertTrue(ag.isNan(Double.NaN)); + } + + @Test + public void testIsInfinite() throws Exception { + DoubleSummaryAggregator ag = new DoubleSummaryAggregator(); + Assert.assertFalse(ag.isInfinite(-1.0)); + Assert.assertFalse(ag.isInfinite(0.0)); + Assert.assertFalse(ag.isInfinite(23.0)); + Assert.assertFalse(ag.isInfinite(Double.MAX_VALUE)); + Assert.assertFalse(ag.isInfinite(Double.MIN_VALUE)); + Assert.assertTrue(ag.isInfinite(Double.POSITIVE_INFINITY)); + Assert.assertTrue(ag.isInfinite(Double.NEGATIVE_INFINITY)); + } + + @Test + public void testMean() throws Exception { + Assert.assertEquals(50.0, summarize(0.0, 100.0).getMean(), 0.0); + Assert.assertEquals(33.333333, summarize(0.0, 0.0, 100.0).getMean(), 0.00001); + Assert.assertEquals(50.0, summarize(0.0, 0.0, 100.0, 100.0).getMean(), 0.0); + Assert.assertEquals(50.0, summarize(0.0, 100.0, null).getMean(), 0.0); + Assert.assertNull(summarize().getMean()); + } + + @Test + public void testSum() throws Exception { + Assert.assertEquals(100.0, summarize(0.0, 100.0).getSum().doubleValue(), 0.0); + Assert.assertEquals(15, summarize(1.0, 2.0, 3.0, 4.0, 5.0).getSum().doubleValue(), 0.0); + Assert.assertEquals(0, summarize(-100.0, 0.0, 100.0, null).getSum().doubleValue(), 0.0); + Assert.assertEquals(90, summarize(-10.0, 100.0, null).getSum().doubleValue(), 0.0); + Assert.assertNull(summarize().getSum()); + } + + @Test + public void testMax() throws Exception { + Assert.assertEquals(1001.0, summarize(-1000.0, 0.0, 1.0, 50.0, 999.0, 1001.0).getMax().doubleValue(), 0.0); + Assert.assertEquals(11.0, summarize(1.0, 8.0, 7.0, 6.0, 9.0, 10.0, 2.0, 3.0, 5.0, 0.0, 11.0, -2.0, 3.0).getMax().doubleValue(), 0.0); + Assert.assertEquals(11.0, summarize(1.0, 8.0, 7.0, 6.0, 9.0, null, 10.0, 2.0, 3.0, 5.0, null, 0.0, 11.0, -2.0, 3.0).getMax().doubleValue(), 0.0); + Assert.assertNull(summarize().getMax()); + } + + @Test + public void testMin() throws Exception { + Assert.assertEquals(-1000, summarize(-1000.0, 0.0, 1.0, 50.0, 999.0, 1001.0).getMin().doubleValue(), 0.0); + Assert.assertEquals(-2.0, summarize(1.0, 8.0, 7.0, 6.0, 9.0, 10.0, 2.0, 3.0, 5.0, 0.0, 11.0, -2.0, 3.0).getMin().doubleValue(), 0.0); + Assert.assertEquals(-2.0, summarize(1.0, 8.0, 7.0, 6.0, 9.0, null, 10.0, 2.0, 3.0, 5.0, null, 0.0, 11.0, -2.0, 3.0).getMin().doubleValue(), 0.0); + Assert.assertNull(summarize().getMin()); + } + + @Test + public void testCounts() throws Exception { + NumericColumnSummary summary = summarize(Double.NaN, 1.0, null, 123.0, -44.00001, Double.POSITIVE_INFINITY, 55.0, Double.NEGATIVE_INFINITY, Double.NEGATIVE_INFINITY, null, Double.NaN); + Assert.assertEquals(11, summary.getTotalCount()); + Assert.assertEquals(2, summary.getNullCount()); + Assert.assertEquals(9, summary.getNonNullCount()); + Assert.assertEquals(7, summary.getMissingCount()); + Assert.assertEquals(4, summary.getNonMissingCount()); + Assert.assertEquals(2, summary.getNanCount()); + Assert.assertEquals(3, summary.getInfinityCount()); + } + + /** + * Helper method for summarizing a list of values. + * + * This method breaks the rule of "testing only one thing" by aggregating and combining + * a bunch of different ways. + */ + private static NumericColumnSummary summarize(Double... values) { + return new AggregateCombineHarness,DoubleSummaryAggregator>() { + + @Override + protected void compareResults(NumericColumnSummary result1, NumericColumnSummary result2) { + Assert.assertEquals(result1.getMin(), result2.getMin(), 0.0); + Assert.assertEquals(result1.getMax(), result2.getMax(), 0.0); + Assert.assertEquals(result1.getMean(), result2.getMean(), 1e-12d); + Assert.assertEquals(result1.getVariance(), result2.getVariance(), 1e-9d); + Assert.assertEquals(result1.getStandardDeviation(), result2.getStandardDeviation(), 1e-12d); + } + + }.summarize(values); + } + +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregatorTest.java new file mode 100644 index 0000000000000..c761fc2f8b623 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregatorTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0f (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0f + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.junit.Assert; +import org.junit.Test; + + +public class FloatSummaryAggregatorTest { + + /** + * Use some values from Anscombe's Quartet for testing. + * + * There was no particular reason to use these except they have known means and variance. + * + * https://en.wikipedia.org/wiki/Anscombe%27s_quartet + */ + @Test + public void testAnscomesQuartetXValues() throws Exception { + + final Float[] q1x = { 10.0f, 8.0f, 13.0f, 9.0f, 11.0f, 14.0f, 6.0f, 4.0f, 12.0f, 7.0f, 5.0f }; + final Float[] q4x = { 8.0f, 8.0f, 8.0f, 8.0f, 8.0f, 8.0f, 8.0f, 19.0f, 8.0f, 8.0f, 8.0f }; + + NumericColumnSummary q1 = summarize(q1x); + NumericColumnSummary q4 = summarize(q4x); + + Assert.assertEquals(9.0, q1.getMean().doubleValue(), 0.0f); + Assert.assertEquals(9.0, q4.getMean().doubleValue(), 0.0f); + + Assert.assertEquals(11.0, q1.getVariance().doubleValue(), 1e-10d); + Assert.assertEquals(11.0, q4.getVariance().doubleValue(), 1e-10d); + + double stddev = Math.sqrt(11.0f); + Assert.assertEquals(stddev, q1.getStandardDeviation().doubleValue(), 1e-10d); + Assert.assertEquals(stddev, q4.getStandardDeviation().doubleValue(), 1e-10d); + } + + /** + * Use some values from Anscombe's Quartet for testing. + * + * There was no particular reason to use these except they have known means and variance. + * + * https://en.wikipedia.org/wiki/Anscombe%27s_quartet + */ + @Test + public void testAnscomesQuartetYValues() throws Exception { + final Float[] q1y = { 8.04f, 6.95f, 7.58f, 8.81f, 8.33f, 9.96f, 7.24f, 4.26f, 10.84f, 4.82f, 5.68f }; + final Float[] q2y = { 9.14f, 8.14f, 8.74f, 8.77f, 9.26f, 8.1f, 6.13f, 3.1f, 9.13f, 7.26f, 4.74f }; + final Float[] q3y = { 7.46f, 6.77f, 12.74f, 7.11f, 7.81f, 8.84f, 6.08f, 5.39f, 8.15f, 6.42f, 5.73f }; + final Float[] q4y = { 6.58f, 5.76f, 7.71f, 8.84f, 8.47f, 7.04f, 5.25f, 12.5f, 5.56f, 7.91f, 6.89f }; + + NumericColumnSummary q1 = summarize(q1y); + NumericColumnSummary q2 = summarize(q2y); + NumericColumnSummary q3 = summarize(q3y); + NumericColumnSummary q4 = summarize(q4y); + + // the y values are have less precisely matching means and variances + + Assert.assertEquals(7.5, q1.getMean().doubleValue(), 0.001); + Assert.assertEquals(7.5, q2.getMean().doubleValue(), 0.001); + Assert.assertEquals(7.5, q3.getMean().doubleValue(), 0.001); + Assert.assertEquals(7.5, q4.getMean().doubleValue(), 0.001); + + Assert.assertEquals(4.12, q1.getVariance().doubleValue(), 0.01); + Assert.assertEquals(4.12, q2.getVariance().doubleValue(), 0.01); + Assert.assertEquals(4.12, q3.getVariance().doubleValue(), 0.01); + Assert.assertEquals(4.12, q4.getVariance().doubleValue(), 0.01); + } + + @Test + public void testIsNan() throws Exception { + FloatSummaryAggregator ag = new FloatSummaryAggregator(); + Assert.assertFalse(ag.isNan(-1.0f)); + Assert.assertFalse(ag.isNan(0.0f)); + Assert.assertFalse(ag.isNan(23.0f)); + Assert.assertFalse(ag.isNan(Float.MAX_VALUE)); + Assert.assertFalse(ag.isNan(Float.MIN_VALUE)); + Assert.assertTrue(ag.isNan(Float.NaN)); + } + + @Test + public void testIsInfinite() throws Exception { + FloatSummaryAggregator ag = new FloatSummaryAggregator(); + Assert.assertFalse(ag.isInfinite(-1.0f)); + Assert.assertFalse(ag.isInfinite(0.0f)); + Assert.assertFalse(ag.isInfinite(23.0f)); + Assert.assertFalse(ag.isInfinite(Float.MAX_VALUE)); + Assert.assertFalse(ag.isInfinite(Float.MIN_VALUE)); + Assert.assertTrue(ag.isInfinite(Float.POSITIVE_INFINITY)); + Assert.assertTrue(ag.isInfinite(Float.NEGATIVE_INFINITY)); + } + + @Test + public void testMean() throws Exception { + Assert.assertEquals(50.0, summarize(0.0f, 100.0f).getMean(), 0.0); + Assert.assertEquals(33.333333, summarize(0.0f, 0.0f, 100.0f).getMean(), 0.00001); + Assert.assertEquals(50.0, summarize(0.0f, 0.0f, 100.0f, 100.0f).getMean(), 0.0); + Assert.assertEquals(50.0, summarize(0.0f, 100.0f, null).getMean(), 0.0); + Assert.assertNull(summarize().getMean()); + } + + @Test + public void testSum() throws Exception { + Assert.assertEquals(100.0, summarize(0.0f, 100.0f).getSum().floatValue(), 0.0f); + Assert.assertEquals(15, summarize(1.0f, 2.0f, 3.0f, 4.0f, 5.0f).getSum().floatValue(), 0.0f); + Assert.assertEquals(0, summarize(-100.0f, 0.0f, 100.0f, null).getSum().floatValue(), 0.0f); + Assert.assertEquals(90, summarize(-10.0f, 100.0f, null).getSum().floatValue(), 0.0f); + Assert.assertNull(summarize().getSum()); + } + + @Test + public void testMax() throws Exception { + Assert.assertEquals(1001.0f, summarize(-1000.0f, 0.0f, 1.0f, 50.0f, 999.0f, 1001.0f).getMax().floatValue(), 0.0f); + Assert.assertEquals(11.0f, summarize(1.0f, 8.0f, 7.0f, 6.0f, 9.0f, 10.0f, 2.0f, 3.0f, 5.0f, 0.0f, 11.0f, -2.0f, 3.0f).getMax().floatValue(), 0.0f); + Assert.assertEquals(11.0f, summarize(1.0f, 8.0f, 7.0f, 6.0f, 9.0f, null, 10.0f, 2.0f, 3.0f, 5.0f, null, 0.0f, 11.0f, -2.0f, 3.0f).getMax().floatValue(), 0.0f); + Assert.assertNull(summarize().getMax()); + } + + @Test + public void testMin() throws Exception { + Assert.assertEquals(-1000, summarize(-1000.0f, 0.0f, 1.0f, 50.0f, 999.0f, 1001.0f).getMin().floatValue(), 0.0f); + Assert.assertEquals(-2.0f, summarize(1.0f, 8.0f, 7.0f, 6.0f, 9.0f, 10.0f, 2.0f, 3.0f, 5.0f, 0.0f, 11.0f, -2.0f, 3.0f).getMin().floatValue(), 0.0f); + Assert.assertEquals(-2.0f, summarize(1.0f, 8.0f, 7.0f, 6.0f, 9.0f, null, 10.0f, 2.0f, 3.0f, 5.0f, null, 0.0f, 11.0f, -2.0f, 3.0f).getMin().floatValue(), 0.0f); + Assert.assertNull(summarize().getMin()); + } + + /** + * Helper method for summarizing a list of values. + * + * This method breaks the rule of "testing only one thing" by aggregating + * and combining a bunch of different ways. + */ + protected NumericColumnSummary summarize(Float... values) { + + return new AggregateCombineHarness,FloatSummaryAggregator>() { + + @Override + protected void compareResults(NumericColumnSummary result1, NumericColumnSummary result2) { + Assert.assertEquals(result1.getMin(), result2.getMin(), 0.0f); + Assert.assertEquals(result1.getMax(), result2.getMax(), 0.0f); + Assert.assertEquals(result1.getMean(), result2.getMean(), 1e-12d); + Assert.assertEquals(result1.getVariance(), result2.getVariance(), 1e-9d); + Assert.assertEquals(result1.getStandardDeviation(), result2.getStandardDeviation(), 1e-12d); + } + + }.summarize(values); + } + +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatValueSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatValueSummaryAggregatorTest.java new file mode 100644 index 0000000000000..ff87946694ce4 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/FloatValueSummaryAggregatorTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.apache.flink.types.FloatValue; +import org.junit.Assert; + +public class FloatValueSummaryAggregatorTest extends FloatSummaryAggregatorTest { + + /** + * Helper method for summarizing a list of values. + * + * This method breaks the rule of "testing only one thing" by aggregating + * and combining a bunch of different ways. + */ + @Override + protected NumericColumnSummary summarize(Float... values) { + + FloatValue[] floatValues = new FloatValue[values.length]; + for(int i = 0; i < values.length; i++) { + if (values[i] != null) { + floatValues[i] = new FloatValue(values[i]); + } + } + + return new AggregateCombineHarness,ValueSummaryAggregator.FloatValueSummaryAggregator>() { + + @Override + protected void compareResults(NumericColumnSummary result1, NumericColumnSummary result2) { + Assert.assertEquals(result1.getMin(), result2.getMin(), 0.0f); + Assert.assertEquals(result1.getMax(), result2.getMax(), 0.0f); + Assert.assertEquals(result1.getMean(), result2.getMean(), 1e-10d); + Assert.assertEquals(result1.getVariance(), result2.getVariance(), 1e-9d); + Assert.assertEquals(result1.getStandardDeviation(), result2.getStandardDeviation(), 1e-10d); + } + + }.summarize(floatValues); + } +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerSummaryAggregatorTest.java new file mode 100644 index 0000000000000..110d2cc486091 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerSummaryAggregatorTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.junit.Assert; +import org.junit.Test; + +public class IntegerSummaryAggregatorTest { + + @Test + public void testIsNan() throws Exception { + IntegerSummaryAggregator ag = new IntegerSummaryAggregator(); + // always false for Integer + Assert.assertFalse(ag.isNan(-1)); + Assert.assertFalse(ag.isNan(0)); + Assert.assertFalse(ag.isNan(23)); + Assert.assertFalse(ag.isNan(Integer.MAX_VALUE)); + Assert.assertFalse(ag.isNan(Integer.MIN_VALUE)); + Assert.assertFalse(ag.isNan(null)); + } + + @Test + public void testIsInfinite() throws Exception { + IntegerSummaryAggregator ag = new IntegerSummaryAggregator(); + // always false for Integer + Assert.assertFalse(ag.isInfinite(-1)); + Assert.assertFalse(ag.isInfinite(0)); + Assert.assertFalse(ag.isInfinite(23)); + Assert.assertFalse(ag.isInfinite(Integer.MAX_VALUE)); + Assert.assertFalse(ag.isInfinite(Integer.MIN_VALUE)); + Assert.assertFalse(ag.isInfinite(null)); + } + + @Test + public void testMean() throws Exception { + Assert.assertEquals(50.0, summarize(0, 100).getMean(), 0.0); + Assert.assertEquals(33.333333, summarize(0, 0, 100).getMean(), 0.00001); + Assert.assertEquals(50.0, summarize(0, 0, 100, 100).getMean(), 0.0); + Assert.assertEquals(50.0, summarize(0, 100, null).getMean(), 0.0); + Assert.assertNull(summarize().getMean()); + } + + @Test + public void testSum() throws Exception { + Assert.assertEquals(100, summarize(0, 100).getSum().intValue()); + Assert.assertEquals(15, summarize(1, 2, 3, 4, 5).getSum().intValue()); + Assert.assertEquals(0, summarize(-100, 0, 100, null).getSum().intValue()); + Assert.assertEquals(90, summarize(-10, 100, null).getSum().intValue()); + Assert.assertNull(summarize().getSum()); + } + + @Test + public void testMax() throws Exception { + Assert.assertEquals(1001, summarize(-1000, 0, 1, 50, 999, 1001).getMax().intValue()); + Assert.assertEquals(0, summarize(Integer.MIN_VALUE, -1000, 0).getMax().intValue()); + Assert.assertEquals(11, summarize(1, 8, 7, 6, 9, 10, 2, 3, 5, 0, 11, -2, 3).getMax().intValue()); + Assert.assertEquals(11, summarize(1, 8, 7, 6, 9, null, 10, 2, 3, 5, null, 0, 11, -2, 3).getMax().intValue()); + Assert.assertNull(summarize().getMax()); + } + + @Test + public void testMin() throws Exception { + Assert.assertEquals(-1000, summarize(-1000, 0, 1, 50, 999, 1001).getMin().intValue()); + Assert.assertEquals(Integer.MIN_VALUE, summarize(Integer.MIN_VALUE, -1000, 0).getMin().intValue()); + Assert.assertEquals(-2, summarize(1, 8, 7, 6, 9, 10, 2, 3, 5, 0, 11, -2, 3).getMin().intValue()); + Assert.assertEquals(-2, summarize(1, 8, 7, 6, 9, null, 10, 2, 3, 5, null, 0, 11, -2, 3).getMin().intValue()); + Assert.assertNull(summarize().getMin()); + } + + /** + * Helper method for summarizing a list of values + */ + protected NumericColumnSummary summarize(Integer... values) { + + return new AggregateCombineHarness,IntegerSummaryAggregator>() { + + @Override + protected void compareResults(NumericColumnSummary result1, NumericColumnSummary result2) { + + Assert.assertEquals(result1.getTotalCount(),result2.getTotalCount()); + Assert.assertEquals(result1.getNullCount(),result2.getNullCount()); + Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount()); + Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount()); + Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount()); + Assert.assertEquals(result1.getNanCount(),result2.getNanCount()); + + Assert.assertEquals(result1.containsNull(),result2.containsNull()); + Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull()); + + Assert.assertEquals(result1.getMin().intValue(),result2.getMin().intValue()); + Assert.assertEquals(result1.getMax().intValue(), result2.getMax().intValue()); + Assert.assertEquals(result1.getSum().intValue(),result2.getSum().intValue()); + Assert.assertEquals(result1.getMean().doubleValue(), result2.getMean().doubleValue(), 1e-12d); + Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(), 1e-9d); + Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(), 1e-12d); + } + }.summarize(values); + } + +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerValueSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerValueSummaryAggregatorTest.java new file mode 100644 index 0000000000000..6ac5485467fec --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/IntegerValueSummaryAggregatorTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.apache.flink.types.IntValue; +import org.junit.Assert; + +public class IntegerValueSummaryAggregatorTest extends IntegerSummaryAggregatorTest { + + @Override + protected NumericColumnSummary summarize(Integer... values) { + + IntValue[] intValues = new IntValue[values.length]; + for(int i = 0; i < values.length; i++) { + if (values[i] != null) { + intValues[i] = new IntValue(values[i]); + } + } + + return new AggregateCombineHarness,ValueSummaryAggregator.IntegerValueSummaryAggregator>() { + + @Override + protected void compareResults(NumericColumnSummary result1, NumericColumnSummary result2) { + + Assert.assertEquals(result1.getTotalCount(), result2.getTotalCount()); + Assert.assertEquals(result1.getNullCount(),result2.getNullCount()); + Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount()); + Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount()); + Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount()); + Assert.assertEquals(result1.getNanCount(),result2.getNanCount()); + + Assert.assertEquals(result1.containsNull(),result2.containsNull()); + Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull()); + + Assert.assertEquals(result1.getMin().intValue(),result2.getMin().intValue()); + Assert.assertEquals(result1.getMax().intValue(), result2.getMax().intValue()); + Assert.assertEquals(result1.getSum().intValue(),result2.getSum().intValue()); + Assert.assertEquals(result1.getMean().doubleValue(), result2.getMean().doubleValue(), 1e-12d); + Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(), 1e-9d); + Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(), 1e-12d); + } + }.summarize(intValues); + } +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongSummaryAggregatorTest.java new file mode 100644 index 0000000000000..19056576e5fa6 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongSummaryAggregatorTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.junit.Assert; +import org.junit.Test; + +public class LongSummaryAggregatorTest { + + @Test + public void testIsNan() throws Exception { + LongSummaryAggregator ag = new LongSummaryAggregator(); + // always false for Long + Assert.assertFalse(ag.isNan(-1L)); + Assert.assertFalse(ag.isNan(0L)); + Assert.assertFalse(ag.isNan(23L)); + Assert.assertFalse(ag.isNan(Long.MAX_VALUE)); + Assert.assertFalse(ag.isNan(Long.MIN_VALUE)); + Assert.assertFalse(ag.isNan(null)); + } + + @Test + public void testIsInfinite() throws Exception { + LongSummaryAggregator ag = new LongSummaryAggregator(); + // always false for Long + Assert.assertFalse(ag.isInfinite(-1L)); + Assert.assertFalse(ag.isInfinite(0L)); + Assert.assertFalse(ag.isInfinite(23L)); + Assert.assertFalse(ag.isInfinite(Long.MAX_VALUE)); + Assert.assertFalse(ag.isInfinite(Long.MIN_VALUE)); + Assert.assertFalse(ag.isInfinite(null)); + } + + @Test + public void testMean() throws Exception { + Assert.assertEquals(50.0, summarize(0L, 100L).getMean(), 0.0); + Assert.assertEquals(33.333333, summarize(0L, 0L, 100L).getMean(), 0.00001); + Assert.assertEquals(50.0, summarize(0L, 0L, 100L, 100L).getMean(), 0.0); + Assert.assertEquals(50.0, summarize(0L, 100L, null).getMean(), 0.0); + Assert.assertNull(summarize().getMean()); + } + + @Test + public void testSum() throws Exception { + Assert.assertEquals(100L, summarize(0L, 100L).getSum().longValue()); + Assert.assertEquals(15L, summarize(1L, 2L, 3L, 4L, 5L).getSum().longValue()); + Assert.assertEquals(0L, summarize(-100L, 0L, 100L, null).getSum().longValue()); + Assert.assertEquals(90L, summarize(-10L, 100L, null).getSum().longValue()); + Assert.assertNull(summarize().getSum()); + } + + @Test + public void testMax() throws Exception { + Assert.assertEquals(1001L, summarize(-1000L, 0L, 1L, 50L, 999L, 1001L).getMax().longValue()); + Assert.assertEquals(11L, summarize(1L, 8L, 7L, 6L, 9L, 10L, 2L, 3L, 5L, 0L, 11L, -2L, 3L).getMax().longValue()); + Assert.assertEquals(11L, summarize(1L, 8L, 7L, 6L, 9L, null, 10L, 2L, 3L, 5L, null, 0L, 11L, -2L, 3L).getMax().longValue()); + Assert.assertNull(summarize().getMax()); + } + + @Test + public void testMin() throws Exception { + Assert.assertEquals(-1000L, summarize(-1000L, 0L, 1L, 50L, 999L, 1001L).getMin().longValue()); + Assert.assertEquals(-2L, summarize(1L, 8L, 7L, 6L, 9L, 10L, 2L, 3L, 5L, 0L, 11L, -2L, 3L).getMin().longValue()); + Assert.assertEquals(-2L, summarize(1L, 8L, 7L, 6L, 9L, null, 10L, 2L, 3L, 5L, null, 0L, 11L, -2L, 3L).getMin().longValue()); + Assert.assertNull(summarize().getMin()); + } + + /** + * Helper method for summarizing a list of values + */ + protected NumericColumnSummary summarize(Long... values) { + return new AggregateCombineHarness,LongSummaryAggregator>() { + + @Override + protected void compareResults(NumericColumnSummary result1, NumericColumnSummary result2) { + + Assert.assertEquals(result1.getTotalCount(),result2.getTotalCount()); + Assert.assertEquals(result1.getNullCount(), result2.getNullCount()); + Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount()); + Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount()); + Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount()); + Assert.assertEquals(result1.getNanCount(),result2.getNanCount()); + + Assert.assertEquals(result1.containsNull(), result2.containsNull()); + Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull()); + + Assert.assertEquals(result1.getMin().longValue(),result2.getMin().longValue()); + Assert.assertEquals(result1.getMax().longValue(), result2.getMax().longValue()); + Assert.assertEquals(result1.getSum().longValue(),result2.getSum().longValue()); + Assert.assertEquals(result1.getMean().doubleValue(), result2.getMean().doubleValue(), 1e-12d); + Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(), 1e-9d); + Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(), 1e-12d); + } + }.summarize(values); + } + +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongValueSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongValueSummaryAggregatorTest.java new file mode 100644 index 0000000000000..eecda69e3f076 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/LongValueSummaryAggregatorTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.apache.flink.types.LongValue; +import org.junit.Assert; + +public class LongValueSummaryAggregatorTest extends LongSummaryAggregatorTest { + + /** + * Helper method for summarizing a list of values + */ + @Override + protected NumericColumnSummary summarize(Long... values) { + + LongValue[] longValues = new LongValue[values.length]; + for(int i = 0; i < values.length; i++) { + if (values[i] != null) { + longValues[i] = new LongValue(values[i]); + } + } + + return new AggregateCombineHarness,ValueSummaryAggregator.LongValueSummaryAggregator>() { + + @Override + protected void compareResults(NumericColumnSummary result1, NumericColumnSummary result2) { + + Assert.assertEquals(result1.getTotalCount(),result2.getTotalCount()); + Assert.assertEquals(result1.getNullCount(), result2.getNullCount()); + Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount()); + Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount()); + Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount()); + Assert.assertEquals(result1.getNanCount(),result2.getNanCount()); + + Assert.assertEquals(result1.containsNull(), result2.containsNull()); + Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull()); + + Assert.assertEquals(result1.getMin().longValue(),result2.getMin().longValue()); + Assert.assertEquals(result1.getMax().longValue(), result2.getMax().longValue()); + Assert.assertEquals(result1.getSum().longValue(),result2.getSum().longValue()); + Assert.assertEquals(result1.getMean().doubleValue(), result2.getMean().doubleValue(), 1e-12d); + Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(), 1e-9d); + Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(), 1e-12d); + } + }.summarize(longValues); + } + +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortSummaryAggregatorTest.java new file mode 100644 index 0000000000000..ebbf627521bd8 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortSummaryAggregatorTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.junit.Assert; +import org.junit.Test; + + +public class ShortSummaryAggregatorTest { + + @Test + public void testIsNan() throws Exception { + ShortSummaryAggregator ag = new ShortSummaryAggregator(); + // always false for Short + Assert.assertFalse(ag.isNan((short) -1)); + Assert.assertFalse(ag.isNan((short) 0)); + Assert.assertFalse(ag.isNan((short) 23)); + Assert.assertFalse(ag.isNan(Short.MAX_VALUE)); + Assert.assertFalse(ag.isNan(Short.MIN_VALUE)); + Assert.assertFalse(ag.isNan(null)); + } + + @Test + public void testIsInfinite() throws Exception { + ShortSummaryAggregator ag = new ShortSummaryAggregator(); + // always false for Short + Assert.assertFalse(ag.isInfinite((short) -1)); + Assert.assertFalse(ag.isInfinite((short) 0)); + Assert.assertFalse(ag.isInfinite((short) 23)); + Assert.assertFalse(ag.isInfinite(Short.MAX_VALUE)); + Assert.assertFalse(ag.isInfinite(Short.MIN_VALUE)); + Assert.assertFalse(ag.isInfinite(null)); + } + + @Test + public void testMean() throws Exception { + Assert.assertEquals(50.0, summarize(0, 100).getMean(), 0.0); + Assert.assertEquals(33.333333, summarize(0, 0, 100).getMean(), 0.00001); + Assert.assertEquals(50.0, summarize(0, 0, 100, 100).getMean(), 0.0); + Assert.assertEquals(50.0, summarize(0, 100, null).getMean(), 0.0); + Assert.assertNull(summarize().getMean()); + } + + @Test + public void testSum() throws Exception { + Assert.assertEquals(100, summarize(0, 100).getSum().shortValue()); + Assert.assertEquals(15, summarize(1, 2, 3, 4, 5).getSum().shortValue()); + Assert.assertEquals(0, summarize(-100, 0, 100, null).getSum().shortValue()); + Assert.assertEquals(90, summarize(-10, 100, null).getSum().shortValue()); + Assert.assertNull(summarize().getSum()); + } + + @Test + public void testMax() throws Exception { + Assert.assertEquals(1001, summarize(-1000, 0, 1, 50, 999, 1001).getMax().shortValue()); + Assert.assertEquals(0, summarize((int)Short.MIN_VALUE, -1000, 0).getMax().shortValue()); + Assert.assertEquals(11, summarize(1, 8, 7, 6, 9, 10, 2, 3, 5, 0, 11, -2, 3).getMax().shortValue()); + Assert.assertEquals(11, summarize(1, 8, 7, 6, 9, null, 10, 2, 3, 5, null, 0, 11, -2, 3).getMax().shortValue()); + Assert.assertNull(summarize().getMax()); + } + + @Test + public void testMin() throws Exception { + Assert.assertEquals(-1000, summarize(-1000, 0, 1, 50, 999, 1001).getMin().shortValue()); + Assert.assertEquals(Short.MIN_VALUE, summarize((int)Short.MIN_VALUE, -1000, 0).getMin().shortValue()); + Assert.assertEquals(-2, summarize(1, 8, 7, 6, 9, 10, 2, 3, 5, 0, 11, -2, 3).getMin().shortValue()); + Assert.assertEquals(-2, summarize(1, 8, 7, 6, 9, null, 10, 2, 3, 5, null, 0, 11, -2, 3).getMin().shortValue()); + Assert.assertNull(summarize().getMin()); + } + + /** + * Helper method for summarizing a list of values + */ + protected NumericColumnSummary summarize(Integer... values) { + + // cast everything to short here + Short[] shortValues = new Short[values.length]; + for(int i = 0; i < values.length; i++) { + if (values[i] != null) { + shortValues[i] = values[i].shortValue(); + } + } + + return new AggregateCombineHarness,ShortSummaryAggregator>() { + + @Override + protected void compareResults(NumericColumnSummary result1, NumericColumnSummary result2) { + + Assert.assertEquals(result1.getTotalCount(),result2.getTotalCount()); + Assert.assertEquals(result1.getNullCount(),result2.getNullCount()); + Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount()); + Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount()); + Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount()); + Assert.assertEquals(result1.getNanCount(),result2.getNanCount()); + + Assert.assertEquals(result1.containsNull(),result2.containsNull()); + Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull()); + + Assert.assertEquals(result1.getMin().shortValue(),result2.getMin().shortValue()); + Assert.assertEquals(result1.getMax().shortValue(), result2.getMax().shortValue()); + Assert.assertEquals(result1.getSum().shortValue(),result2.getSum().shortValue()); + Assert.assertEquals(result1.getMean().doubleValue(), result2.getMean().doubleValue(), 1e-12d); + Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(), 1e-9d); + Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(), 1e-12d); + } + }.summarize(shortValues); + } + +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortValueSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortValueSummaryAggregatorTest.java new file mode 100644 index 0000000000000..8a8e7aaed6d77 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/ShortValueSummaryAggregatorTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.apache.flink.types.ShortValue; +import org.junit.Assert; + +public class ShortValueSummaryAggregatorTest extends ShortSummaryAggregatorTest { + + /** + * Helper method for summarizing a list of values + */ + protected NumericColumnSummary summarize(Integer... values) { + + ShortValue[] shortValues = new ShortValue[values.length]; + for(int i = 0; i < values.length; i++) { + if (values[i] != null) { + shortValues[i] = new ShortValue(values[i].shortValue()); + } + } + + return new AggregateCombineHarness,ValueSummaryAggregator.ShortValueSummaryAggregator>() { + + @Override + protected void compareResults(NumericColumnSummary result1, NumericColumnSummary result2) { + + Assert.assertEquals(result1.getTotalCount(), result2.getTotalCount()); + Assert.assertEquals(result1.getNullCount(),result2.getNullCount()); + Assert.assertEquals(result1.getMissingCount(),result2.getMissingCount()); + Assert.assertEquals(result1.getNonMissingCount(),result2.getNonMissingCount()); + Assert.assertEquals(result1.getInfinityCount(),result2.getInfinityCount()); + Assert.assertEquals(result1.getNanCount(),result2.getNanCount()); + + Assert.assertEquals(result1.containsNull(),result2.containsNull()); + Assert.assertEquals(result1.containsNonNull(),result2.containsNonNull()); + + Assert.assertEquals(result1.getMin().shortValue(),result2.getMin().shortValue()); + Assert.assertEquals(result1.getMax().shortValue(), result2.getMax().shortValue()); + Assert.assertEquals(result1.getSum().shortValue(),result2.getSum().shortValue()); + Assert.assertEquals(result1.getMean().doubleValue(), result2.getMean().doubleValue(), 1e-12d); + Assert.assertEquals(result1.getVariance().doubleValue(),result2.getVariance().doubleValue(), 1e-9d); + Assert.assertEquals(result1.getStandardDeviation().doubleValue(),result2.getStandardDeviation().doubleValue(), 1e-12d); + } + }.summarize(shortValues); + } +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringSummaryAggregatorTest.java new file mode 100644 index 0000000000000..02fc1259dfd99 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringSummaryAggregatorTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.StringColumnSummary; +import org.junit.Assert; +import org.junit.Test; + + +public class StringSummaryAggregatorTest { + + @Test + public void testMixedGroup() { + StringColumnSummary summary = summarize("abc", "", null, " ", "defghi", "foo", null, null, "", " "); + Assert.assertEquals(10, summary.getTotalCount()); + Assert.assertEquals(3, summary.getNullCount()); + Assert.assertEquals(7, summary.getNonNullCount()); + Assert.assertEquals(2, summary.getEmptyCount()); + Assert.assertEquals(0, summary.getMinLength().intValue()); + Assert.assertEquals(6, summary.getMaxLength().intValue()); + Assert.assertEquals(2.142857, summary.getMeanLength().doubleValue(), 0.001); + } + + @Test + public void testAllNullStrings() { + StringColumnSummary summary = summarize(null, null, null, null); + Assert.assertEquals(4, summary.getTotalCount()); + Assert.assertEquals(4, summary.getNullCount()); + Assert.assertEquals(0, summary.getNonNullCount()); + Assert.assertEquals(0, summary.getEmptyCount()); + Assert.assertNull(summary.getMinLength()); + Assert.assertNull(summary.getMaxLength()); + Assert.assertNull(summary.getMeanLength()); + } + + @Test + public void testAllWithValues() { + StringColumnSummary summary = summarize("cat", "hat", "dog", "frog"); + Assert.assertEquals(4, summary.getTotalCount()); + Assert.assertEquals(0, summary.getNullCount()); + Assert.assertEquals(4, summary.getNonNullCount()); + Assert.assertEquals(0, summary.getEmptyCount()); + Assert.assertEquals(3, summary.getMinLength().intValue()); + Assert.assertEquals(4, summary.getMaxLength().intValue()); + Assert.assertEquals(3.25, summary.getMeanLength().doubleValue(), 0.0); + } + + /** + * Helper method for summarizing a list of values. + * + * This method breaks the rule of "testing only one thing" by aggregating and combining + * a bunch of different ways. + */ + protected StringColumnSummary summarize(String... values) { + + return new AggregateCombineHarness(){ + + @Override + protected void compareResults(StringColumnSummary result1, StringColumnSummary result2) { + Assert.assertEquals(result1.getEmptyCount(), result2.getEmptyCount()); + Assert.assertEquals(result1.getMaxLength(), result2.getMaxLength()); + Assert.assertEquals(result1.getMinLength(), result2.getMinLength()); + if (result1.getMeanLength() == null) { + Assert.assertEquals(result1.getMeanLength(), result2.getMeanLength()); + } + else { + Assert.assertEquals(result1.getMeanLength().doubleValue(), result2.getMeanLength().doubleValue(), 1e-5d); + } + Assert.assertEquals(result1.getNullCount(), result2.getNullCount()); + Assert.assertEquals(result1.getNonNullCount(), result2.getNonNullCount()); + } + + }.summarize(values); + } + +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringValueSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringValueSummaryAggregatorTest.java new file mode 100644 index 0000000000000..19bfd52c466ce --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/StringValueSummaryAggregatorTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.StringColumnSummary; +import org.apache.flink.types.StringValue; +import org.junit.Assert; + +public class StringValueSummaryAggregatorTest extends StringSummaryAggregatorTest { + + /** + * Helper method for summarizing a list of values. + * + * This method breaks the rule of "testing only one thing" by aggregating and combining + * a bunch of different ways. + */ + @Override + protected StringColumnSummary summarize(String... values) { + + StringValue[] stringValues = new StringValue[values.length]; + for(int i = 0; i < values.length; i++) { + if (values[i] != null) { + stringValues[i] = new StringValue(values[i]); + } + } + + return new AggregateCombineHarness(){ + + @Override + protected void compareResults(StringColumnSummary result1, StringColumnSummary result2) { + Assert.assertEquals(result1.getEmptyCount(), result2.getEmptyCount()); + Assert.assertEquals(result1.getMaxLength(), result2.getMaxLength()); + Assert.assertEquals(result1.getMinLength(), result2.getMinLength()); + if (result1.getMeanLength() == null) { + Assert.assertEquals(result1.getMeanLength(), result2.getMeanLength()); + } + else { + Assert.assertEquals(result1.getMeanLength().doubleValue(), result2.getMeanLength().doubleValue(), 1e-5d); + } + + Assert.assertEquals(result1.getNullCount(), result2.getNullCount()); + Assert.assertEquals(result1.getNonNullCount(), result2.getNonNullCount()); + } + + }.summarize(stringValues); + } + +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/SummaryAggregatorFactoryTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/SummaryAggregatorFactoryTest.java new file mode 100644 index 0000000000000..8134a90485eb7 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/SummaryAggregatorFactoryTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.types.*; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + + +public class SummaryAggregatorFactoryTest { + + @Test + public void testCreate() throws Exception { + // supported primitive types + Assert.assertEquals(StringSummaryAggregator.class, SummaryAggregatorFactory.create(String.class).getClass()); + Assert.assertEquals(ShortSummaryAggregator.class, SummaryAggregatorFactory.create(Short.class).getClass()); + Assert.assertEquals(IntegerSummaryAggregator.class, SummaryAggregatorFactory.create(Integer.class).getClass()); + Assert.assertEquals(LongSummaryAggregator.class, SummaryAggregatorFactory.create(Long.class).getClass()); + Assert.assertEquals(FloatSummaryAggregator.class, SummaryAggregatorFactory.create(Float.class).getClass()); + Assert.assertEquals(DoubleSummaryAggregator.class, SummaryAggregatorFactory.create(Double.class).getClass()); + Assert.assertEquals(BooleanSummaryAggregator.class, SummaryAggregatorFactory.create(Boolean.class).getClass()); + + // supported value types + Assert.assertEquals(ValueSummaryAggregator.StringValueSummaryAggregator.class, SummaryAggregatorFactory.create(StringValue.class).getClass()); + Assert.assertEquals(ValueSummaryAggregator.ShortValueSummaryAggregator.class, SummaryAggregatorFactory.create(ShortValue.class).getClass()); + Assert.assertEquals(ValueSummaryAggregator.IntegerValueSummaryAggregator.class, SummaryAggregatorFactory.create(IntValue.class).getClass()); + Assert.assertEquals(ValueSummaryAggregator.LongValueSummaryAggregator.class, SummaryAggregatorFactory.create(LongValue.class).getClass()); + Assert.assertEquals(ValueSummaryAggregator.FloatValueSummaryAggregator.class, SummaryAggregatorFactory.create(FloatValue.class).getClass()); + Assert.assertEquals(ValueSummaryAggregator.DoubleValueSummaryAggregator.class, SummaryAggregatorFactory.create(DoubleValue.class).getClass()); + Assert.assertEquals(ValueSummaryAggregator.BooleanValueSummaryAggregator.class, SummaryAggregatorFactory.create(BooleanValue.class).getClass()); + + // some not well supported types - these fallback to ObjectSummaryAggregator + Assert.assertEquals(ObjectSummaryAggregator.class, SummaryAggregatorFactory.create(Object.class).getClass()); + Assert.assertEquals(ObjectSummaryAggregator.class, SummaryAggregatorFactory.create(List.class).getClass()); + } + +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java index 4ccc6e24ba006..86d0ea3440b83 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java @@ -24,14 +24,21 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.Utils; +import org.apache.flink.api.java.summarize.BooleanColumnSummary; +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.apache.flink.api.java.summarize.StringColumnSummary; +import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple8; import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.types.DoubleValue; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -94,4 +101,76 @@ public void testIntegerDataSetChecksumHashCode() throws Exception { Assert.assertEquals(checksum.getCount(), 15); Assert.assertEquals(checksum.getChecksum(), 55); } + + @Test + public void testSummarize() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + List> data = new ArrayList<>(); + data.add(new Tuple8<>((short)1, 1, 100L, 0.1f, 1.012376, "hello", false, new DoubleValue(50.0))); + data.add(new Tuple8<>((short)2, 2, 1000L, 0.2f, 2.003453, "hello", true, new DoubleValue(50.0))); + data.add(new Tuple8<>((short)4, 10, 10000L, 0.2f, 75.00005, "null", true, new DoubleValue(50.0))); + data.add(new Tuple8<>((short)10, 4, 100L, 0.9f, 79.5, "", true, new DoubleValue(50.0))); + data.add(new Tuple8<>((short)5, 5, 1000L, 0.2f, 10.0000001, "a", false, new DoubleValue(50.0))); + data.add(new Tuple8<>((short)6, 6, 10L, 0.1f, 0.0000000000023, "", true, new DoubleValue(100.0))); + data.add(new Tuple8<>((short)7, 7, 1L, 0.2f, Double.POSITIVE_INFINITY, "abcdefghijklmnop", true, new DoubleValue(100.0))); + data.add(new Tuple8<>((short)8, 8, -100L, 0.001f, Double.NaN, "abcdefghi", true, new DoubleValue(100.0))); + + Collections.shuffle(data); + + DataSet> ds = env.fromCollection(data); + + // call method under test + Tuple results = DataSetUtils.summarize(ds); + + Assert.assertEquals(8, results.getArity()); + + NumericColumnSummary col0Summary = results.getField(0); + Assert.assertEquals(8, col0Summary.getNonMissingCount()); + Assert.assertEquals(1, col0Summary.getMin().shortValue()); + Assert.assertEquals(10, col0Summary.getMax().shortValue()); + Assert.assertEquals(5.375, col0Summary.getMean().doubleValue(), 0.0); + + NumericColumnSummary col1Summary = results.getField(1); + Assert.assertEquals(1, col1Summary.getMin().intValue()); + Assert.assertEquals(10, col1Summary.getMax().intValue()); + Assert.assertEquals(5.375, col1Summary.getMean().doubleValue(), 0.0); + + NumericColumnSummary col2Summary = results.getField(2); + Assert.assertEquals(-100L, col2Summary.getMin().longValue()); + Assert.assertEquals(10000L, col2Summary.getMax().longValue()); + + NumericColumnSummary col3Summary = results.getField(3); + Assert.assertEquals(8, col3Summary.getTotalCount()); + Assert.assertEquals(0.001000, col3Summary.getMin().doubleValue(), 0.0000001); + Assert.assertEquals(0.89999999, col3Summary.getMax().doubleValue(), 0.0000001); + Assert.assertEquals(0.2376249988883501, col3Summary.getMean().doubleValue(), 0.000000000001); + Assert.assertEquals(0.0768965488108089, col3Summary.getVariance().doubleValue(), 0.00000001); + Assert.assertEquals(0.27730226975415995, col3Summary.getStandardDeviation().doubleValue(), 0.000000000001); + + NumericColumnSummary col4Summary = results.getField(4); + Assert.assertEquals(6, col4Summary.getNonMissingCount()); + Assert.assertEquals(2, col4Summary.getMissingCount()); + Assert.assertEquals(0.0000000000023, col4Summary.getMin().doubleValue(), 0.0); + Assert.assertEquals(79.5, col4Summary.getMax().doubleValue(), 0.000000000001); + + StringColumnSummary col5Summary = results.getField(5); + Assert.assertEquals(8, col5Summary.getTotalCount()); + Assert.assertEquals(0, col5Summary.getNullCount()); + Assert.assertEquals(8, col5Summary.getNonNullCount()); + Assert.assertEquals(2, col5Summary.getEmptyCount()); + Assert.assertEquals(0, col5Summary.getMinLength().intValue()); + Assert.assertEquals(16, col5Summary.getMaxLength().intValue()); + Assert.assertEquals(5.0, col5Summary.getMeanLength().doubleValue(), 0.0001); + + BooleanColumnSummary col6Summary = results.getField(6); + Assert.assertEquals(8, col6Summary.getTotalCount()); + Assert.assertEquals(2, col6Summary.getFalseCount()); + Assert.assertEquals(6, col6Summary.getTrueCount()); + Assert.assertEquals(0, col6Summary.getNullCount()); + + NumericColumnSummary col7Summary = results.getField(7); + Assert.assertEquals(100.0, col7Summary.getMax().doubleValue(), 0.00001); + Assert.assertEquals(50.0, col7Summary.getMin().doubleValue(), 0.00001); + } } From 6f5d7216372aea682da49266d9742df8c5e886e5 Mon Sep 17 00:00:00 2001 From: Todd Lisonbee Date: Mon, 4 Apr 2016 23:40:10 -0700 Subject: [PATCH 2/3] [FLINK-3664] Adding DoubleValueSummaryAggregatorTest.java --- .../DoubleSummaryAggregatorTest.java | 2 +- .../DoubleValueSummaryAggregatorTest.java | 37 +++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) create mode 100644 flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleValueSummaryAggregatorTest.java diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregatorTest.java index 64df46d1a22d9..08fbe787c22b8 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleSummaryAggregatorTest.java @@ -158,7 +158,7 @@ public void testCounts() throws Exception { * This method breaks the rule of "testing only one thing" by aggregating and combining * a bunch of different ways. */ - private static NumericColumnSummary summarize(Double... values) { + protected NumericColumnSummary summarize(Double... values) { return new AggregateCombineHarness,DoubleSummaryAggregator>() { @Override diff --git a/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleValueSummaryAggregatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleValueSummaryAggregatorTest.java new file mode 100644 index 0000000000000..f482fe2c010fd --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/summarize/aggregation/DoubleValueSummaryAggregatorTest.java @@ -0,0 +1,37 @@ +package org.apache.flink.api.java.summarize.aggregation; + +import org.apache.flink.api.java.summarize.NumericColumnSummary; +import org.apache.flink.types.DoubleValue; +import org.junit.Assert; + +public class DoubleValueSummaryAggregatorTest extends DoubleSummaryAggregatorTest { + + /** + * Helper method for summarizing a list of values. + * + * This method breaks the rule of "testing only one thing" by aggregating and combining + * a bunch of different ways. + */ + protected NumericColumnSummary summarize(Double... values) { + + DoubleValue[] doubleValues = new DoubleValue[values.length]; + for(int i = 0; i < values.length; i++) { + if (values[i] != null) { + doubleValues[i] = new DoubleValue(values[i]); + } + } + + return new AggregateCombineHarness,ValueSummaryAggregator.DoubleValueSummaryAggregator>() { + + @Override + protected void compareResults(NumericColumnSummary result1, NumericColumnSummary result2) { + Assert.assertEquals(result1.getMin(), result2.getMin(), 0.0); + Assert.assertEquals(result1.getMax(), result2.getMax(), 0.0); + Assert.assertEquals(result1.getMean(), result2.getMean(), 1e-12d); + Assert.assertEquals(result1.getVariance(), result2.getVariance(), 1e-9d); + Assert.assertEquals(result1.getStandardDeviation(), result2.getStandardDeviation(), 1e-12d); + } + + }.summarize(doubleValues); + } +} From 916daf081552128ccf79e48413c8a9bdce13f7c7 Mon Sep 17 00:00:00 2001 From: Todd Lisonbee Date: Tue, 5 Apr 2016 00:55:24 -0700 Subject: [PATCH 3/3] [FLINK-3664] adding javadoc --- .../flink/api/java/summarize/aggregation/CompensatedSum.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSum.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSum.java index 5693b384bf551..0a90f6db0adb9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSum.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/CompensatedSum.java @@ -41,7 +41,7 @@ public class CompensatedSum implements java.io.Serializable { private final double delta; /** - * + * Used to calculate sums using the Kahan summation algorithm * @param value the sum * @param delta correction term */ @@ -50,6 +50,9 @@ public CompensatedSum(double value, double delta) { this.delta = delta; } + /** + * The value of the sum + */ public double value() { return value; }