In [2]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("app") \
    .getOrCreate()
spark

# pandas-udf

> Pandas UDF (User-Defined Function) 是 PySpark 中的一个功能,它允许您在 Spark 中使用 Pandas 函数来进行数据处理和转换。这种功能可以提高代码的可读性和开发效率,因为 Pandas 提供了丰富的数据处理函数和方法。

**Pandas UDF 有以下几种使用方式:**

* 标量 Pandas UDF:
    - 接受单个输入列,并返回单个输出列。
    - 可以使用 `@pandas_udf(..., PandasUDFType.SCALAR)` 装饰器定义。
* Grouped Map Pandas UDF:
    - 接受一个 `Pandas DataFrame` 作为输入,并返回一个 `Pandas DataFrame`。
    - 可以使用 `@pandas_udf(..., PandasUDFType.GROUPED_MAP)` 装饰器定义。
* Grouped Aggregate Pandas UDF:
    - 接受一个 Pandas DataFrame 作为输入,并返回一个 Pandas Series。
    - 可以使用 `@pandas_udf(..., PandasUDFType.GROUPED_AGG)` 装饰器定义。

## 标量 Pandas UDF

下面是一个简单的示例,展示如何使用标量 Pandas UDF 计算列的平方:

In [3]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf(returnType="double", functionType=PandasUDFType.SCALAR)
def square(x):
    return x ** 2

In [4]:
df = spark.createDataFrame([1, 2, 3, 4, 5], "int")
df.show()

+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
+-----+



In [5]:
df = df.withColumn("squared", square(df[0]))
df.show()

+-----+-------+
|value|squared|
+-----+-------+
|    1|    1.0|
|    2|    4.0|
|    3|    9.0|
|    4|   16.0|
|    5|   25.0|
+-----+-------+



这个示例演示了如何定义一个名为 square 的标量 Pandas UDF,它接受一个输入列并返回该列的平方值。然后,我们将这个 UDF 应用到一个 Spark DataFrame 上,创建了一个新的 squared 列。

## Grouped Map Pandas UDF

> Grouped Map Pandas UDF 是 Pandas UDF 中的一种重要类型,它允许您在 Spark 中对分组数据应用 Pandas 函数。这种 UDF **接受一个 Pandas DataFrame 作为输入,并返回一个 Pandas DataFrame。**

**以下是 Grouped Map Pandas UDF 的一些主要特点:**

- **分组处理**: Grouped Map Pandas UDF 能够对 Spark DataFrame 中的分组数据应用自定义的 Pandas 函数。这种方式可以利用 Pandas 丰富的数据处理能力,同时保持 Spark 的分布式计算优势。
- **灵活性**: 与标量 Pandas UDF 只能处理单个列不同,Grouped Map Pandas UDF 可以处理整个 Pandas DataFrame,从而提供更大的灵活性和表达能力。
- **返回 Pandas DataFrame**: Grouped Map Pandas UDF 的输出是一个 Pandas DataFrame,这使得后续的数据处理和转换更加方便。

下面是一个示例,演示如何使用 Grouped Map Pandas UDF 计算每个分组的平均值和标准差:

In [12]:
from pyspark.sql.types import StructField, StructType
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd

In [13]:
schema = StructType([
    StructField("mean", DoubleType(), True),
    StructField("std", DoubleType(), True), 
])

@pandas_udf(returnType=schema, functionType=PandasUDFType.GROUPED_MAP)
def calc_stats(pdf: pd.DataFrame):
    mean = pdf.mean()
    std = pdf.std()
    return pd.DataFrame({'mean': mean, 'std': std})

In [14]:
df = spark.createDataFrame([
    (1, 10), (1, 20), (1, 30),
    (2, 5), (2, 15), (2, 25)
], ["group", "value"])
df.show()

+-----+-----+
|group|value|
+-----+-----+
|    1|   10|
|    1|   20|
|    1|   30|
|    2|    5|
|    2|   15|
|    2|   25|
+-----+-----+



In [15]:
result = df.groupBy("group").apply(calc_stats)
result.show()



+----+----+
|mean| std|
+----+----+
| 1.0| 0.0|
|20.0|10.0|
| 2.0| 0.0|
|15.0|10.0|
+----+----+



在这个示例中:

* 我们定义了一个名为 calc_stats 的 Grouped Map Pandas UDF,它接受一个 Pandas DataFrame 作为输入,并返回一个包含 mean 和 std 列的 Pandas DataFrame。
* 我们创建了一个包含 group 和 value 列的 Spark DataFrame。
* 使用 `groupBy("group").apply(calc_stats)` 对数据进行分组并应用 calc_stats UDF。
* 最终结果是一个新的 Spark DataFrame,其中包含每个分组的平均值和标准差。

Spark 正在逐步弃用旧版本的 Pandas UDF API,并推荐使用新的 applyInPandas API。applyInPandas 提供了与旧版 Pandas UDF 类似的功能,但更加灵活和易用。

下面是一个示例:

In [21]:
from pyspark.sql.functions import struct
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType

In [23]:
def calc_stats(pdf):
    mean = pdf["value"].mean()
    std = pdf["value"].std()
    return pdf.assign(mean=mean, std=std)

In [18]:
df = spark.createDataFrame([
    (1, 10), (1, 20), (1, 30),
    (2, 5), (2, 15), (2, 25)
], ["group", "value"])

df.show()

+-----+-----+
|group|value|
+-----+-----+
|    1|   10|
|    1|   20|
|    1|   30|
|    2|    5|
|    2|   15|
|    2|   25|
+-----+-----+



In [24]:
result = df.groupBy("group").applyInPandas(
    calc_stats, schema=StructType([
    StructField("group", IntegerType()),
    StructField("value", IntegerType()),
    StructField("mean", DoubleType()),
    StructField("std", DoubleType())
]))
result.show()

+-----+-----+----+----+
|group|value|mean| std|
+-----+-----+----+----+
|    1|   10|20.0|10.0|
|    1|   20|20.0|10.0|
|    1|   30|20.0|10.0|
|    2|    5|15.0|10.0|
|    2|   15|15.0|10.0|
|    2|   25|15.0|10.0|
+-----+-----+----+----+



* 在这个更新的版本中:

* 我们定义了一个名为 calc_stats 的函数,它接受一个 Pandas DataFrame 作为输入,并返回一个新的 Pandas DataFrame,其中包含原始 DataFrame 以及计算的平均值和标准差。
* 我们使用 applyInPandas 方法将 calc_stats 函数应用到分组的 Spark DataFrame 上。
* 我们还定义了输出 Spark DataFrame 的架构(schema),包括原始的 group 和 value 列以及新的 mean 和 std 列。
* 通过使用 applyInPandas 方法,您可以避免旧版 Pandas UDF API 的警告,并且未来升级 Spark 时,您的代码也会更加稳定。

## Grouped Aggregate Pandas UDF

`Grouped Aggregate Pandas UDF` 是 PySpark 中一种特殊类型的用户自定义函数(UDF),它允许在 PySpark DataFrame 的分组上应用 Pandas 函数,并将结果聚合为一个新的 PySpark DataFrame。

与普通的 applyInPandas 不同,Grouped Aggregate Pandas UDF 更加专注于聚合计算,并提供了更高效的实现。

Grouped Aggregate Pandas UDF 的基本用法如下:

In [36]:
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf(DoubleType(), PandasUDFType.GROUPED_AGG)
def mean_udf(pdf):
    return pdf.mean()



In [32]:
df = spark.createDataFrame([
    (1, 10), (1, 20), (1, 30),
    (2, 5), (2, 15), (2, 25)
], ["group", "value"])
df.show()

