Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46687][PYTHON][CONNECT] Basic support of SparkSession-based memory profiler #44775

Closed
wants to merge 5 commits into from

Conversation

xinrong-meng
Copy link
Member

@xinrong-meng xinrong-meng commented Jan 18, 2024

What changes were proposed in this pull request?

Basic support of SparkSession-based memory profiler in both Spark Connect and non-Spark-Connect.

Why are the changes needed?

We need to make the memory profiler SparkSession-based to support memory profiling in Spark Connect.

Does this PR introduce any user-facing change?

Yes, the SparkSession-based memory profiler is available.

An example is as shown below

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.taskcontext import TaskContext

spark.conf.set("spark.sql.pyspark.udf.profiler", "memory")

@udf("string")
def f(x):
  if TaskContext.get().partitionId() % 2 == 0:
    return str(x)
  else:
    return None

spark.range(10).select(f(col("id"))).show()

spark.showMemoryProfiles()

shows profile result:

============================================================
Profile of UDF<id=2>
============================================================
Filename: /var/folders/h_/60n1p_5s7751jx1st4_sk0780000gp/T/ipykernel_72839/2848225169.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
     7    113.2 MiB    113.2 MiB          10   @udf("string")
     8                                         def f(x):
     9    114.4 MiB      1.3 MiB          10     if TaskContext.get().partitionId() % 2 == 0:
    10     31.8 MiB      0.1 MiB           4       return str(x)
    11                                           else:
    12     82.8 MiB      0.1 MiB           6       return None

How was this patch tested?

New and existing unit tests:

  • pyspark.tests.test_memory_profiler
  • pyspark.sql.tests.connect.test_parity_memory_profiler

And manual tests on Jupyter notebook.

Was this patch authored or co-authored using generative AI tooling?

No.

@xinrong-meng xinrong-meng changed the title [WIP] Basic support of SparkSession-based memory profiler [SPARK-46687][PYTHON] Basic support of SparkSession-based memory profiler Jan 19, 2024
@xinrong-meng xinrong-meng changed the title [SPARK-46687][PYTHON] Basic support of SparkSession-based memory profiler [SPARK-46687][PYTHON][CONNECT] Basic support of SparkSession-based memory profiler Jan 19, 2024
@xinrong-meng xinrong-meng marked this pull request as ready for review January 22, 2024 21:33
@xinrong-meng
Copy link
Member Author

https://github.com/xinrong-meng/spark/actions/runs/7648782322/job/20842144027 failure is irrelevant to the PR changes. I will rebase master.
@ueshin would you please review when you are free?

measures = self[code]
if not measures:
continue # skip if no measurement
linenos = range(min(measures), max(measures) + 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to delay to generate the full linenos until showing the results to reduce the intermediate data?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! Updated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

============================================================
Profile of UDF<id=2>
============================================================
Filename: /var/folders/h_/60n1p_5s7751jx1st4_sk0780000gp/T/ipykernel_69451/109011680.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
     8    147.7 MiB    147.7 MiB          20   @udf("string")
     9                                         def a(x):
    10    149.6 MiB      1.8 MiB          20     if TaskContext.get().partitionId() % 2 == 0:
    11     59.9 MiB      0.1 MiB           8       return str(x)
    12                                           else:
    13     89.9 MiB      0.1 MiB          12       return None

tested on Jupyter.

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, pending tests.

@ueshin
Copy link
Member

ueshin commented Jan 29, 2024

Thanks! merging to master.

@ueshin ueshin closed this in 528ac8b Jan 29, 2024
@xinrong-meng
Copy link
Member Author

Thank you @ueshin !

HyukjinKwon added a commit that referenced this pull request Feb 15, 2024
…s when codecov enabled

### What changes were proposed in this pull request?

This is a followup of #44775 that skips the tests with codecov on. It fails now (https://github.com/apache/spark/actions/runs/7709423681/job/21010676103) and the coverage report is broken.

### Why are the changes needed?

To recover the test coverage report.

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Manually tested.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #45112 from HyukjinKwon/SPARK-46687-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants