From 49fd2294801d1dde6fcc43cc7573bcf5928a8a50 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sat, 28 Jan 2017 20:46:46 +0800 Subject: [PATCH 1/2] Add `approx_percentile` --- .../org/apache/spark/sql/functions.scala | 187 ++++++++++++++++++ .../spark/sql/DataFrameAggregateSuite.scala | 59 ++++++ 2 files changed, 246 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index c86ae5be9ef62..7c95002cb4729 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -616,6 +616,193 @@ object functions { */ def min(columnName: String): Column = min(Column(columnName)) + /** + * :: Experimental :: + * + * Aggregate function: returns the approximate percentiles of a column at the given + * percentages. A percentile is a watermark value below which a given percentage of the column + * values fall. For example, the percentile of column `col` at percentage 50% is the median of + * column `col`. + * + * @param col numeric column to compute approximate percentile values on + * @param percentages an array of double values representing the percentages requested; each + * percentage value must be between 0.0 and 1.0 + * @param accuracy approximation accuracy at the cost of memory; higher value yields better + * accuracy, the default value is DEFAULT_PERCENTILE_ACCURACY + * + * @return the approximate percentile array of column `col` at the given percentage array + * + * @group agg_funcs + * @since 2.2.0 + */ + @Experimental + def approx_percentile(col: Column, percentages: Array[Double], accuracy: Int): Column = + withAggregateFunction { + new ApproximatePercentile( + col.expr, CreateArray(percentages.map(Literal(_))), Literal(accuracy)) + } + + /** + * :: Experimental :: + * + * Aggregate function: returns the approximate percentiles of a column at the given + * percentages. A percentile is a watermark value below which a given percentage of the column + * values fall. For example, the percentile of column `col` at percentage 50% is the median of + * column `col`. + * + * @param columnName numeric column to compute approximate percentile values on + * @param percentages an array of double values representing the percentages requested; each + * percentage value must be between 0.0 and 1.0 + * @param accuracy approximation accuracy at the cost of memory; higher value yields better + * accuracy, the default value is DEFAULT_PERCENTILE_ACCURACY + * + * @return the approximate percentile array of column `col` at the given percentage array + * + * @group agg_funcs + * @since 2.2.0 + */ + @Experimental + def approx_percentile(columnName: String, percentages: Array[Double], accuracy: Int): Column = { + approx_percentile(Column(columnName), percentages, accuracy) + } + + /** + * :: Experimental :: + * + * Aggregate function: returns the approximate percentiles of a column at the given + * percentages. A percentile is a watermark value below which a given percentage of the column + * values fall. For example, the percentile of column `col` at percentage 50% is the median of + * column `col`. + * + * @param col numeric column to compute approximate percentile values on + * @param percentages an array of double values representing the percentages requested; each + * percentage value must be between 0.0 and 1.0 + * + * @return the approximate percentile array of column `col` at the given percentage array + * + * @group agg_funcs + * @since 2.2.0 + */ + @Experimental + def approx_percentile(col: Column, percentages: Array[Double]): Column = withAggregateFunction { + new ApproximatePercentile(col.expr, CreateArray(percentages.map(v => Literal(v)))) + } + + /** + * :: Experimental :: + * + * Aggregate function: returns the approximate percentiles of a column at the given + * percentages. A percentile is a watermark value below which a given percentage of the column + * values fall. For example, the percentile of column `col` at percentage 50% is the median of + * column `col`. + * + * @param columnName numeric column to compute approximate percentile values on + * @param percentages an array of double values representing the percentages requested; each + * percentage value must be between 0.0 and 1.0 + * + * @return the approximate percentile array of column `col` at the given percentage array + * + * @group agg_funcs + * @since 2.2.0 + */ + @Experimental + def approx_percentile(columnName: String, percentages: Array[Double]): Column = { + approx_percentile(Column(columnName), percentages) + } + + /** + * :: Experimental :: + * + * Aggregate function: returns the approximate percentile of a column at the given + * percentage. A percentile is a watermark value below which a given percentage of the column + * values fall. For example, the percentile of column `col` at percentage 50% is the median of + * column `col`. + * + * @param col numeric column to compute approximate percentile value on + * @param percentage a double value representing the percentage requested; the percentage value + * must be between 0.0 and 1.0 + * @param accuracy approximation accuracy at the cost of memory; higher value yields better + * accuracy, the default value is DEFAULT_PERCENTILE_ACCURACY + * + * @return the approximate percentile of column `col` at the given percentage + * + * @group agg_funcs + * @since 2.2.0 + */ + @Experimental + def approx_percentile(col: Column, percentage: Double, accuracy: Int): Column = + withAggregateFunction { + new ApproximatePercentile(col.expr, Literal(percentage), Literal(accuracy)) + } + + /** + * :: Experimental :: + * + * Aggregate function: returns the approximate percentile of a column at the given + * percentage. A percentile is a watermark value below which a given percentage of the column + * values fall. For example, the percentile of column `col` at percentage 50% is the median of + * column `col`. + * + * @param columnName numeric column to compute approximate percentile value on + * @param percentage a double value representing the percentage requested; the percentage value + * must be between 0.0 and 1.0 + * @param accuracy approximation accuracy at the cost of memory; higher value yields better + * accuracy, the default value is DEFAULT_PERCENTILE_ACCURACY + * + * @return the approximate percentile of column `col` at the given percentage + * + * @group agg_funcs + * @since 2.2.0 + */ + @Experimental + def approx_percentile(columnName: String, percentage: Double, accuracy: Int): Column = { + approx_percentile(Column(columnName), percentage, accuracy) + } + + /** + * :: Experimental :: + * + * Aggregate function: returns the approximate percentile of a column at the given + * percentage. A percentile is a watermark value below which a given percentage of the column + * values fall. For example, the percentile of column `col` at percentage 50% is the median of + * column `col`. + * + * @param col numeric column to compute approximate percentile value on + * @param percentage a double value representing the percentage requested; the percentage value + * must be between 0.0 and 1.0 + * + * @return the approximate percentile of column `col` at the given percentage + * + * @group agg_funcs + * @since 2.2.0 + */ + @Experimental + def approx_percentile(col: Column, percentage: Double): Column = withAggregateFunction { + new ApproximatePercentile(col.expr, Literal(percentage)) + } + + /** + * :: Experimental :: + * + * Aggregate function: returns the approximate percentile of a column at the given + * percentage. A percentile is a watermark value below which a given percentage of the column + * values fall. For example, the percentile of column `col` at percentage 50% is the median of + * column `col`. + * + * @param columnName numeric column to compute approximate percentile value on + * @param percentage a double value representing the percentage requested; the percentage value + * must be between 0.0 and 1.0 + * + * @return the approximate percentile of column `col` at the given percentage + * + * @group agg_funcs + * @since 2.2.0 + */ + @Experimental + def approx_percentile(columnName: String, percentage: Double): Column = { + approx_percentile(Column(columnName), percentage) + } + /** * Aggregate function: returns the skewness of the values in a group. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index e7079120bb7df..993fe07e06f12 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -361,6 +361,65 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { Row(0, null)) } + test("approx_percentile functions") { + val df = (1 to 100).toDF("num") + + // with accuracy specified + checkAnswer( + df.select( + approx_percentile($"num", 0.1d, accuracy = 10), + approx_percentile("num", 0.2d, accuracy = 10), + approx_percentile($"num", Array(0.3d, 0.4d, 0.5d), accuracy = 10), + approx_percentile("num", Array(0.6d, 0.7d, 0.8d), accuracy = 10) + ), + Seq(Row(1.0, 12.0, Seq(23.0, 33.0, 41.0), Seq(55.0, 62.0, 73.0))) // results are stable + ) + + // with default accuracy + checkAnswer( + df.select( + approx_percentile($"num", 0.1d), + approx_percentile("num", 0.2d), + approx_percentile($"num", Array(0.3d, 0.4d, 0.5d)), + approx_percentile("num", Array(0.6d, 0.7d, 0.8d)) + ), + Seq(Row(10.0, 20.0, Seq(30.0, 40.0, 50.0), Seq(60.0, 70.0, 80.0))) // results are stable + ) + } + + test("approx_percentile functions with empty inputs") { + val df = (1 to 100).toDF("num").where($"num" > 200) + + checkAnswer( + df.select(approx_percentile($"num", Array(0.5d))), + Seq(Row(null)) + ) + + checkAnswer( + df.select(approx_percentile($"num", 0.5d)), + Seq(Row(null)) + ) + } + + test("approx_percentile functions with malformed params") { + val df = (1 to 100).toDF("num") + + var e = intercept[AnalysisException] { + df.select(approx_percentile($"num", Array(-1.0, 0.0, 0.5))) + } + assert(e.getMessage().contains("must be between 0.0 and 1.0")) + + e = intercept[AnalysisException] { + df.select(approx_percentile($"num", -1.0)) + } + assert(e.getMessage().contains("must be between 0.0 and 1.0")) + + e = intercept[AnalysisException] { + df.select(approx_percentile($"num", 1.0, -100)) + } + assert(e.getMessage().contains("must be a positive integer literal")) + } + test("stddev") { val testData2ADev = math.sqrt(4.0 / 5.0) checkAnswer( From b6b93871230866c5d9aff2427b8bf83d1cb45661 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sun, 29 Jan 2017 10:29:57 +0800 Subject: [PATCH 2/2] Mark as `@InterfaceStability.Evolving` --- .../src/main/scala/org/apache/spark/sql/functions.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 7c95002cb4729..0f7af5641ad39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -636,6 +636,7 @@ object functions { * @since 2.2.0 */ @Experimental + @InterfaceStability.Evolving def approx_percentile(col: Column, percentages: Array[Double], accuracy: Int): Column = withAggregateFunction { new ApproximatePercentile( @@ -662,6 +663,7 @@ object functions { * @since 2.2.0 */ @Experimental + @InterfaceStability.Evolving def approx_percentile(columnName: String, percentages: Array[Double], accuracy: Int): Column = { approx_percentile(Column(columnName), percentages, accuracy) } @@ -684,6 +686,7 @@ object functions { * @since 2.2.0 */ @Experimental + @InterfaceStability.Evolving def approx_percentile(col: Column, percentages: Array[Double]): Column = withAggregateFunction { new ApproximatePercentile(col.expr, CreateArray(percentages.map(v => Literal(v)))) } @@ -706,6 +709,7 @@ object functions { * @since 2.2.0 */ @Experimental + @InterfaceStability.Evolving def approx_percentile(columnName: String, percentages: Array[Double]): Column = { approx_percentile(Column(columnName), percentages) } @@ -730,6 +734,7 @@ object functions { * @since 2.2.0 */ @Experimental + @InterfaceStability.Evolving def approx_percentile(col: Column, percentage: Double, accuracy: Int): Column = withAggregateFunction { new ApproximatePercentile(col.expr, Literal(percentage), Literal(accuracy)) @@ -755,6 +760,7 @@ object functions { * @since 2.2.0 */ @Experimental + @InterfaceStability.Evolving def approx_percentile(columnName: String, percentage: Double, accuracy: Int): Column = { approx_percentile(Column(columnName), percentage, accuracy) } @@ -777,6 +783,7 @@ object functions { * @since 2.2.0 */ @Experimental + @InterfaceStability.Evolving def approx_percentile(col: Column, percentage: Double): Column = withAggregateFunction { new ApproximatePercentile(col.expr, Literal(percentage)) } @@ -799,6 +806,7 @@ object functions { * @since 2.2.0 */ @Experimental + @InterfaceStability.Evolving def approx_percentile(columnName: String, percentage: Double): Column = { approx_percentile(Column(columnName), percentage) }