<a href="https://colab.research.google.com/github/Gabonoid/etl-pyspark-basico/blob/main/etl_articulo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **ETL**

Empezaremos creando nuestros directorios donde se va almacenar la mayoria de nuestros archivos.
Tendremos la siguiente estructura:
- `/content/pdf/`
- `/content/zip/`
- `/content/csv/`

In [None]:
import os


base_path = "/content/"
directorios = ["pdf", "zip", "csv"]

for directorio in directorios:
    ruta_directorio = os.path.join(base_path, directorio)
    os.makedirs(ruta_directorio, exist_ok=True)
    print(f"Directorio '{ruta_directorio}' creado exitosamente.")


Directorio '/content/pdf' creado exitosamente.
Directorio '/content/zip' creado exitosamente.
Directorio '/content/csv' creado exitosamente.


## **Extract**

Empezaremos por descargar los PDF donde estan todos los links de descarga

In [None]:
import requests


def descargar_url(url, destino):
    # Extrae el nombre del archivo de la URL
    nombre_archivo = os.path.basename(url)

    # Realiza la solicitud GET al enlace
    response = requests.get(url)

    # Verifica si la solicitud fue exitosa (código de estado 200)
    if response.status_code == 200:
        # Abre el archivo en modo escritura binaria ('wb')
        with open(destino + nombre_archivo, 'wb') as file:
            # Escribe el contenido del archivo en el archivo local
            file.write(response.content)
        print(f"Archivo {nombre_archivo} descargado correctamente.")
    else:
        print(
            f"Error al descargar el archivo desde {url}. \
            Código de estado: {response.status_code}")

In [None]:
pdf_url = ['https://www.gob.mx/cms/uploads/attachment/file/753710/Cierre_Datos_abiertos_hist_ricos_2020.pdf',
           'https://www.gob.mx/cms/uploads/attachment/file/753711/Cierre_Datos_abiertos_historicos_2021.pdf',
           'https://www.gob.mx/cms/uploads/attachment/file/830686/cierre_covid19__2022.pdf',
           'https://www.gob.mx/cms/uploads/attachment/file/891414/datos_abiertos_historicos_2023.pdf',
           'https://www.gob.mx/cms/uploads/attachment/file/914275/datos_abiertos_historicos_2024.pdf']

for pdf in pdf_url:
    descargar_url(pdf, '/content/pdf/')

Archivo Cierre_Datos_abiertos_hist_ricos_2020.pdf descargado correctamente.
Archivo Cierre_Datos_abiertos_historicos_2021.pdf descargado correctamente.
Archivo cierre_covid19__2022.pdf descargado correctamente.
Archivo datos_abiertos_historicos_2023.pdf descargado correctamente.
Archivo datos_abiertos_historicos_2024.pdf descargado correctamente.


In [None]:
!pip install PyPDF2

