diff --git a/python/pyspark/mlv2/base.py b/python/pyspark/mlv2/base.py index dc503db71c06f..63631eccf2f6a 100644 --- a/python/pyspark/mlv2/base.py +++ b/python/pyspark/mlv2/base.py @@ -134,7 +134,11 @@ def _output_columns(self) -> List[Tuple[str, str]]: def _get_transform_fn(self) -> Callable[["pd.Series"], Any]: """ Return a transformation function that accepts an instance of `pd.Series` as input and - returns transformed result as an instance of `pd.Series` or `pd.DataFrame` + returns transformed result as an instance of `pd.Series` or `pd.DataFrame`. + If there's only one output column, the transformed result must be an + instance of `pd.Series`, if there are multiple output columns, the transformed result + must be an instance of `pd.DataFrame` with column names matching output schema + returned by `_output_columns` interface. """ raise NotImplementedError() diff --git a/python/pyspark/mlv2/tests/test_feature.py b/python/pyspark/mlv2/tests/test_feature.py index 8bc9d4c23070d..eed04217a6f4d 100644 --- a/python/pyspark/mlv2/tests/test_feature.py +++ b/python/pyspark/mlv2/tests/test_feature.py @@ -21,8 +21,6 @@ import numpy as np import pandas as pd -from pyspark.ml.functions import vector_to_array -from pyspark.ml.linalg import Vectors from pyspark.mlv2.feature import MaxAbsScaler, StandardScaler from pyspark.sql import SparkSession @@ -35,8 +33,8 @@ class FeatureTestsMixin: def test_max_abs_scaler(self): df1 = self.spark.createDataFrame( [ - (Vectors.dense([2.0, 3.5, 1.5]),), - (Vectors.dense([-3.0, -0.5, -2.5]),), + ([2.0, 3.5, 1.5],), + ([-3.0, -0.5, -2.5],), ], schema=["features"], ) @@ -49,7 +47,7 @@ def test_max_abs_scaler(self): np.testing.assert_allclose(list(result.scaled_features), expected_result) - local_df1 = df1.withColumn("features", vector_to_array("features")).toPandas() + local_df1 = df1.toPandas() local_fit_model = scaler.fit(local_df1) local_transform_result = local_fit_model.transform(local_df1) @@ -62,9 +60,9 @@ def test_max_abs_scaler(self): def test_standard_scaler(self): df1 = self.spark.createDataFrame( [ - (Vectors.dense([2.0, 3.5, 1.5]),), - (Vectors.dense([-3.0, -0.5, -2.5]),), - (Vectors.dense([1.0, -1.5, 0.5]),), + ([2.0, 3.5, 1.5],), + ([-3.0, -0.5, -2.5],), + ([1.0, -1.5, 0.5],), ], schema=["features"], ) @@ -81,7 +79,7 @@ def test_standard_scaler(self): np.testing.assert_allclose(list(result.scaled_features), expected_result) - local_df1 = df1.withColumn("features", vector_to_array("features")).toPandas() + local_df1 = df1.toPandas() local_fit_model = scaler.fit(local_df1) local_transform_result = local_fit_model.transform(local_df1) diff --git a/python/pyspark/mlv2/tests/test_summarizer.py b/python/pyspark/mlv2/tests/test_summarizer.py index e78510b8ff49f..927ef0bdd5e5c 100644 --- a/python/pyspark/mlv2/tests/test_summarizer.py +++ b/python/pyspark/mlv2/tests/test_summarizer.py @@ -21,8 +21,6 @@ import numpy as np import pandas as pd -from pyspark.ml.linalg import Vectors -from pyspark.ml.functions import vector_to_array from pyspark.mlv2.summarizer import summarize_dataframe from pyspark.sql import SparkSession @@ -35,14 +33,14 @@ class SummarizerTestsMixin: def test_summarize_dataframe(self): df1 = self.spark.createDataFrame( [ - (Vectors.dense([2.0, -1.5]),), - (Vectors.dense([-3.0, 0.5]),), - (Vectors.dense([1.0, 3.5]),), + ([2.0, -1.5],), + ([-3.0, 0.5],), + ([1.0, 3.5],), ], schema=["features"], ) - df1_local = df1.withColumn("features", vector_to_array("features")).toPandas() + df1_local = df1.toPandas() result = summarize_dataframe(df1, "features", ["min", "max", "sum", "mean", "std"]) result_local = summarize_dataframe( diff --git a/python/pyspark/mlv2/util.py b/python/pyspark/mlv2/util.py index 9aebb3fa9a3fa..a5c2dd6e3b68f 100644 --- a/python/pyspark/mlv2/util.py +++ b/python/pyspark/mlv2/util.py @@ -66,17 +66,6 @@ def aggregate_dataframe( agg_state = local_agg_fn(dataframe) return agg_state_to_result(agg_state) - col_types = dict(dataframe.dtypes) - - for col_name in input_col_names: - col_type = col_types[col_name] - if col_type == "vector": - from pyspark.ml.functions import vector_to_array - - # pandas UDF does not support vector type for now, - # we convert it into vector type - dataframe = dataframe.withColumn(col_name, vector_to_array(col(col_name))) - dataframe = dataframe.select(*input_col_names) def compute_state(iterator: Iterable["pd.DataFrame"]) -> Iterable["pd.DataFrame"]: @@ -172,14 +161,6 @@ def transform_fn_pandas_udf(s: "pd.Series") -> "pd.Series": return transform_fn(s) input_col = col(input_col_name) - input_col_type = dict(dataframe.dtypes)[input_col_name] - - if input_col_type == "vector": - from pyspark.ml.functions import vector_to_array - - # pandas UDF does not support vector type for now, - # we convert it into vector type - input_col = vector_to_array(input_col) result_spark_df = dataframe.withColumn(output_col_name, transform_fn_pandas_udf(input_col))