Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48044][PYTHON][CONNECT] Cache DataFrame.isStreaming #46281

Closed

Conversation

zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented Apr 29, 2024

What changes were proposed in this pull request?

Cache DataFrame.isStreaming

Why are the changes needed?

In PS, DataFrame.isStreaming is used in the construction of InternalFrame

assert not spark_frame.isStreaming, "pandas-on-Spark does not support Structured Streaming."

it might cause performance issues since a lot of InternalFrame will be built even in a simple logic, such as

import cProfile, pstats
import pyspark.pandas as ps

df1 = ps.DataFrame({'lkey': ['foo', 'bar', 'baz', 'foo'], 'value': [1, 2, 3, 5]}, columns=['lkey', 'value'])

cProfile.run("df1['value2'] = df1['value'] + 123 + 456", "/tmp/profile_results")

pstats.Stats("/tmp/profile_results").sort_stats("cumtime").print_stats(.1)

before:

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    0.120    0.120 {built-in method builtins.exec}
        1    0.000    0.000    0.120    0.120 <string>:1(<module>)
       27    0.000    0.000    0.113    0.004 /Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client/core.py:1160(_analyze)
       27    0.000    0.000    0.112    0.004 /Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_311/lib/python3.11/site-packages/grpc/_channel.py:1161(__call__)
       27    0.000    0.000    0.112    0.004 /Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_311/lib/python3.11/site-packages/grpc/_channel.py:1124(_blocking)
       27    0.110    0.004    0.110    0.004 {method 'next_event' of 'grpc._cython.cygrpc.SegregatedCall' objects}
        2    0.000    0.000    0.108    0.054 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/base.py:318(__add__)
        2    0.000    0.000    0.078    0.039 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/data_type_ops/num_ops.py:79(add)
        2    0.000    0.000    0.074    0.037 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/base.py:210(wrapper)
       24    0.000    0.000    0.064    0.003 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/internal.py:1435(copy)
       24    0.000    0.000    0.063    0.003 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/internal.py:535(__init__)

after:

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    0.082    0.082 {built-in method builtins.exec}
        1    0.000    0.000    0.082    0.082 <string>:1(<module>)
        2    0.000    0.000    0.081    0.041 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/base.py:318(__add__)
        4    0.000    0.000    0.078    0.019 /Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client/core.py:1160(_analyze)
        4    0.000    0.000    0.077    0.019 /Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_311/lib/python3.11/site-packages/grpc/_channel.py:1161(__call__)
        4    0.000    0.000    0.077    0.019 /Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_311/lib/python3.11/site-packages/grpc/_channel.py:1124(_blocking)
        4    0.077    0.019    0.077    0.019 {method 'next_event' of 'grpc._cython.cygrpc.SegregatedCall' objects}
       26    0.000    0.000    0.058    0.002 /Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/dataframe.py:1783(schema)
        3    0.000    0.000    0.058    0.019 /Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client/core.py:1031(schema)
        2    0.000    0.000    0.057    0.029 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/data_type_ops/num_ops.py:79(add)
        2    0.000    0.000    0.057    0.028 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/base.py:210(wrapper)
       24    0.000    0.000    0.026    0.001 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/internal.py:1435(copy)
       21    0.000    0.000    0.025    0.001 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/internal.py:1419(select_column)
       19    0.000    0.000    0.025    0.001 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/series.py:438(_internal)
       24    0.000    0.000    0.025    0.001 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/internal.py:535(__init__)

There are 24 InternalFrame.__init__ invocations in this example, the number of RPC is reduced from 27 to 4.

Does this PR introduce any user-facing change?

no

How was this patch tested?

ci

Was this patch authored or co-authored using generative AI tooling?

no

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

@dongjoon-hyun
Copy link
Member

Merged to master. Thank you, @zhengruifeng .

@zhengruifeng zhengruifeng deleted the cache_is_streaming_x branch April 29, 2024 23:46
@zhengruifeng
Copy link
Contributor Author

thanks @dongjoon-hyun for reviews

JacobZheng0927 pushed a commit to JacobZheng0927/spark that referenced this pull request May 11, 2024
### What changes were proposed in this pull request?
Cache `DataFrame.isStreaming`

### Why are the changes needed?
In PS, `DataFrame.isStreaming` is used in the construction of `InternalFrame`

https://github.com/apache/spark/blob/e01ac581f46aa595e66daf33fe92b56d1328bc78/python/pyspark/pandas/internal.py#L624

it might cause performance issues since a lot of `InternalFrame` will be built even in a simple logic, such as

```
import cProfile, pstats
import pyspark.pandas as ps

df1 = ps.DataFrame({'lkey': ['foo', 'bar', 'baz', 'foo'], 'value': [1, 2, 3, 5]}, columns=['lkey', 'value'])

cProfile.run("df1['value2'] = df1['value'] + 123 + 456", "/tmp/profile_results")

pstats.Stats("/tmp/profile_results").sort_stats("cumtime").print_stats(.1)
```

before:
```
   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    0.120    0.120 {built-in method builtins.exec}
        1    0.000    0.000    0.120    0.120 <string>:1(<module>)
       27    0.000    0.000    0.113    0.004 /Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client/core.py:1160(_analyze)
       27    0.000    0.000    0.112    0.004 /Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_311/lib/python3.11/site-packages/grpc/_channel.py:1161(__call__)
       27    0.000    0.000    0.112    0.004 /Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_311/lib/python3.11/site-packages/grpc/_channel.py:1124(_blocking)
       27    0.110    0.004    0.110    0.004 {method 'next_event' of 'grpc._cython.cygrpc.SegregatedCall' objects}
        2    0.000    0.000    0.108    0.054 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/base.py:318(__add__)
        2    0.000    0.000    0.078    0.039 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/data_type_ops/num_ops.py:79(add)
        2    0.000    0.000    0.074    0.037 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/base.py:210(wrapper)
       24    0.000    0.000    0.064    0.003 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/internal.py:1435(copy)
       24    0.000    0.000    0.063    0.003 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/internal.py:535(__init__)

```

after:
```
   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    0.082    0.082 {built-in method builtins.exec}
        1    0.000    0.000    0.082    0.082 <string>:1(<module>)
        2    0.000    0.000    0.081    0.041 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/base.py:318(__add__)
        4    0.000    0.000    0.078    0.019 /Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client/core.py:1160(_analyze)
        4    0.000    0.000    0.077    0.019 /Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_311/lib/python3.11/site-packages/grpc/_channel.py:1161(__call__)
        4    0.000    0.000    0.077    0.019 /Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_311/lib/python3.11/site-packages/grpc/_channel.py:1124(_blocking)
        4    0.077    0.019    0.077    0.019 {method 'next_event' of 'grpc._cython.cygrpc.SegregatedCall' objects}
       26    0.000    0.000    0.058    0.002 /Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/dataframe.py:1783(schema)
        3    0.000    0.000    0.058    0.019 /Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client/core.py:1031(schema)
        2    0.000    0.000    0.057    0.029 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/data_type_ops/num_ops.py:79(add)
        2    0.000    0.000    0.057    0.028 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/base.py:210(wrapper)
       24    0.000    0.000    0.026    0.001 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/internal.py:1435(copy)
       21    0.000    0.000    0.025    0.001 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/internal.py:1419(select_column)
       19    0.000    0.000    0.025    0.001 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/series.py:438(_internal)
       24    0.000    0.000    0.025    0.001 /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/internal.py:535(__init__)
```

There are 24 `InternalFrame.__init__` invocations in this example, the number of RPC is reduced from 27 to 4.

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#46281 from zhengruifeng/cache_is_streaming_x.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants