Skip to content

Commit

Permalink
[SPARK-28185][PYTHON][SQL] Closes the generator when Python UDFs stop…
Browse files Browse the repository at this point in the history
… early

## What changes were proposed in this pull request?

 Closes the generator when Python UDFs stop early.

### Manually verification on pandas iterator UDF and mapPartitions

```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import col, udf
from pyspark.taskcontext import TaskContext
import time
import os

spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '1')
spark.conf.set('spark.sql.pandas.udf.buffer.size', '4')

pandas_udf("int", PandasUDFType.SCALAR_ITER)
def fi1(it):
    try:
        for batch in it:
            yield batch + 100
            time.sleep(1.0)
    except BaseException as be:
        print("Debug: exception raised: " + str(type(be)))
        raise be
    finally:
        open("/tmp/000001.tmp", "a").close()

df1 = spark.range(10).select(col('id').alias('a')).repartition(1)

# will see log Debug: exception raised: <class 'GeneratorExit'>
# and file "/tmp/000001.tmp" generated.
df1.select(col('a'), fi1('a')).limit(2).collect()

def mapper(it):
    try:
        for batch in it:
                yield batch
    except BaseException as be:
        print("Debug: exception raised: " + str(type(be)))
        raise be
    finally:
        open("/tmp/000002.tmp", "a").close()

df2 = spark.range(10000000).repartition(1)

# will see log Debug: exception raised: <class 'GeneratorExit'>
# and file "/tmp/000002.tmp" generated.
df2.rdd.mapPartitions(mapper).take(2)

```

## How was this patch tested?

Unit test added.

Please review https://spark.apache.org/contributing.html before opening a pull request.

Closes #24986 from WeichenXu123/pandas_iter_udf_limit.

Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
WeichenXu123 authored and HyukjinKwon committed Jun 28, 2019
1 parent 410a898 commit 31e7c37
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
37 changes: 37 additions & 0 deletions python/pyspark/sql/tests/test_pandas_udf_scalar.py
Expand Up @@ -850,6 +850,43 @@ def test_close(batch_iter):
with self.assertRaisesRegexp(Exception, "reached finally block"):
self.spark.range(1).select(test_close(col("id"))).collect()

def test_scalar_iter_udf_close_early(self):
tmp_dir = tempfile.mkdtemp()
try:
tmp_file = tmp_dir + '/reach_finally_block'

@pandas_udf('int', PandasUDFType.SCALAR_ITER)
def test_close(batch_iter):
generator_exit_caught = False
try:
for batch in batch_iter:
yield batch
time.sleep(1.0) # avoid the function finish too fast.
except GeneratorExit as ge:
generator_exit_caught = True
raise ge
finally:
assert generator_exit_caught, "Generator exit exception was not caught."
open(tmp_file, 'a').close()

with QuietTest(self.sc):
with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 1,
"spark.sql.pandas.udf.buffer.size": 4}):
self.spark.range(10).repartition(1) \
.select(test_close(col("id"))).limit(2).collect()
# wait here because python udf worker will take some time to detect
# jvm side socket closed and then will trigger `GenerateExit` raised.
# wait timeout is 10s.
for i in range(100):
time.sleep(0.1)
if os.path.exists(tmp_file):
break

assert os.path.exists(tmp_file), "finally block not reached."

finally:
shutil.rmtree(tmp_dir)

# Regression test for SPARK-23314
def test_timestamp_dst(self):
# Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am
Expand Down
7 changes: 6 additions & 1 deletion python/pyspark/worker.py
Expand Up @@ -481,7 +481,12 @@ def main(infile, outfile):

def process():
iterator = deserializer.load_stream(infile)
serializer.dump_stream(func(split_index, iterator), outfile)
out_iter = func(split_index, iterator)
try:
serializer.dump_stream(out_iter, outfile)
finally:
if hasattr(out_iter, 'close'):
out_iter.close()

if profiler:
profiler.profile(process)
Expand Down

0 comments on commit 31e7c37

Please sign in to comment.