+-----+-----+
|group|value|
+-----+-----+
|    1|   10|
|    1|   20|
|    1|   30|
|    2|    5|
|    2|   15|
|    2|   25|
+-----+-----+



In [39]:
df.groupBy("group").agg(mean_udf(F.col("value"))).show()

+-----+---------------+
|group|mean_udf(value)|
+-----+---------------+
|    1|           20.0|
|    2|           15.0|
+-----+---------------+



其中:

- `@pandas_udf("double", PandasUDFType.GROUPED_AGG)` 是一个装饰器,用于定义 `Grouped Aggregate Pandas UDF`
- `mean_udf` 是自定义的 `Pandas` 函数,它接收一个 `Pandas DataFrame pdf` 并返回平均值
- `original_df.groupBy("some_column").agg(mean_udf("value_column"))` 是将 mean_udf 应用到 PySpark DataFrame 的分组上

Grouped Aggregate Pandas UDF 的主要特点包括:

- 高效的实现: Grouped Aggregate Pandas UDF 比普通的 applyInPandas 更加高效,因为它利用了 PySpark 的分组聚合机制,减少了数据传输和计算开销。
- 支持多种聚合函数: 你可以定义各种聚合函数,如 sum、count、std 等,并应用到 PySpark DataFrame 的分组上。
- 与其他 PySpark 函数兼容: Grouped Aggregate Pandas UDF 可以与 PySpark 的其他聚合函数(agg、groupBy等)一起使用,增强了灵活性。
- 保留 PySpark 的优势: 使用 Grouped Aggregate Pandas UDF 之后,结果仍然是 PySpark DataFrame,你可以继续使用 PySpark 的其他功能。
    
下面是另一个具体的例子:

In [48]:
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))

@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def pandas_mean(v):
    return v.sum()

df.select(pandas_mean(df['v'])).show()
df.groupby("id").agg(pandas_mean(df['v'])).show()
df.select(pandas_mean(df['v']).over(Window.partitionBy('id'))).show()

+--------------+
|pandas_mean(v)|
+--------------+
|          21.0|
+--------------+

+---+--------------+
| id|pandas_mean(v)|
+---+--------------+
|  1|           3.0|
|  2|          18.0|
+---+--------------+

+----------------------------------------------------------------------------------------------+
|pandas_mean(v) OVER (PARTITION BY id ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)|
+----------------------------------------------------------------------------------------------+
|                                                                                           3.0|
|                                                                                           3.0|
|                                                                                          18.0|
|                                                                                          18.0|
|                                                                                          18.0|
+----------

