# PySpark - DataFrames and Spark SQL

Las aplicaciones de PySpark comienzan con la inicialización de una sesión de Spark, que es el punto de entrada de PySpark como se muestra a continuación.

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Un DataFrame de PySpark se puede crear a través de `pyspark.sql.SparkSession.createDataFrame`,  pasando una lista de listas, tuplas, diccionarios, `pyspark.sql.Rows`, un DataFrame de Pandas o un RDD. Para definir el esquema del DataFrame se le puede agregar como argumento a la función o dejar que PySpark lo suponga.

Si no especificamos el `schema`

In [6]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

Si especificamos el `schema`

In [7]:
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

Desde un DataFrame de Pandas

In [8]:
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

Noten como si imprimimos los DataFrames solo nos indica el `schema` que tienen.

Si queremos ver la estrucutra del DataFrame utilizamos la funcion `show()`

Si queremos ver el `schema` también podemos usar la función `printSchema()`

Si queremos ver las columnas pordemos usar la función `.columns`

In [9]:
df.show()
df.printSchema()
df.columns

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



['a', 'b', 'c', 'd', 'e']

## Visualizando los datos

Para ver los primero registros podemos utilizar la función `.show()` dándole el número de renglones que queremos como argumento

In [10]:
df.show(1)

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+
only showing top 1 row



Si queremos que el formato del DataFrame sea algo más similar a lo que estamos acostumbrados, utilizamos `spark.conf.set('spark.sql.repl.eagerEval.enabled', True)`

In [11]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01 12:00:00
2,3.0,string2,2000-02-01,2000-01-02 12:00:00
3,4.0,string3,2000-03-01,2000-01-03 12:00:00


También podemos visulizar un renglón de forma vertical. Esto es útil cuando los registros son muy largos

In [12]:
df.show(1, vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
only showing top 1 row



Al igual que en Pandas, podemos obtener el summary del data frame con la función `.describe()`

In [13]:
df.select("a", "b", "c").describe().show()

+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   null|
| stddev|1.0|1.0|   null|
|    min|  1|2.0|string1|
|    max|  3|4.0|string3|
+-------+---+---+-------+



## Accesando los datos

Noten lo que pasa si intentamos accesar a una columna como lo haríamos con Pandas

In [14]:
df.a

Column<'a'>

Nos regresa un objeto de la clase `Column`.

Para corregir esto utilizamos la función `.select()`.

In [15]:
df.select(df.a).show()

+---+
|  a|
+---+
|  1|
|  2|
|  3|
+---+



Podemos agregar columnas de la siguiente manera

In [16]:
from pyspark.sql.functions import upper
df.withColumn('upper_c', upper(df.c)).show()

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+



Podemos aplicar también filtros

In [17]:
df.filter(df.a == 1).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



## Aplicando funciones

Podemos definir funciones y aplicarlas a nuestros DataFrames. Noten que podemos utilizar funciones de Pandas.

In [18]:
from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1

df.select(pandas_plus_one(df.a)).show()

+------------------+
|pandas_plus_one(a)|
+------------------+
|                 2|
|                 3|
|                 4|
+------------------+



In [19]:
def pandas_filter_func(iterator):
    for pandas_df in iterator:
        yield pandas_df[pandas_df.a == 1]

df.mapInPandas(pandas_filter_func, schema=df.schema).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



## Agrupando datos

Vamos a utilizar un nuevo DataFrame

In [20]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()
df.printSchema()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+

root
 |-- color: string (nullable = true)
 |-- fruit: string (nullable = true)
 |-- v1: long (nullable = true)
 |-- v2: long (nullable = true)



Vamos a agrupar por color y sacar el promedio de las demás columnas

In [21]:
df.groupby('color').avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
| blue|    3.0|   30.0|
|black|    6.0|   60.0|
+-----+-------+-------+



También podemos agrupar utilizando funciones definidas localmente.

In [22]:
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
+-----+------+---+---+



## Leer y escribir datos

En general, podemos escribir y leer múltiples formatos de archivo. El más sencillo y común es `.csv`. Similar a como se hace en Pandas, utilizamos la función `.read.csv()`.

También podemos escribir archivos con las función `.write.csv()`.

In [24]:
df.write.csv('food.csv', header=True, mode="overwrite")
spark.read.csv('food.csv', header=True).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  6| 60|
| blue|banana|  2| 20|
|  red|banana|  7| 70|
|  red|carrot|  5| 50|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|banana|  1| 10|
|  red| grape|  8| 80|
+-----+------+---+---+



## Utilizando SQL

Como dijimos en la introducción, el motor de PySpark nos permite implementar funciones de SQL para hacer queries a las tablas. Esto lo hacer considerano los DataFrames como tablas de SQL y le realiza los queries correspondientes. 

Primero convertimos el DataFrame a tabla y luego le aplicamos el query deseado

In [25]:
df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()

+--------+
|count(1)|
+--------+
|       8|
+--------+



In [26]:
spark.sql("SELECT color,count(*) FROM tableA GROUP BY color").show()

+-----+--------+
|color|count(1)|
+-----+--------+
|  red|       5|
| blue|       2|
|black|       1|
+-----+--------+



Podemos también utilizar funciones definidas dentro de los queries

In [27]:
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(v1) FROM tableA").show()

+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+



----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 64217)
Traceback (most recent call last):
  File "/Users/Alvlopzam/opt/anaconda3/lib/python3.8/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/Users/Alvlopzam/opt/anaconda3/lib/python3.8/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/Users/Alvlopzam/opt/anaconda3/lib/python3.8/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Users/Alvlopzam/opt/anaconda3/lib/python3.8/socketserver.py", line 720, in __init__
    self.handle()
  File "/Users/Alvlopzam/opt/anaconda3/lib/python3.8/site-packages/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/Users/Alvlopzam/opt/anaconda3/lib/python3.8/site-packages/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/U