Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43516][ML][FOLLOW-UP] Drop vector type support in Distributed ML for spark connect #41420

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion python/pyspark/mlv2/base.py
Expand Up @@ -134,7 +134,11 @@ def _output_columns(self) -> List[Tuple[str, str]]:
def _get_transform_fn(self) -> Callable[["pd.Series"], Any]:
"""
Return a transformation function that accepts an instance of `pd.Series` as input and
returns transformed result as an instance of `pd.Series` or `pd.DataFrame`
returns transformed result as an instance of `pd.Series` or `pd.DataFrame`.
If there's only one output column, the transformed result must be an
instance of `pd.Series`, if there are multiple output columns, the transformed result
must be an instance of `pd.DataFrame` with column names matching output schema
returned by `_output_columns` interface.
"""
raise NotImplementedError()

Expand Down
16 changes: 7 additions & 9 deletions python/pyspark/mlv2/tests/test_feature.py
Expand Up @@ -21,8 +21,6 @@
import numpy as np
import pandas as pd

from pyspark.ml.functions import vector_to_array
from pyspark.ml.linalg import Vectors
from pyspark.mlv2.feature import MaxAbsScaler, StandardScaler
from pyspark.sql import SparkSession

Expand All @@ -35,8 +33,8 @@ class FeatureTestsMixin:
def test_max_abs_scaler(self):
df1 = self.spark.createDataFrame(
[
(Vectors.dense([2.0, 3.5, 1.5]),),
(Vectors.dense([-3.0, -0.5, -2.5]),),
([2.0, 3.5, 1.5],),
([-3.0, -0.5, -2.5],),
],
schema=["features"],
)
Expand All @@ -49,7 +47,7 @@ def test_max_abs_scaler(self):

np.testing.assert_allclose(list(result.scaled_features), expected_result)

local_df1 = df1.withColumn("features", vector_to_array("features")).toPandas()
local_df1 = df1.toPandas()
local_fit_model = scaler.fit(local_df1)
local_transform_result = local_fit_model.transform(local_df1)

Expand All @@ -62,9 +60,9 @@ def test_max_abs_scaler(self):
def test_standard_scaler(self):
df1 = self.spark.createDataFrame(
[
(Vectors.dense([2.0, 3.5, 1.5]),),
(Vectors.dense([-3.0, -0.5, -2.5]),),
(Vectors.dense([1.0, -1.5, 0.5]),),
([2.0, 3.5, 1.5],),
([-3.0, -0.5, -2.5],),
([1.0, -1.5, 0.5],),
],
schema=["features"],
)
Expand All @@ -81,7 +79,7 @@ def test_standard_scaler(self):

np.testing.assert_allclose(list(result.scaled_features), expected_result)

local_df1 = df1.withColumn("features", vector_to_array("features")).toPandas()
local_df1 = df1.toPandas()
local_fit_model = scaler.fit(local_df1)
local_transform_result = local_fit_model.transform(local_df1)

Expand Down
10 changes: 4 additions & 6 deletions python/pyspark/mlv2/tests/test_summarizer.py
Expand Up @@ -21,8 +21,6 @@
import numpy as np
import pandas as pd

from pyspark.ml.linalg import Vectors
from pyspark.ml.functions import vector_to_array
from pyspark.mlv2.summarizer import summarize_dataframe
from pyspark.sql import SparkSession

Expand All @@ -35,14 +33,14 @@ class SummarizerTestsMixin:
def test_summarize_dataframe(self):
df1 = self.spark.createDataFrame(
[
(Vectors.dense([2.0, -1.5]),),
(Vectors.dense([-3.0, 0.5]),),
(Vectors.dense([1.0, 3.5]),),
([2.0, -1.5],),
([-3.0, 0.5],),
([1.0, 3.5],),
],
schema=["features"],
)

df1_local = df1.withColumn("features", vector_to_array("features")).toPandas()
df1_local = df1.toPandas()

result = summarize_dataframe(df1, "features", ["min", "max", "sum", "mean", "std"])
result_local = summarize_dataframe(
Expand Down
19 changes: 0 additions & 19 deletions python/pyspark/mlv2/util.py
Expand Up @@ -66,17 +66,6 @@ def aggregate_dataframe(
agg_state = local_agg_fn(dataframe)
return agg_state_to_result(agg_state)

col_types = dict(dataframe.dtypes)

for col_name in input_col_names:
col_type = col_types[col_name]
if col_type == "vector":
from pyspark.ml.functions import vector_to_array

# pandas UDF does not support vector type for now,
# we convert it into vector type
dataframe = dataframe.withColumn(col_name, vector_to_array(col(col_name)))

dataframe = dataframe.select(*input_col_names)

def compute_state(iterator: Iterable["pd.DataFrame"]) -> Iterable["pd.DataFrame"]:
Expand Down Expand Up @@ -172,14 +161,6 @@ def transform_fn_pandas_udf(s: "pd.Series") -> "pd.Series":
return transform_fn(s)

input_col = col(input_col_name)
input_col_type = dict(dataframe.dtypes)[input_col_name]

if input_col_type == "vector":
from pyspark.ml.functions import vector_to_array

# pandas UDF does not support vector type for now,
# we convert it into vector type
input_col = vector_to_array(input_col)

result_spark_df = dataframe.withColumn(output_col_name, transform_fn_pandas_udf(input_col))

Expand Down