-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-7243][SQL] Contingency Tables for DataFrames #5842
Conversation
Test build #31585 has finished for PR 5842 at commit
|
/** Generate a table of frequencies for the elements of two columns. */ | ||
private[sql] def crossTabulate(df: DataFrame, col1: String, col2: String): DataFrame = { | ||
val tableName = s"${col1}_$col2" | ||
val distinctVals = df.select(countDistinct(col1), countDistinct(col2)).collect().head |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation triggers multiple jobs. I'm thinking about the following approach:
- get distinct values from col2 and create a value-to-index map
- aggregate by col1. for each value in col1, generate a Row object and fill in counts
- assign table schema
Test build #31610 has finished for PR 5842 at commit
|
/** Generate a table of frequencies for the elements of two columns. */ | ||
private[sql] def crossTabulate(df: DataFrame, col1: String, col2: String): DataFrame = { | ||
val tableName = s"${col1}_$col2" | ||
val distinctCol2 = df.select(col2).distinct.orderBy(col2).collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might be faster to collect and then sort, rather than sort and collect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first implementation uses multiple passes. We need two passes any way, either on the original columns or on the pair counts. The latter may be better.
val counts = select(col1, col2).rdd.countByValue().cache()
.- Get distinct values from col2:
counts.map(_._1._2).distinct().collect()
. I'm not sure whether ordering by counts is useful here. - GroupBy
col1
incounts
and create an RDD of Row. And then apply the schema.
Test build #31647 has finished for PR 5842 at commit
|
/** Generate a table of frequencies for the elements of two columns. */ | ||
private[sql] def crossTabulate(df: DataFrame, col1: String, col2: String): DataFrame = { | ||
val tableName = s"${col1}_$col2" | ||
val distinctCol2 = df.select(col2).distinct.collect().sortBy(_.get(0).toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw - isn't a more efficient way to run this is to do groupBy(col1, col2).count(), and then pivot the table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that way we only need one pass over the data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what I did first. Xiangrui thought this would be more efficient.
On Fri, May 1, 2015 at 10:16 PM, Reynold Xin notifications@github.com
wrote:
In
sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
#5842 (comment):@@ -77,4 +78,42 @@ private[sql] object StatFunctions {
})
counts.cov
}
+
- /** Generate a table of frequencies for the elements of two columns. */
- private[sql] def crossTabulate(df: DataFrame, col1: String, col2: String): DataFrame = {
- val tableName = s"${col1}_$col2"
- val distinctCol2 = df.select(col2).distinct.collect().sortBy(_.get(0).toString)
btw - isn't a more efficient way to run this is to do groupBy(col1,
col2).count(), and then pivot the table?—
Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/5842/files#r29545262.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mhmm I'm not sure if I agree. Doing it this way requires 2 pass, and also does not rely on the underlying execution engine. The physical execution will get faster over time, and we definitely want to take advantage of that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm happy to implement it both ways. Check my first commit. If you both think that's ok, or a combination of both ideas is better, I'd be happy to implement it. I think my first implementation had two passes as well. I think you might need two passes two pivot properly. Maybe we can do something smarter. cc @mengxr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first implementation uses multiple passes. We need two passes any way, either on the original columns or on the pair counts. The latter may be better.
- val counts = select(col1, col2).rdd.countByValue().cache().
- Get distinct values from col2: counts.map(_._1._2).distinct().collect(). I'm not sure whether ordering by counts is useful here.
- GroupBy col1 in counts and create an RDD of Row. And then apply the schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use dataframe's own group by and count; don't use the rdd one. df one will get a lot faster with tungsten over time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mengxr what did you mean that we need two passes? If I understand this correctly, it is simply
"select col1, col2, count(*) from table group by col1, col2"
and then pivot the result to put col2 as the colum name ? This is one pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, this comment only shows up in the diff. +1 on the single-pass approach. Driver could be the bottleneck, but we are not expecting large amount of data for crosstab
.
@brkyvz I had an offline discussion with Reynold. For the first version, let's implement the local version in a single pass, which should cover most of the use cases. The steps would be
|
Test build #31700 has finished for PR 5842 at commit
|
retest this please |
Test build #31703 has finished for PR 5842 at commit
|
Test build #31704 has finished for PR 5842 at commit
|
/** Generate a table of frequencies for the elements of two columns. */ | ||
private[sql] def crossTabulate(df: DataFrame, col1: String, col2: String): DataFrame = { | ||
val tableName = s"${col1}_$col2" | ||
val counts = df.groupBy(col1, col2).agg(col(col1), col(col2), count("*")).collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe use take to avoid running out of memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we know how much we are going to take
though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't matter. you can set a max number (maybe 1 million). If the dataset has less than that, it will just return the entire dataset (at a slightly higher cost to run multiple jobs).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would taking 1e8 have such a high cost if there are only for example 100 in total? The reason I chose 1e8, was 1e4 * 1e4, basically the limit we put on the number of columns.
@brkyvz can you also update the PR description? |
""" | ||
Computes a pair-wise frequency table of the given columns. Also known as a contingency | ||
table. The number of distinct values for each column should be less than 1e5. The first | ||
column of each row will be the distinct values of `col1` and the column names will be the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document the first column name. 1e5
-> 1e4
.
Test build #31740 has finished for PR 5842 at commit
|
Test build #31733 has finished for PR 5842 at commit
|
|
||
/** Generate a table of frequencies for the elements of two columns. */ | ||
private[sql] def crossTabulate(df: DataFrame, col1: String, col2: String): DataFrame = { | ||
val tableName = s"${col1}_$col2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my previous comment, I mean this tableName
is not document. Users need to know the name of the first column to operate.
minor: It would be good to check pandas' OR R's naming for this column and follow one of them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pandas and R have the concept of row names, which we currently don't. We have to have the first column as the "row names".
/** Generate a table of frequencies for the elements of two columns. */ | ||
private[sql] def crossTabulate(df: DataFrame, col1: String, col2: String): DataFrame = { | ||
val tableName = s"${col1}_$col2" | ||
val counts = df.groupBy(col1, col2).agg(col(col1), col(col2), count("*")).take(1e8.toInt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check the size of counts
. If it is 1e8
, throw a warning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brkyvz can you submit a follow up pr to reduce 1e8 to 1e6? 1e8 is too large.
Test build #31769 has finished for PR 5842 at commit
|
retest this please |
Test build #31774 has finished for PR 5842 at commit
|
Thanks. I'm merging this. |
Computes a pair-wise frequency table of the given columns. Also known as cross-tabulation. cc mengxr rxin Author: Burak Yavuz <brkyvz@gmail.com> Closes #5842 from brkyvz/df-cont and squashes the following commits: a07c01e [Burak Yavuz] addressed comments v4.1 ae9e01d [Burak Yavuz] fix test 9106585 [Burak Yavuz] addressed comments v4.0 bced829 [Burak Yavuz] fix merge conflicts a63ad00 [Burak Yavuz] addressed comments v3.0 a0cad97 [Burak Yavuz] addressed comments v3.0 6805df8 [Burak Yavuz] addressed comments and fixed test 939b7c4 [Burak Yavuz] lint python 7f098bc [Burak Yavuz] add crosstab pyTest fd53b00 [Burak Yavuz] added python support for crosstab 27a5a81 [Burak Yavuz] implemented crosstab (cherry picked from commit 8055411) Signed-off-by: Reynold Xin <rxin@databricks.com>
Computes a pair-wise frequency table of the given columns. Also known as cross-tabulation. cc mengxr rxin Author: Burak Yavuz <brkyvz@gmail.com> Closes apache#5842 from brkyvz/df-cont and squashes the following commits: a07c01e [Burak Yavuz] addressed comments v4.1 ae9e01d [Burak Yavuz] fix test 9106585 [Burak Yavuz] addressed comments v4.0 bced829 [Burak Yavuz] fix merge conflicts a63ad00 [Burak Yavuz] addressed comments v3.0 a0cad97 [Burak Yavuz] addressed comments v3.0 6805df8 [Burak Yavuz] addressed comments and fixed test 939b7c4 [Burak Yavuz] lint python 7f098bc [Burak Yavuz] add crosstab pyTest fd53b00 [Burak Yavuz] added python support for crosstab 27a5a81 [Burak Yavuz] implemented crosstab
Computes a pair-wise frequency table of the given columns. Also known as cross-tabulation. cc mengxr rxin Author: Burak Yavuz <brkyvz@gmail.com> Closes apache#5842 from brkyvz/df-cont and squashes the following commits: a07c01e [Burak Yavuz] addressed comments v4.1 ae9e01d [Burak Yavuz] fix test 9106585 [Burak Yavuz] addressed comments v4.0 bced829 [Burak Yavuz] fix merge conflicts a63ad00 [Burak Yavuz] addressed comments v3.0 a0cad97 [Burak Yavuz] addressed comments v3.0 6805df8 [Burak Yavuz] addressed comments and fixed test 939b7c4 [Burak Yavuz] lint python 7f098bc [Burak Yavuz] add crosstab pyTest fd53b00 [Burak Yavuz] added python support for crosstab 27a5a81 [Burak Yavuz] implemented crosstab
Computes a pair-wise frequency table of the given columns. Also known as cross-tabulation. cc mengxr rxin Author: Burak Yavuz <brkyvz@gmail.com> Closes apache#5842 from brkyvz/df-cont and squashes the following commits: a07c01e [Burak Yavuz] addressed comments v4.1 ae9e01d [Burak Yavuz] fix test 9106585 [Burak Yavuz] addressed comments v4.0 bced829 [Burak Yavuz] fix merge conflicts a63ad00 [Burak Yavuz] addressed comments v3.0 a0cad97 [Burak Yavuz] addressed comments v3.0 6805df8 [Burak Yavuz] addressed comments and fixed test 939b7c4 [Burak Yavuz] lint python 7f098bc [Burak Yavuz] add crosstab pyTest fd53b00 [Burak Yavuz] added python support for crosstab 27a5a81 [Burak Yavuz] implemented crosstab
Computes a pair-wise frequency table of the given columns. Also known as cross-tabulation.
cc @mengxr @rxin