Skip to content

[SPARK-50881][PYTHON] Use cached schema where possible in conenct dataframe.py#49749

Closed
garlandz-db wants to merge 1 commit intoapache:masterfrom
garlandz-db:SPARK-50881
Closed

[SPARK-50881][PYTHON] Use cached schema where possible in conenct dataframe.py#49749
garlandz-db wants to merge 1 commit intoapache:masterfrom
garlandz-db:SPARK-50881

Conversation

@garlandz-db
Copy link
Contributor

@garlandz-db garlandz-db commented Jan 31, 2025

What changes were proposed in this pull request?

  • schema property returns a deepcopy everytime to ensure completeness. However this creates a performance degradation for internal use in dataframe.py. we make the following changes:
  1. columns returns a copy of the array of names. This is the same as classic
  2. all uses of schema in dataframe.py now calls the cached schema, avoiding a deepcopy

Why are the changes needed?

  • this does not scale well when these methods are called thousands of times like columns method in pivot

Does this PR introduce any user-facing change?

No

How was this patch tested?

  • existing tests

benchmarking does show improvement in performance approximately 1/3 times faster.

import cProfile, pstats
import copy
cProfile.run("""
x = pd.DataFrame(zip(np.random.rand(1000000), np.random.randint(1, 3000, 10000000), list(range(1000)) * 100000), columns=['x', 'y', 'z'])
df = spark.createDataFrame(x)
schema = df.schema
for i in range(1_000_000):
  [name for name in schema.names]
""")
p = pstats.Stats("profile_results")
p.sort_stats("cumtime").print_stats(.1)
         17000003 function calls in 8.886 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.931    0.931    8.886    8.886 <string>:1(<module>)
  1000000    0.391    0.000    0.391    0.000 <string>:3(<listcomp>)
  1000000    0.933    0.000    5.516    0.000 DatasetInfo.py:22(gather_imported_dataframes)
  1000000    0.948    0.000    6.669    0.000 DatasetInfo.py:75(_maybe_handle_dataframe_assignment)
  1000000    0.895    0.000    7.564    0.000 DatasetInfo.py:90(__setitem__)
  3000000    2.853    0.000    4.583    0.000 utils.py:54(retrieve_imported_type)
        1    0.000    0.000    8.886    8.886 {built-in method builtins.exec}
  3000000    0.667    0.000    0.667    0.000 {built-in method builtins.getattr}
  1000000    0.204    0.000    0.204    0.000 {built-in method builtins.isinstance}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
  3000000    0.473    0.000    0.473    0.000 {method 'get' of 'dict' objects}
  3000000    0.590    0.000    0.590    0.000 {method 'rsplit' of 'str' objects}


Thu Jan 16 20:13:47 2025    profile_results

         3 function calls in 0.000 seconds

vs

         55000003 function calls (50000003 primitive calls) in 23.181 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.987    0.987   23.181   23.181 <string>:1(<module>)
  1000000    1.060    0.000    5.750    0.000 DatasetInfo.py:22(gather_imported_dataframes)
  1000000    0.956    0.000    6.907    0.000 DatasetInfo.py:75(_maybe_handle_dataframe_assignment)
  1000000    0.930    0.000    7.837    0.000 DatasetInfo.py:90(__setitem__)
6000000/1000000    7.420    0.000   14.357    0.000 copy.py:128(deepcopy)
  5000000    0.494    0.000    0.494    0.000 copy.py:182(_deepcopy_atomic)
  1000000    2.734    0.000   11.015    0.000 copy.py:201(_deepcopy_list)
  1000000    0.951    0.000    1.160    0.000 copy.py:243(_keep_alive)
  3000000    2.946    0.000    4.690    0.000 utils.py:54(retrieve_imported_type)
        1    0.000    0.000   23.181   23.181 {built-in method builtins.exec}
  3000000    0.686    0.000    0.686    0.000 {built-in method builtins.getattr}
  9000000    0.976    0.000    0.976    0.000 {built-in method builtins.id}
  1000000    0.201    0.000    0.201    0.000 {built-in method builtins.isinstance}
  5000000    0.560    0.000    0.560    0.000 {method 'append' of 'list' objects}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
 15000000    1.673    0.000    1.673    0.000 {method 'get' of 'dict' objects}
  3000000    0.607    0.000    0.607    0.000 {method 'rsplit' of 'str' objects}


Thu Jan 16 20:13:47 2025    profile_results

         3 function calls in 0.000 seconds

Was this patch authored or co-authored using generative AI tooling?

@property
def columns(self) -> List[str]:
return self.schema.names
return [field.name for field in self._original_schema.fields]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now its same as classic/dataframe.py implementation

@HyukjinKwon HyukjinKwon changed the title [SPARK-50881] Use cached schema where possible in conenct dataframe.py [SPARK-50881][PYTHON] Use cached schema where possible in conenct dataframe.py Feb 3, 2025
@HyukjinKwon
Copy link
Member

cc @zhengruifeng

def _original_schema(self) -> StructType:
if self._cached_schema:
return self._cached_schema
return self.schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be a property
And since this is only for internal usage, I think we can avoid calling self.schema which has deepcopy.

e.g.

    @property
    def _schema(self) -> StructType:
        if self._cached_schema is None:
            query = self._plan.to_proto(self._session.client)
            self._cached_schema = self._session.client.schema(query)
        return self._cached_schema

    @property
    def schema(self) -> StructType:
        return copy.deepcopy(self._schema)
```

@garlandz-db
Copy link
Contributor Author

Failed tests dont seem relevant

@zhengruifeng
Copy link
Contributor

Failed tests dont seem relevant

please rebase this PR to latest master, to make sure CI is green

@garlandz-db garlandz-db force-pushed the SPARK-50881 branch 2 times, most recently from 63a88e3 to f338b47 Compare February 6, 2025 10:32
zhengruifeng pushed a commit that referenced this pull request Feb 10, 2025
…aframe.py

### What changes were proposed in this pull request?

* schema property returns a deepcopy everytime to ensure completeness. However this creates a performance degradation for internal use in dataframe.py. we make the following changes:

1. `columns` returns a copy of the array of names. This is the same as classic
2. all uses of schema in dataframe.py now calls the cached schema, avoiding a deepcopy

### Why are the changes needed?
* this does not scale well when these methods are called thousands of times like `columns` method in `pivot`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
* existing tests

benchmarking does show improvement in performance approximately 1/3 times faster.

```
import cProfile, pstats
import copy
cProfile.run("""
x = pd.DataFrame(zip(np.random.rand(1000000), np.random.randint(1, 3000, 10000000), list(range(1000)) * 100000), columns=['x', 'y', 'z'])
df = spark.createDataFrame(x)
schema = df.schema
for i in range(1_000_000):
  [name for name in schema.names]
""")
p = pstats.Stats("profile_results")
p.sort_stats("cumtime").print_stats(.1)
```
```
         17000003 function calls in 8.886 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.931    0.931    8.886    8.886 <string>:1(<module>)
  1000000    0.391    0.000    0.391    0.000 <string>:3(<listcomp>)
  1000000    0.933    0.000    5.516    0.000 DatasetInfo.py:22(gather_imported_dataframes)
  1000000    0.948    0.000    6.669    0.000 DatasetInfo.py:75(_maybe_handle_dataframe_assignment)
  1000000    0.895    0.000    7.564    0.000 DatasetInfo.py:90(__setitem__)
  3000000    2.853    0.000    4.583    0.000 utils.py:54(retrieve_imported_type)
        1    0.000    0.000    8.886    8.886 {built-in method builtins.exec}
  3000000    0.667    0.000    0.667    0.000 {built-in method builtins.getattr}
  1000000    0.204    0.000    0.204    0.000 {built-in method builtins.isinstance}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
  3000000    0.473    0.000    0.473    0.000 {method 'get' of 'dict' objects}
  3000000    0.590    0.000    0.590    0.000 {method 'rsplit' of 'str' objects}

Thu Jan 16 20:13:47 2025    profile_results

         3 function calls in 0.000 seconds

```
vs

```
         55000003 function calls (50000003 primitive calls) in 23.181 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.987    0.987   23.181   23.181 <string>:1(<module>)
  1000000    1.060    0.000    5.750    0.000 DatasetInfo.py:22(gather_imported_dataframes)
  1000000    0.956    0.000    6.907    0.000 DatasetInfo.py:75(_maybe_handle_dataframe_assignment)
  1000000    0.930    0.000    7.837    0.000 DatasetInfo.py:90(__setitem__)
6000000/1000000    7.420    0.000   14.357    0.000 copy.py:128(deepcopy)
  5000000    0.494    0.000    0.494    0.000 copy.py:182(_deepcopy_atomic)
  1000000    2.734    0.000   11.015    0.000 copy.py:201(_deepcopy_list)
  1000000    0.951    0.000    1.160    0.000 copy.py:243(_keep_alive)
  3000000    2.946    0.000    4.690    0.000 utils.py:54(retrieve_imported_type)
        1    0.000    0.000   23.181   23.181 {built-in method builtins.exec}
  3000000    0.686    0.000    0.686    0.000 {built-in method builtins.getattr}
  9000000    0.976    0.000    0.976    0.000 {built-in method builtins.id}
  1000000    0.201    0.000    0.201    0.000 {built-in method builtins.isinstance}
  5000000    0.560    0.000    0.560    0.000 {method 'append' of 'list' objects}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
 15000000    1.673    0.000    1.673    0.000 {method 'get' of 'dict' objects}
  3000000    0.607    0.000    0.607    0.000 {method 'rsplit' of 'str' objects}

Thu Jan 16 20:13:47 2025    profile_results

         3 function calls in 0.000 seconds

```

### Was this patch authored or co-authored using generative AI tooling?

Closes #49749 from garlandz-db/SPARK-50881.

Authored-by: Garland Zhang <garland.zhang@databricks.com>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
(cherry picked from commit 9f86647)
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
@zhengruifeng
Copy link
Contributor

merged to master/4.0

zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 14, 2025
…aframe.py

### What changes were proposed in this pull request?

* schema property returns a deepcopy everytime to ensure completeness. However this creates a performance degradation for internal use in dataframe.py. we make the following changes:

1. `columns` returns a copy of the array of names. This is the same as classic
2. all uses of schema in dataframe.py now calls the cached schema, avoiding a deepcopy

### Why are the changes needed?
* this does not scale well when these methods are called thousands of times like `columns` method in `pivot`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
* existing tests

benchmarking does show improvement in performance approximately 1/3 times faster.

```
import cProfile, pstats
import copy
cProfile.run("""
x = pd.DataFrame(zip(np.random.rand(1000000), np.random.randint(1, 3000, 10000000), list(range(1000)) * 100000), columns=['x', 'y', 'z'])
df = spark.createDataFrame(x)
schema = df.schema
for i in range(1_000_000):
  [name for name in schema.names]
""")
p = pstats.Stats("profile_results")
p.sort_stats("cumtime").print_stats(.1)
```
```
         17000003 function calls in 8.886 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.931    0.931    8.886    8.886 <string>:1(<module>)
  1000000    0.391    0.000    0.391    0.000 <string>:3(<listcomp>)
  1000000    0.933    0.000    5.516    0.000 DatasetInfo.py:22(gather_imported_dataframes)
  1000000    0.948    0.000    6.669    0.000 DatasetInfo.py:75(_maybe_handle_dataframe_assignment)
  1000000    0.895    0.000    7.564    0.000 DatasetInfo.py:90(__setitem__)
  3000000    2.853    0.000    4.583    0.000 utils.py:54(retrieve_imported_type)
        1    0.000    0.000    8.886    8.886 {built-in method builtins.exec}
  3000000    0.667    0.000    0.667    0.000 {built-in method builtins.getattr}
  1000000    0.204    0.000    0.204    0.000 {built-in method builtins.isinstance}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
  3000000    0.473    0.000    0.473    0.000 {method 'get' of 'dict' objects}
  3000000    0.590    0.000    0.590    0.000 {method 'rsplit' of 'str' objects}

Thu Jan 16 20:13:47 2025    profile_results

         3 function calls in 0.000 seconds

```
vs

```
         55000003 function calls (50000003 primitive calls) in 23.181 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.987    0.987   23.181   23.181 <string>:1(<module>)
  1000000    1.060    0.000    5.750    0.000 DatasetInfo.py:22(gather_imported_dataframes)
  1000000    0.956    0.000    6.907    0.000 DatasetInfo.py:75(_maybe_handle_dataframe_assignment)
  1000000    0.930    0.000    7.837    0.000 DatasetInfo.py:90(__setitem__)
6000000/1000000    7.420    0.000   14.357    0.000 copy.py:128(deepcopy)
  5000000    0.494    0.000    0.494    0.000 copy.py:182(_deepcopy_atomic)
  1000000    2.734    0.000   11.015    0.000 copy.py:201(_deepcopy_list)
  1000000    0.951    0.000    1.160    0.000 copy.py:243(_keep_alive)
  3000000    2.946    0.000    4.690    0.000 utils.py:54(retrieve_imported_type)
        1    0.000    0.000   23.181   23.181 {built-in method builtins.exec}
  3000000    0.686    0.000    0.686    0.000 {built-in method builtins.getattr}
  9000000    0.976    0.000    0.976    0.000 {built-in method builtins.id}
  1000000    0.201    0.000    0.201    0.000 {built-in method builtins.isinstance}
  5000000    0.560    0.000    0.560    0.000 {method 'append' of 'list' objects}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
 15000000    1.673    0.000    1.673    0.000 {method 'get' of 'dict' objects}
  3000000    0.607    0.000    0.607    0.000 {method 'rsplit' of 'str' objects}

Thu Jan 16 20:13:47 2025    profile_results

         3 function calls in 0.000 seconds

```

### Was this patch authored or co-authored using generative AI tooling?

Closes apache#49749 from garlandz-db/SPARK-50881.

Authored-by: Garland Zhang <garland.zhang@databricks.com>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
(cherry picked from commit 7d560ba)
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants