# Creamos la sesión de Spark

Como este notebook se ejecuta en un entorno de Anaconda que tiene todas las dependencias necesarias, podremos importar directamente la clase `SparkSession`, que, a continuación, instanciaremos en una variable llamada `spark`

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
            .appName("CursoSpark") \
            .getOrCreate()

spark

# Cargamos el fichero CSV en un DataFrame de Spark

A continuación cargamos el fichero CSV usando la variable `spark`, instanciada en el apartado anterior. A la hora de leer el fichero CSV usaremos dos opciones `header` = `True` y `inferSchema` = `True`, la primera para que las columnas del DataFrame tengan nombre y la segunda para que no todos los datos del CSV sean interpretados como string.

In [2]:
covid_df = spark.read \
                .option('header', True) \
                .option('inferSchema', True) \
                .csv('./covid.csv')

covid_df.printSchema()

root
 |-- dateRep: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- cases: integer (nullable = true)
 |-- deaths: integer (nullable = true)
 |-- countriesAndTerritories: string (nullable = true)
 |-- geoId: string (nullable = true)
 |-- countryterritoryCode: string (nullable = true)
 |-- popData2018: integer (nullable = true)



Una vez cargado el fichero CSV, imprimimos las 3 primeras filas del DataFrame usando el metodo `show`.

In [3]:
covid_df.show(3)

+----------+---+-----+----+-----+------+-----------------------+-----+--------------------+-----------+
|   dateRep|day|month|year|cases|deaths|countriesAndTerritories|geoId|countryterritoryCode|popData2018|
+----------+---+-----+----+-----+------+-----------------------+-----+--------------------+-----------+
|29/03/2020| 29|    3|2020|   15|     1|            Afghanistan|   AF|                 AFG|   37172386|
|28/03/2020| 28|    3|2020|   16|     1|            Afghanistan|   AF|                 AFG|   37172386|
|27/03/2020| 27|    3|2020|    0|     0|            Afghanistan|   AF|                 AFG|   37172386|
+----------+---+-----+----+-----+------+-----------------------+-----+--------------------+-----------+
only showing top 3 rows



# Consultas sobre el DataFrame

En este apartado se realizarán consultas analíticas sobre el DataFrame usando la API de PySpark.

¿Cuántos casos de COVID había en España el 15 de marzo de 2020 (la fecha en la que comenzó el confinamiento)?

In [4]:
from pyspark.sql.functions import col

covid_df.where(col('dateRep') == '15/03/2020') \
        .where(col('countriesAndTerritories') == 'Spain') \
        .select(
            col('countriesAndTerritories').alias('Pais'),
            col('dateRep').alias('Fecha'),
            col('cases').alias('Casos')) \
        .show()

+-----+----------+-----+
| Pais|     Fecha|Casos|
+-----+----------+-----+
|Spain|15/03/2020| 1522|
+-----+----------+-----+



¿Cuántas muertes se registraron en España aquellos días en los que el número de casos es el mínimo(en España)? ¿En qué fechas ocurrió? 

La consulta en SQL correspondiente sería:
```sql
    SELECT Pais, Fecha, Muertes
    FROM Covid
    WHERE Casos = (SELECT MIN(Casos) FROM Covid WHERE Pais = 'Spain')
```

In [5]:
from pyspark.sql.functions import min

subquery = covid_df \
    .where(col('countriesAndTerritories') == 'Spain') \
    .select(min('cases').alias('min_cases'))

covid_df.join(subquery, col('cases') == col('min_cases')) \
        .where(col('countriesAndTerritories') == 'Spain') \
        .select(
            col('countriesAndTerritories').alias('Pais'),
            col('dateRep').alias('Fecha'),
            col('deaths').alias('Muertes')) \
        .show()

+-----+----------+-------+
| Pais|     Fecha|Muertes|
+-----+----------+-------+
|Spain|24/02/2020|      0|
|Spain|23/02/2020|      0|
|Spain|22/02/2020|      0|
|Spain|21/02/2020|      0|
|Spain|20/02/2020|      0|
|Spain|19/02/2020|      0|
|Spain|18/02/2020|      0|
|Spain|17/02/2020|      0|
|Spain|16/02/2020|      0|
|Spain|15/02/2020|      0|
|Spain|14/02/2020|      0|
|Spain|13/02/2020|      0|
|Spain|12/02/2020|      0|
|Spain|11/02/2020|      0|
|Spain|09/02/2020|      0|
|Spain|08/02/2020|      0|
|Spain|07/02/2020|      0|
|Spain|06/02/2020|      0|
|Spain|05/02/2020|      0|
|Spain|04/02/2020|      0|
+-----+----------+-------+
only showing top 20 rows



Obtener el máximo número de casos de COVID que tuvo cada país, ordenados de manera descendente por el número de casos. 

In [6]:
covid_df.groupBy('countriesAndTerritories') \
        .max() \
        .select(
            col('countriesAndTerritories').alias('Pais'),
            col('max(cases)').alias('maxCasos')) \
        .orderBy('maxCasos', ascending=False) \
        .show()

+--------------------+--------+
|                Pais|maxCasos|
+--------------------+--------+
|United_States_of_...|   19979|
|               China|   15141|
|               Spain|    8578|
|               Italy|    6557|
|             Germany|    6294|
|              France|    4611|
|                Iran|    3076|
|      United_Kingdom|    2885|
|              Turkey|    2069|
|             Belgium|    1850|
|              Canada|    1426|
|         Switzerland|    1390|
|         Netherlands|    1172|
|             Austria|    1141|
|         South_Korea|     909|
|            Portugal|     902|
|           Australia|     611|
|              Israel|     584|
|              Brazil|     502|
|              Norway|     425|
+--------------------+--------+
only showing top 20 rows



# Consultas usando sentencias SQL

Primero debemos de crear una vista temporal a la que llamaremos `Covid`. Las consultas SQL se realizarán sobre esta vista.

In [7]:
covid_df.createOrReplaceTempView('Covid')

¿En qué fecha España registro el mayor número de casos? ¿Cuántos casos tuvo?

In [8]:
spark.sql("""
            SELECT countriesAndTerritories AS Pais, dateRep AS Fecha, cases AS Casos
            FROM Covid
            WHERE cases = (SELECT MAX(cases) FROM Covid WHERE countriesAndTerritories = 'Spain')
          """).show()

+-----+----------+-----+
| Pais|     Fecha|Casos|
+-----+----------+-----+
|Spain|27/03/2020| 8578|
+-----+----------+-----+



¿Hubo algún país que nunca registrara ningún caso?

In [9]:
spark.sql("""
            SELECT countriesAndTerritories AS Pais
            FROM Covid
            GROUP BY countriesAndTerritories
            HAVING SUM(cases) = 0
          """).show()

+----+
|Pais|
+----+
+----+



¿Qué país registro el mayor número de muertes? ¿Cuántas fueron? ¿En qué fecha fue? 

In [10]:
spark.sql("""
            SELECT countriesAndTerritories AS Pais, dateRep AS Fecha, deaths AS Muertes
            FROM Covid
            WHERE deaths = (SELECT MAX(deaths) FROM Covid)
          """).show()

+-----+----------+-------+
| Pais|     Fecha|Muertes|
+-----+----------+-------+
|Italy|28/03/2020|    971|
+-----+----------+-------+



# Convertimos el DataFrame en un archivo Parquet

**IMPORTANTE:** En el curso se utilizaba una máquina virtual que contenía todas las librerías necesarias. Este repositorio ha sido adaptado para que el notebook se pueda ejecutar en un entorno de Anaconda, es por ello que no se podrá ejecutar código que requiera de las funciones de Hadoop, como por ejemplo convertir un DataFrame a un archivo parquet, aun así he dejado el código correspondiente, pero comentado.

En esta sección convertiremos el DataFrame en un archivo Parquet que se guardará en la ruta parquet/ de la máquina virtual.

In [11]:
'''
covid_df.write \
        .format('parquet') \
        .mode('overwrite') \
        .save('file:////home/training/parquet/')
'''

"\ncovid_df.write         .format('parquet')         .mode('overwrite')         .save('file:////home/training/parquet/')\n"

# Cerramos la sesión de Spark

Finalmente, cerramos la sesión de Spark con el método `stop` de la variable `spark`.

In [12]:
spark.stop()