# Conceptos Basicos de Pyspark

Este notebook se ha utilizado para dar una introducción de la sintaxis de __PySpark__.

__Author__:  
Carlos Sevilla  
c.sevilla.barcelo@gmail.com  

## Spark Session

La session de spark es el punto de entrada a la aplicación de Spark. Es obligatoria para usar el entorno de Spark. Con ella, podemos leer archivos csv, leer desde bases de datos, de sistemas streaming, etc. 

Una vez que hemos instalado pyspark, hay varias formas de lanzarlo.

__Forma 1:__  
La forma tradicional, la típica, abrir una consola de python3 y crear la spark session

Abrimos una consola de python3
```bash
user@computer: python3
```
Creamos una spark session 
```python
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('nombre_session_spark').getOrCreate()
```

__Forma 2:__  
Lanzamos en la consola:
```bash
pyspark
```
Esto te da una session de spark ya inicializada en la variable `spark`, con lo que podemos empezar a realizar operaciones desde la primera linea


__Para los notebooks, solo se puede usar la forma 1.__ Basicamente, porque jupyter notebook se lanza con el comando `jupyter notebook` 

__ToDo:__ Crea una sesion de spark

---

## Leer un csv

Una vez que tenemos nuestra `spark session`, podemos llamar a su clase `read`, la cual nos permite crear dataframe desde archivos. El metodo `.csv`, evidentemente, nos permite crear un Dataframe a partir de archivo/archivos `.csv` 

```python
df = spark.read.csv('ruta/a/mi/archivo')
```

Este lector tiene algunos parametros importantes:

* sep - Separador en el csv. Default = ','
* schema - Esquema con el tipo de los datos. Default = None
* inferSchema - Crea el schema de forma automatica. default = False
* header - Usa la primera linea como nombre de las columnas. Default = False

Podeis leer mas del lector de csv y sus parametros [aqui](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader.csv)

__ToDo:__ Leer `users.csv` de la carpeta `data`

---

## Operaciones 

### Ver el contenido del DataFrame

In [None]:
df.show()

### Resumen del DataFrame

In [None]:
df.describe().show()

### Columnas del DataFrame

In [None]:
df.columns

### Ver schema del DataFrame

In [None]:
df.printSchema()

---

## Crear Columnas

Para crear una columna, usamos el metodo `.withColumn()`, el cual recibe por parametros el __nombre de la columna__ como parametro 1, y la operacion/dato como parametro 2.

Un ejemplo: 
```python
df.withColumn('altura_cuadrada', df.altura_cuadrada * df.altura_cuadrada)
```
Esto devuelve el DataFrame `df` que teniamos, junto con esta columna nueva. No se guarda en el DataFrame de forma automática, por lo que para guardarlo, tenemos que guardar este dataframe que nos devuelve en una variable

```python
df = df.withColumn('altura_cuadrada', df.altura_cuadrada * df.altura_cuadrada)
```

Para modificar el contenido de una columna, también se usa el metodo `.withColumn()`. Como nombre de la columna, hay que pararle el nombre de la columna que queremos modificar.

__ToDo:__ Crea una columna nueva con el IMC de cada usuario

---

## Funciones de PySpark SQL

PySpark incluye en su API bastantes funciones. Muchas de ellas, son funciones que podemos encontrar en __SQL__, algunas otras son alias de otras funciones, o simplemente snippets para agilizar la programación

Puedes encontrarlas todas [aquí](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions) 

In [3]:
import pyspark.sql.functions  as F

### select

Podemos seleccionar las columnas que queremos ver, o realizar operaciones que no se guarden en el DataFrame. Es el mismo concepto que el SELECT de __SQL__

In [None]:
df.select(['nombre','edad','IMC']).show()

In [None]:
df.select(df.edad < 30).show()

### filter
Es el metodo que nos permite filtrar un DataFrame en base a unas condiciones. `filter()` nos devuelve el DataFrame original filtrado.

In [None]:
df.filter(df.edad < 30).show()

In [None]:
df.filter((df.edad < 30) & (df.altura > 1.5)).show()

Si queremos guardar este DataFrame filtrado, basta con guardarlo en una variable

In [None]:
df_filtered = df.filter(df.edad < 30)

__ToDo:__ Prueba a hacer algún filtro

### groupBy

Nos permite agrupar el DataFrame en base a una o varias columnas. Necesitamos definir el comportamiento de los valores de las columnas agrupadas. Existen diferentes métodos:
- count()
- sum()
- min()
- max()
- mean()

También podemos definir una función agregada con `.agg()` para aplicar diferentes comportamientos sobre las agrupaciones. La función `.agg()`  recibe un diccionario como parametro.  
En este diccionario, la clave es el nómbre de la columna, y el valor es la operación a realizar con esa columna

In [None]:
df.groupBy('apellidos').count().show()

### join

tipos:
- inner
- left
- left_anti
- outer

```python
df.join(d2,['columnas','join'],'inner')
```

__ToDo:__ Carga el dataset `family_info.csv` y juntalo junto a dataframe original, sin perder información original del dataframe original.


### Max / Min / Mean

In [None]:
df.select(F.max(F.col('edad'))).show()
df.select(F.min(F.col('edad'))).show()
df.select(F.mean(F.col('edad'))).show()

### sqrt

In [None]:
df.select(F.sqrt(F.col('IMC'))).show()

### lit

Devuelve una columna con un valor estatico en todas las filas.   
Ejemplo: Meter el contenido de una variable en el DataFrame

In [None]:
df.withColumn('continente',lit('Westeros'))

### UDF
User Defined Functions.

Aplicamos funciones en Spark que hemos definido nosotros. 
Para ello, hay que usar la funcion `udf` de `pyspark.sql.functions`, que tiene la siguiente estructura:
```python
udf(lambda x: funcion_definida(x), tipo_datos())
```
`tipo_datos()` es el tipo de datos que devuelve esta funcion. Puede ser un String, un Double, una Lista, etc. Consulta los tipos [aquí](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types)

Ejemplo: 

```python
from pyspark.sql.types import StringType

def altera_apellido(x):
    longitud = len(x)
    mitad = int(longitud/2)
    parte_1 = x[:longitud]
    parte_2 = x[longitud:]
    nuevo_string = parte_2 + parte_1
    return nuevo_string


altera_apellido_udf = udf(lambda x: altera_apellido(x),StringType()

```

Una vez que tenemos la funcion definida, podemos usarla en spark.

```python
df.withColumn('apellido_alterado',altera_apellido_udf)

```

__ToDo:__ Prueba a crear una función y usarla en tu dataframe! 

### isNull 

En spark, existen simultaneamente el tipo `NaN` (not a number), `null` y `None`. Pese a que `null` es como un "alias" de `None`, `NaN` es tratado como un `Double`.

In [None]:
df.filter(df.edad.isNull()).show()

### Window

Uno de los puntos fuertes de Spark son las operaciones sober una ventana (Window) de valores. Sirve para realizar acciones simultaneas entre diferentes valores de una misma columna. 

Ejemplo: Queremos añadirle un campo id a los usuarios. Queremos un id unico para usuarios del mismo apellido.

Para ello, podemos usar `Window`, aplicando una ventana en el campo `apellido`. Esto hará que la operación que vayas a realizar, se haga por cada valor diferente en el campo `apellidos`.  
Para entendernos, lo que hace __Spark__ es coger el dataframe original y "crear" X dataframes mas pequeños, siendo X el __número de valores únicos__ en el dataframe. Si tenemos 4 valores únicos, va a crear 4 dataframes.  
En estos 4 dataframes, aplica la acción, y despues vuelve a unir estos 4 dataframes se vuelven a juntar en 1 solo como en dataframe original

Para crear una ventana, necesitamos indicar el campo por el que se va a particionar el dataframe. Luego, en el dataframe, en la accion que vamos a realizar añadimos el metodo `.over()`, el cual recibe por parametro la ventana que hemos creado. 

```python
from pyspark.sql import Window

ventana = Window.partitionBy('apellidos')

df = df.withColumn('id',F.row_number().over(ventana))
```

El metodo `row_number` es un metodo de spark que devuelve el numero de la fila en el dataframe

Estas ventanas, pueden particionar por mas de un campo, y pueden ser ordenados.

```python
ventana = Window.partitionBy('apellidos').orderBy('edad','peso')
```

__ToDo:__ Prueba a realizar una operación con una ventana!

---

## Operaciones sobre fechas

__Todas estas operaciones se aplican sobre toda la columna__

### dayofweek

Te devuelve el día de las fechas en la columna que recibe como parametro

```python
df.withColumn('dia_semana',F.dayofweek(F.col('init_date'))
```

### month

Devuelve el mes de las fechas en la columna que recibe como parametro

```python
df.withColumn('mes_batalla',F.month(F.col('init_date'))
```

### months_between
Devuelve el numero de meses entre 2 columna de fechas

```python
df.withColumn('duracion_batallas',F.months_between(F.col('init_date'),F.col('finish_date'))
```

### date_add

Recibe como parametro el número de días que quieres desplazar la fecha hacia el futuro, y devuelve la fecha ya desplazada

```python
df.withColumn('fin_del_luto',date_add(df.finish_date, 5)
```

## Otros

### Spark DataFrame -> Pandas DataFrame

In [None]:
df_p = df.toPandas()

## Practica Final

__ToDo:__
- Carga `battles.csv`
- Crea una columna que contenga el día de la semana (en texto) del día que empezó cada batalla
- Crea una columna con la diferencia de meses entre una batalla y otra (usar la funcion `lag` de `pyspark.sql.functions`
- Extrae del dataset la batalla mas larga

---