Skip to content

[SPARK-57860][ML][PYTHON] Add HasIntermediateStorageLevel shared param and apply to KMeans#56949

Closed
maoli67660 wants to merge 1 commit into
apache:masterfrom
maoli67660:SPARK-57860
Closed

[SPARK-57860][ML][PYTHON] Add HasIntermediateStorageLevel shared param and apply to KMeans#56949
maoli67660 wants to merge 1 commit into
apache:masterfrom
maoli67660:SPARK-57860

Conversation

@maoli67660

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

This is the first sub-task of SPARK-47103, which aims to make the default storage level of MLlib's intermediate datasets configurable.

This PR:

  1. Adds a new shared param HasIntermediateStorageLevel (default "MEMORY_AND_DISK", cannot be "NONE") by extending the code generators on both sides:
    • Scala: SharedParamsCodeGen.scala -> regenerated sharedParams.scala
    • Python: _shared_params_code_gen.py -> regenerated shared.py
  2. Applies it to KMeans (Scala and PySpark): mixes in the trait, adds setIntermediateStorageLevel, and uses $(intermediateStorageLevel) at the persist call site instead of the hardcoded StorageLevel.MEMORY_AND_DISK.
  3. Adds test coverage in KMeansSuite.

The design follows the suggestion from @zhengruifeng on the earlier PR #45182 (a per-estimator param via a shared HasIntermediateStorageLevel trait, consistent with ALS's existing intermediateStorageLevel), rather than the global SQL config explored there. The remaining estimators are tracked as sibling sub-tasks under SPARK-47103.

Why are the changes needed?

MLlib persists intermediate datasets internally during training (e.g. blockified instances), with the storage level hardcoded to MEMORY_AND_DISK. These datasets are created inside the algorithm and are not the user's input DataFrame, so users currently have no way to change their storage level -- unlike the input, which they can already cache themselves.

Making this configurable (e.g. DISK_ONLY) improves resilience to executor loss: since SPARK-27677, the External Shuffle Service can serve disk-persisted cached blocks, so disk-based intermediate storage survives executor failures. ALS already exposes exactly this via intermediateStorageLevel; this PR starts extending the same capability to the rest of MLlib.

Does this PR introduce any user-facing change?

Yes. KMeans gains a new expert param intermediateStorageLevel and a setIntermediateStorageLevel setter.

The default is "MEMORY_AND_DISK", so behavior is unchanged unless the user sets it.

Before (no way to change intermediate storage level):

kmeans = KMeans(k=3)          # intermediate data always MEMORY_AND_DISK

After:

kmeans = KMeans(k=3).setIntermediateStorageLevel("DISK_ONLY")

How was this patch tested?

  • Extended KMeansSuite to assert the new param's default value, that it can be set, and that invalid values ("NONE" and non-existent levels) are rejected. KMeansSuite passes (15/15).
  • PySpark param parity is covered by the existing pyspark.ml.tests.test_param.test_java_params, which checks that Python params match their Scala counterparts.
  • dev/mima (mllib mimaReportBinaryIssues) reports no binary compatibility problems; no MimaExcludes entries were needed.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Claude Opus 4.8)

…ply to KMeans

Introduce a `HasIntermediateStorageLevel` shared param (generated into both
`sharedParams.scala` and PySpark `shared.py`) and apply it to KMeans as the
reference implementation for SPARK-47103. This lets users control the
StorageLevel of the intermediate datasets MLlib persists internally during
training, which is currently hardcoded to MEMORY_AND_DISK. The default is
unchanged, so existing behavior is preserved.

Follows the per-estimator param approach suggested by @zhengruifeng on apache#45182,
consistent with ALS's existing `intermediateStorageLevel` param.
* Trait for shared param intermediateStorageLevel (default: "MEMORY_AND_DISK"). This trait may be changed or
* removed between minor versions.
*/
trait HasIntermediateStorageLevel extends Params {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we also make ALS also extend this new trait?
this can be done in a separate PR

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in #56979 (SPARK-57910). Thanks for the suggestion!

@zhengruifeng zhengruifeng left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks pretty good, thanks

@zhengruifeng zhengruifeng changed the title [SPARK-57860][ML] Add HasIntermediateStorageLevel shared param and apply to KMeans [SPARK-57860][ML][PYTHON] Add HasIntermediateStorageLevel shared param and apply to KMeans Jul 2, 2026
@zhengruifeng

Copy link
Copy Markdown
Contributor

this failed org.apache.spark.sql.connect.service.SparkConnectSessionHolderSuite should be unrelated

zhengruifeng pushed a commit that referenced this pull request Jul 2, 2026
…m and apply to KMeans

### What changes were proposed in this pull request?

This is the first sub-task of [SPARK-47103](https://issues.apache.org/jira/browse/SPARK-47103), which aims to make the default storage level of MLlib's intermediate datasets configurable.

This PR:
1. Adds a new shared param `HasIntermediateStorageLevel` (default `"MEMORY_AND_DISK"`, cannot be `"NONE"`) by extending the code generators on both sides:
   - Scala: `SharedParamsCodeGen.scala` -> regenerated `sharedParams.scala`
   - Python: `_shared_params_code_gen.py` -> regenerated `shared.py`
2. Applies it to `KMeans` (Scala and PySpark): mixes in the trait, adds `setIntermediateStorageLevel`, and uses `$(intermediateStorageLevel)` at the `persist` call site instead of the hardcoded `StorageLevel.MEMORY_AND_DISK`.
3. Adds test coverage in `KMeansSuite`.

The design follows the suggestion from zhengruifeng on the earlier PR #45182 (a per-estimator param via a shared `HasIntermediateStorageLevel` trait, consistent with ALS's existing `intermediateStorageLevel`), rather than the global SQL config explored there. The remaining estimators are tracked as sibling sub-tasks under SPARK-47103.

### Why are the changes needed?

MLlib persists *intermediate* datasets internally during training (e.g. blockified instances), with the storage level hardcoded to `MEMORY_AND_DISK`. These datasets are created inside the algorithm and are not the user's input `DataFrame`, so users currently have **no way** to change their storage level -- unlike the input, which they can already cache themselves.

Making this configurable (e.g. `DISK_ONLY`) improves resilience to executor loss: since SPARK-27677, the External Shuffle Service can serve disk-persisted cached blocks, so disk-based intermediate storage survives executor failures. `ALS` already exposes exactly this via `intermediateStorageLevel`; this PR starts extending the same capability to the rest of MLlib.

### Does this PR introduce _any_ user-facing change?

Yes. `KMeans` gains a new expert param `intermediateStorageLevel` and a `setIntermediateStorageLevel` setter.

The default is `"MEMORY_AND_DISK"`, so **behavior is unchanged unless the user sets it**.

Before (no way to change intermediate storage level):
```python
kmeans = KMeans(k=3)          # intermediate data always MEMORY_AND_DISK
```

After:
```python
kmeans = KMeans(k=3).setIntermediateStorageLevel("DISK_ONLY")
```

### How was this patch tested?

- Extended `KMeansSuite` to assert the new param's default value, that it can be set, and that invalid values (`"NONE"` and non-existent levels) are rejected. `KMeansSuite` passes (15/15).
- PySpark param parity is covered by the existing `pyspark.ml.tests.test_param.test_java_params`, which checks that Python params match their Scala counterparts.
- `dev/mima` (mllib `mimaReportBinaryIssues`) reports no binary compatibility problems; no `MimaExcludes` entries were needed.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Claude Opus 4.8)

Closes #56949 from maoli67660/SPARK-57860.

Lead-authored-by: Mao Li <63109264+maoli67660@users.noreply.github.com>
Co-authored-by: Mao Li <maoli@roku.com>
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
(cherry picked from commit 6313ea6)
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
@zhengruifeng

Copy link
Copy Markdown
Contributor

Merge Summary:

Posted by merge_spark_pr.py

@zhengruifeng

Copy link
Copy Markdown
Contributor

thanks, merged into master/branch-4.x

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants