In [1]:
#Instalacion spark
!pip install pyspark



In [25]:
from google.colab import drive
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row
from datetime import *
import pandas as pd
import os
import re
import shutil

In [3]:
#Montando la ruta de google drive para el almacenamiento de los archivos
drive.mount('/content/drive')
spark = SparkSession.builder.master("local[1]").appName("ChallengeN5").getOrCreate()
sc=spark.sparkContext

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [40]:
#Clase que maneja todas las operaciones para el cargado de los archivos.
class Challenge:
  def __init__(self,ruta_origen,ruta_raw,ruta_curated,fec_proceso,tipo_carga):
    self.ruta_origen=ruta_origen
    self.ruta_raw=ruta_raw
    self.ruta_curated=ruta_curated
    self.fec_proceso = fec_proceso
    self.tipo_carga = tipo_carga
  def valida_archivos_carga(self):
    with os.scandir(self.ruta_origen) as ficheros:
      lst_ficheros=[fichero.name for fichero in ficheros if fichero.is_file()]
    return lst_ficheros
  #Carga de archivos en carpeta raw en formato parquet, despues de pasarlo a parquet lo mueve a una ruta de procesado.
  def cargaraw(self,nombrearchivo):
    df_archivo=pd.read_csv(self.ruta_origen+nombrearchivo)
    df_archivo.to_parquet(self.ruta_raw+nombrearchivo.replace('csv','parquet'))
    shutil.move(self.ruta_origen+nombrearchivo,f'{self.ruta_origen}procesado/{nombrearchivo.replace(".csv",str(self.fec_proceso))}.csv')

  #Lee los archivos guardados en formato parquet.
  def readfile(self,nomfile):
    df_archivo=spark.read.parquet(self.ruta_raw+nomfile.replace('csv','parquet'))
    return df_archivo

  #Procesa la informacion cargada y genera un archivo resumen.
  def proceso_data(self,df_full_grouped , df_covid_clean):
    dff_full_grouped= df_full_grouped.withColumnsRenamed({'Date':'date_group','Country/Region':'country_group','WHO Region':'region','New cases':'new_cases','New deaths':'new_deaths','New recovered':'new_recovered'}) \
    .select('date_group','region','country_group','Confirmed','Deaths','Recovered','Active','new_cases','new_deaths','new_recovered')

    dff_covid_clean=df_covid_clean.withColumnsRenamed({'Province/State':'province','Lat':'latitude','Long':'Longitude','Date':'date_covid','Country/Region':'country_covid'}) \
    .select('date_covid','province','latitude','longitude','country_covid')

    df_resumen = dff_covid_clean.join(dff_full_grouped,(dff_covid_clean.date_covid==dff_full_grouped.date_group) & (dff_covid_clean.country_covid ==dff_full_grouped.country_group)) \
    .select(col('date_group').alias('date'),'region',col('country_group').alias('country'),'province','latitude','longitude','Confirmed','Deaths','Recovered','Active' \
            ,'new_cases','new_deaths','new_recovered')

    dff_resumen = df_resumen.withColumn('Confirmed',df_resumen.Confirmed.cast(IntegerType())) \
    .withColumn('Deaths',df_resumen.Deaths.cast(IntegerType())) \
    .withColumn('Recovered',df_resumen.Recovered.cast(IntegerType())) \
    .withColumn('Active',df_resumen.Active.cast(IntegerType())) \
    .withColumn('new_cases',df_resumen.new_cases.cast(IntegerType())) \
    .withColumn('new_deaths',df_resumen.new_deaths.cast(IntegerType())) \
    .withColumn('new_recovered',df_resumen.new_recovered.cast(IntegerType())) \
    .withColumn('date',df_resumen.date.cast(DateType())) \
    .withColumn('año',date_format(col('date'),'y')) \
    .withColumn("mes",date_format(col('date'),'MM')) \
    .select('date','año','mes','region','country',col('province').alias('state'),'latitude','longitude' \
            ,'Confirmed','Deaths','Recovered','Active','new_cases','new_deaths','new_recovered')

    return dff_resumen
  #Funcion que guarda el archivo resumen en formato parquet particionado
  def writefile(self,df_resul,nomfile):
    df_resul.write.partitionBy('año','mes').parquet(self.ruta_curated+nomfile.replace('csv','parquet'))

In [41]:
#Instancia la clase challenge y ejecuta la carga de los archivos.
if __name__ == "__main__":
  ruta_archivo = '/content/drive/MyDrive/Test_N5/archivos/'
  ruta_raw = '/content/drive/MyDrive/Test_N5/Raw/'
  ruta_curated = '/content/drive/MyDrive/Test_N5/Curated/'
  tipo_carga = 'Full' #Full
  fec_proceso = date.today()
  try:
    #Creacion del objeto de la clase Challenge
    file=Challenge(ruta_archivo,ruta_raw,ruta_curated,fec_proceso,tipo_carga)
    #Valida que existan archivos para cargar.
    val_file = file.valida_archivos_carga()
    if val_file:
      for x in range(len(val_file)):
        file.cargaraw(val_file[x])
    else:
      raise FileNotFoundError

    #Lectura de archivos en formato parquet.
    df_covid19_clean=file.readfile('covid_19_clean_complete.parquet')
    df_country_wise_latest=file.readfile('country_wise_latest.parquet')
    df_grouped=file.readfile('full_grouped.parquet')

    #Proceso de generacion de archivo resumen.
    df_resultado = file.proceso_data(df_grouped,df_covid19_clean)

    #Guardado de resultado de archivo resumen particionado por año y mes.
    nomfile = f'resultado_covid_{fec_proceso}.parquet'
    file.writefile(df_resultado,nomfile)

  except FileNotFoundError:
    print(f'No se encuentran archivos para cargar en la ruta {ruta_archivo}')