From c190f59c02728c92aa9b5b87dd95e50f1286598f Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Thu, 1 Jun 2023 12:33:41 +0800 Subject: [PATCH 1/3] init Signed-off-by: Weichen Xu --- python/pyspark/mlv2/base.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/pyspark/mlv2/base.py b/python/pyspark/mlv2/base.py index dc503db71c06f..63631eccf2f6a 100644 --- a/python/pyspark/mlv2/base.py +++ b/python/pyspark/mlv2/base.py @@ -134,7 +134,11 @@ def _output_columns(self) -> List[Tuple[str, str]]: def _get_transform_fn(self) -> Callable[["pd.Series"], Any]: """ Return a transformation function that accepts an instance of `pd.Series` as input and - returns transformed result as an instance of `pd.Series` or `pd.DataFrame` + returns transformed result as an instance of `pd.Series` or `pd.DataFrame`. + If there's only one output column, the transformed result must be an + instance of `pd.Series`, if there are multiple output columns, the transformed result + must be an instance of `pd.DataFrame` with column names matching output schema + returned by `_output_columns` interface. """ raise NotImplementedError() From cba0d9da6cd238a1717bd9bac5b308a5a379b358 Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Thu, 1 Jun 2023 16:48:26 +0800 Subject: [PATCH 2/3] init Signed-off-by: Weichen Xu --- python/pyspark/mlv2/tests/test_feature.py | 15 +++++++-------- python/pyspark/mlv2/tests/test_summarizer.py | 10 ++++------ python/pyspark/mlv2/util.py | 16 ---------------- 3 files changed, 11 insertions(+), 30 deletions(-) diff --git a/python/pyspark/mlv2/tests/test_feature.py b/python/pyspark/mlv2/tests/test_feature.py index 8bc9d4c23070d..b91b455e1be9d 100644 --- a/python/pyspark/mlv2/tests/test_feature.py +++ b/python/pyspark/mlv2/tests/test_feature.py @@ -21,7 +21,6 @@ import numpy as np import pandas as pd -from pyspark.ml.functions import vector_to_array from pyspark.ml.linalg import Vectors from pyspark.mlv2.feature import MaxAbsScaler, StandardScaler from pyspark.sql import SparkSession @@ -35,8 +34,8 @@ class FeatureTestsMixin: def test_max_abs_scaler(self): df1 = self.spark.createDataFrame( [ - (Vectors.dense([2.0, 3.5, 1.5]),), - (Vectors.dense([-3.0, -0.5, -2.5]),), + ([2.0, 3.5, 1.5],), + ([-3.0, -0.5, -2.5],), ], schema=["features"], ) @@ -49,7 +48,7 @@ def test_max_abs_scaler(self): np.testing.assert_allclose(list(result.scaled_features), expected_result) - local_df1 = df1.withColumn("features", vector_to_array("features")).toPandas() + local_df1 = df1.toPandas() local_fit_model = scaler.fit(local_df1) local_transform_result = local_fit_model.transform(local_df1) @@ -62,9 +61,9 @@ def test_max_abs_scaler(self): def test_standard_scaler(self): df1 = self.spark.createDataFrame( [ - (Vectors.dense([2.0, 3.5, 1.5]),), - (Vectors.dense([-3.0, -0.5, -2.5]),), - (Vectors.dense([1.0, -1.5, 0.5]),), + ([2.0, 3.5, 1.5],), + ([-3.0, -0.5, -2.5],), + ([1.0, -1.5, 0.5],), ], schema=["features"], ) @@ -81,7 +80,7 @@ def test_standard_scaler(self): np.testing.assert_allclose(list(result.scaled_features), expected_result) - local_df1 = df1.withColumn("features", vector_to_array("features")).toPandas() + local_df1 = df1.toPandas() local_fit_model = scaler.fit(local_df1) local_transform_result = local_fit_model.transform(local_df1) diff --git a/python/pyspark/mlv2/tests/test_summarizer.py b/python/pyspark/mlv2/tests/test_summarizer.py index e78510b8ff49f..927ef0bdd5e5c 100644 --- a/python/pyspark/mlv2/tests/test_summarizer.py +++ b/python/pyspark/mlv2/tests/test_summarizer.py @@ -21,8 +21,6 @@ import numpy as np import pandas as pd -from pyspark.ml.linalg import Vectors -from pyspark.ml.functions import vector_to_array from pyspark.mlv2.summarizer import summarize_dataframe from pyspark.sql import SparkSession @@ -35,14 +33,14 @@ class SummarizerTestsMixin: def test_summarize_dataframe(self): df1 = self.spark.createDataFrame( [ - (Vectors.dense([2.0, -1.5]),), - (Vectors.dense([-3.0, 0.5]),), - (Vectors.dense([1.0, 3.5]),), + ([2.0, -1.5],), + ([-3.0, 0.5],), + ([1.0, 3.5],), ], schema=["features"], ) - df1_local = df1.withColumn("features", vector_to_array("features")).toPandas() + df1_local = df1.toPandas() result = summarize_dataframe(df1, "features", ["min", "max", "sum", "mean", "std"]) result_local = summarize_dataframe( diff --git a/python/pyspark/mlv2/util.py b/python/pyspark/mlv2/util.py index 9aebb3fa9a3fa..e8118184945a3 100644 --- a/python/pyspark/mlv2/util.py +++ b/python/pyspark/mlv2/util.py @@ -68,15 +68,6 @@ def aggregate_dataframe( col_types = dict(dataframe.dtypes) - for col_name in input_col_names: - col_type = col_types[col_name] - if col_type == "vector": - from pyspark.ml.functions import vector_to_array - - # pandas UDF does not support vector type for now, - # we convert it into vector type - dataframe = dataframe.withColumn(col_name, vector_to_array(col(col_name))) - dataframe = dataframe.select(*input_col_names) def compute_state(iterator: Iterable["pd.DataFrame"]) -> Iterable["pd.DataFrame"]: @@ -174,13 +165,6 @@ def transform_fn_pandas_udf(s: "pd.Series") -> "pd.Series": input_col = col(input_col_name) input_col_type = dict(dataframe.dtypes)[input_col_name] - if input_col_type == "vector": - from pyspark.ml.functions import vector_to_array - - # pandas UDF does not support vector type for now, - # we convert it into vector type - input_col = vector_to_array(input_col) - result_spark_df = dataframe.withColumn(output_col_name, transform_fn_pandas_udf(input_col)) if len(output_cols) > 1: From fef4cd215b251242b93f051d2188910f150f91e4 Mon Sep 17 00:00:00 2001 From: Weichen Xu Date: Thu, 1 Jun 2023 21:19:31 +0800 Subject: [PATCH 3/3] fix format Signed-off-by: Weichen Xu --- python/pyspark/mlv2/tests/test_feature.py | 1 - python/pyspark/mlv2/util.py | 3 --- 2 files changed, 4 deletions(-) diff --git a/python/pyspark/mlv2/tests/test_feature.py b/python/pyspark/mlv2/tests/test_feature.py index b91b455e1be9d..eed04217a6f4d 100644 --- a/python/pyspark/mlv2/tests/test_feature.py +++ b/python/pyspark/mlv2/tests/test_feature.py @@ -21,7 +21,6 @@ import numpy as np import pandas as pd -from pyspark.ml.linalg import Vectors from pyspark.mlv2.feature import MaxAbsScaler, StandardScaler from pyspark.sql import SparkSession diff --git a/python/pyspark/mlv2/util.py b/python/pyspark/mlv2/util.py index e8118184945a3..a5c2dd6e3b68f 100644 --- a/python/pyspark/mlv2/util.py +++ b/python/pyspark/mlv2/util.py @@ -66,8 +66,6 @@ def aggregate_dataframe( agg_state = local_agg_fn(dataframe) return agg_state_to_result(agg_state) - col_types = dict(dataframe.dtypes) - dataframe = dataframe.select(*input_col_names) def compute_state(iterator: Iterable["pd.DataFrame"]) -> Iterable["pd.DataFrame"]: @@ -163,7 +161,6 @@ def transform_fn_pandas_udf(s: "pd.Series") -> "pd.Series": return transform_fn(s) input_col = col(input_col_name) - input_col_type = dict(dataframe.dtypes)[input_col_name] result_spark_df = dataframe.withColumn(output_col_name, transform_fn_pandas_udf(input_col))