# **Data Processing using Pyspark and Google Colab.**

# **1. Configuraciones iniciales**

In [1]:
# Configuración en google colab de spark y pyspark
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
# Eliminamos spark para asegurar que todo quede bien luego.
!rm -f spark-3.5.3-bin-hadoop3.tgz

In [3]:
# Instalamos Java y Spark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

In [4]:
# Descargamos Spark.
!wget -q https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz

In [5]:
# Descomprimimos el archivo.
!tar xf spark-3.5.3-bin-hadoop3.tgz

In [6]:
# Instalamos Spark.
!pip install -q findspark

In [7]:
# Importamos las variables de entorno de Java y Spark.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"

In [8]:
# Importamos e iniciamos sesión en Spark.
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [9]:
spark

In [10]:
sc

In [11]:
%cd /content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket

/content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket


In [12]:
!pwd

/content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket


In [13]:
%ll

total 12
drwx------ 2 root 4096 Nov 22 22:40 [0m[01;34mraw[0m/
drwx------ 2 root 4096 Nov 22 22:40 [01;34mrefined[0m/
drwx------ 2 root 4096 Nov 22 22:40 [01;34mtrusted[0m/


In [14]:
spark.sql("SHOW TABLES").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



In [15]:
# Leer los tres archivos CSV y combinarlos
csv_files = [
    "/content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket/raw/basededatos.csv",
    "/content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket/raw/covid_api.csv",
    "/content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket/raw/Covid19-Colombia.csv"
]

In [16]:
df_covid_init = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(csv_files)

In [17]:
# Registrar la tabla en Spark SQL
df_covid_init.createOrReplaceTempView("covid19")

In [18]:
# Consultar los datos desde SQL
spark.sql("SELECT * FROM covid19").show(10)

+-------------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+---------+-------------------+---------------+----------+---------------------------+-------------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|  fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|   Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|    Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-------------------+----------+---------------------+----------------------------+-------------------+-------------------------+-----------

In [19]:
# Forma del dataset. (# Filas, # Colunas)
print((df_covid_init.count(),len(df_covid_init.columns)))

(6391997, 23)


In [20]:
df_covid_init.printSchema()

root
 |-- fecha reporte web: timestamp (nullable = true)
 |-- ID de caso: integer (nullable = true)
 |-- Fecha de notificación: timestamp (nullable = true)
 |-- Código DIVIPOLA departamento: integer (nullable = true)
 |-- Nombre departamento: string (nullable = true)
 |-- Código DIVIPOLA municipio: integer (nullable = true)
 |-- Nombre municipio: string (nullable = true)
 |-- Edad: integer (nullable = true)
 |-- Unidad de medida de edad: integer (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Tipo de contagio: string (nullable = true)
 |-- Ubicación del caso: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- Código ISO del país: integer (nullable = true)
 |-- Nombre del país: string (nullable = true)
 |-- Recuperado: string (nullable = true)
 |-- Fecha de inicio de síntomas: timestamp (nullable = true)
 |-- Fecha de muerte: timestamp (nullable = true)
 |-- Fecha de diagnóstico: timestamp (nullable = true)
 |-- Fecha de recuperación: timestamp (nullable = tr

In [21]:
df_covid_init.columns

['fecha reporte web',
 'ID de caso',
 'Fecha de notificación',
 'Código DIVIPOLA departamento',
 'Nombre departamento',
 'Código DIVIPOLA municipio',
 'Nombre municipio',
 'Edad',
 'Unidad de medida de edad',
 'Sexo',
 'Tipo de contagio',
 'Ubicación del caso',
 'Estado',
 'Código ISO del país',
 'Nombre del país',
 'Recuperado',
 'Fecha de inicio de síntomas',
 'Fecha de muerte',
 'Fecha de diagnóstico',
 'Fecha de recuperación',
 'Tipo de recuperación',
 'Pertenencia étnica',
 'Nombre del grupo étnico']

In [22]:
# Eliminación de columnas que no se van a usar.
df_eliminacion_columnas = df_covid_init.drop( 'fecha reporte web',\
                                              'ID de caso',\
                                              'Código DIVIPOLA departamento',\
                                              'Código DIVIPOLA municipio',\
                                              'Unidad de medida de edad',\
                                              'Código ISO del país',\
                                              'Fecha de diagnóstico',\
                                              'Tipo de recuperación',\
                                              'Pertenencia étnica',\
                                              'Nombre del grupo étnico')

In [23]:
# Forma del dataset. (# Filas, # Colunas)
print((df_eliminacion_columnas.count(),len(df_eliminacion_columnas.columns)))

(6391997, 13)


In [24]:
df_eliminacion_columnas.columns

['Fecha de notificación',
 'Nombre departamento',
 'Nombre municipio',
 'Edad',
 'Sexo',
 'Tipo de contagio',
 'Ubicación del caso',
 'Estado',
 'Nombre del país',
 'Recuperado',
 'Fecha de inicio de síntomas',
 'Fecha de muerte',
 'Fecha de recuperación']

In [25]:
df_cambio_columnas = df_eliminacion_columnas.withColumnRenamed('Fecha de notificación','FechaNotificacion')\
                                            .withColumnRenamed('Nombre departamento','Departamento')\
                                            .withColumnRenamed('Nombre municipio','Ciudad')\
                                            .withColumnRenamed('Tipo de contagio','TipoContagio')\
                                            .withColumnRenamed('Ubicación del caso','UbicacionPaciente')\
                                            .withColumnRenamed('Estado','EstadoPaciente')\
                                            .withColumnRenamed('Nombre del país','PaisProcedencia')\
                                            .withColumnRenamed('Recuperado','PacienteRecuperado')\
                                            .withColumnRenamed('Fecha de inicio de síntomas','FechaInicioSintomas')\
                                            .withColumnRenamed('Fecha de muerte','FechaMuerte')\
                                            .withColumnRenamed('Fecha de recuperación','FechaRecuperacion')

In [26]:
df_cambio_columnas.columns

['FechaNotificacion',
 'Departamento',
 'Ciudad',
 'Edad',
 'Sexo',
 'TipoContagio',
 'UbicacionPaciente',
 'EstadoPaciente',
 'PaisProcedencia',
 'PacienteRecuperado',
 'FechaInicioSintomas',
 'FechaMuerte',
 'FechaRecuperacion']

In [27]:
from pyspark.sql.functions import col, when, upper, date_format

In [28]:
# CAMBIAMOS EL FORMATO DE LAS FECHAS.

df_cambio_columnas = df_cambio_columnas.withColumn('FechaNotificacion',date_format(col('FechaNotificacion'), 'yyyy-MM-dd'))\
                                        .withColumn('FechaInicioSintomas',date_format(col('FechaInicioSintomas'), 'yyyy-MM-dd'))\
                                        .withColumn('FechaMuerte',date_format(col('FechaMuerte'), 'yyyy-MM-dd'))\
                                        .withColumn('FechaRecuperacion',date_format(col('FechaRecuperacion'), 'yyyy-MM-dd'))

In [29]:
# MODIFICACIÓN DE ALGUNAS ENTRADAS DE LAS COLUMNAS PARA QUE LAS 'N/A' Y NULL QUEDEN COMO 'NO REGISTRA'.
df_cambio_columnas = df_cambio_columnas.withColumn('UbicacionPaciente', when(col('UbicacionPaciente')=='N/A', 'NO REGISTRA').otherwise(col('UbicacionPaciente')))\
                                        .withColumn('EstadoPaciente', when(col('EstadoPaciente')=='N/A', 'NO REGISTRA').otherwise(col('EstadoPaciente')))\
                                        .withColumn('PaisProcedencia', when(col('PaisProcedencia').isNull(), 'COLOMBIA').otherwise(col('PaisProcedencia')))\
                                        .withColumn('PacienteRecuperado', when(col('PacienteRecuperado')=='N/A', 'NO REGISTRA').otherwise(col('PacienteRecuperado')))\
                                        .withColumn('FechaMuerte', when(col('FechaMuerte').isNull(), 'RECUPERADO').otherwise(col('FechaMuerte')))\
                                        .withColumn('FechaRecuperacion', when(col('FechaRecuperacion').isNull(), 'FALLECIDO').otherwise(col('FechaRecuperacion')))

In [30]:
# MODIFICACIÓN DE ALGUNAS COLUMNAS PARA HACER LA UNIFICACIÓN DE ENTRADAS A MAYÚSCULAS.
df_cambio_columnas = df_cambio_columnas.withColumn('Departamento',upper(col('Departamento')))\
                                        .withColumn('Ciudad',upper(col('Ciudad')))\
                                        .withColumn('Sexo',upper(col('Sexo')))\
                                        .withColumn('TipoContagio',upper(col('TipoContagio')))\
                                        .withColumn('UbicacionPaciente',upper(col('UbicacionPaciente')))\
                                        .withColumn('EstadoPaciente',upper(col('EstadoPaciente')))\
                                        .withColumn('PaisProcedencia',upper(col('PaisProcedencia')))\
                                        .withColumn('PacienteRecuperado',upper(col('PacienteRecuperado')))\
                                        .withColumn('FechaMuerte',upper(col('FechaMuerte')))\
                                        .withColumn('FechaRecuperacion',upper(col('FechaRecuperacion')))

In [49]:
df_cambio_columnas.show(10)

+-----------------+------------+------+----+----+------------+-----------------+--------------+---------------+------------------+-------------------+-----------+-----------------+
|FechaNotificacion|Departamento|Ciudad|Edad|Sexo|TipoContagio|UbicacionPaciente|EstadoPaciente|PaisProcedencia|PacienteRecuperado|FechaInicioSintomas|FechaMuerte|FechaRecuperacion|
+-----------------+------------+------+----+----+------------+-----------------+--------------+---------------+------------------+-------------------+-----------+-----------------+
|       2020-12-22|       VALLE|  CALI|  67|   F| COMUNITARIA|             CASA|          LEVE|       COLOMBIA|        RECUPERADO|         2020-12-21| RECUPERADO|       2021-01-04|
|       2020-12-19|       VALLE|  CALI|  66|   F| COMUNITARIA|             CASA|          LEVE|       COLOMBIA|        RECUPERADO|         2020-12-07| RECUPERADO|       2020-12-25|
|       2020-12-19|       VALLE|  CALI|  68|   F| COMUNITARIA|             CASA|          LEVE|

In [47]:
# CREAMOS LOS DATAFRAMES PARA CADA COLUMNA.
# 'FechaNotificacion'
dataframe_FechaNotificacion = df_cambio_columnas.groupBy('FechaNotificacion').count()
# 'Departamento'
dataframe_Departamento = df_cambio_columnas.groupBy('Departamento').count()
# 'Ciudad'
dataframe_Ciudad = df_cambio_columnas.groupBy('Ciudad').count()
# 'Edad'
dataframe_Edad = df_cambio_columnas.groupBy('Edad').count()
# 'Sexo'
dataframe_Sexo = df_cambio_columnas.groupBy('Sexo').count()
# 'TipoContagio'
dataframe_TipoContagio = df_cambio_columnas.groupBy('TipoContagio').count()
# 'UbicacionPaciente'
dataframe_UbicacionPaciente = df_cambio_columnas.groupBy('UbicacionPaciente').count()
# 'EstadoPaciente'
dataframe_EstadoPaciente = df_cambio_columnas.groupBy('EstadoPaciente').count()
# 'PaisProcedencia'
dataframe_PaisProcedencia = df_cambio_columnas.groupBy('PaisProcedencia').count()
# 'PacienteRecuperado'
dataframe_PacienteRecuperado = df_cambio_columnas.groupBy('PacienteRecuperado').count()
# 'FechaInicioSintomas'
dataframe_FechaInicioSintomas = df_cambio_columnas.groupBy('FechaInicioSintomas').count()
# 'FechaMuerte'
dataframe_FechaMuerte = df_cambio_columnas.groupBy('FechaMuerte').count()
# 'FechaRecuperacion'
dataframe_FechaRecuperacion = df_cambio_columnas.groupBy('FechaRecuperacion').count()

# **SALVAMOS LOS DATAFRAMES.**

In [67]:
!pwd

/content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket


In [68]:
# Directorios para guardar los dataframes.
# Trusted.
write_uri_DataFrame_Procesado = "/content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket/trusted/DataFrame_Procesado"

# Refined.
write_uri_FechaNotificacion = "/content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket/refined/FechaNotificacion"
write_uri_Departamento = "/content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket/refined/Departamento"
write_uri_Ciudad = "/content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket/refined/Ciudad"
write_uri_Edad = "/content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket/refined/Edad"
write_uri_Sexo = "/content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket/refined/Sexo"
write_uri_TipoContagio = "/content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket/refined/TipoContagio"
write_uri_UbicacionPaciente = "/content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket/refined/UbicacionPaciente"
write_uri_EstadoPaciente = "/content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket/refined/EstadoPaciente"
write_uri_PaisProcedencia = "/content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket/refined/PaisProcedencia"
write_uri_PacienteRecuperado = "/content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket/refined/PacienteRecuperado"
write_uri_FechaInicioSintomas = "/content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket/refined/FechaInicioSintomas"
write_uri_FechaMuerte = "/content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket/refined/FechaMuerte"
write_uri_FechaRecuperacion = "/content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket/refined/FechaRecuperacion"

In [69]:
# GUARDAMOS LOS DATAFRAMES EN UN CSV
# DataFrame Completo.
df_cambio_columnas.coalesce(1).write.format("csv").option("header","true").save(write_uri_DataFrame_Procesado)

# 'FechaNotificacion'
dataframe_FechaNotificacion.coalesce(1).write.format("csv").option("header","true").save(write_uri_FechaNotificacion)
# 'Departamento'
dataframe_Departamento.coalesce(1).write.format("csv").option("header","true").save(write_uri_Departamento)
# 'Ciudad'
dataframe_Ciudad.coalesce(1).write.format("csv").option("header","true").save(write_uri_Ciudad)
# 'Edad'
dataframe_Edad.coalesce(1).write.format("csv").option("header","true").save(write_uri_Edad)
# 'Sexo'
dataframe_Sexo.coalesce(1).write.format("csv").option("header","true").save(write_uri_Sexo)
# 'TipoContagio'
dataframe_TipoContagio.coalesce(1).write.format("csv").option("header","true").save(write_uri_TipoContagio)
# 'UbicacionPaciente'
dataframe_UbicacionPaciente.coalesce(1).write.format("csv").option("header","true").save(write_uri_UbicacionPaciente)
# 'EstadoPaciente'
dataframe_EstadoPaciente.coalesce(1).write.format("csv").option("header","true").save(write_uri_EstadoPaciente)
# 'PaisProcedencia'
dataframe_PaisProcedencia.coalesce(1).write.format("csv").option("header","true").save(write_uri_PaisProcedencia)
# 'PacienteRecuperado'
dataframe_PacienteRecuperado.coalesce(1).write.format("csv").option("header","true").save(write_uri_PacienteRecuperado)
# 'FechaInicioSintomas'
dataframe_FechaInicioSintomas.coalesce(1).write.format("csv").option("header","true").save(write_uri_FechaInicioSintomas)
# 'FechaMuerte'
dataframe_FechaMuerte.coalesce(1).write.format("csv").option("header","true").save(write_uri_FechaMuerte)
# 'FechaRecuperacion'
dataframe_FechaRecuperacion.coalesce(1).write.format("csv").option("header","true").save(write_uri_FechaRecuperacion)

In [70]:
%cd refined

/content/gdrive/MyDrive/Semestre_2024.2/04._Tópicos_Espec_en_Telemática/Proyecto3/bucket/refined


In [71]:
%ls

[0m[01;34mCiudad[0m/        [01;34mEstadoPaciente[0m/       [01;34mFechaNotificacion[0m/   [01;34mPaisProcedencia[0m/  [01;34mUbicacionPaciente[0m/
[01;34mDepartamento[0m/  [01;34mFechaInicioSintomas[0m/  [01;34mFechaRecuperacion[0m/   [01;34mSexo[0m/
[01;34mEdad[0m/          [01;34mFechaMuerte[0m/          [01;34mPacienteRecuperado[0m/  [01;34mTipoContagio[0m/
