In [81]:
from pyspark.sql import SparkSession, SQLContext
import os
import shutil
import io
# to download data
import wget
# storing and analysing data
import pandas as pd
from pyspark.sql.types import StructType,StructField, StringType, IntegerType


class DownloadData():
    
    def __init__(self, file, folder_csv, folder_source, folder_parquet):
        self.file           = file
        self.folder_csv     = folder_csv
        self.folder_source  = folder_source
        self.folder_parquet = folder_parquet

    
    def delete_file_csv(self):
        if len(os.listdir(self.folder_csv)) > 0:
            for file in os.listdir(self.folder_csv):
                if file[0] != ".":
                    os.remove(os.path.join(self.folder_csv, file))
        return "Delete File CSV"


    def delete_file_parquet(self):
        if len(os.listdir(self.folder_parquet)) > 0:
            for file in os.listdir(self.folder_parquet):
                if file[0] != ".":
                    shutil.rmtree(os.path.join(self.folder_parquet, file))
        return "Delete File Parquet"
        

    def download_file(self):
        # download files
        for url in self.file:
            filename = wget.download(url)
            os.replace(folder_source+filename, folder_csv+filename)
        return "Download file"



class TransformFile(DownloadData):


    def __init__(self,spark_session, folder_parquet=None,folder_csv=None, folder_source=None, file=None):
        super().__init__(file, folder_csv, folder_source, folder_parquet)
        self.spark_session   = spark_session


    def csv_to_parquet(self):
        try:
            list_name = []
            if len(os.listdir(folder_csv)) > 0:
                for file in os.listdir(folder_csv):
                    if file[0] != ".":
                        name_folder_parquet = file.split(".")
                        list_name.append(name_folder_parquet[0])
                        pandas_df = pd.read_csv(folder_csv + file)
                        # Convierte el DataFrame de pandas a un DataFrame de Spark
                        spark_df = self.spark_session.createDataFrame(pandas_df)
                        # Llena los valores nulos con un valor predeterminado (ajusta según tus necesidades)
                        spark_df = spark_df.fillna("Null")
                        # Guarda el DataFrame como un archivo Parquet en el directorio especificado
                        spark_df.write.mode("append").parquet(folder_parquet+name_folder_parquet[0])
            return list_name
        except Exception as e:
            return f"Error {e}"



class ChangeColumnType():

    def __init__(self, folder_parquet, list_name):
        self.folder_parquet = folder_parquet
        self.list_name      = list_name
           

    def update_parquet(self):
        try:
            for folders in range(len(self.list_name)):
                for file in os.listdir(self.folder_parquet+self.list_name[folders]):
                    if file[0] != "." and file[0] != "_":
                        df_parquet = pd.read_parquet(self.folder_parquet+self.list_name[folders]+"/"+file)
                        columns_int = df_parquet.select_dtypes(include=['int', 'float']).columns
                        df_parquet[columns_int] = df_parquet[columns_int].astype(object)
                        df_parquet.to_parquet(self.folder_parquet+self.list_name[folders]+"/"+file, index=True, compression="snappy", engine='auto')
                        df_parquet = pd.read_parquet(self.folder_parquet+self.list_name[folders]+"/"+file)
            return df_parquet.dtypes
        except Exception as e:
            return print(f"Error al leer el archivo Parquet: {e}")

                



In [82]:
if __name__ == "__main__":

    #Spark Session
    spark = SparkSession.builder.appName("N5ProcesadorDeDatos").getOrCreate()

    #Url from process
    data_time_series      = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/"
    data_iso_fips         = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/"
    data_situation_report = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/who_covid_19_situation_reports/who_covid_19_sit_rep_time_series/"

    #Path folders for files
    folder_source = ""
    folder_csv = ""
    folder_parquet = ""

    #File to Process
    urls = [data_time_series+'time_series_covid19_confirmed_global.csv',
            data_time_series+'time_series_covid19_deaths_global.csv',
            data_time_series+'time_series_covid19_recovered_global.csv',
            data_iso_fips+'UID_ISO_FIPS_LookUp_Table.csv',
            data_situation_report+'who_covid_19_sit_rep_time_series.csv'
            ]
    
    #Download Data from Github
    data = DownloadData(urls, folder_csv, folder_source, folder_parquet)
    delete_file_csv = data.delete_file_csv()
    delete_folder_parquet = data.delete_file_parquet()
    get_data = data.download_file()

    #Process file csv to parquet
    process = TransformFile(spark)
    process_file = process.csv_to_parquet()

    #Update parquet
    update = ChangeColumnType(folder_parquet, process_file)
    updtae_column = update.update_parquet()
    

       

100% [............................................................................] 159506 / 159506

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


FOLDERA 0
FILE in if part-00005-7a24424f-e547-437d-a7c6-03ab650f59ca-c000.snappy.parquet
Province/State     object
Country/Region     object
Lat               float64
Long              float64
1/22/20             int64
                   ...   
3/5/23              int64
3/6/23              int64
3/7/23              int64
3/8/23              int64
3/9/23              int64
Length: 1147, dtype: object
FILE in if part-00009-7a24424f-e547-437d-a7c6-03ab650f59ca-c000.snappy.parquet
Province/State     object
Country/Region     object
Lat               float64
Long              float64
1/22/20             int64
                   ...   
3/5/23              int64
3/6/23              int64
3/7/23              int64
3/8/23              int64
3/9/23              int64
Length: 1147, dtype: object
FILE in if part-00002-7a24424f-e547-437d-a7c6-03ab650f59ca-c000.snappy.parquet
Province/State     object
Country/Region     object
Lat               float64
Long              float64
1/22/20             i