## Práctica con parquet
El objetivo de esta libreta es aprender los conceptos básicos de parquets, dataframes y RDDS sobre Databricks usando Pyspark y Scala.

# Nivel básico

Estructuras de datos: 
- RDDs (Resilient Distributed Dataset): son una colección de datos tolerantes a fallos y que trabajan de forma paralela.
  * Es la abstracción de datos más básica en Spark.
  * Están particionados sobre los nodos del clúster.
  * Son inmutables: cuando transformamos un RDD realmente estamos creado uno nuevo.
  * Distribuye los datos en modo lectura.

- Dataframe: es un conjunto de datos organizado en columnas.
  * Trabajan de forma paralela.
  
- Parquet: es un almacenamiento basado en columnas
  * Los datos se almacenan en columnas en lugar de filas.
  * Se selecciona solo las columnas necesarias para la consulta.
  * Parquet es formato binario, garantizando la serialización efectiva y deserialización de los datos.
  * Los metadatos se almacenan después de escribir los datos en el pie de página capturando las estadísticas resumidas de los fragmentos de la columna.
  
  <img src="https://miro.medium.com/v2/resize:fit:4800/format:webp/1*X9wL6xlATJsAru7N5u-IEA.png"  width="40%" height="40%">
  
  <img src="https://1.bp.blogspot.com/-EBTMgn05zVU/XjpzbFcS2pI/AAAAAAAAA_8/8E_IHuumv6cTgQnnwQwEbNwouMvyz331gCLcBGAsYHQ/s1600/file_struct.JPG"  width="30%" height="30%">

In [0]:
# Modo terminal (magic funcion)
%pwd
%ls

[0m[01;34mazure[0m/  [01;34meventlogs[0m/  [01;32mhadoop_accessed_config.lst[0m*  [01;34mmetastore_db[0m/
[01;34mconf[0m/   [01;34mganglia[0m/    [01;34mlogs[0m/                        [01;32mpreload_class.lst[0m*


#### 1) Crear una estructura de datos y guardarla como parquet usando Scala

In [0]:
%scala 
case class fifa20(nombre: String, club: String)
val df_scala = Seq(new fifa20("L. Messi","FC Barcelona"), 
                   new fifa20("Cristiano Ronaldo", "Juventus"), 
                   new fifa20("Neymar Jr", "Paris Saint-Germain")).toDF

display(df_scala) //revisando el contenido del dataframe creado

nombre,club
L. Messi,FC Barcelona
Cristiano Ronaldo,Juventus
Neymar Jr,Paris Saint-Germain


In [0]:
%scala 
df_scala.write.mode("overwrite").parquet("/tmp/FIFA_Parquet") //guardamos el df a disco como parquet

#### 2) Carga de un archivo parquet con python

In [0]:
df_python = spark.read.parquet("/tmp/FIFA_Parquet")
print(type(df_python))

<class 'pyspark.sql.dataframe.DataFrame'>


In [0]:
df_python.show() #visualizando el contenido del dataframe

+-----------------+-------------------+
|           nombre|               club|
+-----------------+-------------------+
|        Neymar Jr|Paris Saint-Germain|
|Cristiano Ronaldo|           Juventus|
|         L. Messi|       FC Barcelona|
+-----------------+-------------------+



In [0]:
#renombrar una columna
df_python.withColumnRenamed('nombre','jugador').show() #esto no afectará al dataframe original

+-----------------+-------------------+
|          jugador|               club|
+-----------------+-------------------+
|        Neymar Jr|Paris Saint-Germain|
|Cristiano Ronaldo|           Juventus|
|         L. Messi|       FC Barcelona|
+-----------------+-------------------+



In [0]:
#Para poder revisar la organización de nuestro DataFrame y el data type de cada campo podemos usarlo siguiente
df_python.printSchema()

root
 |-- nombre: string (nullable = true)
 |-- club: string (nullable = true)



#### 3) Carga de un archivo parquet con SQL

In [0]:
%sql 
CREATE TABLE tabla_fifa
USING parquet
OPTIONS (path "/tmp/FIFA_Parquet")

In [0]:
%sql 
SELECT * FROM tabla_fifa 

nombre,club
Neymar Jr,Paris Saint-Germain
Cristiano Ronaldo,Juventus
L. Messi,FC Barcelona


In [0]:
consulta = '''
  SELECT nombre AS jugador 
  FROM tabla_fifa
  where club = "Juventus"
  '''
spark.sql(consulta).show()

+-----------------+
|          jugador|
+-----------------+
|Cristiano Ronaldo|
+-----------------+



# Nivel intermedio

#### ¿Qué es PySpark?
PySpark es una herramienta que nos deja usar Spark encima de Python. Permitiendonos combinar el proceso de datos distribuidos de Spark con la simplicidad de Python para el análisis de conjuntos masivos de datos (big data).

In [0]:
#Carga de la biblioteca
import pyspark
# Carga funciones extra
from pyspark.sql.functions import * 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('primeros_pasos').getOrCreate()

In [0]:
%scala
//Carga de un archivo parquet
val clickstream = sqlContext.read.parquet("/datasets/wiki-clickstream/part-00000-tid-3267700592912152934-af798f24-a539-4dbb-8419-1799463dc63a-664-1-c000.snappy.parquet")

In [0]:
df_stream = spark.read.parquet("/datasets/wiki-clickstream/part-00004-tid-3267700592912152934-af798f24-a539-4dbb-8419-1799463dc63a-668-1-c000.snappy.parquet")
print(type(df_stream))

<class 'pyspark.sql.dataframe.DataFrame'>


In [0]:
df_stream.show(100)

+--------+--------+---+--------------------+--------------------+-------+
| prev_id| curr_id|  n|          prev_title|          curr_title|   type|
+--------+--------+---+--------------------+--------------------+-------+
|    null|30132352|306|        other-google|So_Beautiful_or_S...|  other|
|    null|30132352|168|     other-wikipedia|So_Beautiful_or_S...|  other|
| 2048277|30132352| 15|Still_Crazy_After...|So_Beautiful_or_S...|   link|
| 4980756|30132352| 43|Surprise_(Paul_Si...|So_Beautiful_or_S...|   link|
|23617892|30132352|240|The_Essential_Pau...|So_Beautiful_or_S...|   link|
|    null|30132352| 68|         other-empty|So_Beautiful_or_S...|  other|
|  147605|30132352| 11|  Paul_Simon_(album)|So_Beautiful_or_S...|   link|
|    null|30132352| 11|         other-yahoo|So_Beautiful_or_S...|  other|
|15580374|30132352| 11|           Main_Page|So_Beautiful_or_S...|  other|
|18422066|30132352|299|Paul_Simon_discog...|So_Beautiful_or_S...|   link|
|   50745|30132352|732|          Paul_