- [New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0](https://www.databricks.com/blog/2020/05/20/new-pandas-udfs-and-python-type-hints-in-the-upcoming-release-of-apache-spark-3-0.html)
- [sql pyspark pandas with arrow](https://spark.apache.org/docs/2.4.0/sql-pyspark-pandas-with-arrow.html)
- [sql pyspark pandas with arrow](https://spark.apache.org/docs/2.4.0/sql-pyspark-pandas-with-arrow.html)


- [sql pyspark pandas with arrow](https://spark.apache.org/docs/2.4.0/sql-pyspark-pandas-with-arrow.html)
- [dineshdharme](https://gist.github.com/dineshdharme/d0247100dd0b29034e5bc46bc883504d)

创建一个pandas用户定义的函数（也称为向量化的用户定义函数）。

Pandas UDF是由Spark使用Arrow执行的用户定义函数， data和Pandas来处理数据，这允许矢量化操作。一个Pandas UDF 使用pandas_udf作为装饰器或包装函数来定义，而没有 需要额外的配置。Pandas UDF的行为与常规PySpark函数相同 API一般。

## 为了使用此API，通常导入以下内容：

In [3]:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import *

> 从Spark 3.0和Python 3.6+，Python类型提示 检测功能类型如下：

In [4]:
@pandas_udf(IntegerType())
def slen(s: pd.Series) -> pd.Series:
    return s.str.len()

> 在Spark 3.0之前，pandas UDF使用functionType来决定执行类型，如下所示：

In [5]:
from pyspark.sql.functions import PandasUDFType
from pyspark.sql.types import IntegerType
@pandas_udf(IntegerType(), PandasUDFType.SCALAR)
def slen(s):
    return s.str.len()



最好为pandas UDF指定类型提示，而不是指定pandas UDF 类型通过functionType，这将在未来版本中弃用。

请注意，类型提示在所有情况下都应该使用pandas.Series，但有一个变体 当输入类型提示为 或输出列为pyspark.sql.types.StructType。以下示例显示 一个Pandas UDF，它接受长列、字符串列和结构列，并输出一个结构 柱它要求函数指定pandas.Series和 pandas.DataFrame如下：

In [11]:
@pandas_udf("col1 string, col2 long")
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
    s3['col2'] = s1 + s2.str.len()
    return s3

In [12]:
df = spark.createDataFrame(
    [[1, "a string", ("a nested string",)]],
    "long_col long, string_col string, struct_col struct<col1:string>")
df.printSchema()

root
 |-- long_col: long (nullable = true)
 |-- string_col: string (nullable = true)
 |-- struct_col: struct (nullable = true)
 |    |-- col1: string (nullable = true)



In [13]:
df.select(func("long_col", "string_col", "struct_col")).printSchema()

root
 |-- func(long_col, string_col, struct_col): struct (nullable = true)
 |    |-- col1: string (nullable = true)
 |    |-- col2: long (nullable = true)



In [14]:
@pandas_udf("string")
def to_upper(s: pd.Series) -> pd.Series:
    return s.str.upper()

df = spark.createDataFrame([("John Doe",)], ("name",))
df.select(to_upper("name")).show()

+--------------+
|to_upper(name)|
+--------------+
|      JOHN DOE|
+--------------+



In [15]:
@pandas_udf("first string, last string")
def split_expand(s: pd.Series) -> pd.DataFrame:
    return s.str.split(expand=True)

df = spark.createDataFrame([("John Doe",)], ("name",))
df.select(split_expand("name")).show()

+------------------+
|split_expand(name)|
+------------------+
|       {John, Doe}|
+------------------+



In [16]:
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
df.groupby("id").agg(mean_udf(df['v'])).show()

+---+-----------+
| id|mean_udf(v)|
+---+-----------+
|  1|        1.5|
|  2|        6.0|
+---+-----------+



In [39]:
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import functions as F
import numpy as np

In [38]:
def calculate_cost(vectors):
    vectors = np.array(vectors)
    center = np.mean(vectors, axis=0)
    distances = np.linalg.norm(vectors - center,axis=1) ** 2
    return float(np.sum(distances) / len(vectors))

In [43]:
calculate_cost(np.random.random((10, 1024)))

76.69126354384197

In [44]:
@F.pandas_udf(returnType=FloatType(), functionType=PandasUDFType.GROUPED_AGG)
def calculate_cost(vectors):
    import numpy as np
    vectors = np.array(vectors.tolist())
    center = np.mean(vectors, axis=0)
    distances = np.linalg.norm(vectors - center,axis=1) ** 2
    return float(np.sum(distances) / len(vectors))



In [45]:
df_clusters_=spark.createDataFrame([
    (1, [1.0,2.0,3.0]),
    (1, [4.0,5.0,6.0]),
    (1, [7.0,8.0,9.0]),
    (2, [1.0,2.0,3.0]),
    (2, [4.0,5.0,6.0]),
    (2, [7.0,8.0,19.0]),
], ["k","vector"])

In [47]:
data = np.random.random((10, 1024))

In [52]:
def cosine_similarity(
        vector1: np.array,
        vector2: np.array,
) -> np.array:
    """

    :param vector1:
    :param vector2:
    :return:
    """
    dot_product = np.dot(vector1, vector2)
    norm_vector1 = np.linalg.norm(vector1, axis=1, keepdims=True)
    norm_vector2 = np.linalg.norm(vector2, axis=0, keepdims=True)
    similarity = dot_product / (norm_vector1 * norm_vector2)
    return similarity

In [60]:
center = np.mean(data, axis=0, keepdims=True)

In [66]:
vectors = data

In [72]:
((vectors[:, np.newaxis, :] - vectors[np.newaxis, :, :]) ** 2).shape

(10, 10, 1024)

In [74]:
np.sum((vectors[:, np.newaxis, :] - vectors[np.newaxis, :, :]) ** 2, axis=-1).shape

(10, 10)

In [70]:
vectors[np.newaxis, :, :].shape

(1, 10, 1024)

In [77]:
distances

array([[ 0.        , 13.27363914, 13.28169283, 13.08598339, 12.63868528,
        12.87749279, 13.11084234, 12.9612079 , 13.22347233, 12.94993754],
       [13.27363914,  0.        , 13.17356742, 13.05944711, 13.51668028,
        13.58946275, 13.44044147, 13.52471714, 13.39231052, 13.06377062],
       [13.28169283, 13.17356742,  0.        , 12.9471896 , 12.69430728,
        13.19872245, 13.37827878, 13.08627054, 13.35102986, 12.80680542],
       [13.08598339, 13.05944711, 12.9471896 ,  0.        , 12.9950665 ,
        13.19778614, 12.57414198, 12.98743466, 12.93069745, 12.9850471 ],
       [12.63868528, 13.51668028, 12.69430728, 12.9950665 ,  0.        ,
        12.96493448, 13.35321711, 13.02346397, 13.24807683, 12.90623346],
       [12.87749279, 13.58946275, 13.19872245, 13.19778614, 12.96493448,
         0.        , 13.34908547, 13.35354952, 13.48634552, 12.65401174],
       [13.11084234, 13.44044147, 13.37827878, 12.57414198, 13.35321711,
        13.34908547,  0.        , 13.04400699

In [84]:
np.max(distances)

13.589462751942932

In [98]:
distances[1][5]

13.589462751942932

In [94]:
distances[2]

array([13.28169283, 13.17356742,  0.        , 12.9471896 , 12.69430728,
       13.19872245, 13.37827878, 13.08627054, 13.35102986, 12.80680542])

In [85]:
np.argmax?

(1, 5)

In [93]:
np.argmax(distances, axis=-1)

array([2, 5, 6, 5, 1, 1, 1, 1, 5, 1], dtype=int64)

In [88]:
np.argmax(distances, axis=0)

array([2, 5, 6, 5, 1, 1, 1, 1, 5, 1], dtype=int64)

In [61]:
data.shape, center.shape

((10, 1024), (1, 1024))

In [65]:
cosine_similarity(data, center.T).flatten()

array([0.88242862, 0.87616786, 0.88299337, 0.88012671, 0.88252907,
       0.87912349, 0.87579674, 0.88558641, 0.87809565, 0.88416553])

In [64]:
np.argmax(cosine_similarity(data, center.T).flatten())

7

In [31]:
df_clusters_.show()

+---+----------------+
|  k|          vector|
+---+----------------+
|  1| [1.0, 2.0, 3.0]|
|  1| [4.0, 5.0, 6.0]|
|  1| [7.0, 8.0, 9.0]|
|  2| [1.0, 2.0, 3.0]|
|  2| [4.0, 5.0, 6.0]|
|  2|[7.0, 8.0, 19.0]|
+---+----------------+



In [46]:
df_clusters_=df_clusters_.groupBy("k").agg(calculate_cost(F.col("vector")))
df_clusters_.show()

+---+----------------------+
|  k|calculate_cost(vector)|
+---+----------------------+
|  1|                  18.0|
|  2|              60.22222|
+---+----------------------+



-----