Skip to content

Commit

Permalink
[SPARK-43516][ML][FOLLOW-UP] Drop vector type support in Distributed …
Browse files Browse the repository at this point in the history
…ML for spark connect

### What changes were proposed in this pull request?

Drop vector type support in Distributed ML for spark connect.

### Why are the changes needed?

Distributed ML is designed for supporting fitting / transforming over either spark dataframe or local pandas dataframe.
Currently pandas dataframe does not have vector type similar to `spark.ml.linalg.Vector`, and Vector type does not have too much advantages except saving sparse features dataset.

To make the interface consistent, we decided initial version does not support vector type.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

UT.

Closes #41420 from WeichenXu123/mlv2-drop-vector-type-support.

Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
  • Loading branch information
WeichenXu123 committed Jun 2, 2023
1 parent 085dfeb commit c3b6270
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 35 deletions.
6 changes: 5 additions & 1 deletion python/pyspark/mlv2/base.py
Expand Up @@ -134,7 +134,11 @@ def _output_columns(self) -> List[Tuple[str, str]]:
def _get_transform_fn(self) -> Callable[["pd.Series"], Any]:
"""
Return a transformation function that accepts an instance of `pd.Series` as input and
returns transformed result as an instance of `pd.Series` or `pd.DataFrame`
returns transformed result as an instance of `pd.Series` or `pd.DataFrame`.
If there's only one output column, the transformed result must be an
instance of `pd.Series`, if there are multiple output columns, the transformed result
must be an instance of `pd.DataFrame` with column names matching output schema
returned by `_output_columns` interface.
"""
raise NotImplementedError()

Expand Down
16 changes: 7 additions & 9 deletions python/pyspark/mlv2/tests/test_feature.py
Expand Up @@ -21,8 +21,6 @@
import numpy as np
import pandas as pd

from pyspark.ml.functions import vector_to_array
from pyspark.ml.linalg import Vectors
from pyspark.mlv2.feature import MaxAbsScaler, StandardScaler
from pyspark.sql import SparkSession

Expand All @@ -35,8 +33,8 @@ class FeatureTestsMixin:
def test_max_abs_scaler(self):
df1 = self.spark.createDataFrame(
[
(Vectors.dense([2.0, 3.5, 1.5]),),
(Vectors.dense([-3.0, -0.5, -2.5]),),
([2.0, 3.5, 1.5],),
([-3.0, -0.5, -2.5],),
],
schema=["features"],
)
Expand All @@ -49,7 +47,7 @@ def test_max_abs_scaler(self):

np.testing.assert_allclose(list(result.scaled_features), expected_result)

local_df1 = df1.withColumn("features", vector_to_array("features")).toPandas()
local_df1 = df1.toPandas()
local_fit_model = scaler.fit(local_df1)
local_transform_result = local_fit_model.transform(local_df1)

Expand All @@ -62,9 +60,9 @@ def test_max_abs_scaler(self):
def test_standard_scaler(self):
df1 = self.spark.createDataFrame(
[
(Vectors.dense([2.0, 3.5, 1.5]),),
(Vectors.dense([-3.0, -0.5, -2.5]),),
(Vectors.dense([1.0, -1.5, 0.5]),),
([2.0, 3.5, 1.5],),
([-3.0, -0.5, -2.5],),
([1.0, -1.5, 0.5],),
],
schema=["features"],
)
Expand All @@ -81,7 +79,7 @@ def test_standard_scaler(self):

np.testing.assert_allclose(list(result.scaled_features), expected_result)

local_df1 = df1.withColumn("features", vector_to_array("features")).toPandas()
local_df1 = df1.toPandas()
local_fit_model = scaler.fit(local_df1)
local_transform_result = local_fit_model.transform(local_df1)

Expand Down
10 changes: 4 additions & 6 deletions python/pyspark/mlv2/tests/test_summarizer.py
Expand Up @@ -21,8 +21,6 @@
import numpy as np
import pandas as pd

from pyspark.ml.linalg import Vectors
from pyspark.ml.functions import vector_to_array
from pyspark.mlv2.summarizer import summarize_dataframe
from pyspark.sql import SparkSession

Expand All @@ -35,14 +33,14 @@ class SummarizerTestsMixin:
def test_summarize_dataframe(self):
df1 = self.spark.createDataFrame(
[
(Vectors.dense([2.0, -1.5]),),
(Vectors.dense([-3.0, 0.5]),),
(Vectors.dense([1.0, 3.5]),),
([2.0, -1.5],),
([-3.0, 0.5],),
([1.0, 3.5],),
],
schema=["features"],
)

df1_local = df1.withColumn("features", vector_to_array("features")).toPandas()
df1_local = df1.toPandas()

result = summarize_dataframe(df1, "features", ["min", "max", "sum", "mean", "std"])
result_local = summarize_dataframe(
Expand Down
19 changes: 0 additions & 19 deletions python/pyspark/mlv2/util.py
Expand Up @@ -66,17 +66,6 @@ def aggregate_dataframe(
agg_state = local_agg_fn(dataframe)
return agg_state_to_result(agg_state)

col_types = dict(dataframe.dtypes)

for col_name in input_col_names:
col_type = col_types[col_name]
if col_type == "vector":
from pyspark.ml.functions import vector_to_array

# pandas UDF does not support vector type for now,
# we convert it into vector type
dataframe = dataframe.withColumn(col_name, vector_to_array(col(col_name)))

dataframe = dataframe.select(*input_col_names)

def compute_state(iterator: Iterable["pd.DataFrame"]) -> Iterable["pd.DataFrame"]:
Expand Down Expand Up @@ -172,14 +161,6 @@ def transform_fn_pandas_udf(s: "pd.Series") -> "pd.Series":
return transform_fn(s)

input_col = col(input_col_name)
input_col_type = dict(dataframe.dtypes)[input_col_name]

if input_col_type == "vector":
from pyspark.ml.functions import vector_to_array

# pandas UDF does not support vector type for now,
# we convert it into vector type
input_col = vector_to_array(input_col)

result_spark_df = dataframe.withColumn(output_col_name, transform_fn_pandas_udf(input_col))

Expand Down

0 comments on commit c3b6270

Please sign in to comment.