## Librerias

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession 
import logging
import os
import datetime, time
from pyspark.sql.functions import regexp_replace  
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType, DateType  

In [2]:
spark = SparkSession.builder.appName('TestEK')\
                .config('spark.master','local[4]')\
                .config('spark.shuffle.sql.partitions',1)\
                .getOrCreate()
sqlContext = SparkSession(spark)

In [3]:
## spark.stop()

#### initialize logger

In [4]:
logging.basicConfig(filename='Code_EK.log',  
                    level=logging.INFO, 
                    format= '[%(asctime)s] - %(levelname)s - %(message)s',
                    datefmt='%H:%M:%S',
                    filemode='w')

## Cargar los datasets utilizando Spark y mantenerlos en formato parquet

#### Class definitions

In [5]:
class MyFile:
    def __init__(self, path, name):
        self.path = path
        self.name = name
        self.dfNews = [] 
        logging.info("*********************************************************")
        logging.info(f"Iniciando carga de {self.name} con Spark.")

    def readNews(self):
        self.dfNews = spark.read.csv(self.path+self.name+".csv", sep = ',', header = True, inferSchema = True)
        logging.info(f"El archivo nuevo tiene {self.dfNews.count()} registros.") 

    def updateSchema(self):
        pass

    def initialLoad(self):
        self.dfNews.write.parquet("parquet/"+self.name, mode='overwrite')
        logging.info(f"Carga incial completada. Se cargaron {self.dfNews.count()} registros.")

###### CountryWiseLatest

In [6]:
class CountryWiseLatest(MyFile):
    def __init__(self,path, name):
        super().__init__(path, name)

    def updateSchema(self):
        self.dfNews = self.dfNews.withColumnsRenamed({"Country/Region":"Country",
                            "New cases":"NewCases",
                            "New deaths":"NewDeaths",
                            "New recovered":"NewRecovered",
                            "Deaths / 100 Cases":"Deaths100Cases",
                            "Recovered / 100 Cases":"Recovered100Cases",
                            "Deaths / 100 Recovered":"Deaths100Recovered",
                            "Confirmed last week":"ConfirmedLastWeek",
                            "1 week change":"weekChange",
                            "1 week % increase":"weekIncrease",
                            "WHO Region":"WHORegion"
                           })
        
    def incrementalLoad(self):
        df = spark.read.parquet("parquet/"+self.name, sep = ',', header = True, inferSchema = True)
        logging.info(f"El archivo destino tiene {df.count()} registros.") 
        
        dfToUpdate = self.dfNews.join(df,(df.Country == self.dfNews.Country) &
                             (df.WHORegion == self.dfNews.WHORegion) &
                             ((df.Confirmed!=self.dfNews.Confirmed) |
                             (df.Deaths!=self.dfNews.Deaths) |
                             (df.Recovered!=self.dfNews.Recovered) |
                             (df.Active!=self.dfNews.Active) | 
                             (df.NewCases!=self.dfNews.NewCases) |
                             (df.NewDeaths!=self.dfNews.NewDeaths) |
                             (df.NewRecovered!=self.dfNews.NewRecovered) |
                             (df.Deaths100Cases!=self.dfNews.Deaths100Cases) |
                             (df.Recovered100Cases!=self.dfNews.Recovered100Cases) |
                             (df.Deaths100Recovered!=self.dfNews.Deaths100Recovered) |
                             (df.ConfirmedLastWeek !=self.dfNews.ConfirmedLastWeek ) |
                             (df.weekChange!=self.dfNews.weekChange) |
                             (df.weekIncrease!=self.dfNews.weekIncrease))                        
                            , "inner").select(self.dfNews.Country,self.dfNews.Confirmed,self.dfNews.Deaths,self.dfNews.Recovered,self.dfNews.Active,
                                              self.dfNews.NewCases,self.dfNews.NewDeaths,self.dfNews.NewRecovered,self.dfNews.Deaths100Cases,
                                              self.dfNews.Recovered100Cases,self.dfNews.Deaths100Recovered,self.dfNews.ConfirmedLastWeek,
                                              self.dfNews.weekChange,self.dfNews.weekIncrease,self.dfNews.WHORegion)

        df_u = dfToUpdate.count()
        
        df = df.join(dfToUpdate,(df.Country == self.dfNews.Country) & (df.WHORegion == self.dfNews.WHORegion), "leftanti")
        
        dfToInsert = self.dfNews.join(df,(df.Country == self.dfNews.Country) & (df.WHORegion == self.dfNews.WHORegion), "leftanti")

        df_i = dfToInsert.count()-dfToUpdate.count()

        df = df.union(dfToInsert)

        df.write.parquet("parquet/"+self.name,mode='overwrite')
        logging.info(f"Se actualizaron {df_u} registros y se insertaron {df_i} nuevos.") 

