# PySpark Usage Guide for Pandas with Apache Arrow

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [3]:
import numpy as np
import pandas as pd

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()



----
如上所示，如果安装了PyArrow，就会使用pyarrow来转化

In [4]:
type(pdf), type(df), type(result_pdf)

(pandas.core.frame.DataFrame,
 pyspark.sql.dataframe.DataFrame,
 pandas.core.frame.DataFrame)

## pandas UDFs

### scalar

用于将标量的运算向量化，输入是series，输出是相同长度的series

In [5]:
import pandas as pd

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# 普通的标量函数
def multiply_func(a, b):
    return a*b

multiply = pandas_udf(multiply_func, returnType=LongType())

x = pd.Series([1, 2, 3])
print(multiply_func(x, x))

# 新建一个spark DataFrame
df = spark.createDataFrame(pd.DataFrame(x, columns=['x']))

# 执行spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()

0    1
1    4
2    9
dtype: int64
+-------------------+
|multiply_func(x, x)|
+-------------------+
|                  1|
|                  4|
|                  9|
+-------------------+



In [6]:
df.withColumn('x^2', col("x")**2).show()

+---+---+
|  x|x^2|
+---+---+
|  1|1.0|
|  2|4.0|
|  3|9.0|
+---+---+



## Grouped Map

面向的操作是gruopBy().apply()

+ 根据group来split数据
+ 对于每个group apply一个函数，输入和输出都是pandas.dataframe
+ 将输出combine

定义函数的时候需要:
+ 定义一个对每一个group 进行运算的函数
+ 一个SturctType对象，或者一个字符串，来定义DataFrame的输出的schema信息

**注意数据倾斜**

In [8]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    v = pdf.v
    return pdf.assign(v=v-v.mean())

df.groupby("id").apply(subtract_mean).show()

+---+----+
| id|   v|
+---+----+
|  1|-0.5|
|  1| 0.5|
|  2|-3.0|
|  2|-1.0|
|  2| 4.0|
+---+----+



In [16]:
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def test(pdf):
    return pdf.apply(lambda x: x**2)

df.groupby("id").apply(test).show()


+---+-----+
| id|    v|
+---+-----+
|  1|  1.0|
|  1|  4.0|
|  4|  9.0|
|  4| 25.0|
|  4|100.0|
+---+-----+



## Grouped Aggregate

groupBy().agg 和pyspark.sql.Window

将一个或多个pandas.Series转化为scalar.

only unbounded window is supported with Grouped aggregate Pandas UDFs currently

In [12]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def mean_udf(v):
    return v.mean()

df.groupby("id").agg(mean_udf(df["v"])).show()

+---+-----------+
| id|mean_udf(v)|
+---+-----------+
|  1|        1.5|
|  2|        6.0|
+---+-----------+



In [13]:
w = Window\
    .partitionBy("id")\
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df.withColumn("mean_v", mean_udf(df["v"]).over(w)).show()

+---+----+------+
| id|   v|mean_v|
+---+----+------+
|  1| 1.0|   1.5|
|  1| 2.0|   1.5|
|  2| 3.0|   6.0|
|  2| 5.0|   6.0|
|  2|10.0|   6.0|
+---+----+------+



## 支持的SQL Types

Currently, all Spark SQL data types are supported by Arrow-based conversion except MapType, ArrayType of TimestampType, and nested StructType. BinaryType is supported only when installed PyArrow is equal to or higher then 0.10.0.

Note that a standard UDF (non-Pandas) will load timestamp data as Python datetime objects, which is different than a Pandas timestamp. It is recommended to use Pandas time series functionality when working with timestamps in pandas_udfs to get the best performance, see here for details.

In [23]:
from pyspark.sql.functions import current_timestamp
df = df.withColumn('timestamp', current_timestamp())
df.show(2,truncate=False)

+---+---+-----------------------+
|id |v  |timestamp              |
+---+---+-----------------------+
|1  |1.0|2019-07-16 20:29:27.008|
|1  |2.0|2019-07-16 20:29:27.008|
+---+---+-----------------------+
only showing top 2 rows



In [25]:
pdf = df.toPandas()
pdf.head()



Unnamed: 0,id,v,timestamp
0,1,1.0,2019-07-16 20:29:35.680
1,1,2.0,2019-07-16 20:29:35.680
2,2,3.0,2019-07-16 20:29:35.680
3,2,5.0,2019-07-16 20:29:35.680
4,2,10.0,2019-07-16 20:29:35.680


In [29]:
pdf.dtypes

id                    int64
v                   float64
timestamp    datetime64[ns]
dtype: object