# COVID 19 - Dataset

#### N5 Now Challenge

##### Objetivos
1. Cargar los datasets utilizando Spark y mantenerlos en formato parquet
1. Cargar los datasets utilizando Pandas y mantenerlos en formato parquet.
1. Desarrollar un diagrama DER del modelado de datos.

##### Librerias

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import pyspark.sql.functions as F
from pathlib import Path

##### Clases

In [0]:
class FileInfo:

    def __init__(self,path,full_name,name,schema):
        self.path = path
        self.full_name = full_name
        self.name = name
        self.schema = schema

##### Esquemas

In [0]:
country_wise_latest_schema = StructType([StructField("Country",StringType(),True),
                                         StructField("Confirmed",IntegerType(),True),
                                         StructField("Deaths",IntegerType(),True),
                                         StructField("Recovered",IntegerType(),True),
                                         StructField("Active",IntegerType(),True),
                                         StructField("New_Cases",IntegerType(),True),
                                         StructField("New_Deaths",IntegerType(),True),
                                         StructField("New_Recovered",IntegerType(),True),
                                         StructField("Deaths_100_Cases",StringType(),True),
                                         StructField("Recovered_100_Cases",StringType(),True),
                                         StructField("Deaths_100_Recovered",StringType(),True),
                                         StructField("Confirmed_Last_Week",IntegerType(),True),
                                         StructField("One_Week_Change",IntegerType(),True),
                                         StructField("One_Week_Percentage_Increase",StringType(),True),
                                         StructField("WHO_Region",StringType(),True)])

covid_19_clean_complete_schema = StructType([StructField("Province",StringType(),True),
                                             StructField("Country",StringType(),True),
                                             StructField("Lat",StringType(),True),
                                             StructField("long",StringType(),True),
                                             StructField("Date",StringType(),True),
                                             StructField("Confirmed",IntegerType(),True),
                                             StructField("Deaths",IntegerType(),True),
                                             StructField("Recovered",IntegerType(),True),
                                             StructField("Active",IntegerType(),True),
                                             StructField("WHO_Region",StringType(),True)])

day_wise_schema = StructType([StructField("Date",StringType(),True),
                              StructField("Confirmed",IntegerType(),True),
                              StructField("Deaths",IntegerType(),True),
                              StructField("Recovered",IntegerType(),True),
                              StructField("Active",IntegerType(),True),
                              StructField("New_Cases",IntegerType(),True),
                              StructField("New_Deaths",IntegerType(),True),
                              StructField("New_Recovered",IntegerType(),True),
                              StructField("Deaths_100_Cases",StringType(),True),
                              StructField("Recovered_100_Cases",StringType(),True),
                              StructField("Deaths_100_Recovered",StringType(),True),
                              StructField("No_Countries",IntegerType(),True)])       

full_grouped_schema = StructType([StructField("Date",StringType(),True),
                                  StructField("Country",StringType(),True),
                                  StructField("Confirmed",IntegerType(),True),
                                  StructField("Deaths",IntegerType(),True),
                                  StructField("Recovered",IntegerType(),True),
                                  StructField("Active",IntegerType(),True),
                                  StructField("New_Cases",IntegerType(),True),
                                  StructField("New_Deaths",IntegerType(),True),
                                  StructField("New_Recovered",IntegerType(),True),
                                  StructField("WHO_Region",StringType(),True)])

usa_county_wise_schema = StructType([StructField("UID",StringType(),True),
                                     StructField("ISO2",StringType(),True),
                                     StructField("ISO3",StringType(),True),
                                     StructField("Code3",StringType(),True),
                                     StructField("FIPS",StringType(),True),
                                     StructField("Admin2",StringType(),True),
                                     StructField("Province_State",StringType(),True),
                                     StructField("Country_Region",StringType(),True),
                                     StructField("Lat",StringType(),True),
                                     StructField("Long",StringType(),True),
                                     StructField("Combined_Key",StringType(),True),
                                     StructField("Date",StringType(),True),
                                     StructField("Confirmed",StringType(),True),
                                     StructField("Deaths",StringType(),True)])

worldometer_data_schema = StructType([StructField("Country",StringType(),True),
                                      StructField("Continent",StringType(),True),
                                      StructField("Population",IntegerType(),True),
                                      StructField("TotalCases",IntegerType(),True),
                                      StructField("NewCases",IntegerType(),True),
                                      StructField("TotalDeaths",IntegerType(),True),
                                      StructField("NewDeaths",IntegerType(),True),
                                      StructField("TotalRecovered",IntegerType(),True),
                                      StructField("NewRecovered",IntegerType(),True),
                                      StructField("ActiveCases",IntegerType(),True),
                                      StructField("Critical",IntegerType(),True),
                                      StructField("TotCases1M",IntegerType(),True),
                                      StructField("Deaths1M",IntegerType(),True),
                                      StructField("TotalTests",IntegerType(),True),
                                      StructField("Tests1M",IntegerType(),True),
                                      StructField("WHO_Region",StringType(),True)])                                                               

##### Funciones

In [0]:
'''
Carga metadatos de archivos CSV

:param files: list -> listado rutas de archivos
:return: lista metadatos de tipo FileInfo
'''
def load_file_info(files):
    filesInfo = []
    for fl in files:
        filesInfo.append(FileInfo(fl,Path(fl).name,Path(fl).stem,str(Path(fl).stem + '_schema')))
    return filesInfo

'''
Carga archivos en Dataframe mapeando cada una de sus columnas

:param fl: FileInfo -> objeto metadatos del archivo
:return: dataframe resultado
'''
def load_csv_files(fl):
    if fl.name == 'country_wise_latest':
        df = (spark
          .read
          .option("header",True)
          .schema(country_wise_latest_schema)
          .csv(fl.path))
    elif fl.name == 'covid_19_clean_complete':
        df = (spark
          .read
          .option("header",True)
          .schema(covid_19_clean_complete_schema)
          .csv(fl.path))
    elif fl.name == 'day_wise':
        df = (spark
          .read
          .option("header",True)
          .schema(day_wise_schema)
          .csv(fl.path))
    elif fl.name == 'full_grouped':
        df = (spark
          .read
          .option("header",True)
          .schema(full_grouped_schema)
          .csv(fl.path))
    elif fl.name == 'usa_county_wise':
        df = (spark
          .read
          .option("header",True)
          .schema(usa_county_wise_schema)
          .csv(fl.path))
    elif fl.name == 'worldometer_data':
        df = (spark
          .read
          .option("header",True)
          .schema(worldometer_data_schema)
          .csv(fl.path))
    return df

'''
Almacena archivos parquet en repositorio de informacion

:param df: Dataframe -> dataframe datos de archivo
:param fl: objeto metadatos del archivo
'''
def save_parquet_files(df_write,fl):
    full_path = f'dbfs:/tmp/challenge/{fl.name}'
    df_write.write.mode("overwrite").parquet(full_path)

'''
Carga archivos en Dataframe de Pandas

:param fl: objeto metadatos del archivo
:return: dataframe resultado
'''
def read_pandas(fl):
    df_pandas = spark.read.format('csv').options(header='true').load(fl.path).toPandas()
    return df_pandas

'''
Escribe dataframes de Pandas en archivos parquet

:param df: Dataframe -> dataframe datos de archivo
:param fl: objeto metadatos del archivo
'''
def save_parquet_pandas(df_write,fl):
    full_path = f'dbfs:/tmp/challenge_pandas/{fl.name}.parquet'
    df_write.to_parquet(full_path)

##### Metodo Principal

In [0]:
if __name__ == '__main__':
    # listado rutas de archivos
    file_path_lst = ['dbfs:/FileStore/tables/country_wise_latest.csv',
                     'dbfs:/FileStore/tables/covid_19_clean_complete.csv',
                     'dbfs:/FileStore/tables/day_wise.csv',
                     'dbfs:/FileStore/tables/full_grouped.csv',
                     'dbfs:/FileStore/tables/usa_county_wise.csv',
                     'dbfs:/FileStore/tables/worldometer_data.csv']
    
    filesInfo = load_file_info(file_path_lst)
    for fl in filesInfo:
        df = load_csv_files(fl)
        save_parquet_files(df,fl)
        df_pandas = read_pandas(fl)
        save_parquet_pandas(df_pandas,fl)