In [4]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

##Importando Librarías

In [5]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
from pyspark import SparkConf
from pyspark.sql import SparkSession
import json

spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

##Path y Estructura de archivos .csv

Necesario definir si deseamos ingestas más archivos csv, solo es necesario modificar esto y el proceso estará preparado para más archivos

In [6]:
#Importante si deseamos lecturar más archivos .csv es importante solo configurar este archivo

# Ruta de los archivos CSV
file_paths = ["country_wise_latest.csv",
              "covid_19_clean_complete.csv",
              "day_wise.csv",
              "full_grouped.csv",
              "usa_county_wise.csv",
              "worldometer_data.csv"]

# Definir esquemas para cada archivo CSV
schema = {
    "country_wise_latest.csv": StructType([
        StructField("country_region", StringType(), True),
        StructField("confirmed", IntegerType(), False),
        StructField("deaths", IntegerType(), False),
        StructField("recovered", IntegerType(), False),
        StructField("active", IntegerType(), False),
        StructField("new_cases", IntegerType(), False),
        StructField("new_deaths", IntegerType(), False),
        StructField("new_recovered", IntegerType(), False),
        StructField("deaths_100_cases", DoubleType(), False),
        StructField("recovered_100_cases", DoubleType(), False),
        StructField("deaths_100_recovered", DoubleType(), False),
        StructField("confirmed_last_week", IntegerType(), False),
        StructField("one_week_change", IntegerType(), False),
        StructField("one_week_percentage_increase", DoubleType(), False),
        StructField("who_region", StringType(), True)
    ]),
        "covid_19_clean_complete.csv": StructType([
        StructField("province_state", StringType(), True),
        StructField("country_region", StringType(), True),
        StructField("lat", DoubleType(), True),
        StructField("long", DoubleType(), True),
        StructField("date", DateType(), True),
        StructField("confirmed", IntegerType(), True),
        StructField("deaths", IntegerType(), True),
        StructField("recovered", IntegerType(), True),
        StructField("active", IntegerType(), True),
        StructField("who_region", StringType(), True)
    ]),
    "day_wise.csv": StructType([
        StructField("date", DateType(), True),
        StructField("confirmed", IntegerType(), True),
        StructField("deaths", IntegerType(), True),
        StructField("recovered", IntegerType(), True),
        StructField("active", IntegerType(), True),
        StructField("new_cases", IntegerType(), True),
        StructField("new_deaths", IntegerType(), True),
        StructField("new_recovered", IntegerType(), True),
        StructField("deaths_100_cases", DoubleType(), True),
        StructField("recovered_100_cases", DoubleType(), True),
        StructField("deaths_100_recovered", DoubleType(), True),
        StructField("num_countries", IntegerType(), True)
    ]),
    "full_grouped.csv": StructType([
        StructField("date", DateType(), True),
        StructField("country_region", StringType(), True),
        StructField("confirmed", IntegerType(), True),
        StructField("deaths", IntegerType(), True),
        StructField("recovered", IntegerType(), True),
        StructField("active", IntegerType(), True),
        StructField("new_cases", IntegerType(), True),
        StructField("new_deaths", IntegerType(), True),
        StructField("new_recovered", IntegerType(), True),
        StructField("who_region", StringType(), True)
    ]),
    "usa_county_wise.csv": StructType([
        StructField("uid", StringType(), True),
        StructField("iso2", StringType(), True),
        StructField("iso3", StringType(), True),
        StructField("code3", IntegerType(), True),
        StructField("fips", DoubleType(), True),
        StructField("admin2", StringType(), True),
        StructField("province_state", StringType(), True),
        StructField("country_region", StringType(), True),
        StructField("lat", DoubleType(), True),
        StructField("long", DoubleType(), True),
        StructField("combined_key", StringType(), True),
        StructField("date", DateType(), True),
        StructField("confirmed", IntegerType(), True),
        StructField("deaths", IntegerType(), True)
    ]),
    "worldometer_data.csv": StructType([
        StructField("country_region", StringType(), True),
        StructField("continent", StringType(), True),
        StructField("population", IntegerType(), True),
        StructField("total_cases", IntegerType(), True),
        StructField("new_cases", IntegerType(), True),
        StructField("total_deaths", IntegerType(), True),
        StructField("new_deaths", IntegerType(), True),
        StructField("total_recovered", IntegerType(), True),
        StructField("new_recovered", IntegerType(), True),
        StructField("active_cases", IntegerType(), True),
        StructField("serious_critical", IntegerType(), True),
        StructField("tot_cases_1m_pop", DoubleType(), True),
        StructField("deaths_1m_pop", DoubleType(), True),
        StructField("total_tests", IntegerType(), True),
        StructField("tests_1m_pop", DoubleType(), True),
        StructField("who_region", StringType(), True)
    ])
    # Agrega esquemas para más archivos CSV si es necesario
}

###Job de Ingesta

In [7]:
class csvReader:
    def __init__(self):
        self.spark = SparkSession.builder.getOrCreate()
        self.dfs = {}

    def read_csv_files(self, file_paths, schema):

        #Importante, aquí se usan los Schemas establecidos para cada .Csv
        for path in file_paths:
            df = self.spark.read.csv("data/" + path, schema=schema[path], header=True)
            self.dfs[path] = df
        print("Se aplicaron todos los Schemas descritos en structureSchema.py ")

        print("Ingesta Completada se lecturan todos los archivos .csv")

        return self.dfs

###Job de Limpieza de datos

In [8]:
class dataCleaner:
    def __init__(self):
        self.df = {}

    def clean_data(self, df: DataFrame) -> DataFrame:
        string_columns = [column_name for column_name, data_type in df.dtypes if data_type == 'string']
        # Se realiza limpieza de datos para reemplazar valores nulos por string vacio
        for col_name in string_columns:
            df = df.withColumn(col_name, when(col(col_name).isNull(), "").otherwise(col(col_name)))

        return df

###Job de Escritura y Actualización de datos


In [9]:
class dataProcess:
    def __init__(self):
        self.spark = SparkSession.builder.getOrCreate()
        self.df = {}

    def process_and_write_data(self,path, df: DataFrame) -> DataFrame:
        # Escribir el DataFrame en formato Parquet en modo 'append'
        try:
            if os.path.exists(path):
                # Si el archivo Parquet de salida ya existe, lee los datos existentes como DataFrame
                df_parquet = spark.read.parquet(path)
                # Encuentra las novedades (diferencias) entre los datos CSV y los datos Parquet existentes
                df_novedades = df.subtract(df_parquet)

                # Agrega nuevos registros en formato Parquet
                df_novedades.write.mode("append").parquet(path)
            else:
                df.write.mode("overwrite").parquet(path)

        except Exception as e:
        # Captura cualquier excepción y muestra información de error adicional.
            print("Error al guardar los datos en formato Parquet:")
            print(str(e))

In [12]:
class dataPipeline:
    def __init__(self):

        self.csv_reader = csvReader()
        self.data_cleaner = dataCleaner()
        self.data_processor = dataProcess()
        # self.file_monitor = FileMonitor()

    def run(self):
        # Quitamos el .csv de los nombres de archivos para definir el nombra de salida en parquet
        output_path = [file_path.split(".csv")[0] for file_path in file_paths]

        # Ingesta y guardado en dataframes
        dfs = self.csv_reader.read_csv_files(file_paths, schema)

        #Limpieza de datos usando la clase dataClean para eliminar nulos en campos String y reemplazar por string vacios ''
        df_cleaned = {}
        print('\n################## INICIANDO LIMPIEZA DE DATOS   ##################')
        for dataframe_name in dfs.keys():
            df_cleaned[dataframe_name] = self.data_cleaner.clean_data(dfs[dataframe_name])
            print("Limpieza de Datos completada para el dataframe "+ dataframe_name)


        # Control de Datos y Offset / option Write Append si son nuevos registros / Overwrite si no existe la tabla en parquet
        print('\n################## ACTUALIZANDO DATOS EN PARQUET ##################')
        for dataframe_name in df_cleaned.keys():
            self.data_processor.process_and_write_data("output/"+dataframe_name.split(".csv")[0]+".parquet",df_cleaned[dataframe_name])
            print("Agregando nuevos registros encontrados en " + dataframe_name)


if __name__ == "__main__":

    # Crear una instancia de la clase DataPipeline y ejecutar el flujo de datos
    data_pipeline = dataPipeline()
    data_pipeline.run()

Se aplicaron todos los Schemas descritos en structureSchema.py 
Ingesta Completada se lecturan todos los archivos .csv

################## INICIANDO LIMPIEZA DE DATOS   ##################
Limpieza de Datos completada para el dataframe country_wise_latest.csv
Limpieza de Datos completada para el dataframe covid_19_clean_complete.csv
Limpieza de Datos completada para el dataframe day_wise.csv
Limpieza de Datos completada para el dataframe full_grouped.csv
Limpieza de Datos completada para el dataframe usa_county_wise.csv
Limpieza de Datos completada para el dataframe worldometer_data.csv

################## ACTUALIZANDO DATOS EN PARQUET ##################
Agregando nuevos registros encontrados en country_wise_latest.csv
Agregando nuevos registros encontrados en covid_19_clean_complete.csv
Agregando nuevos registros encontrados en day_wise.csv
Agregando nuevos registros encontrados en full_grouped.csv
Agregando nuevos registros encontrados en usa_county_wise.csv
Agregando nuevos registros