From 55fc39c96c746a0d39f8789e0afe6ab09a3920e3 Mon Sep 17 00:00:00 2001 From: dwmclary Date: Fri, 19 Jun 2015 07:38:49 -0700 Subject: [PATCH 1/9] adding freqItemsFrame --- .../spark/sql/DataFrameStatFunctions.scala | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index edb9ed7bba56a..2621595cc9f0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -19,6 +19,9 @@ package org.apache.spark.sql import org.apache.spark.annotation.Experimental import org.apache.spark.sql.execution.stat._ +import org.apache.spark.sql.types._ + +import scala.collection.mutable.ArrayBuffer /** * :: Experimental :: @@ -163,4 +166,49 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { def freqItems(cols: Seq[String]): DataFrame = { FrequentItems.singlePassFreqItems(df, cols, 0.01) } + + /** + * Finding frequent items for columns, possibly with false positives. Using the + * frequent element count algorithm described in + * [[http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou]]. + * Uses a `default` support of 1%. + * + * This function is meant for exploratory data analysis, as we make no guarantee about the + * backward compatibility of the schema of the resulting [[DataFrame]]. + * + * @param cols the names of the columns to search frequent items in. + * @return A Local DataFrame with the list of frequent items for each column. + * + * @since 1.5.0 + */ + def freqItemsFrame(cols: Seq[String]) : DataFrame = { + //get the row + val fi = freqItems(cols).first + //get the schema + val fiSchema = fi.schema + val schemaNames = fiSchema.map(_.name) + val schemaTypes = fiSchema.map(_.dataType.asInstanceOf[ArrayType].elementType) + val newSchema = StructType(schemaNames.zip(schemaTypes) + .map(x => StructField(x._1, x._2)).toArray) + //get the sizes + val colSize = fi.toSeq.map(_.asInstanceOf[ArrayBuffer[Any]].size) + val rowSize = fi.size + var i = 0 + var j = 0 + //make a seq for the rows + var rowHolder = Seq[Row]() + for (i <- 0 to colSize.max-1) { + var fillMap = colSize.map(_ > i) + var rowFiller = Seq[Any]() + for (j <- 0 to rowSize-1) { + if (fillMap(j)) { + rowFiller = rowFiller :+ fi(j).asInstanceOf[ArrayBuffer[Any]](i) + } + } + rowHolder = rowHolder :+ Row.fromSeq(rowFiller) + } + val rowRDD = df.sqlContext.sparkContext.parallelize(rowHolder) + val fiDF = df.sqlContext.createDataFrame(rowRDD, newSchema) + fiDF + } } From 4a173b800a703a9dfa37f4bada92db6d62813e09 Mon Sep 17 00:00:00 2001 From: dwmclary Date: Fri, 19 Jun 2015 14:56:28 -0700 Subject: [PATCH 2/9] finished freqItemsFrame --- python/pyspark/sql/dataframe.py | 29 ++++++ .../spark/sql/DataFrameStatFunctions.scala | 99 ++++++++++++++++++- .../apache/spark/sql/DataFrameStatSuite.scala | 13 +++ 3 files changed, 138 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 152b87351db31..f2d45dba2f7b5 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1152,6 +1152,30 @@ def crosstab(self, col1, col2): raise ValueError("col2 should be a string.") return DataFrame(self._jdf.stat().crosstab(col1, col2), self.sql_ctx) + @since(1.5) + def freqItemsFrame(self, cols, support=None): + """ + Finding frequent items for columns, possibly with false positives. Using the + frequent element count algorithm described in + "http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou". + :func:`DataFrame.freqItems` and :func:`DataFrameStatFunctions.freqItems` are aliases. + + .. note:: This function is meant for exploratory data analysis, as we make no \ + guarantee about the backward compatibility of the schema of the resulting DataFrame. + + :param cols: Names of the columns to calculate frequent items for as a list or tuple of + strings. + :param support: The frequency with which to consider an item 'frequent'. Default is 1%. + The support must be greater than 1e-4. + """ + if isinstance(cols, tuple): + cols = list(cols) + if not isinstance(cols, list): + raise ValueError("cols must be a list or tuple of column names as strings.") + if not support: + support = 0.01 + return DataFrame(self._jdf.stat().freqItemsFrame(_to_seq(self._sc, cols), support), self.sql_ctx) + @since(1.4) def freqItems(self, cols, support=None): """ @@ -1322,6 +1346,11 @@ def freqItems(self, cols, support=None): freqItems.__doc__ = DataFrame.freqItems.__doc__ + def freqItemsFrame(self, cols, support=None): + return self.df.freqItemsFrame(cols, support) + + freqItemsFrame.__doc__ = DataFrame.freqItemsFrame.__doc__ + def _test(): import doctest diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 2621595cc9f0e..fe45403d7b7f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -171,19 +171,91 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * Finding frequent items for columns, possibly with false positives. Using the * frequent element count algorithm described in * [[http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou]]. + * The `support` should be greater than 1e-4. + * + * This function is meant for exploratory data analysis, as we make no guarantee about the + * backward compatibility of the schema of the resulting [[DataFrame]]. + * + * @param cols the names of the columns to search frequent items in. + * @param support The minimum frequency for an item to be considered `frequent`. Should be greater + * than 1e-4. + * @return A Local DataFrame with the frequent items for each column. + * + * @since 1.5.0 + */ + def freqItemsFrame(cols: Array[String], support: Double): DataFrame = { + //get the row + val fi = freqItems(cols, support).first + //get the schema + val fiSchema = fi.schema + val schemaNames = fiSchema.map(_.name) + val schemaTypes = fiSchema.map(_.dataType.asInstanceOf[ArrayType].elementType) + val newSchema = StructType(schemaNames.zip(schemaTypes) + .map(x => StructField(x._1, x._2)).toArray) + //get the sizes + val colSize = fi.toSeq.map(_.asInstanceOf[ArrayBuffer[Any]].size) + val rowSize = fi.size + var i = 0 + var j = 0 + //make a seq for the rows + var rowHolder = Seq[Row]() + for (i <- 0 to colSize.max-1) { + var fillMap = colSize.map(_ > i) + var rowFiller = Seq[Any]() + for (j <- 0 to rowSize-1) { + if (fillMap(j)) { + rowFiller = rowFiller :+ fi(j).asInstanceOf[ArrayBuffer[Any]](i) + } + else { + rowFiller = rowFiller :+ null + } + } + rowHolder = rowHolder :+ Row.fromSeq(rowFiller) + } + val rowRDD = df.sqlContext.sparkContext.parallelize(rowHolder) + val fiDF = df.sqlContext.createDataFrame(rowRDD, newSchema) + fiDF + } + + /** + * Finding frequent items for columns, possibly with false positives. Using the + * frequent element count algorithm described in + * [[http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou]]. + * The `support` should be greater than 1e-4. + * + * This function is meant for exploratory data analysis, as we make no guarantee about the + * backward compatibility of the schema of the resulting [[DataFrame]]. + * + * @param cols the names of the columns to search frequent items in. + * @param support The minimum frequency for an item to be considered `frequent`. Should be greater + * than 1e-4. + * @return A Local DataFrame with the frequent items for each column. + * + * @since 1.5.0 + */ + def freqItemsFrame(cols: Array[String]): DataFrame = { + freqItemsFrame(cols, 0.01) + } + + /** + * (Scala-specific) Finding frequent items for columns, possibly with false positives. Using the + * frequent element count algorithm described in + * [[http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou]]. * Uses a `default` support of 1%. * * This function is meant for exploratory data analysis, as we make no guarantee about the * backward compatibility of the schema of the resulting [[DataFrame]]. * * @param cols the names of the columns to search frequent items in. - * @return A Local DataFrame with the list of frequent items for each column. + * @param support The minimum frequency for an item to be considered `frequent`. Should be greater + * than 1e-4. + * @return A Local DataFrame with the frequent items for each column. * * @since 1.5.0 */ - def freqItemsFrame(cols: Seq[String]) : DataFrame = { + def freqItemsFrame(cols: Seq[String], support: Double) : DataFrame = { //get the row - val fi = freqItems(cols).first + val fi = freqItems(cols, support).first //get the schema val fiSchema = fi.schema val schemaNames = fiSchema.map(_.name) @@ -204,6 +276,9 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { if (fillMap(j)) { rowFiller = rowFiller :+ fi(j).asInstanceOf[ArrayBuffer[Any]](i) } + else { + rowFiller = rowFiller :+ null + } } rowHolder = rowHolder :+ Row.fromSeq(rowFiller) } @@ -211,4 +286,22 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { val fiDF = df.sqlContext.createDataFrame(rowRDD, newSchema) fiDF } + + /** + * (Scala-specific) Finding frequent items for columns, possibly with false positives. Using the + * frequent element count algorithm described in + * [[http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou]]. + * Uses a `default` support of 1%. + * + * This function is meant for exploratory data analysis, as we make no guarantee about the + * backward compatibility of the schema of the resulting [[DataFrame]]. + * + * @param cols the names of the columns to search frequent items in. + * @return A Local DataFrame with the frequent items for each column. + * + * @since 1.5.0 + */ + def freqItemsFrame(cols: Seq[String]) : DataFrame = { + freqItemsFrame(cols, 0.01) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index 0d3ff899dad72..bc9a2f929c44c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -98,4 +98,17 @@ class DataFrameStatSuite extends SparkFunSuite { val items2 = singleColResults.collect().head items2.getSeq[Double](0) should contain (-1.0) } + + test("Frequent Items Frame") { + val rows = Seq.tabulate(1000) { i => + if (i % 3 == 0) (1, toLetter(1), -1.0) else (i, toLetter(i), i * -1.0) + } + val df = rows.toDF("numbers", "letters", "negDoubles") + + val results = df.stat.freqItemsFrame(Array("numbers", "letters"), 0.1) + results.select("numbers_freqItems").map(_.get(0)).collect should contain (1) + results.select("letters_freqItems").map(_.get(0)).collect should contain (toLetter(1)) + val singleColResults = df.stat.freqItemsFrame(Seq("negDoubles"), 0.1) + singleColResults.select("negDoubles_freqItems").map(_.get(0)).collect should contain(-1.0) + } } From d0ce7d54ec5617793c1f507d20daefcf93977eb6 Mon Sep 17 00:00:00 2001 From: dwmclary Date: Sat, 20 Jun 2015 21:56:51 -0700 Subject: [PATCH 3/9] removed inline comments, fixed scalastyle --- .../org/apache/spark/sql/DataFrameStatFunctions.scala | 8 -------- 1 file changed, 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index fe45403d7b7f6..e04976455c729 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -184,20 +184,16 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * @since 1.5.0 */ def freqItemsFrame(cols: Array[String], support: Double): DataFrame = { - //get the row val fi = freqItems(cols, support).first - //get the schema val fiSchema = fi.schema val schemaNames = fiSchema.map(_.name) val schemaTypes = fiSchema.map(_.dataType.asInstanceOf[ArrayType].elementType) val newSchema = StructType(schemaNames.zip(schemaTypes) .map(x => StructField(x._1, x._2)).toArray) - //get the sizes val colSize = fi.toSeq.map(_.asInstanceOf[ArrayBuffer[Any]].size) val rowSize = fi.size var i = 0 var j = 0 - //make a seq for the rows var rowHolder = Seq[Row]() for (i <- 0 to colSize.max-1) { var fillMap = colSize.map(_ > i) @@ -254,20 +250,16 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * @since 1.5.0 */ def freqItemsFrame(cols: Seq[String], support: Double) : DataFrame = { - //get the row val fi = freqItems(cols, support).first - //get the schema val fiSchema = fi.schema val schemaNames = fiSchema.map(_.name) val schemaTypes = fiSchema.map(_.dataType.asInstanceOf[ArrayType].elementType) val newSchema = StructType(schemaNames.zip(schemaTypes) .map(x => StructField(x._1, x._2)).toArray) - //get the sizes val colSize = fi.toSeq.map(_.asInstanceOf[ArrayBuffer[Any]].size) val rowSize = fi.size var i = 0 var j = 0 - //make a seq for the rows var rowHolder = Seq[Row]() for (i <- 0 to colSize.max-1) { var fillMap = colSize.map(_ > i) From 6f25872ee26ad12951734e58e54bb097e5003700 Mon Sep 17 00:00:00 2001 From: dwmclary Date: Sat, 20 Jun 2015 22:17:40 -0700 Subject: [PATCH 4/9] cleaned up python style --- python/pyspark/sql/dataframe.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index f2d45dba2f7b5..61666c9526cdf 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1174,7 +1174,8 @@ def freqItemsFrame(self, cols, support=None): raise ValueError("cols must be a list or tuple of column names as strings.") if not support: support = 0.01 - return DataFrame(self._jdf.stat().freqItemsFrame(_to_seq(self._sc, cols), support), self.sql_ctx) + return DataFrame(self._jdf.stat().freqItemsFrame(_to_seq(self._sc, cols), support), \ + self.sql_ctx) @since(1.4) def freqItems(self, cols, support=None): @@ -1350,7 +1351,7 @@ def freqItemsFrame(self, cols, support=None): return self.df.freqItemsFrame(cols, support) freqItemsFrame.__doc__ = DataFrame.freqItemsFrame.__doc__ - + def _test(): import doctest From ff676c9c2a9aadc2376b9b1d26a4211f4822cac7 Mon Sep 17 00:00:00 2001 From: dwmclary Date: Sat, 20 Jun 2015 22:31:55 -0700 Subject: [PATCH 5/9] cleaned up python style, part2 --- python/pyspark/sql/dataframe.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 61666c9526cdf..e5053920f1eba 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1174,8 +1174,8 @@ def freqItemsFrame(self, cols, support=None): raise ValueError("cols must be a list or tuple of column names as strings.") if not support: support = 0.01 - return DataFrame(self._jdf.stat().freqItemsFrame(_to_seq(self._sc, cols), support), \ - self.sql_ctx) + return DataFrame(self._jdf.stat().freqItemsFrame(_to_seq(self._sc, cols), support), + self.sql_ctx) @since(1.4) def freqItems(self, cols, support=None): @@ -1349,7 +1349,6 @@ def freqItems(self, cols, support=None): def freqItemsFrame(self, cols, support=None): return self.df.freqItemsFrame(cols, support) - freqItemsFrame.__doc__ = DataFrame.freqItemsFrame.__doc__ From 7ce1248d461ac4a11a0982f6fff842232292d87c Mon Sep 17 00:00:00 2001 From: dwmclary Date: Sun, 21 Jun 2015 08:00:33 -0700 Subject: [PATCH 6/9] cleaned up python style, part2 --- python/pyspark/sql/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index e5053920f1eba..6245e255dcff6 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1175,7 +1175,7 @@ def freqItemsFrame(self, cols, support=None): if not support: support = 0.01 return DataFrame(self._jdf.stat().freqItemsFrame(_to_seq(self._sc, cols), support), - self.sql_ctx) + self.sql_ctx) @since(1.4) def freqItems(self, cols, support=None): From 8c239c759dca6ff9edfae8774fcb2fa4e0070b84 Mon Sep 17 00:00:00 2001 From: dwmclary Date: Sun, 21 Jun 2015 08:28:56 -0700 Subject: [PATCH 7/9] cleaned up python style, part2 --- python/pyspark/sql/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 6245e255dcff6..0548334e6eeff 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1175,7 +1175,7 @@ def freqItemsFrame(self, cols, support=None): if not support: support = 0.01 return DataFrame(self._jdf.stat().freqItemsFrame(_to_seq(self._sc, cols), support), - self.sql_ctx) + self.sql_ctx) @since(1.4) def freqItems(self, cols, support=None): From 9b345cb6144568bc530fb7019f0086cb5c549563 Mon Sep 17 00:00:00 2001 From: dwmclary Date: Sun, 21 Jun 2015 08:37:21 -0700 Subject: [PATCH 8/9] python/pyspark/sql/dataframe.py --- python/pyspark/sql/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 0548334e6eeff..338587c494d8e 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1175,7 +1175,7 @@ def freqItemsFrame(self, cols, support=None): if not support: support = 0.01 return DataFrame(self._jdf.stat().freqItemsFrame(_to_seq(self._sc, cols), support), - self.sql_ctx) + self.sql_ctx) @since(1.4) def freqItems(self, cols, support=None): From 80c841b1dfd91502a8f6acc1788ca48dfda4da32 Mon Sep 17 00:00:00 2001 From: dwmclary Date: Sun, 21 Jun 2015 08:40:14 -0700 Subject: [PATCH 9/9] corrected indentation --- python/pyspark/sql/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 338587c494d8e..5678193bd05ae 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1175,7 +1175,7 @@ def freqItemsFrame(self, cols, support=None): if not support: support = 0.01 return DataFrame(self._jdf.stat().freqItemsFrame(_to_seq(self._sc, cols), support), - self.sql_ctx) + self.sql_ctx) @since(1.4) def freqItems(self, cols, support=None):