In [28]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import numpy as np
import pandas as pd

In [2]:
# enable arrow-based columnar data transfer
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true')

In [30]:
spark.sparkContext.setLogLevel("ERROR")

## Converting to/from Pandas

In [4]:
# generate a pandas dataframe
pdf = pd.DataFrame(np.random.rand(100, 3))

# create a spark dataframe from a pandas dataframe using arrow
df = spark.createDataFrame(pdf)

# convert the spark dataframe back to a pandas dataframe using arrow
result_pdf = df.select("*").toPandas()

print("Pandas Dataframe Result Statistics: \n%s\n" %str(result_pdf.describe()))

  if is_categorical_dtype(series.dtype):
[Stage 0:>                                                        (0 + 10) / 10]

Pandas Dataframe Result Statistics: 
                0           1           2
count  100.000000  100.000000  100.000000
mean     0.552710    0.494376    0.546048
std      0.277863    0.311639    0.303372
min      0.024243    0.002103    0.004011
25%      0.287379    0.227527    0.287459
50%      0.621190    0.502328    0.593260
75%      0.745210    0.793043    0.808989
max      0.996406    0.989879    0.999916



                                                                                

## Pandas UDFs (Vectorized UDFs)

Pandas UDFs are user defined functions that are executed by spark using arrow to transfer data and pandas to work with the data, which allows vectorized operations.

Internally, pyspark will execute a pandas UDF by splitting columns into batches and calling the function for each batch as a subset of the data, then concatenating the results together.

In [5]:
from pyspark.sql.functions import pandas_udf

In [6]:
@pandas_udf("col1 string, col2 string")
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
    s3['col2'] = s1 + s2.str.len()
    return s3

In [7]:
df = spark.createDataFrame(
    [[1, 'a string', ('a nested string', )]],
    "long_col long, string_col string, struct_col struct<col1:string>"
)

In [8]:
df.printSchema()

root
 |-- long_col: long (nullable = true)
 |-- string_col: string (nullable = true)
 |-- struct_col: struct (nullable = true)
 |    |-- col1: string (nullable = true)



In [9]:
df.select(func('long_col', 'string_col', 'struct_col')).printSchema()

root
 |-- func(long_col, string_col, struct_col): struct (nullable = true)
 |    |-- col1: string (nullable = true)
 |    |-- col2: string (nullable = true)



### Series to Series

In [10]:
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

In [11]:
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return  a * b

In [12]:
multiply = pandas_udf(multiply_func, returnType = LongType())

In [14]:
# a function for pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))

0    1
1    4
2    9
dtype: int64


In [15]:
df = spark.createDataFrame(pd.DataFrame(x, columns = ['x']))

  if is_categorical_dtype(series.dtype):


In [16]:
df.select(multiply(col("x"), col('x'))).show()

                                                                                

+-------------------+
|multiply_func(x, x)|
+-------------------+
|                  1|
|                  4|
|                  9|
+-------------------+



### Iterator of Series to Iterator of Series

In [18]:
from typing import Iterator

In [19]:
pdf = pd.DataFrame([1, 2, 3], columns = ['x'])
df = spark.createDataFrame(pdf)

In [23]:
@pandas_udf('long')
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in iterator:
        yield x+1

In [31]:
df.select(plus_one(col('x'))).show()

+-----------+
|plus_one(x)|
+-----------+
|          2|
|          3|
|          4|
+-----------+





### Iterator of Multiple Series to Iterator of Series

In [32]:
from typing import Iterator, Tuple

In [33]:
pdf = pd.DataFrame([1, 2, 3], columns = ['x'])
df = spark.createDataFrame(pdf)

In [47]:
@pandas_udf("long")
def multiply_two_cols(iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

In [48]:
df.select(multiply_two_cols("x", "x")).show()

+-----------------------+
|multiply_two_cols(x, x)|
+-----------------------+
|                      1|
|                      4|
|                      9|
+-----------------------+





### Series to Scalar

In [57]:
from pyspark.sql import Window

In [51]:
df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v")
)

In [53]:
@pandas_udf("double")
def mean_udf(v : pd.Series) -> float:
    return v.mean()

In [54]:
df.select(mean_udf(col('v'))).show()

[Stage 7:>                                                        (0 + 10) / 10]

+-----------+
|mean_udf(v)|
+-----------+
|        4.2|
+-----------+



                                                                                

In [55]:
df.groupby("id").agg(mean_udf(df['v'])).show()

+---+-----------+
| id|mean_udf(v)|
+---+-----------+
|  1|        1.5|
|  2|        6.0|
+---+-----------+





In [59]:
w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

In [61]:
df.withColumn("mean_v", mean_udf(df['v']).over(w)).show()

+---+----+------+
| id|   v|mean_v|
+---+----+------+
|  1| 1.0|   1.5|
|  1| 2.0|   1.5|
|  2| 3.0|   6.0|
|  2| 5.0|   6.0|
|  2|10.0|   6.0|
+---+----+------+





## Pandas Function APIs

Pandas Function APIs can directly apply a Python native function against the whole DataFrame by using Pandas instances. Internally it works similarly with Pandas UDFs by using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. 

### Grouped Map

Grouped map operations with Pandas instances are supported by DataFrame.groupby().applyInPandas() which requires a Python function that takes a pandas.DataFrame and return another pandas.DataFrame. It maps each group to each pandas.DataFrame in the Python function.

This API implements the “split-apply-combine” pattern which consists of three steps:

- Split the data into groups by using DataFrame.groupBy().
- Apply a function on each group. The input and output of the function are both pandas.DataFrame. The input data contains all the rows and columns for each group.
- Combine the results into a new PySpark DataFrame.

To use DataFrame.groupBy().applyInPandas(), the user needs to define the following:

- A Python function that defines the computation for each group.
- A StructType object or a string that defines the schema of the output PySpark DataFrame.

In [65]:
df = spark.createDataFrame(
[(1,1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", 'v'))

In [66]:
def subtract_mean(pdf: pd.DataFrame) -> pd.DataFrame:
    v = pdf.v
    return pdf.assign(v = v - v.mean())

In [68]:
df.groupby('id').applyInPandas(subtract_mean, schema = "id long, v double").show()

+---+----+
| id|   v|
+---+----+
|  1|-0.5|
|  1| 0.5|
|  2|-3.0|
|  2|-1.0|
|  2| 4.0|
+---+----+





### Map

Map operations with Pandas instances are supported by DataFrame.mapInPandas() which maps an iterator of pandas.DataFrames to another iterator of pandas.DataFrames that represents the current PySpark DataFrame and returns the result as a PySpark DataFrame. The function takes and outputs an iterator of pandas.DataFrame. It can return the output of arbitrary length in contrast to some Pandas UDFs although internally it works similarly with Series to Series Pandas UDF.

In [71]:
from collections.abc import Iterable

In [72]:
df = spark.createDataFrame([(1, 21), (2, 30)], ('id', 'age'))

In [73]:
def filter_func(iterator: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
    for pdf in iterator:
        yield pdf[pdf.id == 1]

In [74]:
df.mapInPandas(filter_func, schema = df.schema).show()



+---+---+
| id|age|
+---+---+
|  1| 21|
+---+---+



### Co-grouped Map

Co-grouped map operations with Pandas instances are supported by DataFrame.groupby().cogroup().applyInPandas() which allows two PySpark DataFrames to be cogrouped by a common key and then a Python function applied to each cogroup. It consists of the following steps:

- Shuffle the data such that the groups of each dataframe which share a key are cogrouped together.
- Apply a function to each cogroup. The input of the function is two pandas.DataFrame (with an optional tuple representing the key). The output of the function is a pandas.DataFrame.
- Combine the pandas.DataFrames from all groups into a new PySpark DataFrame.

To use groupBy().cogroup().applyInPandas(), the user needs to define the following:

- A Python function that defines the computation for each cogroup.
- A StructType object or a string that defines the schema of the output PySpark DataFrame.

In [75]:
df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

In [76]:
def merge_ordered(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
    return pd.merge_ordered(left, right)

In [77]:
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    merge_ordered, schema = "time int, id int, v1 double, v2 string").show()

+--------+---+---+----+
|    time| id| v1|  v2|
+--------+---+---+----+
|20000101|  1|1.0|   x|
|20000102|  1|3.0|NULL|
|20000101|  2|2.0|   y|
|20000102|  2|4.0|NULL|
+--------+---+---+----+



