In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, StringType

spark = SparkSession.builder.getOrCreate()

# Apache Arrow

In [2]:
import numpy as np
import pandas
import pyarrow

df = pandas.DataFrame({'one': [-1, np.nan, 2.5],
                   'two': ['foo', 'bar', 'baz'],
                   'three': [True, False, True]},
                   index=list('abc'))


table = pyarrow.Table.from_pandas(df)

In [3]:
from pyarrow import parquet

parquet.write_table(table, 'example.parquet')

# Обычные UDF

In [4]:
df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.show()

+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|John Doe| 21|
+---+--------+---+



In [5]:
string_length = udf(lambda s: len(s))

@udf
def string_length2(s):
    return len(s)

df.select(string_length(col('name')).alias('string_length'), string_length2(col('name')).alias('string_length2')).show()

+-------------+--------------+
|string_length|string_length2|
+-------------+--------------+
|            8|             8|
+-------------+--------------+



# Pandas UDF

In [6]:
@pandas_udf(StringType(), PandasUDFType.SCALAR)
def to_upper(s):
    return s.str.upper()

@pandas_udf("integer", PandasUDFType.SCALAR)
def add_one(x):
    return x + 1



In [7]:
df.select(to_upper("name"), add_one("age")) \
    .show()

+--------------+------------+
|to_upper(name)|add_one(age)|
+--------------+------------+
|      JOHN DOE|          22|
+--------------+------------+



In [8]:
df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "value"))

In [9]:
@pandas_udf("id long, value double", PandasUDFType.GROUPED_MAP)
def normalize(pdf):
    v = pdf.value
    return pdf.assign(value=(v - v.mean()) / v.std())

In [10]:
df.groupby("id").apply(normalize).show()



+---+-------------------+
| id|              value|
+---+-------------------+
|  1|-0.7071067811865475|
|  1| 0.7071067811865475|
|  2|-0.8320502943378437|
|  2|-0.2773500981126146|
|  2| 1.1094003924504583|
+---+-------------------+



In [11]:
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def mean_udf(v):
    return v.mean()

In [12]:
from pyspark.sql import Window

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['value']).over(w)).show()

+---+-----+------+
| id|value|mean_v|
+---+-----+------+
|  1|  1.0|   1.5|
|  1|  2.0|   1.5|
|  2|  3.0|   6.0|
|  2|  5.0|   6.0|
|  2| 10.0|   6.0|
+---+-----+------+



# Spark 3 UDF (Type Hints)

In [13]:
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [14]:
@pandas_udf('long', PandasUDFType.SCALAR)
def pandas_plus_one(v):
    return v + 1

spark.range(10).select(pandas_plus_one("id")).show()

+-------------------+
|pandas_plus_one(id)|
+-------------------+
|                  1|
|                  2|
|                  3|
|                  4|
|                  5|
|                  6|
|                  7|
|                  8|
|                  9|
|                 10|
+-------------------+



In [15]:
@pandas_udf('long')
def pandas_plus_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.range(10).select(pandas_plus_one("id")).show()

+-------------------+
|pandas_plus_one(id)|
+-------------------+
|                  1|
|                  2|
|                  3|
|                  4|
|                  5|
|                  6|
|                  7|
|                  8|
|                  9|
|                 10|
+-------------------+



In [16]:
@pandas_udf('long', PandasUDFType.SCALAR_ITER) # New in 3.0
def multiply_two(iterator):
    return (a * b for a, b in iterator)

spark.range(10).select(multiply_two("id", "id")).show()

+--------------------+
|multiply_two(id, id)|
+--------------------+
|                   0|
|                   1|
|                   4|
|                   9|
|                  16|
|                  25|
|                  36|
|                  49|
|                  64|
|                  81|
+--------------------+



In [17]:
from typing import Iterator, Tuple       

@pandas_udf("long")
def multiply_two(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    return (a * b for a, b in iterator)

spark.range(10).select(multiply_two("id", "id")).show()

+--------------------+
|multiply_two(id, id)|
+--------------------+
|                   0|
|                   1|
|                   4|
|                   9|
|                  16|
|                  25|
|                  36|
|                  49|
|                  64|
|                  81|
+--------------------+



In [18]:
df1 = spark.createDataFrame(
    [(1201, 1, 1.0), (1201, 2, 2.0), (1202, 1, 3.0), (1202, 2, 4.0)],
    ("time", "id", "v1"))
df2 = spark.createDataFrame(
    [(1201, 1, "x"), (1201, 2, "y")], ("time", "id", "v2"))

def asof_join(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
    return pd.merge_asof(left, right, on="time", by="id")

df1.groupby("id").cogroup(
    df2.groupby("id")
).applyInPandas(asof_join, "time int, id int, v1 double, v2 string").show()

+----+---+---+---+
|time| id| v1| v2|
+----+---+---+---+
|1201|  1|1.0|  x|
|1202|  1|3.0|  x|
|1201|  2|2.0|  y|
|1202|  2|4.0|  y|
+----+---+---+---+

