Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23942][PYTHON][SQL] Makes collect in PySpark as action for a query executor listener #21007

Closed
wants to merge 6 commits into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Apr 9, 2018

What changes were proposed in this pull request?

This PR proposes to add collect to a query executor as an action.

Seems collect / collect with Arrow are not recognised via QueryExecutionListener as an action. For example, if we have a custom listener as below:

package org.apache.spark.sql

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.QueryExecutionListener


class TestQueryExecutionListener extends QueryExecutionListener with Logging {
  override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
    logError("Look at me! I'm 'onSuccess'")
  }

  override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { }
}

and set spark.sql.queryExecutionListeners to org.apache.spark.sql.TestQueryExecutionListener

Other operations in PySpark or Scala side seems fine:

>>> sql("SELECT * FROM range(1)").show()
18/04/09 17:02:04 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess'
+---+
| id|
+---+
|  0|
+---+
scala> sql("SELECT * FROM range(1)").collect()
18/04/09 16:58:41 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess'
res1: Array[org.apache.spark.sql.Row] = Array([0])

but ..

Before

>>> sql("SELECT * FROM range(1)").collect()
[Row(id=0)]
>>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
>>> sql("SELECT * FROM range(1)").toPandas()
   id
0   0

After

>>> sql("SELECT * FROM range(1)").collect()
18/04/09 16:57:58 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess'
[Row(id=0)]
>>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
>>> sql("SELECT * FROM range(1)").toPandas()
18/04/09 17:53:26 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess'
   id
0   0

How was this patch tested?

I have manually tested as described above and unit test was added.

@SparkQA
Copy link

SparkQA commented Apr 9, 2018

Test build #89053 has finished for PR 21007 at commit db1987f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

cc @cloud-fan and @viirya (from checking the history).

Copy link
Member

@felixcheung felixcheung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add test?

@felixcheung
Copy link
Member

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Apr 10, 2018

Yup, will add. I was just hesitant because It needs some complexity for writing an actual test (as described in the PR description) whereas the fix could be quite straightforward.

@felixcheung
Copy link
Member

got it. sorry I missed the last sentence. maybe a jvm only test?

@HyukjinKwon
Copy link
Member Author

Will give a shot first for the Python one to show how it looks like. I have an incomplete one in my local.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fix looks good.

@@ -3189,10 +3189,10 @@ class Dataset[T] private[sql](

private[sql] def collectToPython(): Int = {
EvaluatePython.registerPicklers()
withNewExecutionId {
withAction("collect", queryExecution) { plan =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collect or collectToPython?

@@ -3312,10 +3313,15 @@ class Dataset[T] private[sql](

/** Convert to an RDD of ArrowPayload byte arrays */
private[sql] def toArrowPayload: RDD[ArrowPayload] = {
// This is only used in tests, for now.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this comment be moved above on def toArrowPayload: RDD[ArrowPayload]?

@HyukjinKwon
Copy link
Member Author

Will address the comments together soon.

@@ -3062,6 +3062,73 @@ def test_sparksession_with_stopped_sparkcontext(self):
sc.stop()


class SQLTests3(unittest.TestCase):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I manually tested different conditions with this test for sure:

Before:

test_query_execution_listener_on_collect (pyspark.sql.tests.SQLTests3) ... FAIL
test_query_execution_listener_on_collect_with_arrow (pyspark.sql.tests.SQLTests3) ... FAIL

======================================================================
FAIL: test_query_execution_listener_on_collect (pyspark.sql.tests.SQLTests3)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/.../spark/python/pyspark/sql/tests.py", line 3105, in test_query_execution_listener_on_collect
    "The callback from the query execution listener should be called after 'collect'")
AssertionError: The callback from the query execution listener should be called after 'collect'

======================================================================
FAIL: test_query_execution_listener_on_collect_with_arrow (pyspark.sql.tests.SQLTests3)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/.../spark/python/pyspark/sql/tests.py", line 3122, in test_query_execution_listener_on_collect_with_arrow
    "The callback from the query execution listener should be called after 'toPandas'")
AssertionError: The callback from the query execution listener should be called after 'toPandas'

After:

test_query_execution_listener_on_collect (pyspark.sql.tests.SQLTests3) ... ok
test_query_execution_listener_on_collect_with_arrow (pyspark.sql.tests.SQLTests3) ... ok

Missing 'org.apache.spark.sql.TestQueryExecutionListener'

skipped "'org.apache.spark.sql.TestQueryExecutionListener' is not available. Skipping the related tests."

Missing Pandas

test_query_execution_listener_on_collect (pyspark.sql.tests.SQLTests3) ... ok
test_query_execution_listener_on_collect_with_arrow (pyspark.sql.tests.SQLTests3) ... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.'

Missing PyArrow

test_query_execution_listener_on_collect (pyspark.sql.tests.SQLTests3) ... ok
test_query_execution_listener_on_collect_with_arrow (pyspark.sql.tests.SQLTests3) ... skipped 'PyArrow >= 0.8.0 must be installed; however, it was not found.'

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I don't feel strongly about this test. Let me know if you guys think it's rather better to just take out. I am fine either way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test seem good to me, but would it be more appropriate to call it TestQueryExecutionListener instead?

@SparkQA
Copy link

SparkQA commented Apr 10, 2018

Test build #89109 has finished for PR 21007 at commit e865c88.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Apr 10, 2018

Test build #89108 has finished for PR 21007 at commit edb5eea.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 10, 2018

Test build #89127 has finished for PR 21007 at commit e865c88.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple questions, but overall looks good

@@ -3062,6 +3062,73 @@ def test_sparksession_with_stopped_sparkcontext(self):
sc.stop()


class SQLTests3(unittest.TestCase):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test seem good to me, but would it be more appropriate to call it TestQueryExecutionListener instead?

not _have_pandas or not _have_pyarrow,
_pandas_requirement_message or _pyarrow_requirement_message)
def test_query_execution_listener_on_collect_with_arrow(self):
# Here, it deplicates codes in ReusedSQLTestCase.sql_conf context manager.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be fine refactor sql_conf a little to use it here, it makes things much clearer

def isCalled(): Boolean = isOnSuccessCalled.get()

def clear(): Unit = isOnSuccessCalled.set(false)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need a newline at the end?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, it already has. github shows a warning and mark on this UI if it doesn't IIRC.

@@ -0,0 +1,45 @@
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's possible. Just took a look; however, mind if I had a separate one as is for Python test specifically? maybe I am too much worried but thinking about having a dependency with a class in a suite and I am a bit hesitant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think that's fine. Thanks for putting a comment in the class for what it is for.

@HyukjinKwon
Copy link
Member Author

will address the comments soon.

@SparkQA
Copy link

SparkQA commented Apr 11, 2018

Test build #89163 has finished for PR 21007 at commit deacb17.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class SQLTestUtils(object):
  • class ReusedSQLTestCase(ReusedPySparkTestCase, SQLTestUtils):
  • class QueryExecutionListenerTests(unittest.TestCase, SQLTestUtils):

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Apr 11, 2018

Test build #89173 has finished for PR 21007 at commit deacb17.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class SQLTestUtils(object):
  • class ReusedSQLTestCase(ReusedPySparkTestCase, SQLTestUtils):
  • class QueryExecutionListenerTests(unittest.TestCase, SQLTestUtils):

@HyukjinKwon
Copy link
Member Author

retest this please

import org.apache.spark.sql.util.QueryExecutionListener


class TestQueryExecutionListener extends QueryExecutionListener with Logging {
Copy link
Member

@viirya viirya Apr 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to with Logging now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops true.

@SparkQA
Copy link

SparkQA commented Apr 11, 2018

Test build #89182 has finished for PR 21007 at commit deacb17.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class SQLTestUtils(object):
  • class ReusedSQLTestCase(ReusedPySparkTestCase, SQLTestUtils):
  • class QueryExecutionListenerTests(unittest.TestCase, SQLTestUtils):

@SparkQA
Copy link

SparkQA commented Apr 11, 2018

Test build #89193 has finished for PR 21007 at commit 1a52dbe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class TestQueryExecutionListener extends QueryExecutionListener

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


import java.util.concurrent.atomic.AtomicBoolean

import org.apache.spark.internal.Logging
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should get rid of this import too. :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

D'oh.

"sql/core/target/scala-*/test-classes/org/apache/spark/sql/"
"TestQueryExecutionListener.class")
if not glob.glob(os.path.join(SPARK_HOME, filename_pattern)):
raise unittest.SkipTest(
Copy link
Member

@viirya viirya Apr 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this part. What is the case we can't find the class? TestQueryExecutionListener.scala has been removed or moved? If it happens, should we just silently skip this test like this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nope. It's when we do sbt package, according to https://spark.apache.org/docs/latest/building-spark.html#building-with-sbt. In this case, test files are not actually compiled. If we run the tests, it'd hit some exceptions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I admit It's rare. But I believe this is more correct. In fact, there are few test cases actually taking care about this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and .. for

If it happens, should we just silently skip this test like this?

Yea, ideally we should warn explicitly in the console. The problem is about our own testing script .. We could make some changes to explicitly warn but seems we need some duplicated changes.

There are some discussions / changes going on here - #20909

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I see. Makes sense.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @viirya. I know this one is a rather tricky one to judge what's righter. Will maybe cc you when we actually discuss about this further. I believe some people could think differently and I might have to have more discussion. But for now, I feel sure on this.

@viirya
Copy link
Member

viirya commented Apr 12, 2018

LGTM

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89267 has finished for PR 21007 at commit 7c1b3c6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89279 has finished for PR 21007 at commit 7c1b3c6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89290 has finished for PR 21007 at commit 7c1b3c6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Apr 13, 2018

Test build #89304 has finished for PR 21007 at commit 7c1b3c6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

Merged to master.

Thanks for reviewing this @felixcheung, @viirya and @BryanCutler.

@asfgit asfgit closed this in ab7b961 Apr 13, 2018
HyukjinKwon added a commit to HyukjinKwon/spark that referenced this pull request Apr 13, 2018
…uery executor listener

This PR proposes to add `collect` to  a query executor as an action.

Seems `collect` / `collect` with Arrow are not recognised via `QueryExecutionListener` as an action. For example, if we have a custom listener as below:

```scala
package org.apache.spark.sql

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.QueryExecutionListener

class TestQueryExecutionListener extends QueryExecutionListener with Logging {
  override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
    logError("Look at me! I'm 'onSuccess'")
  }

  override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { }
}
```
and set `spark.sql.queryExecutionListeners` to `org.apache.spark.sql.TestQueryExecutionListener`

Other operations in PySpark or Scala side seems fine:

```python
>>> sql("SELECT * FROM range(1)").show()
```
```
18/04/09 17:02:04 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess'
+---+
| id|
+---+
|  0|
+---+
```

```scala
scala> sql("SELECT * FROM range(1)").collect()
```
```
18/04/09 16:58:41 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess'
res1: Array[org.apache.spark.sql.Row] = Array([0])
```

but ..

**Before**

```python
>>> sql("SELECT * FROM range(1)").collect()
```
```
[Row(id=0)]
```

```python
>>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
>>> sql("SELECT * FROM range(1)").toPandas()
```
```
   id
0   0
```

**After**

```python
>>> sql("SELECT * FROM range(1)").collect()
```
```
18/04/09 16:57:58 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess'
[Row(id=0)]
```

```python
>>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
>>> sql("SELECT * FROM range(1)").toPandas()
```
```
18/04/09 17:53:26 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess'
   id
0   0
```

I have manually tested as described above and unit test was added.

Author: hyukjinkwon <gurwls223@apache.org>

Closes apache#21007 from HyukjinKwon/SPARK-23942.

(cherry picked from commit ab7b961)
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
@@ -3189,10 +3189,10 @@ class Dataset[T] private[sql](

private[sql] def collectToPython(): Int = {
EvaluatePython.registerPicklers()
withNewExecutionId {
withAction("collectToPython", queryExecution) { plan =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes can cause the behavior changes. Please submit a PR to document it.

@gatorsmile
Copy link
Member

gatorsmile commented Apr 15, 2018

@HyukjinKwon @BryanCutler @viirya @felixcheung The first sentence of this PR really scares me. After reading the PR description, I found it is wrong. Since the PR description will be part of our change log. Please be careful to ensure they are right.

@HyukjinKwon
Copy link
Member Author

What's wrong in the description and PR title, and what to document? Do you mean the first sentence This PR proposes to add collect to a query executor as an action. is wrong because this sentence doesn't mention Pyspark specifically? I think the PR title and other words explain what this PR changes.

It's already documented -

* A callback function that will be called when a query executed successfully.

Should we document collect in PySpark is now recignised as a query execution action in a query execution listener?

@BryanCutler
Copy link
Member

@gatorsmile I think the PR description here is great and very detailed, what exactly is wrong and scary?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants