<a href="https://colab.research.google.com/github/dspauloplima/PySpark/blob/main/Apache_Arrow_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

# Apach Arrow

É um formato de dados colunar em memória que usa o spark para transferir dados entre a JVM e os processos Python. Interessante para quem trabalha com Pandas/Numpy.

Requer algumas mudanças de confuguração ou código para ter todas as vantagens e assegurar compatibilidade.


Pandas == 0.23.2 or higher<br>
PyArrow == 1.0.0 or higher

Arrow está disponível como uma otimização ao converter um Spark DataFrame para um Pandas DataFrame usando a call ``DataFrame.toPandas()`` ou o inverso ``SparkSession.createDataFrame()``.

Precisamos configurar o Spark Arrow para otimizar as calls.

In [11]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf

# SparkSession
spark = SparkSession.builder.getOrCreate()

In [8]:
# configurar o Arrow
spark.conf.set("spark.sql.execution.arrow.pyspark.enable", True)

In [9]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])

df.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



A transformação é a mesma mesmo quando o Arrow está desativado. Porém é otimizada quando ativado.

Nem todos os tipos de dados são suportados. Isso pode levar a um erro durante a conversão. Então o Spark irá retornar o processo e converter sem o Arrow.

In [12]:
# Pandas DF
pandas_df = pd.DataFrame(np.random.rand(100,3))

# Spark DF
df = spark.createDataFrame(pandas_df)

# converter Spark DF para Pandas DF usando Arrow
arrow_pdf = df.select("*").toPandas()

In [16]:
arrow_pdf

Unnamed: 0,0,1,2
0,0.708371,0.538061,0.079359
1,0.311344,0.981666,0.180534
2,0.134101,0.830222,0.736917
3,0.698400,0.580080,0.775096
4,0.674968,0.250627,0.403149
...,...,...,...
95,0.611099,0.823814,0.407274
96,0.631503,0.767443,0.700638
97,0.371547,0.252529,0.583755
98,0.141851,0.683160,0.443891


## Pandas UDF (Vectorized User-Defined-Function)

Funções definidas pelo usuário que são executados no spark utilizando o Arrow. Assim podemos trabalhar com pandas, o que permite realizar operações vetorizadas.

Usa o ``pandas_udf()`` para embrulhar a função.

In [17]:
@pandas_udf("col1 string, col2 long")
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
    s3['col2'] = s1 + s2.str.len()
    return s3

# Create a Spark DataFrame that has three columns including a struct column.
df = spark.createDataFrame(
    [[1, "a string", ("a nested string",)]],
    "long_col long, string_col string, struct_col struct<col1:string>")

df.printSchema()

root
 |-- long_col: long (nullable = true)
 |-- string_col: string (nullable = true)
 |-- struct_col: struct (nullable = true)
 |    |-- col1: string (nullable = true)



In [18]:
df.select(func("long_col", "string_col", "struct_col")).printSchema()

root
 |-- func(long_col, string_col, struct_col): struct (nullable = true)
 |    |-- col1: string (nullable = true)
 |    |-- col2: long (nullable = true)



In [19]:
df.show()

+--------+----------+-----------------+
|long_col|string_col|       struct_col|
+--------+----------+-----------------+
|       1|  a string|{a nested string}|
+--------+----------+-----------------+



In [27]:
from pyspark.sql.types import LongType

# Declara a função
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

# Cria UDF
multiply = pandas_udf(multiply_func, returnType=LongType())

# Pandas Series
x = pd.Series([1, 2, 3])

# Aplica a Função
print("Função no Pandas DF")
print(multiply_func(x, x))

# Cria Spark DF
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Aplica a função no Spark DF com o UDF
print("\nFunção no Spark DF")
df.select(multiply(df["x"], df["x"])).show()

Função no Pandas DF
0    1
1    4
2    9
dtype: int64

Função no Spark DF
+-------------------+
|multiply_func(x, x)|
+-------------------+
|                  1|
|                  4|
|                  9|
+-------------------+



### Iterador -> Iterador

In [30]:
from typing import Iterator

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# Função
@pandas_udf("long")
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in iterator:
        yield x + 1

# Aplica Função
df.select(plus_one("x")).show()

+-----------+
|plus_one(x)|
+-----------+
|          2|
|          3|
|          4|
+-----------+



### Iterator Multipla Série -> Iterator Múltipla Série

In [31]:
from typing import Iterator, Tuple

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# Função
@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

# Aplica função
df.select(multiply_two_cols("x", "x")).show()

+-----------------------+
|multiply_two_cols(x, x)|
+-----------------------+
|                      1|
|                      4|
|                      9|
+-----------------------+



### Series -> Scalar

In [34]:
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Função
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

# Spark DF
df.show()

# Aplica Função
df.select(mean_udf(df['v'])).show()

# Aplica função com agregação
df.groupby("id").agg(mean_udf(df['v'])).show()


w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()

+---+----+
| id|   v|
+---+----+
|  1| 1.0|
|  1| 2.0|
|  2| 3.0|
|  2| 5.0|
|  2|10.0|
+---+----+

+-----------+
|mean_udf(v)|
+-----------+
|        4.2|
+-----------+

+---+-----------+
| id|mean_udf(v)|
+---+-----------+
|  1|        1.5|
|  2|        6.0|
+---+-----------+

+---+----+------+
| id|   v|mean_v|
+---+----+------+
|  1| 1.0|   1.5|
|  1| 2.0|   1.5|
|  2| 3.0|   6.0|
|  2| 5.0|   6.0|
|  2|10.0|   6.0|
+---+----+------+



## Funções Pandas

### Grouped Map

``df.groupby().applyInPandas()``: implementa "split-apply-combine".

Requer uma função python e o tipo de dado para o schema.

In [36]:
df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf é um Pandas DF
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.show()

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()

+---+----+
| id|   v|
+---+----+
|  1| 1.0|
|  1| 2.0|
|  2| 3.0|
|  2| 5.0|
|  2|10.0|
+---+----+

+---+----+
| id|   v|
+---+----+
|  1|-0.5|
|  1| 0.5|
|  2|-3.0|
|  2|-1.0|
|  2| 4.0|
+---+----+



### Map

``df.mapInPandas()``: mapeia um iterador do pandas df para outro iterador do pandas df e retorna um spark df.

In [37]:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()

+---+---+
| id|age|
+---+---+
|  1| 21|
+---+---+



### Co-Grouped Map

``df.groupby().cogroup().applyInPandas()``: permite o agrupamento entre dois sparks df através de uma chave comum. A função Python é aplicada a cada grupo.

Requer uma função que define a computação pra cada grupo e os tipos de dados do schema.

In [39]:
df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.show()

df2.show()

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()

+--------+---+---+
|    time| id| v1|
+--------+---+---+
|20000101|  1|1.0|
|20000101|  2|2.0|
|20000102|  1|3.0|
|20000102|  2|4.0|
+--------+---+---+

+--------+---+---+
|    time| id| v2|
+--------+---+---+
|20000101|  1|  x|
|20000101|  2|  y|
+--------+---+---+

+--------+---+---+---+
|    time| id| v1| v2|
+--------+---+---+---+
|20000101|  1|1.0|  x|
|20000102|  1|3.0|  x|
|20000101|  2|2.0|  y|
|20000102|  2|4.0|  y|
+--------+---+---+---+

