<a href="https://colab.research.google.com/github/bry4/test_pyspark/blob/main/Challenge.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# TESTEADO EN GOOGLE COLAB
# Estructura de archivos en capas en base a proyectos en Databricks
!rm -r sample_data/
!mkdir 00_raw
!mkdir 01_bronze
!mkdir 02_silver
!mkdir 03_gold

In [2]:
# PARA AMBIENTES DE PRUEBA O DESARROLLO - Testeado en Google Colab
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m21.5 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=d7fb4f8e1322964a4360851fcb261dc425b44d2796b751034c449788aee2d226
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [3]:
# Importando librerías a considerar
# Armando Clase con funciones útiles para desarrollar el ETL
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

class sparkChallenge:

  def __init__(self):
    spark = SparkSession.builder.master("local[*]").appName("appChallenge").getOrCreate()
    self._spark = spark
  
  def get_session(self):
    return self._spark

  def read_csv_rdd(self, path:str):

    rdd = self._spark.sparkContext.textFile(path)
    return rdd
  
  def read_csv_df(self, path:str):

    df = self._spark.read.format("csv").option("header","true").load(path)
    return df

  def read_parquet(self, path:str):
    
    df = spark.read.parquet(path)
    return df

  def write_parquet(self, df_input:DataFrame, path:str):
    
    df_input.write.mode("overwrite").parquet(path)


  def clean_column_names(self,df_input:DataFrame)->DataFrame:

    # Limpieza de nombres de columnas por cada caso encontrado (se puede usar regex pero se agregarían más líneas de código)
    for x in df_input.schema.names:
      df_input = df_input.withColumnRenamed(x,x.lower())

    for x in df_input.schema.names:
      df_input = df_input.withColumnRenamed(x,x.replace(" / ","_"))

    for x in df_input.schema.names:
      df_input = df_input.withColumnRenamed(x,x.replace(" ","_"))

    for x in df_input.schema.names:
      df_input = df_input.withColumnRenamed(x,x.replace(".",""))

    for x in df_input.schema.names:
      df_input = df_input.withColumnRenamed(x,x.replace("/","_"))
    
    return df_input

  def find_null_columns(self,df_input:DataFrame, txt_input:str)->list:
    
    list_col = []
    for x in df_input.schema.names:
      count_null = df_input.filter(~col(x).isNotNull()).count() # Contar nulos por cada columna recorrida del dataframe

      if count_null>0:
        list_col.append(x) # Listando columnas con valores nulos por cada Dataframe

    if len(list_col)==0:
      print("No hay columnas con valores nulos en: "+txt_input)
    
    return list_col

  def replace_null_values(self,df_input:DataFrame, list_col:list)->DataFrame:

    if len(list_col)>0:
      
      for x in list_col:
        isnum_df = df_input.filter(df_input[x].rlike('\D+')).count() # Contar la existencia de valores String

        if isnum_df>0: # Reemplazar "Otros" a columnas con una cantidad mayor a Cero de valores String (Columnas con tipo dato String)
          df_input = df_input.withColumn(x,when(df_input[x].isNull(),"Otros").otherwise(df_input[x]))
        
        if isnum_df==0: # Reemplazar 0 a columnas con una cantidad igual a Cero de valores String (Columnas con tipo dato Numerico)
          df_input = df_input.withColumn(x,when(df_input[x].isNull(),0).otherwise(df_input[x]))

    return df_input

  def transform_type(self,df_input:DataFrame, typeTF:str, listVal:list)->DataFrame: # Dar Formato de tipo de datos a tablas según lista de campos o tipo de dato
    if typeTF == "integer":
      for x in listVal:
        df_input = df_input.withColumn(x,col(x).cast("integer"))

    if typeTF == "date":
      for x in listVal:
        df_input = df_input.withColumn(x,to_date(df_input[x]))

    if typeTF == 'decimal':
      for x in listVal:        
        df_input = df_input.withColumn(x,regexp_replace(col(x),' ','').cast('decimal(22,3)'))
              
    if typeTF == 'double':
      for x in listVal:
        df_input = df_input.withColumn(x,regexp_replace(col(x),' ','').cast('double'))

    return df_input



In [4]:
# Instanciando Clase
SC = sparkChallenge()
spark = SC.get_session() # Obteniendo sesion para uso de propiedades particulares de Spark Dataframe

In [5]:
# Cargando dataset en RDD
rdd_country = SC.read_csv_rdd("/content/00_raw/country_wise_latest.csv")
rdd_covid_clean =  SC.read_csv_rdd("/content/00_raw/covid_19_clean_complete.csv")
rdd_day =  SC.read_csv_rdd("/content/00_raw/day_wise.csv")
rdd_full_grouped =  SC.read_csv_rdd("/content/00_raw/full_grouped.csv")
rdd_usa_county =  SC.read_csv_rdd("/content/00_raw/usa_county_wise.csv")
rdd_worldmeter =  SC.read_csv_rdd("/content/00_raw/worldometer_data.csv")

In [6]:
# Calculando cantidad de registros por archivo fuente
rdd_count = [{
    "country":rdd_country.count()-1,
    "covid_clean":rdd_covid_clean.count()-1,
    "day":rdd_day.count()-1,
    "full_grouped":rdd_full_grouped.count()-1,
    "usa_county":rdd_usa_county.count()-1,
    "worldmeter":rdd_worldmeter.count()-1
}]
df_rdd_count = spark.createDataFrame(rdd_count)

SC.write_parquet(df_rdd_count,"/content/01_bronze/rowcount_files.parquet")

In [7]:
# Cargando dataset en Dataframe
df_country = SC.read_csv_df("/content/00_raw/country_wise_latest.csv")
df_covid_clean = SC.read_csv_df("/content/00_raw/covid_19_clean_complete.csv")
df_day = SC.read_csv_df("/content/00_raw/day_wise.csv")
df_full_grouped = SC.read_csv_df("/content/00_raw/full_grouped.csv")
df_usa_county = SC.read_csv_df("/content/00_raw/usa_county_wise.csv")
df_worldmeter = SC.read_csv_df("/content/00_raw/worldometer_data.csv")

# Cargando en formato parquet
SC.write_parquet(df_country,"/content/01_bronze/country_wise_latest.parquet")
SC.write_parquet(df_covid_clean,"/content/01_bronze/covid_19_clean_complete.parquet")
SC.write_parquet(df_day,"/content/01_bronze/day_wise.parquet")
SC.write_parquet(df_full_grouped,"/content/01_bronze/full_grouped.parquet")
SC.write_parquet(df_usa_county,"/content/01_bronze/usa_county_wise.parquet")
SC.write_parquet(df_worldmeter,"/content/01_bronze/worldometer_data.parquet")


In [8]:
# Limpiando nombre de columnas
df_country = SC.clean_column_names(df_country)
df_covid_clean = SC.clean_column_names(df_covid_clean)
df_day = SC.clean_column_names(df_day)
df_full_grouped = SC.clean_column_names(df_full_grouped)
df_usa_county = SC.clean_column_names(df_usa_county)
df_worldmeter = SC.clean_column_names(df_worldmeter)

In [9]:
# Listando columnas nulas para tratarlos más adelante
list_country = SC.find_null_columns(df_country, "country")
list_covid_clean = SC.find_null_columns(df_covid_clean, "covid_clean")
list_day = SC.find_null_columns(df_day, "day")
list_full_grouped = SC.find_null_columns(df_full_grouped, "full_grouped")
list_usa_county = SC.find_null_columns(df_usa_county, "usa_county")
list_worldmeter = SC.find_null_columns(df_worldmeter, "worldmeter")


No hay columnas con valores nulos en: country
No hay columnas con valores nulos en: day
No hay columnas con valores nulos en: full_grouped


In [10]:
# Reemplazando nulos ya sean campos String o Numericos
df_country = SC.replace_null_values(df_country, list_country)
df_covid_clean = SC.replace_null_values(df_covid_clean, list_covid_clean)
df_day = SC.replace_null_values(df_day, list_day)
df_full_grouped = SC.replace_null_values(df_full_grouped, list_full_grouped)
df_usa_county = SC.replace_null_values(df_usa_county, list_usa_county)
df_worldmeter = SC.replace_null_values(df_worldmeter, list_worldmeter)

In [11]:
# CAPA SILVER
# Dandole formato a las tablas para el modelo en la capa Silver
# Se ha modelado en base a contener la mayor cantidad de columnas por entidad aprovechando el uso de parquet

df_covid_clean = df_covid_clean.select(concat(df_covid_clean.country_region,df_covid_clean.province_state). \
                      alias("id_province"),"country_region","province_state","lat","long","date","confirmed","deaths","recovered","active")

group_col = ["id_province","country_region","province_state","lat","long"]
# Tabla province_covid
df_province_covid = df_covid_clean.groupBy(group_col). \
                                  agg(sum("confirmed").alias("confirmed"), \
                                      sum("deaths").alias("deaths"), \
                                      sum("recovered").alias("recovered"), \
                                      sum("active").alias("active"))
# Dando formato de tipo datos
list1_double = ["lat","long"]
list1_int = ["confirmed","deaths","recovered","active"]
df_province_covid = SC.transform_type(df_province_covid,"double",list1_double)

SC.write_parquet(df_province_covid,"/content/02_silver/province_covid.parquet")

# Tabla daily_province_covid
df_daily_province_covid = df_covid_clean["id_province","date","confirmed","deaths","recovered","active"]

list2_time = ["date"]
list2_double = ["confirmed","deaths","recovered","active"]
df_daily_province_covid = SC.transform_type(df_daily_province_covid,"date",list2_time)
df_daily_province_covid = SC.transform_type(df_daily_province_covid,"double",list2_double)

SC.write_parquet(df_daily_province_covid,"/content/02_silver/daily_province_covid.parquet")

# Tabla daily_covid
df_daily_covid = df_day["date","confirmed","deaths","recovered","active"]

list3_time = ["date"]
list3_double = ["confirmed","deaths","recovered","active"]
df_daily_covid = SC.transform_type(df_daily_covid,"date",list3_time)
df_daily_covid = SC.transform_type(df_daily_covid,"double",list3_double)

SC.write_parquet(df_daily_covid,"/content/02_silver/daily_covid.parquet")

# Tabla country
df_country_world = df_worldmeter["who_region","continent","country_region","population"]

list4_int = ["population"]
df_country_world = SC.transform_type(df_country_world,"integer",list4_int)

SC.write_parquet(df_country_world,"/content/02_silver/country.parquet")



In [12]:
# CAPA GOLD
# Para el caso de reportería tendría que ser un modelo más simple (ESTRELLA, TABLÓN) para su efectivo consumo
# En este caso sacamos todos los datos unidos en una tabla a nivel Fecha

df_province_covid = SC.read_parquet("/content/02_silver/province_covid.parquet")
df_daily_province_covid = SC.read_parquet("/content/02_silver/daily_province_covid.parquet")
df_daily_covid = SC.read_parquet("/content/02_silver/daily_covid.parquet")
df_country = SC.read_parquet("/content/02_silver/country.parquet")

df_country = df_country.withColumnRenamed("country_region","country_reg")

df_f1 = df_province_covid.join(df_country, df_province_covid.country_region==df_country.country_reg,"left")
df_f2 = df_f1["id_province","who_region","continent","country_region","population","province_state","lat","long"]
df_f2 = df_f2.withColumnRenamed("id_province","id_prov")

df_final = df_daily_province_covid.join( \
    df_f2, \
    df_daily_province_covid.id_province==df_f2.id_prov,"left")

df_final = df_final["id_province","who_region","continent","country_region","population","province_state","lat","long","date","confirmed","deaths","recovered","active"]

SC.write_parquet(df_final,"/content/03_gold/report.parquet")