Collecting PyPDF2
  Downloading pypdf2-3.0.1-py3-none-any.whl (232 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/232.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━[0m [32m122.9/232.6 kB[0m [31m3.5 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m232.6/232.6 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: PyPDF2
Successfully installed PyPDF2-3.0.1


Como primer paso dentro del **Extract** es obtener todos los links de descargas de nuestros CSV. Estos links de descarga se encuentran dentro de PDF, un PDF para cada año. Con la libreia PyPDF2 lo que hacemos es leer estos PDF y leer todos los hipervinculos que se encuentran dentro.
Posteriormente creamos un txt con todos los links.

In [None]:
import PyPDF2


def nombres_archivos(carpeta):
    # Lista para almacenar los nombres de archivos
    nombres_archivos = []
    # Itera sobre los archivos en la carpeta
    for nombre_archivo in os.listdir(carpeta):
        # Verifica si el nombre representa un archivo (no una subcarpeta)
        if os.path.isfile(os.path.join(carpeta, nombre_archivo)):
            nombres_archivos.append(nombre_archivo)
    return nombres_archivos


def obtener_hipervinculos(path):
    # Abre el archivo PDF en modo lectura binaria
    with open(f'/content/pdf/{path}', 'rb') as file:
        # Crea un objeto PdfReader

        reader = PyPDF2.PdfReader(file)

        # Itera sobre cada página del PDF
        for page_num in range(len(reader.pages)):
            # Obtiene el objeto de página actual
            page = reader.pages[page_num]

            # Busca en los recursos de la página los enlaces
            if '/Annots' in page:
                for annot in page['/Annots']:
                    annotation_object = annot.get_object()
                    if '/A' in annotation_object:
                        if annotation_object['/A']['/S'] == '/URI':
                            with open('/content/links.txt', 'a') as archivo:
                                # Escribe el string en el archivo
                                # Agrega un salto de línea al final
                                archivo.write(
                                    annotation_object['/A']['/URI'] + '\n')


# Lista para almacenar los nombres de archivos
nombres_pdf = nombres_archivos('/content/pdf')

for nombre in nombres_pdf:
    obtener_hipervinculos(nombre)

Ya una vez obtenido nuestros links procedemos a descargar cada uno de los links

In [None]:
with open('/content/links.txt', 'r') as archivo:
    for linea in archivo:
        descargar_url(linea.strip(), '/content/zip/')

# descargar_url('https://datosabiertos.salud.gob.mx/gobmx/salud/datos_abiertos/historicos/2020/COVID19MEXICO2020.zip', '/content/zip/')

Archivo datos_abiertos_covid19_02.04.2024.zip descargado correctamente.
Archivo datos_abiertos_covid19_09.04.2024.zip descargado correctamente.
Archivo datos_abiertos_covid19_16.04.2024.zip descargado correctamente.
Archivo datos_abiertos_covid19_23.04.2024.zip descargado correctamente.
Archivo datos_abiertos_covid19_30.04.2024.zip descargado correctamente.
Archivo datos_abiertos_covid19_05.03.2024.zip descargado correctamente.
Archivo datos_abiertos_covid19_12.03.2024.zip descargado correctamente.
Archivo datos_abiertos_covid19_19.03.2024.zip descargado correctamente.
Archivo datos_abiertos_covid19_26.03.2024.zip descargado correctamente.
Archivo datos_abiertos_covid19_06.02.2024.zip descargado correctamente.
Archivo datos_abiertos_covid19_13.02.2024.zip descargado correctamente.
Archivo datos_abiertos_covid19_20.02.2024.zip descargado correctamente.
Archivo datos_abiertos_covid19_27.02.2024.zip descargado correctamente.
Archivo datos_abiertos_covid19_02.01.2024.zip descargado correct

En nuestro directorio `/content/zip` hemos guardado todos los archivos ZIP, procedemos a descomprimir cada uno.

In [None]:
import zipfile


def descomprimir_zip(name, archivo_zip, directorio_destino):
    # Abre el archivo ZIP en modo lectura
    with zipfile.ZipFile(archivo_zip, 'r') as zip_ref:
            # Extrae todos los archivos en el directorio especificado
            zip_ref.extractall(directorio_destino)
            nombres_archivos_extraidos = zip_ref.namelist()
            # Renombra el directorio descomprimido con el nombre del archivo original
            os.rename(
                f'/content/csv/{nombres_archivos_extraidos[-1]}',
                f'/content/csv/{name.replace(".zip", ".csv")}')


# Ruta del archivo ZIP que deseas descomprimir
archivos_zip = nombres_archivos('/content/zip')

# Directorio donde deseas descomprimir los archivos
directorio_destino = '/content/csv'


for archivo in archivos_zip:
    archivo_zip = f'/content/zip/{archivo}'
    # Llama a la función para descomprimir el archivo ZIP
    descomprimir_zip(archivo, archivo_zip, directorio_destino)


Hemos extraido con exito todos los CSV dando fin al paso de extracción.

In [None]:
# Itera sobre los archivos y directorios en el directorio
for elemento in os.listdir("/content/csv"):
    ruta_elemento = os.path.join(directorio, elemento)

    # Si es un archivo, verifica si es un archivo CSV
    if os.path.isfile(ruta_elemento):
        if not elemento.endswith('.csv'):
            os.remove(ruta_elemento)

    # Si es un directorio, elimínalo recursivamente
    elif os.path.isdir(ruta_elemento):
        os.removedirs(ruta_elemento)



## **Transform**

Hemos entrado ya a la parte de transformacion de los datos. Es aqui donde aplicaremos todos los filtros para limpiar y/o ordenar nuestros datos.
Vamos a utilizar Spark para este apartado.

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=2a9eb0d88a217019e846faabaae2d71f3040867936d371486c3ec32c2c723c50
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


Como todo proyecto de spark empezaremos por crear unestro `SparkSession` y nuestro `sparkContext`.

In [None]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local[*]") \
        .appName("ETL_basico_BigData") \
        .getOrCreate()

sc = spark.sparkContext


In [None]:
for i, archivo in enumerate(nombres_archivos('/content/csv')):

    with open(f'/content/csv/{archivo}', 'r', encoding='utf-8') as entrada:
        contenido = entrada.read()

    contenido_sin_comillas = contenido.replace(
        'Países de la Ex-U.R.S.S., excepto Ucrania y Bielorusia', 'Países de la Ex-U.R.S.S. excepto Ucrania y Bielorusia')

    with open(f'/content/csv/{archivo}', 'w', encoding='utf-8') as salida:
        salida.write(contenido_sin_comillas)

    print(f"{i+1}-> {archivo}")

1-> datos_abiertos_covid19_22.08.2023.csv
2-> datos_abiertos_covid19_06.02.2024.csv
3-> datos_abiertos_covid19_08.08.2023.csv
4-> datos_abiertos_covid19_04.04.2023.csv
5-> datos_abiertos_covid19_30.01.2024.csv
6-> datos_abiertos_covid19_16.01.2024.csv
7-> datos_abiertos_covid19_02.04.2024.csv
8-> datos_abiertos_covid19_13.06.2023.csv
9-> datos_abiertos_covid19_18.04.2023.csv
10-> datos_abiertos_covid19_28.02.2023.csv
11-> datos_abiertos_covid19_24.01.2023.csv
12-> datos_abiertos_covid19_16.04.2024.csv
13-> datos_abiertos_covid19_17.01.2023.csv
14-> datos_abiertos_covid19_14.03.2023.csv
15-> datos_abiertos_covid19_15.08.2023.csv
16-> datos_abiertos_covid19_11.04.2023.csv
17-> datos_abiertos_covid19_23.04.2024.csv
18-> datos_abiertos_covid19_21.11.2023.csv
19-> datos_abiertos_covid19_13.02.2024.csv
20-> datos_abiertos_covid19_09.05.2023.csv
21-> datos_abiertos_covid19_07.11.2023.csv
22-> datos_abiertos_covid19_23.05.2023.csv
23-> datos_abiertos_covid19_28.03.2023.csv
24-> datos_abiertos_

En pasos anteriores hemos descargado los csv, utilizando spark vamos a unificar estos CSV para convertirlo en un solo dataframe.

In [None]:
# Obtenemos los nombres de los ficheros dentro de nuestro directorio csv
csv_covid = nombres_archivos('/content/csv/')
# Hacemos un dataframe unificado
df = spark.read.option("header", "true")\
    .csv(list(map(lambda x: "/content/csv/" + x, csv_covid)),
         encoding='utf-8')


### **Visualizacion los de Datos**

Checamos cuantos datos tenemos en nuestro dataframe

In [None]:
df.count()

227757980

In [None]:
df.show(10)

+-------------------+-----------+------+------+----------+----+-----------+-----------+-------------+-------------+-------------+--------------+----------+--------+--------+----+------------+--------+------------------+--------+--------+----+----+--------+------------+--------+--------------+--------+-------------+----------+---------+----------------+-------------+---------------------+------------------+-------------------+--------+-----------------+-----------+---+
|FECHA_ACTUALIZACION|ID_REGISTRO|ORIGEN|SECTOR|ENTIDAD_UM|SEXO|ENTIDAD_NAC|ENTIDAD_RES|MUNICIPIO_RES|TIPO_PACIENTE|FECHA_INGRESO|FECHA_SINTOMAS| FECHA_DEF|INTUBADO|NEUMONIA|EDAD|NACIONALIDAD|EMBARAZO|HABLA_LENGUA_INDIG|INDIGENA|DIABETES|EPOC|ASMA|INMUSUPR|HIPERTENSION|OTRA_COM|CARDIOVASCULAR|OBESIDAD|RENAL_CRONICA|TABAQUISMO|OTRO_CASO|TOMA_MUESTRA_LAB|RESULTADO_LAB|TOMA_MUESTRA_ANTIGENO|RESULTADO_ANTIGENO|CLASIFICACION_FINAL|MIGRANTE|PAIS_NACIONALIDAD|PAIS_ORIGEN|UCI|
+-------------------+-----------+------+------+-------

### **Limpieza de Datos**


Damos comienzo a la limpieza de los datos cambiando el formato de la fecha. Primero tenemos que analizar cuantos tipos de formatos de fechas contiene nuestro DataSet ya que estos vienen variados.

En el DataSet tenemos 4 posibles formatos de fecha
- dd-MM-yyyy
- yyyy-MM-dd
- yyyy/MM/dd
- dd/MM/yyyy

Por lo que pasaremos primero a definir que esas columnas son de tipo `Date` y posteriormente le asignamos el formato que queremos.
En este dataset nos encontraremos con un valor `9999-99-99` por lo que lo transformaremos de valor `None`.

In [None]:
from pyspark.sql.functions import when, col, to_date

# Columnas con fechas
col_fechas = ['FECHA_ACTUALIZACION','FECHA_INGRESO','FECHA_SINTOMAS','FECHA_DEF']

for columna in col_fechas:
    df = df.withColumn(columna,
                        when(col(columna).rlike(r'^\d{2}-\d{2}-\d{4}$'),
                             to_date(col(columna), 'dd-MM-yyyy'))
                       .when(col(columna).rlike(r'^\d{4}-\d{2}-\d{2}$'),
                             to_date(col(columna), 'yyyy-MM-dd'))
                       .when(col(columna).rlike(r'^\d{2}/\d{2}/\d{4}$'),
                             to_date(col(columna), 'dd/MM/yyyy'))
                       .when(col(columna).rlike(r'^\d{4}/\d{2}/\d{2}$'),
                             to_date(col(columna), 'yyyy/MM/dd'))
                       .otherwise(None))

for columna in col_fechas:
    df = df.withColumn(columna, to_date(columna, 'yyyy-MM-dd'))


Pasaremos a utilizar otro filtro que nos ayudara a ordenar los datos. Esto lo haremos con la columna `FECHA_ACTUALIZACION`.

In [None]:
df = df.sort("FECHA_ACTUALIZACION")


Mostraremos la primera entrada y la ultima para confirmar que el Dataframe fue ordenado correctamente.

In [None]:
# df.show(10)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

Dentro de la documentacion https://www.gob.mx/salud/documentos/datos-abiertos-152127 nos especifican que existe una categoria que lo llaman `"SI_NO"`, los cuales incluye:
- 1 = SI
- 2 = NO
- 97 = NO APLICA
- 98 = SE IGNORA
- 99 = NO ESPECIFICADO

Al tratarse de un modelo que se utilizara en un futuro para entrenar un modelo predictivo tenemos que considerar en eliminar los valores diferentes a partir de interpretarlos como otros, es decir las variables '`NO APLICA`', '`SE IGNORA`' y '`NO ESPECIFICADO`' vamos a interpretarlas directamente como '`NO`'; para despues pasar a una convención el cual es trabajar con 0 y 1, lo que pasaremos el 2 a 0.

Revisando la documentacion, columnas que pertenecen a la categoria `"SI_NO"`:
- INTUBADO
- NEUMONIA
- HABLA_LENGUA_INDIG
- INDIGENA
- DIABETES
- EPOC
- ASMA
- INMUSUPR
- HIPERTENSION
- OTRAS_COM
- CARDIOVASCULAR
- OBESIDAD
- RENAL_CRONICA
- TABAQUISMO
- OTRO_CASO
- TOMA_MUESTRA_LAB
- TOMA_MUESTRA_ANTIGENO
- MIGRANTE
- UCI

In [None]:
columnas_modificar = ["INTUBADO",
                    "NEUMONIA",
                    "HABLA_LENGUA_INDIG",
                    "INDIGENA",
                    "DIABETES",
                    "EPOC",
                    "ASMA",
                    "INMUSUPR",
                    "HIPERTENSION",
                    "CARDIOVASCULAR",
                    "OBESIDAD",
                    "RENAL_CRONICA",
                    "TABAQUISMO",
                    "OTRO_CASO",
                    "OTRA_COM",
                    "TOMA_MUESTRA_LAB",
                    "TOMA_MUESTRA_ANTIGENO",
                    "MIGRANTE",
                    "ORIGEN",
                    "EMBARAZO",
                    "UCI"]

for columna in columnas_modificar:
    df = df.withColumn(columna,
                       when(df[columna] == 2, 0)
                       .otherwise(df[columna]))


Tambien necesitamos modificar la variable de '`SEXO`', estos vienen de la siguiente manera:
- 1 = Mujer
- 2 = Hombre
- 99 = No especificado

Los casos de no especificados los pasaremos a `None`.

In [None]:
df = df.withColumnRenamed("SEXO", "SEXO_MUJER")
df = df.withColumn("SEXO_MUJER",
                   when(df["SEXO_MUJER"] == 2, 0)
                   .otherwise(df["SEXO_MUJER"]))


Para evitar confusiones en futuras consultas vamos a especificar más nuestra columna, pasaremos de llamarlo "SEXO" a "SEXO_MUJER" para que cuando el valor sea 1 sepamos que ese paciente era Mujer y si es 0 no es mujer por ende es Hombre.

In [None]:
df = df.withColumnRenamed("SEXO", "SEXO_MUJER")

Haremos lo mismo en las columnas que lo necesiten

In [None]:
df = df.withColumnRenamed("ORIGEN", "USMER")
df = df.withColumn("USMER",
                   when(df["USMER"] == 2, 0)
                   .otherwise(df["USMER"]))

In [None]:
df = df.withColumnRenamed("TIPO_PACIENTE", "HOSPITALIZADO")
df = df.withColumn("HOSPITALIZADO",
                   when(df["HOSPITALIZADO"] == 1, 0)
                   .when(df["HOSPITALIZADO"] == 2, 1)
                   .otherwise(df["HOSPITALIZADO"]))

In [None]:
df = df.withColumnRenamed("NACIONALIDAD", "EXTRANJERA")
df = df.withColumn("EXTRANJERA",
                   when(df["EXTRANJERA"] == 1, 0)
                   .when(df["EXTRANJERA"] == 2, 1)
                   .otherwise(df["EXTRANJERA"]))

Podemos generalizar las demas columnas ya que los valores 97, 98, 99 siempre demuestran un valor nulo

In [None]:
columnas_a_ignorar = ["EDAD","FECHA_ACTUALIZACION",
                      "FECHA_INGRESO","FECHA_SINTOMAS",
                      "FECHA_DEF", "ID_REGISTRO"]

columnas_a_modificar = [elemento for elemento in df.columns
                        if elemento not in columnas_a_ignorar]

# Aplicar el reemplazo para cada columna excepto "edad" y columnas con fechas
for col_name in columnas_a_modificar:
    df = df.withColumn(col_name,
                       when((col(col_name).isin([97, 98, 99])), None)
                       .otherwise(col(col_name)))


In [None]:
df.write.csv("/content/covid_20.csv", header=True)

In [None]:
spark.stop()