###### FullGrouped

In [7]:
class FullGrouped(MyFile):
    def __init__(self,path, name):
        super().__init__(path, name)

    def updateSchema(self):
        self.dfNews = self.dfNews.withColumnsRenamed({"Country/Region":"Country",
                            "New cases":"NewCases",
                            "New deaths":"NewDeaths",
                            "New recovered":"NewRecovered", 
                            "WHO Region":"WHORegion"
                           })
        
    def incrementalLoad(self):
        df = spark.read.parquet("parquet/"+self.name, sep = ',', header = True, inferSchema = True)
        logging.info(f"El archivo destino tiene {df.count()} registros.") 
        
        dfToUpdate = self.dfNews.join(df,(df.Date == self.dfNews.Date) &
                             (df.Country == self.dfNews.Country) &
                             (df.WHORegion == self.dfNews.WHORegion) &
                             ((df.Confirmed!=self.dfNews.Confirmed) |
                             (df.Deaths!=self.dfNews.Deaths) |
                             (df.Recovered!=self.dfNews.Recovered) |
                             (df.Active!=self.dfNews.Active) | 
                             (df.NewCases!=self.dfNews.NewCases) |
                             (df.NewDeaths!=self.dfNews.NewDeaths) |
                             (df.NewRecovered!=self.dfNews.NewRecovered))                        
                            , "inner").select(self.dfNews.Date,self.dfNews.Country,self.dfNews.Confirmed,self.dfNews.Deaths,
                                              self.dfNews.Recovered,self.dfNews.Active,self.dfNews.NewCases,self.dfNews.NewDeaths,
                                              self.dfNews.NewRecovered,self.dfNews.WHORegion)

        df_u = dfToUpdate.count()
        
        df = df.join(dfToUpdate,(df.Date == self.dfNews.Date) & 
                                (df.Country == self.dfNews.Country) & 
                                (df.WHORegion == self.dfNews.WHORegion), "leftanti")
        
        dfToInsert = self.dfNews.join(df,(df.Date == self.dfNews.Date) & 
                                         (df.Country == self.dfNews.Country) & 
                                         (df.WHORegion == self.dfNews.WHORegion), "leftanti")

        df_i = dfToInsert.count()-dfToUpdate.count()

        df = df.union(dfToInsert)

        df.write.parquet("parquet/"+self.name,mode='overwrite')
        logging.info(f"Se actualizaron {df_u} registros y se insertaron {df_i} nuevos.") 

###### Covid19CleanComplete

In [8]:
class Covid19CleanComplete(MyFile):
    def __init__(self,path, name):
        super().__init__(path, name)

    def updateSchema(self):
        self.dfNews = self.dfNews.withColumnsRenamed({"Province/State":"State",
                            "Country/Region":"Country", 
                            "WHO Region":"WHORegion"
                           }) 
        self.dfNews = self.dfNews.fillna("unknown")
        
    def incrementalLoad(self):
        df = spark.read.parquet("parquet/"+self.name, sep = ',', header = True, inferSchema = True)
        logging.info(f"El archivo destino tiene {df.count()} registros.") 

        
        dfToUpdate = self.dfNews.join(df,(df.Date == self.dfNews.Date) &
                             (df.WHORegion == self.dfNews.WHORegion) &
                             (df.Country == self.dfNews.Country) & 
                             (df.State == self.dfNews.State) & 
                             ((df.Lat!=self.dfNews.Lat) |
                             (df.Long!=self.dfNews.Long) | 
                             (df.Confirmed!=self.dfNews.Confirmed) | 
                             (df.Deaths!=self.dfNews.Deaths) |
                             (df.Recovered!=self.dfNews.Recovered) |
                             (df.Active!=self.dfNews.Active))                       
                            , "inner").select(self.dfNews.State,self.dfNews.Country,self.dfNews.Lat,self.dfNews.Long,self.dfNews.Date,
                                              self.dfNews.Confirmed,self.dfNews.Deaths,self.dfNews.Recovered,self.dfNews.Active,self.dfNews.WHORegion)

        df_u = dfToUpdate.count()
        
        df = df.join(dfToUpdate,(df.Date == self.dfNews.Date) &
                                (df.WHORegion == self.dfNews.WHORegion) & 
                                (df.Country == self.dfNews.Country) & 
                                (df.State == self.dfNews.State), "leftanti")
        
        dfToInsert = self.dfNews.join(df,(df.Date == self.dfNews.Date) &
                                         (df.WHORegion == self.dfNews.WHORegion) & 
                                         (df.Country == self.dfNews.Country) & 
                                         (df.State == self.dfNews.State), "leftanti")

        df_i = dfToInsert.count()-dfToUpdate.count()

        df = df.union(dfToInsert)

        df.write.parquet("parquet/"+self.name,mode='overwrite')
        logging.info(f"Se actualizaron {df_u} registros y se insertaron {df_i} nuevos.") 

###### WorldometerData

In [9]:
class WorldometerData(MyFile):
    def __init__(self,path, name):
        super().__init__(path, name) 

    def updateSchema(self):
        self.dfNews = self.dfNews.withColumnsRenamed({"Country/Region":"Country",
                            "Serious,Critical":"Serious",
                            "Tot Cases/1M pop":"TotCases1MPop",
                            "Deaths/1M pop":"Deaths1MPop",
                            "Tests/1M pop":"Tests1MPop",
                            "WHO Region":"WHORegion"
                           })  
        self.dfNews = self.dfNews.fillna("unknown")
        
    def incrementalLoad(self):
        df = spark.read.parquet("parquet/"+self.name, sep = ',', header = True, inferSchema = True)
        logging.info(f"El archivo destino tiene {df.count()} registros.") 
        
        dfToUpdate = self.dfNews.join(df,(df.WHORegion == self.dfNews.WHORegion) & 
							 (df.Continent == self.dfNews.Continent) &  
							 (df.Country == self.dfNews.Country) & 
                             ((df.Population!=self.dfNews.Population) |
                             (df.TotalCases!=self.dfNews.TotalCases) | 
                             (df.NewCases!=self.dfNews.NewCases) |
                             (df.TotalDeaths!=self.dfNews.TotalDeaths) |
                             (df.NewDeaths!=self.dfNews.NewDeaths) |
                             (df.TotalRecovered!=self.dfNews.TotalRecovered) |
                             (df.NewRecovered!=self.dfNews.NewRecovered) |
                             (df.ActiveCases!=self.dfNews.ActiveCases) |
                             (df.Serious!=self.dfNews.Serious) |
                             (df.TotCases1MPop!=self.dfNews.TotCases1MPop) |
                             (df.Deaths1MPop!=self.dfNews.Deaths1MPop) |
                             (df.TotalTests!=self.dfNews.TotalTests) |
                             (df.Tests1MPop!=self.dfNews.Tests1MPop))                
                            , "inner").select(self.dfNews.Country,self.dfNews.Continent,self.dfNews.Population,self.dfNews.TotalCases,
                                              self.dfNews.NewCases,self.dfNews.TotalDeaths,self.dfNews.NewDeaths,self.dfNews.TotalRecovered,
                                              self.dfNews.NewRecovered,self.dfNews.ActiveCases,self.dfNews.Serious,self.dfNews.TotCases1MPop,
                                              self.dfNews.Deaths1MPop,self.dfNews.TotalTests,self.dfNews.Tests1MPop,self.dfNews.WHORegion)

        df_u = dfToUpdate.count()
        
        df = df.join(dfToUpdate,(df.WHORegion == self.dfNews.WHORegion) & 
							 (df.Continent == self.dfNews.Continent) &  
							 (df.Country == self.dfNews.Country), "leftanti")
        
        dfToInsert = self.dfNews.join(df,(df.WHORegion == self.dfNews.WHORegion) & 
							 (df.Continent == self.dfNews.Continent) &  
							 (df.Country == self.dfNews.Country), "leftanti")

        df_i = dfToInsert.count()-dfToUpdate.count()

        df = df.union(dfToInsert)

        df.write.parquet("parquet/"+self.name,mode='overwrite')
        logging.info(f"Se actualizaron {df_u} registros y se insertaron {df_i} nuevos.") 

###### DayWise

In [10]:
class DayWise(MyFile):
    def __init__(self,path, name):
        super().__init__(path, name)

    def updateSchema(self):
        self.dfNews = self.dfNews.withColumnsRenamed({"New cases":"NewCases",
                            "New deaths":"NewDeaths",
                            "New recovered":"NewRecovered",
                            "Deaths / 100 Cases":"Deaths100Cases",
                            "Recovered / 100 Cases":"Recovered100Cases",
                            "Deaths / 100 Recovered":"Deaths100Recovered",
                            "No. of countries":"CountriesNr"
                           })  
        
    def incrementalLoad(self):
        df = spark.read.parquet("parquet/"+self.name, sep = ',', header = True, inferSchema = True)
        logging.info(f"El archivo destino tiene {df.count()} registros.") 
        
        dfToUpdate = self.dfNews.join(df,(df.Date == self.dfNews.Date) & 
                             ((df.Confirmed!=self.dfNews.Confirmed) |
                             (df.Deaths!=self.dfNews.Deaths) |
                             (df.Recovered!=self.dfNews.Recovered) |
                             (df.Active!=self.dfNews.Active) | 
                             (df.NewCases!=self.dfNews.NewCases) |
                             (df.NewDeaths!=self.dfNews.NewDeaths) |
                             (df.NewRecovered!=self.dfNews.NewRecovered) |
                             (df.Deaths100Cases!=self.dfNews.Deaths100Cases) |
                             (df.Recovered100Cases!=self.dfNews.Recovered100Cases) |
                             (df.Deaths100Recovered!=self.dfNews.Deaths100Recovered) |
                             (df.CountriesNr!=self.dfNews.CountriesNr))                
                            , "inner").select(self.dfNews.Date,self.dfNews.Confirmed,self.dfNews.Deaths,self.dfNews.Recovered,self.dfNews.Active, 
                                              self.dfNews.NewCases,self.dfNews.NewDeaths,self.dfNews.NewRecovered,self.dfNews.Deaths100Cases,
                                              self.dfNews.Recovered100Cases,self.dfNews.Deaths100Recovered,self.dfNews.CountriesNr)

        df_u = dfToUpdate.count()
        
        df = df.join(dfToUpdate,(df.Date == self.dfNews.Date), "leftanti")
        
        dfToInsert = self.dfNews.join(df,(df.Date == self.dfNews.Date), "leftanti")

        df_i = dfToInsert.count()-dfToUpdate.count()

        df = df.union(dfToInsert)

        df.write.parquet("parquet/"+self.name,mode='overwrite')
        logging.info(f"Se actualizaron {df_u} registros y se insertaron {df_i} nuevos.") 

###### UsaCountyWise

In [11]:
class UsaCountyWise(MyFile):
    def __init__(self,path, name):
        super().__init__(path, name)
     
    def updateSchema(self):
        self.dfNews = self.dfNews.withColumnsRenamed({"Province_State":"State",
                            "Country_Region":"Country",
                            "Long_":"Long",
                            "Combined_Key":"CombinedKey"
                           })  
        self.dfNews = self.dfNews.fillna("unknown")
        
    def incrementalLoad(self):
        df = spark.read.parquet("parquet/"+self.name, sep = ',', header = True, inferSchema = True)
        logging.info(f"El archivo destino tiene {df.count()} registros.") 
        
        dfToUpdate = self.dfNews.join(df,(df.Date == self.dfNews.Date) & 
                             (df.State == self.dfNews.State) & 
                             (df.Country == self.dfNews.Country) & 
                             (df.Admin2 == self.dfNews.Admin2) &
                             ((df.UID!=self.dfNews.UID) |
                             (df.iso2!=self.dfNews.iso2) |
                             (df.iso3!=self.dfNews.iso3) |
                             (df.code3!=self.dfNews.code3) | 
                             (df.FIPS!=self.dfNews.FIPS) | 
                             (df.Lat!=self.dfNews.Lat) |
                             (df.Long!=self.dfNews.Long) |
                             (df.CombinedKey!=self.dfNews.CombinedKey) | 
                             (df.Confirmed!=self.dfNews.Confirmed) |
                             (df.Deaths!=self.dfNews.Deaths))                
                            , "inner").select(self.dfNews.UID,self.dfNews.iso2, self.dfNews.iso3,self.dfNews.code3,self.dfNews.FIPS,
                                              self.dfNews.Admin2,self.dfNews.State,self.dfNews.Country,self.dfNews.Lat,self.dfNews.Long,
                                              self.dfNews.CombinedKey,self.dfNews.Date,self.dfNews.CombinedKey,self.dfNews.Confirmed,self.dfNews.Deaths)

        df_u = dfToUpdate.count()
        
        df = df.join(dfToUpdate,(df.Date == self.dfNews.Date) & 
                             (df.State == self.dfNews.State) & 
                             (df.Country == self.dfNews.Country) &
                             (df.Admin2 == self.dfNews.Admin2) , "leftanti")
        
        dfToInsert = self.dfNews.join(df,(df.Date == self.dfNews.Date) & 
                             (df.State == self.dfNews.State) & 
                             (df.Country == self.dfNews.Country) &
                             (df.Admin2 == self.dfNews.Admin2), "leftanti")

        df_i = dfToInsert.count()-dfToUpdate.count()

        df = df.union(dfToInsert)

        df.write.parquet("parquet/"+self.name,mode='overwrite')
        logging.info(f"Se actualizaron {df_u} registros y se insertaron {df_i} nuevos.") 

#### Main

In [12]:
path = "./archive/"
files = os.listdir(path)
files = list(map(lambda x: x[0:x.find(".")] ,filter(lambda f: f.endswith('.csv'), files)))

In [13]:
for x in files:
    if x == "country_wise_latest":
        file = CountryWiseLatest(path,x)
    elif x == "full_grouped":
        file = FullGrouped(path,x)
    elif x == "covid_19_clean_complete":
        file = Covid19CleanComplete(path,x)
    elif x == "day_wise":
        file = DayWise(path,x)
    elif x == "usa_county_wise":
        continue
    elif x == "worldometer_data":
        file = WorldometerData(path,x)
    file.readNews()
    file.updateSchema()
    if x in os.listdir("./parquet"):
        file.incrementalLoad()
    else:
        file.initialLoad()
    print(x)
        

country_wise_latest
covid_19_clean_complete
day_wise
full_grouped
worldometer_data
