From e5c827b0cf4cae546f1d1ad435262ef8d339ba71 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 5 Dec 2025 10:29:18 +0800 Subject: [PATCH] test --- python/pyspark/ml/connect/feature.py | 75 ++++++++++--------- .../tests/connect/test_legacy_mode_feature.py | 13 ---- 2 files changed, 40 insertions(+), 48 deletions(-) diff --git a/python/pyspark/ml/connect/feature.py b/python/pyspark/ml/connect/feature.py index 42b470246d50..b3832db4668c 100644 --- a/python/pyspark/ml/connect/feature.py +++ b/python/pyspark/ml/connect/feature.py @@ -17,7 +17,8 @@ import numpy as np import pandas as pd -import pickle +import pyarrow as pa + from typing import Any, Union, List, Tuple, Callable, Dict, Optional from pyspark import keyword_only @@ -118,27 +119,29 @@ def map_value(x: "np.ndarray") -> "np.ndarray": return transform_fn def _get_core_model_filename(self) -> str: - return self.__class__.__name__ + ".sklearn.pkl" + return self.__class__.__name__ + ".arrow.parquet" def _save_core_model(self, path: str) -> None: - from sklearn.preprocessing import MaxAbsScaler as sk_MaxAbsScaler - - sk_model = sk_MaxAbsScaler() - sk_model.scale_ = self.scale_values - sk_model.max_abs_ = self.max_abs_values - sk_model.n_features_in_ = len(self.max_abs_values) # type: ignore[arg-type] - sk_model.n_samples_seen_ = self.n_samples_seen - - with open(path, "wb") as fp: - pickle.dump(sk_model, fp) + import pyarrow.parquet as pq + + table = pa.Table.from_arrays( + [ + pa.array([self.scale_values], pa.list_(pa.float64())), + pa.array([self.max_abs_values], pa.list_(pa.float64())), + pa.array([self.n_samples_seen], pa.int64()), + ], + names=["scale", "max_abs", "n_samples"], + ) + pq.write_table(table, path) def _load_core_model(self, path: str) -> None: - with open(path, "rb") as fp: - sk_model = pickle.load(fp) + import pyarrow.parquet as pq + + table = pq.read_table(path) - self.max_abs_values = sk_model.max_abs_ - self.scale_values = sk_model.scale_ - self.n_samples_seen = sk_model.n_samples_seen_ + self.max_abs_values = np.array(table.column("scale")[0].as_py()) + self.scale_values = np.array(table.column("max_abs")[0].as_py()) + self.n_samples_seen = table.column("n_samples")[0].as_py() class StandardScaler(Estimator, HasInputCol, HasOutputCol, ParamsReadWrite): @@ -233,26 +236,28 @@ def map_value(x: "np.ndarray") -> "np.ndarray": return transform_fn def _get_core_model_filename(self) -> str: - return self.__class__.__name__ + ".sklearn.pkl" + return self.__class__.__name__ + ".arrow.parquet" def _save_core_model(self, path: str) -> None: - from sklearn.preprocessing import StandardScaler as sk_StandardScaler - - sk_model = sk_StandardScaler(with_mean=True, with_std=True) - sk_model.scale_ = self.scale_values - sk_model.var_ = self.std_values * self.std_values # type: ignore[operator] - sk_model.mean_ = self.mean_values - sk_model.n_features_in_ = len(self.std_values) # type: ignore[arg-type] - sk_model.n_samples_seen_ = self.n_samples_seen - - with open(path, "wb") as fp: - pickle.dump(sk_model, fp) + import pyarrow.parquet as pq + + table = pa.Table.from_arrays( + [ + pa.array([self.scale_values], pa.list_(pa.float64())), + pa.array([self.mean_values], pa.list_(pa.float64())), + pa.array([self.std_values], pa.list_(pa.float64())), + pa.array([self.n_samples_seen], pa.int64()), + ], + names=["scale", "mean", "std", "n_samples"], + ) + pq.write_table(table, path) def _load_core_model(self, path: str) -> None: - with open(path, "rb") as fp: - sk_model = pickle.load(fp) + import pyarrow.parquet as pq + + table = pq.read_table(path) - self.std_values = np.sqrt(sk_model.var_) - self.scale_values = sk_model.scale_ - self.mean_values = sk_model.mean_ - self.n_samples_seen = sk_model.n_samples_seen_ + self.scale_values = np.array(table.column("scale")[0].as_py()) + self.mean_values = np.array(table.column("mean")[0].as_py()) + self.std_values = np.array(table.column("std")[0].as_py()) + self.n_samples_seen = table.column("n_samples")[0].as_py() diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py b/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py index 3aac4a0e0972..328effef233b 100644 --- a/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py +++ b/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py @@ -17,7 +17,6 @@ # import os -import pickle import numpy as np import tempfile import unittest @@ -75,12 +74,6 @@ def test_max_abs_scaler(self): np.testing.assert_allclose(model.max_abs_values, loaded_model.max_abs_values) assert model.n_samples_seen == loaded_model.n_samples_seen - # Test loading core model as scikit-learn model - with open(os.path.join(model_path, "MaxAbsScalerModel.sklearn.pkl"), "rb") as f: - sk_model = pickle.load(f) - sk_result = sk_model.transform(np.stack(list(local_df1.features))) - np.testing.assert_allclose(sk_result, expected_result) - def test_standard_scaler(self): df1 = self.spark.createDataFrame( [ @@ -129,12 +122,6 @@ def test_standard_scaler(self): np.testing.assert_allclose(model.scale_values, loaded_model.scale_values) assert model.n_samples_seen == loaded_model.n_samples_seen - # Test loading core model as scikit-learn model - with open(os.path.join(model_path, "StandardScalerModel.sklearn.pkl"), "rb") as f: - sk_model = pickle.load(f) - sk_result = sk_model.transform(np.stack(list(local_df1.features))) - np.testing.assert_allclose(sk_result, expected_result) - class FeatureTests(FeatureTestsMixin, unittest.TestCase): def setUp(self) -> None: