Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24396] [SS] [PYSPARK] Add Structured Streaming ForeachWriter for python #21477

Closed
wants to merge 8 commits into from

Conversation

tdas
Copy link
Contributor

@tdas tdas commented May 31, 2018

What changes were proposed in this pull request?

This PR adds foreach for streaming queries in Python. Users will be able to specify their processing logic in two different ways.

  • As a function that takes a row as input.
  • As an object that has methods open, process, and close methods.

See the python docs in this PR for more details.

How was this patch tested?

Added java and python unit tests

@@ -389,19 +389,7 @@ def __hash__(self):
"python/pyspark/sql"
],
python_test_goals=[
"pyspark.sql.types",
Copy link
Contributor Author

@tdas tdas May 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only to speed up local testing. Will remove this.

@@ -296,6 +296,7 @@ def tearDown(self):
# tear down test_bucketed_write state
self.spark.sql("DROP TABLE IF EXISTS pyspark_bucket")

'''
Copy link
Contributor Author

@tdas tdas May 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only to speed up local testing. Will remove this.

@tdas tdas changed the title [SPARK-24396] [SS] [PYSPARK] Add Structured Streaming ForeachWriter for python [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Streaming ForeachWriter for python May 31, 2018
@SparkQA
Copy link

SparkQA commented Jun 1, 2018

Test build #91371 has finished for PR 21477 at commit 701a455.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class PythonForeachWriter(func: PythonFunction, schema: StructType)
  • class UnsafeRowBuffer(taskMemoryManager: TaskMemoryManager, tempDir: File, numFields: Int)
  • case class ForeachWriterProvider[T](
  • case class ForeachWriterFactory[T](
  • class ForeachDataWriter[T](

@tdas
Copy link
Contributor Author

tdas commented Jun 1, 2018

jenkins retest this please.

@SparkQA
Copy link

SparkQA commented Jun 1, 2018

Test build #91385 has finished for PR 21477 at commit 0920260.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 1, 2018

Test build #91386 has finished for PR 21477 at commit f40dff6.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 1, 2018

Test build #91384 has finished for PR 21477 at commit 701a455.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class PythonForeachWriter(func: PythonFunction, schema: StructType)
  • class UnsafeRowBuffer(taskMemoryManager: TaskMemoryManager, tempDir: File, numFields: Int)
  • case class ForeachWriterProvider[T](
  • case class ForeachWriterFactory[T](
  • class ForeachDataWriter[T](

@SparkQA
Copy link

SparkQA commented Jun 1, 2018

Test build #91388 has finished for PR 21477 at commit d1cd933.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented Jun 4, 2018

@gatorsmile @zsxwing

@tdas
Copy link
Contributor Author

tdas commented Jun 4, 2018

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will take a closer look again soon.

@@ -30,6 +30,7 @@
from pyspark.sql.readwriter import OptionUtils, to_str
from pyspark.sql.types import *
from pyspark.sql.utils import StreamingQueryException
from abc import ABCMeta, abstractmethod
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas, Seems not used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

"""
The provided object is a callable function that is supposed to be called on each row.
Construct a function that takes an iterator and calls the provided function on each row.
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should better turn this to comment form..

# The ..
# Construct ...

The provided object is not a callable function. Then it is expected to have a
'process(row)' method, and optional 'open(partition_id, epoch_id)' and
'close(error)' methods.
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

else:
open_exists = True

close_exists = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe

close_exists  = hasattr(f, "close")
if close_exists:
    ...

if not callable(getattr(f, 'process')):
raise Exception("Attribute 'process' in provided object is not callable")

open_exists = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for

if open_exists

import random
file = open(os.path.join(dir, str(random.randint(0, 100000))), 'w')
file.write("%s\n" % str(event))
file.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be put in finally too?


def _write_event(self, dir, event):
import random
file = open(os.path.join(dir, str(random.randint(0, 100000))), 'w')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: file shadows the builtin function file ..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might feel more convenient with with statement, and renaming file to f or fw or so. Please ignore if there's specific reason not to use with statement.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use a uuid in the file name?

sq.processAllAvailable()
self.fail("bad writer should fail the query") # this is not expected
except StreamingQueryException as e:
# self.assertTrue("test error" in e.desc) # this is expected
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh, shall we uncomment this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cannot be done because somehow e.desc does not have the original underlying error when it's a StreamingQueryException. I think this is a separate bug that needs to be fixed independent of this PR.

self.assertEqual(len(tester.process_events()), 0) # no row was processed
close_events = tester.close_events()
self.assertEqual(len(close_events), 1)
# self.assertTrue("test error" in e[0]['error'])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}

def add(row: UnsafeRow): Unit = withLock {
assert(queue.add(row), s"Failed to add row to HybridRowQueue while sending data to Python" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s seems not needed

@HyukjinKwon
Copy link
Member

let me cc @HeartSaVioR too since he's looking through ss code bit.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Jun 5, 2018

Thanks @HyukjinKwon for cc-ing me. I didn't cover the python part on structured streaming so would take some time to cover and going through the code. Hoping I can participate reviewing in time.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have finished reviewing for the first time. For now I'm not aware of how Spark JVM processes collaborate with python, so unable to deep dive on that part.

processing one partition of the data generated in a distributed manner.

* This object must be serializable because each task will get a fresh
serialized-deserializedcopy of the provided object. Hence, it is strongly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: deserialized copy (space)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


* This object must be serializable because each task will get a fresh
serialized-deserializedcopy of the provided object. Hence, it is strongly
recommended that any initialization for writing data (e.g. opening a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any initialization for writing data (e.g. opening a connection or starting a transaction) be done open after the open(...) method has been called

be done open seems a bit odd. If we can polish the sentence it would be better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if not callable(getattr(f, 'process')):
raise Exception("Attribute 'process' in provided object is not callable")

open_exists = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be able to extract the code block to the function, since the logic for checking open and close are exactly same.

for x in iterator:
f.process(x)
except Exception as ex:
call_close_if_needed(ex)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

putting this to finally sounds like better pattern for me too, and then we may be able to inline call_close_if_needed.


def _write_event(self, dir, event):
import random
file = open(os.path.join(dir, str(random.randint(0, 100000))), 'w')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might feel more convenient with with statement, and renaming file to f or fw or so. Please ignore if there's specific reason not to use with statement.

self.assertEqual(len(close_events), 1)
# self.assertTrue("test error" in e[0]['error'])

'''
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving marker as well: This is only to speed up local testing. Will remove this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes i will after I finish adding all the tests.

* usually should do all the initialization (e.g. opening a connection or initiating a transaction)
* in the `open` method.
* The abstract class for writing custom logic to process data generated by a query.
* This is often used to write the output of a streaming query to arbitrary storage systems.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like doc is duplicated between foreach() and ForeachWriter. I'm not sure how we can leave some reference on Python doc instead of duplicating content, but even Python doc doesn't support some kind of reference, some part of content seems to be OK to be placed to either place, not both.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's a easy way to reference doc across other languages. It's unfortunate but usually we have duplicated the docs between other language API docs ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes my bad. I confused this as python.

object PythonForeachWriter {

/**
* A buffer that is designed for the sole purpose of buffering UnsafeRows in PythonForeahWriter.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: PythonForeachWriter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

some input data. That would require you to specify the processing logic in the next
way.

#. An **object** with a ``process`` method and optional ``open`` and ``close`` methods.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas, wouldn't we better just have ForeachWriter class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discussed this with @marmbrus . If there is a ForeachWriter class in python, then uses will have to additionally import it. That's just another overhead that can be avoided by just allowing any class with the appropriate methods. One less step for python users.

This is often used to write the output of a streaming query to arbitrary storage systems.
The processing logic can be specified in two ways.

#. A **function** that takes a row as input.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this variant is specific to Python. I thought we should better match how we support with Scala side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is superset of what we support in scala. Python users are more likely to use simple lambdas instead of defining classes. But they may also want to write transactional stuff in python with open and close methods. Hence providing both alternatives seems to be a good idea.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(including the response to #21477 (comment)) I kind of agree that it's a-okay idea but I think we usually provide a consistent API support so far unless it's language specific, for example, ContextManager, decorator in Python and etc.

Just for clarification, does Scala side support function only support too?

Also, I know attribute-checking way is kind of more like "Pythonic" way but I am seeing the documentation is already diverted between Scala vs Python. It costs maintaining overhead on the other hand.

Copy link
Member

@HyukjinKwon HyukjinKwon Jun 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, we could maybe consider the other ways but wouldn't it better to have the consistent support as the primary, and then see if the other ways are really requested by users? I think we could still incrementally add attribute-checking way or the lambda (or function to be more correct) way later (but we can't in the opposite way).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python APIs anyways have slightly divergences from Scala/Java APIs in order to provide better experiences for Python users. For example, StreamingQuery.lastProgress returns an object of type StreamingQueryProgress in Java/Scala but returns a dict in python. Because python users are more used to dealing with dicts, and java/scala users (typed language) are more comfortable with structures). Similarly, in DataFrame.select, you can refer to columns in scala using $"columnName" but in python, you can refer to it as df.columnName. If we strictly adhere to pure consistency, then we cannot make it convenient for users in different languages. And ultimately convenience is what matters for the user experience. So its okay to have a superset of supported types in python compared to java/scala.

Personally, I think we should also add the lambda variant to Scala as well. But that decision for Scala is independent of this PR as there is enough justification for add the lambda variant for

Copy link
Member

@HyukjinKwon HyukjinKwon Jun 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe $"columnName" is more like a language specific feature in Scala and I think df.columnName is language specific to Python.

And ultimately convenience is what matters for the user experience.

Thing is, it sounded to me like we are kind of prejudging it.. We can't revert it back easily once we go in this way ..

I think we should also add the lambda variant to Scala as well.

+1

I am okay but I hope this shouldn't be usually done next time ..

@HyukjinKwon
Copy link
Member

Seems fine so far to me otherwise

open_exists = True

close_exists = False
if hasattr(f, "close"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, for perfectness, we should check argument specification too ... although it's a bit too much. I felt like I had to mention it at least.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's all dynamically typed, right? I don't think there is a good way to check where the function accepts the right argument types without actually calling the function (please correct me if I am wrong). And this is a standard problem in python that is everybody using python is used to (i.e. runtime exceptions when using incorrectly typed parameters). Also, no other operations that take lambdas check the types and counts. So I think we should just be consistent and let that be.

Copy link
Member

@HyukjinKwon HyukjinKwon Jun 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can check argspec at least I believe. but, yup, I am fine with it as is.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks pretty good. Left some minor comments.

mode, then this guarantee does not hold and therefore should not be used for
deduplication.

* The ``close()`` method (if exists) is will be called if `open()` method exists and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* @return `true` if the corresponding partition and version id should be processed. `false`
* indicates the partition should be skipped.
*/
def open(partitionId: Long, version: Long): Boolean
def open(partitionId: Long, epochId: Long): Boolean
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming a parameter breaks Scala source compatibility. I'm totally fine to change this since it's not a stable API, just point this out.

Copy link
Contributor Author

@tdas tdas Jun 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh right... it probably breaks source compatibility. I dont think that is fine :( especially in the 2.x line, even though it is marked as experimental (its been out there for over 2 years now, this is hardly experimental and we should not remove the experimental tag, but that's a different discussion).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay. I checked but there is no compatibility issue in this case. The source compatibility issue arises when a code calls a method with param name (e.g. func(paramName=value)) and the param name changes. In this case, users are overriding the method (overriding does not care about the exact name, only the types) and Spark is internally calling the method (not by param name). So this is fine.


def _write_event(self, dir, event):
import random
file = open(os.path.join(dir, str(random.randint(0, 100000))), 'w')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use a uuid in the file name?

@SparkQA
Copy link

SparkQA commented Jun 8, 2018

Test build #91539 has finished for PR 21477 at commit ecf3d88.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas tdas changed the title [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Streaming ForeachWriter for python [SPARK-24396] [SS] [PYSPARK] Add Structured Streaming ForeachWriter for python Jun 14, 2018
@zsxwing
Copy link
Member

zsxwing commented Jun 14, 2018

LGTM

@SparkQA
Copy link

SparkQA commented Jun 15, 2018

Test build #91871 has finished for PR 21477 at commit d081110.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


if (count > 0) {
val row = queue.remove()
assert(row != null, s"HybridRowQueue.remove() returned null " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks s can be removed

if should_process:
for x in iterator:
f.process(x)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I usually write try-except-finally without a newline tho ..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like strictly related to preference, and I prefer newline. Are you referring to the Spark style guide or PEP 8 or so?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see no newline between them is more consistent in the current codebase. It should be better to keep consistent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh OK. I agree it might be better to try best to keep consistent though style guide doesn't define it explicitly.


self.assertEqual(len(tester.open_events()), 2)

self.assertEqual(len(tester.process_events()), 0) # no row was processed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two spaces before #.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that a PEP 8 rule or Spark style?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PEP 8 claims (at least) 2 spaces before inlined comments (https://www.python.org/dev/peps/pep-0008/#inline-comments). So, the previous style is totally fine but I have been doing this with less spaces since I find there's no point of adding more spaces unless it looks more pretty or has a format.

(This was just a tiny nit too me. I am okay with just ignoring such my comments personally)

tester.write_close_event(error)

tester.run_streaming_query_on_writer(ForeachWriter(), 2)
self.assertEqual(len(tester.open_events()), 0) # no open events
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for two spaces

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jun 15, 2018

(Please ignore nits. They are ignorable but I just left them while reading the codes in case you might want to address them)

@tdas
Copy link
Contributor Author

tdas commented Jun 15, 2018

@tdas
Copy link
Contributor Author

tdas commented Jun 15, 2018

@JoshRosen

@HyukjinKwon
Copy link
Member

LGTM too just for clarification.

@SparkQA
Copy link

SparkQA commented Jun 15, 2018

Test build #91898 has finished for PR 21477 at commit 1ab612f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented Jun 15, 2018

Thank you every one. I merging this.

@asfgit asfgit closed this in b5ccf0d Jun 15, 2018
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
… python

This PR adds `foreach` for streaming queries in Python. Users will be able to specify their processing logic in two different ways.
- As a function that takes a row as input.
- As an object that has methods `open`, `process`, and `close` methods.

See the python docs in this PR for more details.

Added java and python unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#21477 from tdas/SPARK-24396.

Ref: LIHADOOP-48531
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants