From 408cb77839d469c4738c506f8a3dfa856e46a7d0 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 30 Apr 2015 15:00:56 -0700 Subject: [PATCH 01/10] initial commit make progress make progress2 finished cov implementation. Waiting for freqItems to be merged trying to debug added cov --- python/pyspark/sql/dataframe.py | 42 +++++++++- python/pyspark/sql/tests.py | 5 ++ .../spark/sql/DataFrameStatFunctions.scala | 12 ++- .../sql/execution/stat/StatFunctions.scala | 81 +++++++++++++++++++ .../apache/spark/sql/JavaDataFrameSuite.java | 7 ++ .../apache/spark/sql/DataFrameStatSuite.scala | 20 ++++- 6 files changed, 163 insertions(+), 4 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 5908ebc990a56..0a3869792db2a 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -34,7 +34,8 @@ from pyspark.sql.types import _create_cls, _parse_datatype_json_string -__all__ = ["DataFrame", "GroupedData", "Column", "SchemaRDD", "DataFrameNaFunctions"] +__all__ = ["DataFrame", "GroupedData", "Column", "SchemaRDD", "DataFrameNaFunctions", + "DataFrameStatFunctions"] class DataFrame(object): @@ -93,6 +94,12 @@ def na(self): """ return DataFrameNaFunctions(self) + @property + def stat(self): + """Returns a :class:`DataFrameStatFunctions` for statistic functions. + """ + return DataFrameStatFunctions(self) + @ignore_unicode_prefix def toJSON(self, use_unicode=True): """Converts a :class:`DataFrame` into a :class:`RDD` of string. @@ -868,6 +875,17 @@ def fillna(self, value, subset=None): return DataFrame(self._jdf.na().fill(value, self._jseq(subset)), self.sql_ctx) + def cov(self, col1, col2): + """ + Calculate the covariance for the given columns, specified by their names. + alias for ``stat.cov()``. + + :param col1: The name of the first column + :param col2: The name of the second column + :return: the covariance of the columns + """ + return self.stat.cov(col1, col2) + @ignore_unicode_prefix def withColumn(self, colName, col): """Returns a new :class:`DataFrame` by adding a column. @@ -1311,6 +1329,28 @@ def fill(self, value, subset=None): fill.__doc__ = DataFrame.fillna.__doc__ +class DataFrameStatFunctions(object): + """Functionality for statistic functions with :class:`DataFrame`. + """ + + def __init__(self, df): + self.df = df + + def cov(self, col1, col2): + """ + Calculate the covariance for the given columns, specified by their names. + + :param col1: The name of the first column + :param col2: The name of the second column + :return: the covariance of the columns + """ + if not isinstance(col1, str): + raise ValueError("col1 should be a string.") + if not isinstance(col2, str): + raise ValueError("col2 should be a string.") + return self.df._jdf.stat().cov(col1, col2) + + def _test(): import doctest from pyspark.context import SparkContext diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 2ffd18ebd7c89..05aad62f0764b 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -387,6 +387,11 @@ def test_aggregator(self): self.assertTrue(95 < g.agg(functions.approxCountDistinct(df.key)).first()[0]) self.assertEqual(100, g.agg(functions.countDistinct(df.value)).first()[0]) + def test_cov(self): + df = self.sc.parallelize([Row(a=i, b=2 * i) for i in range(10)]).toDF() + cov = df.stat.cov("a", "b") + self.assertTrue(abs(cov - 16.5) < 1e-6) + def test_math_functions(self): df = self.sc.parallelize([Row(a=i, b=2 * i) for i in range(10)]).toDF() from pyspark.sql import mathfunctions as functions diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 42e5cbc05e1e0..a5aa7f51786af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.execution.stat.FrequentItems +import org.apache.spark.sql.execution.stat._ /** * :: Experimental :: @@ -65,4 +65,14 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { def freqItems(cols: List[String]): DataFrame = { FrequentItems.singlePassFreqItems(df, cols, 0.01) } + + /** + * Calculate the covariance of two numerical columns of a DataFrame. + * @param col1 the name of the first column + * @param col2 the name of the second column + * @return the covariance of the two columns. + */ + def cov(col1: String, col2: String): Double = { + StatFunctions.calculateCov(df, Seq(col1, col2)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala new file mode 100644 index 0000000000000..08bd297cb61ab --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.stat + +import org.apache.spark.sql.types.NumericType +import org.apache.spark.sql.{Column, DataFrame} + +private[sql] object StatFunctions { + + /** Helper class to simplify tracking and merging counts. */ + private class CovarianceCounter extends Serializable { + var xAvg = 0.0 + var yAvg = 0.0 + var Ck = 0.0 + var count = 0 + // add an example to the calculation + def add(x: Number, y: Number): this.type = { + val oldX = xAvg + val otherX = x.doubleValue() + val otherY = y.doubleValue() + count += 1 + xAvg += (otherX - xAvg) / count + yAvg += (otherY - yAvg) / count + println(oldX) + Ck += (otherY - yAvg) * (otherX - oldX) + this + } + // merge counters from other partitions + def merge(other: CovarianceCounter): this.type = { + val totalCount = count + other.count + Ck += other.Ck + + (xAvg - other.xAvg) * (yAvg - other.yAvg) * (count * other.count) / totalCount + xAvg = (xAvg * count + other.xAvg * other.count) / totalCount + yAvg = (yAvg * count + other.yAvg * other.count) / totalCount + count = totalCount + this + } + // return the covariance for the observed examples + def cov: Double = Ck / count + } + + /** + * Calculate the covariance of two numerical columns of a DataFrame. + * @param df The DataFrame + * @param cols the column names + * @return the covariance of the two columns. + */ + private[sql] def calculateCov(df: DataFrame, cols: Seq[String]): Double = { + require(cols.length == 2, "Currently cov supports calculating the covariance " + + "between two columns.") + cols.map(name => (name, df.schema.fields.find(_.name == name))).foreach { case (name, data) => + require(data.nonEmpty, s"Couldn't find column with name $name") + require(data.get.dataType.isInstanceOf[NumericType], "Covariance calculation for columns " + + s"with dataType ${data.get.dataType} not supported.") + } + val counts = df.select(cols.map(Column(_)):_*).rdd.aggregate(new CovarianceCounter)( + seqOp = (counter, row) => { + counter.add(row.getAs[Number](0), row.getAs[Number](1)) + }, + combOp = (baseCounter, other) => { + baseCounter.merge(other) + }) + counts.cov + } + +} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index 966d879e1fc9f..35debd2397409 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -183,4 +183,11 @@ public void testFrequentItems() { DataFrame results = df.stat().freqItems(cols, 0.2); Assert.assertTrue(results.collect()[0].getSeq(0).contains(1)); } + + @Test + public void testCovariance() { + DataFrame df = context.table("testData2"); + Double result = df.stat().cov("a", "b"); + Assert.assertTrue(Math.abs(result) < 1e-6); + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index bb1d29c71d23b..9a2c3b54e267a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -25,10 +25,11 @@ import org.apache.spark.sql.test.TestSQLContext.implicits._ class DataFrameStatSuite extends FunSuite { + import TestData._ val sqlCtx = TestSQLContext - + def toLetter(i: Int): String = (i + 97).toChar.toString + test("Frequent Items") { - def toLetter(i: Int): String = (i + 96).toChar.toString val rows = Array.tabulate(1000) { i => if (i % 3 == 0) (1, toLetter(1), -1.0) else (i, toLetter(i), i * -1.0) } @@ -44,4 +45,19 @@ class DataFrameStatSuite extends FunSuite { items2.getSeq[Double](0) should contain (-1.0) } + + test("covariance") { + val rows = Array.tabulate(10)(i => (i, 2.0 * i, toLetter(i))) + val df = sqlCtx.sparkContext.parallelize(rows).toDF("singles", "doubles", "letters") + df.show() + + val results = df.stat.cov("singles", "doubles") + println(results) + assert(math.abs(results - 16.5) < 1e-6) + intercept[IllegalArgumentException] { + df.stat.cov("singles", "letters") // doesn't accept non-numerical dataTypes + } + val decimalRes = decimalData.stat.cov("a", "b") + assert(math.abs(decimalRes) < 1e-6) + } } From 7dc6dbca2fa7f990c30bb605727025fa1a34c7a0 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 30 Apr 2015 21:14:13 -0700 Subject: [PATCH 02/10] reorder imports --- .../org/apache/spark/sql/execution/stat/StatFunctions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 08bd297cb61ab..90582d6d4b98a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.execution.stat -import org.apache.spark.sql.types.NumericType import org.apache.spark.sql.{Column, DataFrame} +import org.apache.spark.sql.types.NumericType private[sql] object StatFunctions { From a7115f1baa8d62a239d6643092cc986186f45088 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 30 Apr 2015 21:24:57 -0700 Subject: [PATCH 03/10] fix python style --- python/pyspark/sql/dataframe.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 0a3869792db2a..b2a8c4cab22be 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -34,7 +34,7 @@ from pyspark.sql.types import _create_cls, _parse_datatype_json_string -__all__ = ["DataFrame", "GroupedData", "Column", "SchemaRDD", "DataFrameNaFunctions", +__all__ = ["DataFrame", "GroupedData", "Column", "SchemaRDD", "DataFrameNaFunctions", "DataFrameStatFunctions"] @@ -877,9 +877,9 @@ def fillna(self, value, subset=None): def cov(self, col1, col2): """ - Calculate the covariance for the given columns, specified by their names. + Calculate the covariance for the given columns, specified by their names. alias for ``stat.cov()``. - + :param col1: The name of the first column :param col2: The name of the second column :return: the covariance of the columns From e3b0b857c860d2d69baf4cd103070663e851ecf2 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 30 Apr 2015 22:58:47 -0700 Subject: [PATCH 04/10] addressed comments v0.1 --- python/pyspark/sql/__init__.py | 5 +++-- python/pyspark/sql/dataframe.py | 15 ++++----------- .../sql/execution/stat/StatFunctions.scala | 19 +++++++++---------- .../apache/spark/sql/DataFrameStatSuite.scala | 2 -- 4 files changed, 16 insertions(+), 25 deletions(-) diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py index 6d54b9e49ed10..379358d6017da 100644 --- a/python/pyspark/sql/__init__.py +++ b/python/pyspark/sql/__init__.py @@ -53,8 +53,9 @@ from pyspark.sql.types import Row from pyspark.sql.context import SQLContext, HiveContext -from pyspark.sql.dataframe import DataFrame, GroupedData, Column, SchemaRDD, DataFrameNaFunctions +from pyspark.sql.dataframe import DataFrame, GroupedData, Column, SchemaRDD, DataFrameNaFunctions, DataFrameStatFunctions __all__ = [ - 'SQLContext', 'HiveContext', 'DataFrame', 'GroupedData', 'Column', 'Row', 'DataFrameNaFunctions' + 'SQLContext', 'HiveContext', 'DataFrame', 'GroupedData', 'Column', 'Row', + 'DataFrameNaFunctions', 'DataFrameStatFunctions' ] diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index b2a8c4cab22be..6b8b112b4d230 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -877,12 +877,11 @@ def fillna(self, value, subset=None): def cov(self, col1, col2): """ - Calculate the covariance for the given columns, specified by their names. - alias for ``stat.cov()``. + Calculate the covariance for the given columns, specified by their names as a double value. + :func:`DataFrame.cov` and :func:`DataFrameStatFunctions.cov` are aliases. :param col1: The name of the first column :param col2: The name of the second column - :return: the covariance of the columns """ return self.stat.cov(col1, col2) @@ -1337,19 +1336,13 @@ def __init__(self, df): self.df = df def cov(self, col1, col2): - """ - Calculate the covariance for the given columns, specified by their names. - - :param col1: The name of the first column - :param col2: The name of the second column - :return: the covariance of the columns - """ if not isinstance(col1, str): raise ValueError("col1 should be a string.") if not isinstance(col2, str): raise ValueError("col2 should be a string.") return self.df._jdf.stat().cov(col1, col2) - + + cov.__doc__ = DataFrame.cov.__doc__ def _test(): import doctest diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 90582d6d4b98a..99da155239beb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -17,8 +17,9 @@ package org.apache.spark.sql.execution.stat +import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.{Column, DataFrame} -import org.apache.spark.sql.types.NumericType +import org.apache.spark.sql.types.{DoubleType, NumericType} private[sql] object StatFunctions { @@ -29,15 +30,12 @@ private[sql] object StatFunctions { var Ck = 0.0 var count = 0 // add an example to the calculation - def add(x: Number, y: Number): this.type = { + def add(x: Double, y: Double): this.type = { val oldX = xAvg - val otherX = x.doubleValue() - val otherY = y.doubleValue() count += 1 - xAvg += (otherX - xAvg) / count - yAvg += (otherY - yAvg) / count - println(oldX) - Ck += (otherY - yAvg) * (otherX - oldX) + xAvg += (x - xAvg) / count + yAvg += (y - yAvg) / count + Ck += (y - yAvg) * (x - oldX) this } // merge counters from other partitions @@ -68,9 +66,10 @@ private[sql] object StatFunctions { require(data.get.dataType.isInstanceOf[NumericType], "Covariance calculation for columns " + s"with dataType ${data.get.dataType} not supported.") } - val counts = df.select(cols.map(Column(_)):_*).rdd.aggregate(new CovarianceCounter)( + val columns = cols.map(n => Column(Cast(Column(n).expr, DoubleType))) + val counts = df.select(columns:_*).rdd.aggregate(new CovarianceCounter)( seqOp = (counter, row) => { - counter.add(row.getAs[Number](0), row.getAs[Number](1)) + counter.add(row.getDouble(0), row.getDouble(1)) }, combOp = (baseCounter, other) => { baseCounter.merge(other) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index 9a2c3b54e267a..6f3ba616748b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -49,10 +49,8 @@ class DataFrameStatSuite extends FunSuite { test("covariance") { val rows = Array.tabulate(10)(i => (i, 2.0 * i, toLetter(i))) val df = sqlCtx.sparkContext.parallelize(rows).toDF("singles", "doubles", "letters") - df.show() val results = df.stat.cov("singles", "doubles") - println(results) assert(math.abs(results - 16.5) < 1e-6) intercept[IllegalArgumentException] { df.stat.cov("singles", "letters") // doesn't accept non-numerical dataTypes From aa2ad29bc67bca7e68aa2a87bfc67e02d671b6cd Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 30 Apr 2015 23:10:12 -0700 Subject: [PATCH 05/10] fix pyStyle2 --- python/pyspark/sql/dataframe.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 6b8b112b4d230..3ed2a6dd8879f 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1341,9 +1341,10 @@ def cov(self, col1, col2): if not isinstance(col2, str): raise ValueError("col2 should be a string.") return self.df._jdf.stat().cov(col1, col2) - + cov.__doc__ = DataFrame.cov.__doc__ + def _test(): import doctest from pyspark.context import SparkContext From 8456eca957aca1771e08fc40e19db82f8cda01a4 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 30 Apr 2015 23:11:17 -0700 Subject: [PATCH 06/10] fix pyStyle3 --- python/pyspark/sql/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py index 379358d6017da..b60b991dd4d8b 100644 --- a/python/pyspark/sql/__init__.py +++ b/python/pyspark/sql/__init__.py @@ -53,7 +53,8 @@ from pyspark.sql.types import Row from pyspark.sql.context import SQLContext, HiveContext -from pyspark.sql.dataframe import DataFrame, GroupedData, Column, SchemaRDD, DataFrameNaFunctions, DataFrameStatFunctions +from pyspark.sql.dataframe import DataFrame, GroupedData, Column, SchemaRDD, DataFrameNaFunctions +from pyspark.sql.dataframe import DataFrameStatFunctions __all__ = [ 'SQLContext', 'HiveContext', 'DataFrame', 'GroupedData', 'Column', 'Row', From 0c6a759afbdad5d78765422c50e0d811f5107ad2 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 1 May 2015 00:21:05 -0700 Subject: [PATCH 07/10] addressed math comments --- .../apache/spark/sql/execution/stat/StatFunctions.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 99da155239beb..7b4fc948871c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -28,7 +28,7 @@ private[sql] object StatFunctions { var xAvg = 0.0 var yAvg = 0.0 var Ck = 0.0 - var count = 0 + var count = 0L // add an example to the calculation def add(x: Double, y: Double): this.type = { val oldX = xAvg @@ -38,11 +38,12 @@ private[sql] object StatFunctions { Ck += (y - yAvg) * (x - oldX) this } - // merge counters from other partitions + // merge counters from other partitions. Formula can be found at: + // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Covariance def merge(other: CovarianceCounter): this.type = { val totalCount = count + other.count Ck += other.Ck + - (xAvg - other.xAvg) * (yAvg - other.yAvg) * (count * other.count) / totalCount + (xAvg - other.xAvg) * (yAvg - other.yAvg) * count / totalCount * other.count xAvg = (xAvg * count + other.xAvg * other.count) / totalCount yAvg = (yAvg * count + other.yAvg * other.count) / totalCount count = totalCount @@ -76,5 +77,4 @@ private[sql] object StatFunctions { }) counts.cov } - } From 51e39b80b68d3ab0e8b092abc40a57652f2953ec Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 1 May 2015 00:32:16 -0700 Subject: [PATCH 08/10] moved implementation --- python/pyspark/sql/dataframe.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 3ed2a6dd8879f..35af578b236fe 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -883,7 +883,11 @@ def cov(self, col1, col2): :param col1: The name of the first column :param col2: The name of the second column """ - return self.stat.cov(col1, col2) + if not isinstance(col1, str): + raise ValueError("col1 should be a string.") + if not isinstance(col2, str): + raise ValueError("col2 should be a string.") + return self.df._jdf.stat().cov(col1, col2) @ignore_unicode_prefix def withColumn(self, colName, col): @@ -1336,11 +1340,7 @@ def __init__(self, df): self.df = df def cov(self, col1, col2): - if not isinstance(col1, str): - raise ValueError("col1 should be a string.") - if not isinstance(col2, str): - raise ValueError("col2 should be a string.") - return self.df._jdf.stat().cov(col1, col2) + return self.stat.cov(col1, col2) cov.__doc__ = DataFrame.cov.__doc__ From f2e862bc0bb96a110702c04779741e3bc2c0d982 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 1 May 2015 07:36:32 -0700 Subject: [PATCH 09/10] fixed failed test --- python/pyspark/sql/dataframe.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 35af578b236fe..daf0b270452c6 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -887,7 +887,7 @@ def cov(self, col1, col2): raise ValueError("col1 should be a string.") if not isinstance(col2, str): raise ValueError("col2 should be a string.") - return self.df._jdf.stat().cov(col1, col2) + return self._jdf.stat().cov(col1, col2) @ignore_unicode_prefix def withColumn(self, colName, col): @@ -1340,7 +1340,7 @@ def __init__(self, df): self.df = df def cov(self, col1, col2): - return self.stat.cov(col1, col2) + return self.df.cov(col1, col2) cov.__doc__ = DataFrame.cov.__doc__ From cb18046a5cee431e08bf5682b1645507ee7f08a7 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 1 May 2015 09:07:05 -0700 Subject: [PATCH 10/10] changed to sample covariance --- python/pyspark/sql/dataframe.py | 4 ++-- python/pyspark/sql/tests.py | 2 +- .../scala/org/apache/spark/sql/DataFrameStatFunctions.scala | 2 +- .../org/apache/spark/sql/execution/stat/StatFunctions.scala | 4 ++-- .../test/scala/org/apache/spark/sql/DataFrameStatSuite.scala | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index daf0b270452c6..1f08c2df9305b 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -877,8 +877,8 @@ def fillna(self, value, subset=None): def cov(self, col1, col2): """ - Calculate the covariance for the given columns, specified by their names as a double value. - :func:`DataFrame.cov` and :func:`DataFrameStatFunctions.cov` are aliases. + Calculate the sample covariance for the given columns, specified by their names, as a + double value. :func:`DataFrame.cov` and :func:`DataFrameStatFunctions.cov` are aliases. :param col1: The name of the first column :param col2: The name of the second column diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 21304f55fdd1a..44c8b6a1aac13 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -390,7 +390,7 @@ def test_aggregator(self): def test_cov(self): df = self.sc.parallelize([Row(a=i, b=2 * i) for i in range(10)]).toDF() cov = df.stat.cov("a", "b") - self.assertTrue(abs(cov - 16.5) < 1e-6) + self.assertTrue(abs(cov - 55.0 / 3) < 1e-6) def test_math_functions(self): df = self.sc.parallelize([Row(a=i, b=2 * i) for i in range(10)]).toDF() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index a5aa7f51786af..23652aeb7c7bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -67,7 +67,7 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { } /** - * Calculate the covariance of two numerical columns of a DataFrame. + * Calculate the sample covariance of two numerical columns of a DataFrame. * @param col1 the name of the first column * @param col2 the name of the second column * @return the covariance of the two columns. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 7b4fc948871c8..d4a94c24d9866 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -49,8 +49,8 @@ private[sql] object StatFunctions { count = totalCount this } - // return the covariance for the observed examples - def cov: Double = Ck / count + // return the sample covariance for the observed examples + def cov: Double = Ck / (count - 1) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index 6f3ba616748b2..4f5a2ff696789 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -51,7 +51,7 @@ class DataFrameStatSuite extends FunSuite { val df = sqlCtx.sparkContext.parallelize(rows).toDF("singles", "doubles", "letters") val results = df.stat.cov("singles", "doubles") - assert(math.abs(results - 16.5) < 1e-6) + assert(math.abs(results - 55.0 / 3) < 1e-6) intercept[IllegalArgumentException] { df.stat.cov("singles", "letters") // doesn't accept non-numerical dataTypes }