Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-9298][SQL] Add pearson correlation aggregation function #8587

Closed
wants to merge 14 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Sep 3, 2015

JIRA: https://issues.apache.org/jira/browse/SPARK-9298

This patch adds pearson correlation aggregation function based on AggregateExpression2.

@SparkQA
Copy link

SparkQA commented Sep 3, 2015

Test build #41976 has finished for PR 8587 at commit cb34a95.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Corr(left: Expression, right: Expression) extends AggregateFunction2
    • case class Corr(

@rxin
Copy link
Contributor

rxin commented Sep 5, 2015

Is it possible to do this with AlgebraicAggregate so it can be codegened?

@viirya
Copy link
Member Author

viirya commented Sep 5, 2015

Seems it is feasible. I will update this later.

Conflicts:
	sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@viirya
Copy link
Member Author

viirya commented Sep 6, 2015

@rxin I've updated this patch to use AlgebraicAggregate as you suggested. Please take a look when you are available. Thanks.

@SparkQA
Copy link

SparkQA commented Sep 6, 2015

Test build #42076 has finished for PR 8587 at commit 0dd6320.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Corr(left: Expression, right: Expression) extends AlgebraicAggregate
    • case class Corr(

@mengxr
Copy link
Contributor

mengxr commented Oct 20, 2015

@viirya We compared the performance of declarative approach vs imperative in https://issues.apache.org/jira/browse/SPARK-10953. The declarative approach is much slower because of extra code and lack of common sub-expression elimination. If you still keep the original code, could you revert this PR to your first commit? Also cc @rxin for this performance issue.

@viirya
Copy link
Member Author

viirya commented Oct 21, 2015

@mengxr I have the original code. I wii revert to it soon as I come back in these two days.

@SparkQA
Copy link

SparkQA commented Oct 22, 2015

Test build #44103 has finished for PR 8587 at commit 1505cd2.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class Corr(\n * case class Corr(\n

@SparkQA
Copy link

SparkQA commented Oct 22, 2015

Test build #44106 has finished for PR 8587 at commit d10afbe.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class First(child: Expression, ignoreNullsExpr: Expression) extends DeclarativeAggregate\n * case class Last(child: Expression, ignoreNullsExpr: Expression) extends DeclarativeAggregate\n * case class Corr(\n * case class First(\n * case class FirstFunction(\n * case class Last(\n * case class LastFunction(\n * case class Corr(\n

@SparkQA
Copy link

SparkQA commented Oct 22, 2015

Test build #44108 has finished for PR 8587 at commit cc1657b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class Corr(\n * case class Corr(\n

@viirya
Copy link
Member Author

viirya commented Oct 22, 2015

retest this please.

@SparkQA
Copy link

SparkQA commented Oct 22, 2015

Test build #44154 has finished for PR 8587 at commit cc1657b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class ClassEncoder[T](\n * case class First(child: Expression, ignoreNullsExpr: Expression) extends DeclarativeAggregate\n * case class Last(child: Expression, ignoreNullsExpr: Expression) extends DeclarativeAggregate\n * case class Corr(\n * case class First(\n * case class FirstFunction(\n * case class Last(\n * case class LastFunction(\n * case class Corr(\n * case class CreateRow(children: Seq[Expression]) extends Expression\n

@viirya
Copy link
Member Author

viirya commented Oct 22, 2015

ping @mengxr

@viirya
Copy link
Member Author

viirya commented Oct 26, 2015

ping @mengxr any other comments?

@@ -524,6 +525,116 @@ case class Sum(child: Expression) extends DeclarativeAggregate {
override val evaluateExpression = Cast(currentSum, resultType)
}

case class Corr(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Please add ScalaDoc and document the behavior for null and NaN values.
  • Provide a link to the wikipedia page that contains the update formula.

@mengxr
Copy link
Contributor

mengxr commented Oct 27, 2015

LGTM except inline comments. Ping @yhuai @rxin for another pass on SQL side.

@SparkQA
Copy link

SparkQA commented Oct 27, 2015

Test build #44418 has finished for PR 8587 at commit 02562f3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class Corr(\n * case class Corr(\n

@yhuai
Copy link
Contributor

yhuai commented Oct 27, 2015

Can you add this function to FunctionRegistry? Otherwise, we cannot use it in SQL.

override def nullable: Boolean = false
override def dataType: DoubleType.type = DoubleType
override def toString: String = s"CORRELATION($left, $right)"
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will be the error message if we call this function when spark.sql.useAggregate2=false? It will be good to provide a meaning error message.

val df3 = Seq.tabulate(0)(i => (1.0 * i, 2.0 * i)).toDF("a", "b")
val corr4 = df3.groupBy().agg(corr("a", "b")).collect()(0).getDouble(0)
assert(corr4.isNaN)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if the data type of input parameters are not double?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add ImplicitCastInputTypes to case class Corr. So the other NumericType can be automatically casting to double.

@SparkQA
Copy link

SparkQA commented Oct 29, 2015

Test build #44616 has finished for PR 8587 at commit 2f7b864.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class Corr(\n * case class Corr(left: Expression, right: Expression)\n

*
*/
case class Corr(left: Expression, right: Expression)
extends BinaryExpression with AggregateExpression with ImplicitCastInputTypes {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a place holder, right? Can we change it to with AggregateExpression1 then we throw an exception (UnsupportedOperatorException) in the newInstance method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But how do we check spark.sql.useAggregate2=false at this expression? Catalyst expressions seems being independent from SQLConf. In newInstance method, we can't refer a conf object.

Sorry. I think I know what you meant.

@SparkQA
Copy link

SparkQA commented Oct 30, 2015

Test build #44645 has finished for PR 8587 at commit 3b731e2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class Corr(\n * case class Corr(left: Expression, right: Expression)\n

@SparkQA
Copy link

SparkQA commented Oct 30, 2015

Test build #44659 has finished for PR 8587 at commit 4f8c381.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class Corr(\n * case class Corr(left: Expression, right: Expression)\n

@SparkQA
Copy link

SparkQA commented Oct 30, 2015

Test build #44671 has finished for PR 8587 at commit 7dcf689.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class Corr(\n * case class Corr(left: Expression, right: Expression)\n

@viirya
Copy link
Member Author

viirya commented Oct 30, 2015

Failure caused by: 0.6633880657639323 != 0.6633880657639322

@viirya
Copy link
Member Author

viirya commented Oct 30, 2015

@yhuai How do you think? Should we modify HiveComparisonTest to allow this kind of error?

@yhuai
Copy link
Contributor

yhuai commented Oct 30, 2015

If that is the answer generated by Hive, we should not change that (at least for now). The thing we can to is that if we believe our answer is valid (please double check it), we can put that test in the blacklist (with comments on why we do not run it with HiveComparisonTest). Then, we add a query test using queries from that file.

…lacklist and add these tests to AggregationQuerySuite.
@viirya
Copy link
Member Author

viirya commented Oct 30, 2015

@yhuai ok. I've put the test in the blacklist, and also add corresponding test to AggregationQuerySuite.

@SparkQA
Copy link

SparkQA commented Oct 30, 2015

Test build #44690 has finished for PR 8587 at commit 2de76b4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class Corr(\n * case class Corr(left: Expression, right: Expression)\n

@viirya
Copy link
Member Author

viirya commented Oct 31, 2015

retest this please.

@viirya
Copy link
Member Author

viirya commented Oct 31, 2015

oh, that failure is going to fixed in #9387 now.

@SparkQA
Copy link

SparkQA commented Oct 31, 2015

Test build #44719 has finished for PR 8587 at commit 2de76b4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class Corr(\n * case class Corr(left: Expression, right: Expression)\n

@viirya
Copy link
Member Author

viirya commented Oct 31, 2015

retest this please.

@SparkQA
Copy link

SparkQA commented Oct 31, 2015

Test build #44726 has finished for PR 8587 at commit 2de76b4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class Corr(\n * case class Corr(left: Expression, right: Expression)\n

@viirya
Copy link
Member Author

viirya commented Nov 2, 2015

ping @yhuai any more comments?

@yhuai
Copy link
Contributor

yhuai commented Nov 2, 2015

Thanks! Merging to master.

@asfgit asfgit closed this in 3e770a6 Nov 2, 2015
@viirya
Copy link
Member Author

viirya commented Nov 2, 2015

@yhuai Thank you!

@viirya viirya deleted the corr_aggregation branch December 27, 2023 18:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants