<a href="https://colab.research.google.com/github/AzulBarr/introduccion-a-las-bases-de-datos/blob/main/5_2_Spark_y_SQL_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark + SQL

PySpark es la interfaz de Python para Apache Spark. Su uso principal es trabajar con grandes volúmenes de datos y crear pipelines de procesamiento.

Sin embargo, no es necesario trabajar con big data para aprovechar PySpark. SparkSQL es una excelente herramienta para realizar análisis de datos de forma eficiente. En muchos casos, Pandas puede volverse lento y uno termina escribiendo mucho código para limpiar y transformar datos, mientras que en SQL las mismas operaciones suelen necesitar menos líneas y ser más expresivas. ¡Vamos a comenzar!

Más información aquí:
http://spark.apache.org/docs/latest/api/python/

# 1. Instalando PySpark en Google Colab

In [None]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys

import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("5.1 Spark y SQL") \
       .getOrCreate()

spark

[33m0% [Working][0m            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
[33m0% [Waiting for headers] [Waiting for headers] [Connecting to r2u.stat.illinois[0m                                                                               Hit:2 https://cli.github.com/packages stable InRelease
[33m0% [Waiting for headers] [Waiting for headers] [Connected to r2u.stat.illinois.[0m                                                                               Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
[33m0% [Waiting for headers] [Connected to r2u.stat.illinois.edu (192.17.190.167)] [0m                                                                               Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
[33m0% [Waiting for headers] [Connected to r2u.stat.illinois.edu (192.17.190.167)] [0m                                                                               Hit:5 https://developer.download.n

In [None]:
spark

# 2. Lectura de datos

Utilizamos base publica de datos del COVID.

In [None]:
import requests
path = "https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/owid-covid-data.csv"
req = requests.get(path)
url_content = req.content

csv_file_name = 'owid-covid-data.csv'
csv_file = open(csv_file_name, 'wb')

csv_file.write(url_content)
csv_file.close()

df = spark.read.csv('/content/'+csv_file_name, header=True, inferSchema=True)

#3. PySpark DataFrames

In [None]:
# Revisando el schema del dataframe
df.printSchema()

root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: date (nullable = true)
 |-- total_cases: integer (nullable = true)
 |-- new_cases: integer (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: integer (nullable = true)
 |-- new_deaths: integer (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: integer (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: integer (nullable = true)
 |-- hosp_patients_per_mil

In [None]:
# Conversion date a columna
df.select(F.to_date(df.date).alias('date'))

DataFrame[date: date]

In [None]:
#Summary estadisticas
df.describe().show()

+-------+--------+-------------+-----------+--------------------+------------------+------------------+------------------+------------------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+------------------+------------------+------------------------+------------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-------------------+------------------+------------------------+----------------------+------------------+-------------------------------+-------------------+------------------+-------------+--------------------+--------------------+-----------------------+--------------------+------------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------

In [None]:
#Filtrado de DataFrame.
#Pais ARGENTINA ordenados por fecha desc.
df.filter(df.location == "Argentina").orderBy(F.desc("date")).show()

+--------+-------------+---------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+-----------------------------

In [None]:
#Agrupamos por location y como funcion de agrupacion sumamos los nuevos casos.
df.groupBy("location").sum("new_cases").orderBy(F.desc("sum(new_cases)")).show(truncate=False)

+-----------------------------+--------------+
|location                     |sum(new_cases)|
+-----------------------------+--------------+
|World                        |775935057     |
|High-income countries        |429044052     |
|Asia                         |301564180     |
|Europe                       |252916868     |
|Upper-middle-income countries|251756125     |
|European Union (27)          |185822587     |
|North America                |124492698     |
|United States                |103436829     |
|China                        |99373219      |
|Lower-middle-income countries|92019711      |
|South America                |68811012      |
|India                        |45041748      |
|France                       |38997490      |
|Germany                      |38437756      |
|Brazil                       |37511921      |
|South Korea                  |34571873      |
|Japan                        |33803572      |
|Italy                        |26781078      |
|United Kingd

# 4. Spark SQL

El módulo SQL resulta muy accesible para interactuar con los datos mientras seguimos usando Spark. Hay menos cosas nuevas que aprender, ya que básicamente utiliza la misma sintaxis SQL con la que probablemente ya estés familiarizado.

In [None]:
#Creamos una tabla a partir del data frame
df.createOrReplaceTempView("covid_data") # tabla temporal
# df.saveAsTable("covid_data") # opcion de salvar la tabla
# df.write.mode("overwrite").saveAsTable("covid_data") # Save as table and overwrite table if exits

In [None]:

df2 = spark.sql("SELECT * from covid_data")
df2.printSchema()


root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: date (nullable = true)
 |-- total_cases: integer (nullable = true)
 |-- new_cases: integer (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: integer (nullable = true)
 |-- new_deaths: integer (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: integer (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: integer (nullable = true)
 |-- hosp_patients_per_mil

In [None]:
df2.show()

+--------+---------+-----------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+-------------------------------

In [None]:
groupDF = spark.sql("SELECT location, count(*) from covid_data group by location order by count(*)")
groupDF.show()

+-----------------+--------+
|         location|count(1)|
+-----------------+--------+
|   Western Sahara|       1|
|  Northern Cyprus|     691|
|            Macao|     795|
|            Wales|    1198|
|         Scotland|    1305|
|           Taiwan|    1348|
|          England|    1359|
| Northern Ireland|    1372|
|        Hong Kong|    1654|
|         Dominica|    1674|
|   American Samoa|    1674|
|          Algeria|    1674|
|         Anguilla|    1674|
|Equatorial Guinea|    1674|
|           Guyana|    1674|
|          Eritrea|    1674|
|           Jersey|    1674|
|         Djibouti|    1674|
|           Angola|    1674|
|             Iraq|    1674|
+-----------------+--------+
only showing top 20 rows



### N: Obtener el total de casos confirmados de COVID-19 por país hasta la fecha más reciente disponible

In [None]:
query = """
SELECT location AS pais, sum(total_cases) AS total_casos_confirmados
FROM covid_data
GROUP BY location
ORDER BY total_casos_confirmados DESC
"""

result = spark.sql(query)
result.show()

+--------------------+-----------------------+
|                pais|total_casos_confirmados|
+--------------------+-----------------------+
|               World|           715697182101|
|High-income count...|           393562796886|
|                Asia|           252167317226|
|              Europe|           236756684151|
|Upper-middle-inco...|           218037426784|
| European Union (27)|           171330559623|
|       North America|           127073670231|
|       United States|           105914483457|
|Lower-middle-inco...|           100873065026|
|       South America|            73484570403|
|               China|            59789650705|
|               India|            52079485202|
|              Brazil|            40274470410|
|              France|            35276866268|
|             Germany|            33383624547|
|         South Korea|            25657640953|
|      United Kingdom|            24768373218|
|               Japan|            24572158686|
|            

### O: Consultar el número de muertes en una fecha específica (2022-01-01)

In [None]:
query = """
SELECT date, sum(total_deaths) AS num_muertes
FROM covid_data
WHERE date = '2022-01-01'
GROUP BY date
"""

result = spark.sql(query)
result.show()

+----------+-----------+
|      date|num_muertes|
+----------+-----------+
|2022-01-01|   22694365|
+----------+-----------+



### P: Obtener la evolución diaria de los casos en un país específico ( "Argentina").

In [None]:
query = """
SELECT location AS pais, date, new_cases
FROM covid_data
WHERE iso_code = 'ARG'
ORDER BY date DESC
"""

result = spark.sql(query)
result.show(10)

+---------+----------+---------+
|     pais|      date|new_cases|
+---------+----------+---------+
|Argentina|2024-08-04|       54|
|Argentina|2024-08-03|        0|
|Argentina|2024-08-02|        0|
|Argentina|2024-08-01|        0|
|Argentina|2024-07-31|        0|
|Argentina|2024-07-30|        0|
|Argentina|2024-07-29|        0|
|Argentina|2024-07-28|       85|
|Argentina|2024-07-27|        0|
|Argentina|2024-07-26|        0|
+---------+----------+---------+
only showing top 10 rows



### Q: Calcular porcentaje de la población vacunada por país.

In [None]:
query = """
SELECT location AS pais, max(people_vaccinated_per_hundred) AS porcentaje_personas_vacunadas
FROM covid_data
WHERE people_vaccinated_per_hundred IS NOT NULL
GROUP BY location
ORDER BY porcentaje_personas_vacunadas
"""

result = spark.sql(query)
result.show()

+--------------------+-----------------------------+
|                pais|porcentaje_personas_vacunadas|
+--------------------+-----------------------------+
|             Burundi|                         0.29|
|               Yemen|                         3.12|
|    Papua New Guinea|                         3.77|
|               Haiti|                          4.5|
|          Madagascar|                         9.15|
|               Congo|                        11.65|
|               Gabon|                        13.03|
|            Cameroon|                        13.45|
|               Syria|                         14.9|
|             Senegal|                         15.5|
|   Equatorial Guinea|                        16.13|
|Democratic Republ...|                        17.22|
|             Algeria|                        17.46|
|                Mali|                        19.27|
|               Niger|                        23.84|
|             Namibia|                        

### R: Calcular el total de casos y muertes en el ultimo mes disponible

In [83]:
query = """
SELECT location AS pais, SUM(new_cases) AS total_casos_mes, SUM(new_deaths) AS total_muertes_mes
FROM covid_data
WHERE new_cases IS NOT NULL AND new_deaths IS NOT NULL AND month(date) >= (SELECT DISTINCT max(month(date))
                                                                          FROM covid_data
                                                                          WHERE year(date) >= (SELECT DISTINCT max(year(date))
                                                                          FROM covid_data)
                                                                          )
                                                        AND year(date) >= (SELECT DISTINCT max(year(date))
                                                                          FROM covid_data)
GROUP BY location
ORDER BY total_casos_mes DESC, total_muertes_mes DESC
"""

result = spark.sql(query)
result.show()

+--------------------+---------------+-----------------+
|                pais|total_casos_mes|total_muertes_mes|
+--------------------+---------------+-----------------+
|               World|          47169|              815|
|              Europe|          39047|              162|
|High-income count...|          32293|              786|
| European Union (27)|          25642|              150|
|Upper-middle-inco...|          14277|               29|
|              Russia|           7777|               10|
|               Italy|           6350|                6|
|              Greece|           5818|               40|
|             Romania|           4633|                6|
|                Asia|           4515|               17|
|      United Kingdom|           3400|                0|
|              Poland|           2463|                4|
|               China|           2087|               15|
|             Oceania|           1809|               16|
|         New Zealand|         

In [80]:
query = """
SELECT location AS pais, SUM(new_cases) AS total_casos_mes, SUM(new_deaths) AS total_muertes_mes
FROM covid_data
WHERE new_cases IS NOT NULL AND new_deaths IS NOT NULL AND month(date) >= (SELECT DISTINCT max(month(date))
                                                                          FROM covid_data
                                                                          WHERE year(date) >= (SELECT DISTINCT max(year(date))
                                                                          FROM covid_data)
                                                                          )
GROUP BY location
ORDER BY total_casos_mes DESC, total_muertes_mes DESC
"""

result = spark.sql(query)
result.show()

+--------------------+---------------+-----------------+
|                pais|total_casos_mes|total_muertes_mes|
+--------------------+---------------+-----------------+
|               World|      286384260|          2755962|
|High-income count...|      144786063|          1144648|
|                Asia|      129195953|           695961|
|Upper-middle-inco...|      110061730|          1070265|
|              Europe|       82537515|           869525|
|               China|       56874773|            15846|
| European Union (27)|       55104825|           493174|
|       North America|       48399506|           692619|
|       United States|       41124984|           454738|
|Lower-middle-inco...|       30524440|           523054|
|       South America|       18020343|           379309|
|               Japan|       16762669|            28594|
|               India|       12892421|           180471|
|         South Korea|       11035994|            11249|
|             Germany|       11