In [155]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType
from pyspark import RDD
from pyspark.sql.functions import current_timestamp

from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [156]:
spark = SparkSession.builder\
                    .master('local')\
                    .appName('N5')\
                    .config('spark.executor.memory','1gb')\
                    .getOrCreate()

### Processing Classes

In [157]:
# Class to convert to the appropriate types according to the schema for RDD
class LineToRow:
    """Converter methods for each csv file, assuming that the input columns 
    of the CSVs always come in the same order."""
    def __float(self, s):
        if s == "":
            return None
        try:
            return float(s)
        except ValueError:
            return None
            
    def country_wise_latest(self, line):
        parts = line.split(",")
        return (str(parts[0]), int(parts[1]), int(parts[2]), int(parts[3]), int(parts[4]), 
                int(parts[5]), int(parts[6]), int(parts[7]), self.__float(parts[8]), self.__float(parts[9]), 
                self.__float(parts[10]), int(parts[11]), int(parts[12]), self.__float(parts[13]), str(parts[14]))
    
    def covid_clean_complete(self, line):
        parts = line.split(",")
        return (str(parts[0]), str(parts[1]), self.__float(parts[2]), self.__float(parts[3]), str(parts[4]), 
                int(parts[5]), int(parts[6]), int(parts[7]), int(parts[8]), str(parts[9]))

    def day_wise(self, line):
        parts = line.split(",")
        return (str(parts[0]), int(parts[1]), int(parts[2]), int(parts[3]), int(parts[4]), 
                int(parts[5]), int(parts[6]), int(parts[7]), self.__float(parts[8]), self.__float(parts[9]), 
                self.__float(parts[10]), int(parts[11]))
    
    def full_grouped(self, line):
        parts = line.split(",")
        return (str(parts[0]), str(parts[1]), int(parts[2]), int(parts[3]), int(parts[4]), 
                int(parts[5]), int(parts[6]), int(parts[7]), int(parts[8]), str(parts[9]))

    def usa_county_wise(self, line):
        parts = line.split(",")
        n_parts = len(parts)
        return (int(parts[0]), str(parts[1]), str(parts[2]), int(parts[3]), self.__float(parts[4]), 
                str(parts[5]), str(parts[6]), str(parts[7]), self.__float(parts[8]), self.__float(parts[9]), 
                ','.join([parts[k] for k in range(10,n_parts-3)]), str(parts[-3]), int(parts[-2]), int(parts[-1]))
        
    def worldometer_data(self, line):
        parts = line.split(",")
        return (str(parts[0]), str(parts[1]), self.__float(parts[2]), int(parts[3]), self.__float(parts[4]), 
                self.__float(parts[5]), self.__float(parts[6]), self.__float(parts[7]), self.__float(parts[8]), self.__float(parts[9]), 
                self.__float(parts[10]), self.__float(parts[11]), self.__float(parts[12]), self.__float(parts[13]), self.__float(parts[14]),
                str(parts[15]))
        
# Class to acquire the schemas
class FileSchemas:
    """Schemas for each csv file"""
    COUNTRY_WISE_LATEST = StructType([
        StructField("Country/Region", StringType(), True),
        StructField("Confirmed", IntegerType(), True),
        StructField("Deaths", IntegerType(), True),
        StructField("Recovered", IntegerType(), True),
        StructField("Active", IntegerType(), True),
        StructField("New cases", IntegerType(), True),
        StructField("New deaths", IntegerType(), True),
        StructField("New recovered", IntegerType(), True),
        StructField("Deaths / 100 Cases", FloatType(), True),
        StructField("Recovered / 100 Cases", FloatType(), True),
        StructField("Deaths / 100 Recovered", FloatType(), True),
        StructField("Confirmed last week", IntegerType(), True),
        StructField("1 week change", IntegerType(), True),
        StructField("1 week % increase", FloatType(), True),
        StructField("WHO Region", StringType(), True)
    ])
    
    COVID_CLEAN_COMPLETE = StructType([
        StructField("Province/State", StringType(), True),
        StructField("Country/Region", StringType(), True),
        StructField("Lat", FloatType(), True),
        StructField("Long", FloatType(), True),
        StructField("Date", StringType(), True),
        StructField("Confirmed", IntegerType(), True),
        StructField("Deaths", IntegerType(), True),
        StructField("Recovered", IntegerType(), True),
        StructField("Active", IntegerType(), True),
        StructField("WHO Region", StringType(), True)
    ])
    
    DAY_WISE = StructType([
        StructField("Date", StringType(), True),
        StructField("Confirmed", IntegerType(), True),
        StructField("Deaths", IntegerType(), True),
        StructField("Recovered", IntegerType(), True),
        StructField("Active", IntegerType(), True),
        StructField("New cases", IntegerType(), True),
        StructField("New deaths", IntegerType(), True),
        StructField("New recovered", IntegerType(), True),
        StructField("Deaths / 100 Cases", FloatType(), True),
        StructField("Recovered / 100 Cases", FloatType(), True),
        StructField("Deaths / 100 Recovered", FloatType(), True),
        StructField("No. of countries", IntegerType(), True)
    ])
    
    FULL_GROUPED = StructType([
        StructField("Date", StringType(), True),
        StructField("Country/Region", StringType(), True),
        StructField("Confirmed", IntegerType(), True),
        StructField("Deaths", IntegerType(), True),
        StructField("Recovered", IntegerType(), True),
        StructField("Active", IntegerType(), True),
        StructField("New cases", IntegerType(), True),
        StructField("New deaths", IntegerType(), True),
        StructField("New recovered", IntegerType(), True),
        StructField("WHO Region", StringType(), True)
    ])
    
    USA_COUNTY_WISE = StructType([
        StructField("UID", IntegerType(), True),
        StructField("iso2", StringType(), True),
        StructField("iso3", StringType(), True),
        StructField("code3", IntegerType(), True),
        StructField("FIPS", FloatType(), True),       
        StructField("Admin2", StringType(), True),
        StructField("Province_State", StringType(), True),
        StructField("Country_Region", StringType(), True),
        StructField("Lat", FloatType(), True),
        StructField("Long_", FloatType(), True),
        StructField("Combined_Key", StringType(), True),
        StructField("Date", StringType(), True),
        StructField("Confirmed", IntegerType(), True),
        StructField("Deaths", IntegerType(), True)
    ])
    
    WORLDOMETER_DATA = StructType([
        StructField("Country/Region", StringType(), True),
        StructField("Continent", StringType(), True),
        StructField("Population", FloatType(), True),
        StructField("TotalCases", IntegerType(), True),
        StructField("NewCases", FloatType(), True),
        StructField("TotalDeaths", FloatType(), True),
        StructField("NewDeaths", FloatType(), True),
        StructField("TotalRecovered", FloatType(), True),
        StructField("NewRecovered", FloatType(), True),
        StructField("ActiveCases", FloatType(), True),
        StructField("Serious,Critical", FloatType(), True),
        StructField("Tot Cases/1M pop", FloatType(), True),
        StructField("Deaths/1M pop", FloatType(), True),
        StructField("TotalTests", FloatType(), True),
        StructField("Tests/1M pop", FloatType(), True),
        StructField("WHO Region", StringType(), True)
    ])


# Class to normalize the names
class NormalizeColumns:
    """Normalizing methods for each csv file"""
    def country_wise_latest(self, data):
        return data.withColumnRenamed('Country/Region','country_region')\
                   .withColumnRenamed('Confirmed','confirmed')\
                   .withColumnRenamed('Deaths','deaths')\
                   .withColumnRenamed('Recovered','recovered')\
                   .withColumnRenamed('Active','active')\
                   .withColumnRenamed('New cases','new_cases')\
                   .withColumnRenamed('New deaths','new_deaths')\
                   .withColumnRenamed('New recovered','new_recovered')\
                   .withColumnRenamed('Deaths / 100 Cases','deaths_per_hundred_cases')\
                   .withColumnRenamed('Recovered / 100 Cases','recovered_per_hundred_cases')\
                   .withColumnRenamed('Deaths / 100 Recovered','deaths_per_hundred_recovered')\
                   .withColumnRenamed('Confirmed last week','confirmed_last_week')\
                   .withColumnRenamed('1 week change','one_week_change')\
                   .withColumnRenamed('1 week % increase','perc_one_week_increse')\
                   .withColumnRenamed('WHO Region','who_region')
    
    def covid_clean_complete(self, data):
        return data.withColumnRenamed('Province/State','province_state')\
                   .withColumnRenamed('Country/Region','country_region')\
                   .withColumnRenamed('Lat','lat')\
                   .withColumnRenamed('Long','long')\
                   .withColumnRenamed('Date','date')\
                   .withColumnRenamed('Confirmed','confirmed')\
                   .withColumnRenamed('Deaths','deaths')\
                   .withColumnRenamed('Recovered','recovered')\
                   .withColumnRenamed('Active','active')\
                   .withColumnRenamed('WHO Region','who_region')
    
    def day_wise(self, data):
        return data.withColumnRenamed('Date','date')\
                   .withColumnRenamed('Confirmed','confirmed')\
                   .withColumnRenamed('Deaths','deaths')\
                   .withColumnRenamed('Recovered','recovered')\
                   .withColumnRenamed('Active','active')\
                   .withColumnRenamed('New cases','new_cases')\
                   .withColumnRenamed('New deaths','new_deaths')\
                   .withColumnRenamed('New recovered','new_recovered')\
                   .withColumnRenamed('Deaths / 100 Cases','deaths_per_hundred_cases')\
                   .withColumnRenamed('Recovered / 100 Cases','recovered_per_hundred_cases')\
                   .withColumnRenamed('Deaths / 100 Recovered','deaths_per_hundred_recovered')\
                   .withColumnRenamed('No. of countries','number_countries')
    
    def full_grouped(self, data):
        return data.withColumnRenamed('Date','date')\
                   .withColumnRenamed('Country/Region','country_region')\
                   .withColumnRenamed('Confirmed','confirmed')\
                   .withColumnRenamed('Deaths','deaths')\
                   .withColumnRenamed('Recovered','recovered')\
                   .withColumnRenamed('Active','active')\
                   .withColumnRenamed('New cases','new_cases')\
                   .withColumnRenamed('New deaths','new_deaths')\
                   .withColumnRenamed('New recovered','new_recovered')\
                   .withColumnRenamed('WHO Region','who_region')

    def usa_county_wise(self, data):
        return data.withColumnRenamed('UID','uid')\
                   .withColumnRenamed('iso2','iso2')\
                   .withColumnRenamed('iso3','iso3')\
                   .withColumnRenamed('code3','code3')\
                   .withColumnRenamed('FIPS','fips')\
                   .withColumnRenamed('Admin2','admin2')\
                   .withColumnRenamed('Province_State','province_state')\
                   .withColumnRenamed('Country_Region','country_region')\
                   .withColumnRenamed('Lat','lat')\
                   .withColumnRenamed('Long_','long')\
                   .withColumnRenamed('Combined_Key','combined_key')\
                   .withColumnRenamed('Date','date')\
                   .withColumnRenamed('Confirmed','confirmed')\
                   .withColumnRenamed('Deaths','deaths')
    
    def worldometer_data(self, data):
        return data.withColumnRenamed('Country/Region','country_region')\
                   .withColumnRenamed('Continent','continent')\
                   .withColumnRenamed('Population','population')\
                   .withColumnRenamed('TotalCases','total_cases')\
                   .withColumnRenamed('NewCases','new_cases')\
                   .withColumnRenamed('TotalDeaths','total_deaths')\
                   .withColumnRenamed('NewDeaths','new_deaths')\
                   .withColumnRenamed('TotalRecovered','total_recovered')\
                   .withColumnRenamed('NewRecovered','new_recovered')\
                   .withColumnRenamed('ActiveCases','active_cases')\
                   .withColumnRenamed('Serious,Critical','serious_critical')\
                   .withColumnRenamed('Tot Cases/1M pop','tot_cases_per_one_million_pop')\
                   .withColumnRenamed('Deaths/1M pop','deaths_per_one_million_pop')\
                   .withColumnRenamed('TotalTests','total_tests')\
                   .withColumnRenamed('Tests/1M pop','tests_per_one_million_pop')\
                   .withColumnRenamed('WHO Region','who_region')
    
	

In [158]:
# Main Class for the whole process
class BaseDataProcessor:
    """Main Processor for save csv to parquet using RDD and spark.read"""
    def __init__(self, name, file_path_input, file_path_output):
        """ Contructor 
        params: 
            name: Process name.
            file_path_input: CSV file name with extension.
            file_path_output: Directory name for saving Parquet files
        """
        self.file_path_input  = file_path_input
        self.file_path_output = file_path_output
        self.name             = name
    
    def get_schema(self):
        """Schemas for each csv file."""
        if self.name == 'COUNTRY_WISE_LATEST':
            return SCHEMAS.COUNTRY_WISE_LATEST
        elif self.name == 'COVID_CLEAN_COMPLETE':
            return SCHEMAS.COVID_CLEAN_COMPLETE
        elif self.name == 'DAY_WISE':
            return SCHEMAS.DAY_WISE
        elif self.name == 'FULL_GROUPED':
            return SCHEMAS.FULL_GROUPED
        elif self.name == 'USA_COUNTY_WISE':
            return SCHEMAS.USA_COUNTY_WISE
        elif self.name == 'WORLDOMETER_DATA':
            return SCHEMAS.WORLDOMETER_DATA
        else:
            raise Exception("Valid process name is needed.") 
            
    def normalize_columns(self, data):
        """Column names normalization for saving."""
        if self.name == 'COUNTRY_WISE_LATEST':
            return NORMALIZE.country_wise_latest(data)
        elif self.name == 'COVID_CLEAN_COMPLETE':
            return NORMALIZE.covid_clean_complete(data)
        elif self.name == 'DAY_WISE':
            return NORMALIZE.day_wise(data)
        elif self.name == 'FULL_GROUPED':
            return NORMALIZE.full_grouped(data)
        elif self.name == 'USA_COUNTY_WISE':
            return NORMALIZE.usa_county_wise(data)
        elif self.name == 'WORLDOMETER_DATA':
            return NORMALIZE.worldometer_data(data)
        else:
            raise Exception("Valid process name is needed.") 

    def add_timestamp(self, data):
        """Add update date for possible deltas."""
        return data.withColumn("update_date", current_timestamp())
            
    def rdd_load_file(self, schema):
        """Get Spark Dataframe using RDD."""
        rdd = spark.sparkContext.textFile(self.file_path_input)
        header = rdd.first()
        rdd = rdd.filter(lambda line: line != header)
        if self.name == 'COUNTRY_WISE_LATEST':
            rdd = rdd.map(LINE2ROW.country_wise_latest)
        elif self.name == 'COVID_CLEAN_COMPLETE':
            rdd = rdd.map(LINE2ROW.covid_clean_complete)
        elif self.name == 'DAY_WISE':
            rdd = rdd.map(LINE2ROW.day_wise)
        elif self.name == 'FULL_GROUPED':
            rdd = rdd.map(LINE2ROW.full_grouped)
        elif self.name == 'USA_COUNTY_WISE':
            rdd = rdd.map(LINE2ROW.usa_county_wise)
        elif self.name == 'WORLDOMETER_DATA':
            rdd = rdd.map(LINE2ROW.worldometer_data)
        else:
            raise Exception("Valid process name is needed.") 
        data = spark.createDataFrame(rdd, schema)
        return data
    
    def get_dataframe(self, schema):
        """Get Spark Dataframe using spark.read."""
        return spark.read.csv(self.file_path_input, sep=',', schema=schema, header=True)
    
    def save_as_parquet(self, data, output_path):
        """Save the DataFrame in Parquet format."""
        data.write.mode('overwrite').parquet(output_path)
               
    def process_in_rdd(self):
        """Process using RDD loading"""
        schema = self.get_schema()
        data   = self.rdd_load_file(schema)
        data   = self.normalize_columns(data)
        data   = self.add_timestamp(data)
        self.save_as_parquet(data, self.file_path_output)

    def process(self):
        """Process using spark.read loading"""
        schema = self.get_schema()
        data   = self.get_dataframe(schema)
        data   = self.normalize_columns(data)
        data   = self.add_timestamp(data)
        self.save_as_parquet(data, self.file_path_output)


### Process Initialization

In [159]:
LINE2ROW  = LineToRow()
SCHEMAS   = FileSchemas()
NORMALIZE = NormalizeColumns()

In [160]:
BaseDataProcessor('COUNTRY_WISE_LATEST','data/raw/country_wise_latest.csv', 'data/processed/country_wise_latest').process_in_rdd()
BaseDataProcessor('COVID_CLEAN_COMPLETE','data/raw/covid_19_clean_complete.csv', 'data/processed/covid_19_clean_complete').process_in_rdd()
BaseDataProcessor('DAY_WISE','data/raw/day_wise.csv', 'data/processed/day_wise').process_in_rdd()
BaseDataProcessor('FULL_GROUPED','data/raw/full_grouped.csv', 'data/processed/full_grouped').process_in_rdd()
BaseDataProcessor('USA_COUNTY_WISE','data/raw/usa_county_wise.csv', 'data/processed/usa_county_wise').process_in_rdd()
BaseDataProcessor('WORLDOMETER_DATA','data/raw/worldometer_data.csv', 'data/processed/worldometer_data').process_in_rdd()

### Reading saved partitions

In [161]:
data = spark.read.parquet("data/processed/country_wise_latest")
data.show()

+-------------------+---------+------+---------+------+---------+----------+-------------+------------------------+---------------------------+----------------------------+-------------------+---------------+---------------------+--------------------+--------------------+
|     country_region|confirmed|deaths|recovered|active|new_cases|new_deaths|new_recovered|deaths_per_hundred_cases|recovered_per_hundred_cases|deaths_per_hundred_recovered|confirmed_last_week|one_week_change|perc_one_week_increse|          who_region|         update_date|
+-------------------+---------+------+---------+------+---------+----------+-------------+------------------------+---------------------------+----------------------------+-------------------+---------------+---------------------+--------------------+--------------------+
|        Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|                     3.5|                      69.49|                        5.04|         

In [162]:
data = spark.read.parquet("data/processed/covid_19_clean_complete")
data.show()

+--------------------+-------------------+---------+---------+----------+---------+------+---------+------+--------------------+--------------------+
|      province_state|     country_region|      lat|     long|      date|confirmed|deaths|recovered|active|          who_region|         update_date|
+--------------------+-------------------+---------+---------+----------+---------+------+---------+------+--------------------+--------------------+
|                    |        Afghanistan| 33.93911| 67.70995|2020-01-22|        0|     0|        0|     0|Eastern Mediterra...|2023-11-02 02:04:...|
|                    |            Albania|  41.1533|  20.1683|2020-01-22|        0|     0|        0|     0|              Europe|2023-11-02 02:04:...|
|                    |            Algeria|  28.0339|   1.6596|2020-01-22|        0|     0|        0|     0|              Africa|2023-11-02 02:04:...|
|                    |            Andorra|  42.5063|   1.5218|2020-01-22|        0|     0|        0|

In [163]:
data = spark.read.parquet("data/processed/day_wise")
data.show()

+----------+---------+------+---------+------+---------+----------+-------------+------------------------+---------------------------+----------------------------+----------------+--------------------+
|      date|confirmed|deaths|recovered|active|new_cases|new_deaths|new_recovered|deaths_per_hundred_cases|recovered_per_hundred_cases|deaths_per_hundred_recovered|number_countries|         update_date|
+----------+---------+------+---------+------+---------+----------+-------------+------------------------+---------------------------+----------------------------+----------------+--------------------+
|2020-01-22|      555|    17|       28|   510|        0|         0|            0|                    3.06|                       5.05|                       60.71|               6|2023-11-02 02:04:...|
|2020-01-23|      654|    18|       30|   606|       99|         1|            2|                    2.75|                       4.59|                        60.0|               8|2023-11-02 0

In [164]:
data = spark.read.parquet("data/processed/full_grouped")
data.show()

+----------+-------------------+---------+------+---------+------+---------+----------+-------------+--------------------+--------------------+
|      date|     country_region|confirmed|deaths|recovered|active|new_cases|new_deaths|new_recovered|          who_region|         update_date|
+----------+-------------------+---------+------+---------+------+---------+----------+-------------+--------------------+--------------------+
|2020-01-22|        Afghanistan|        0|     0|        0|     0|        0|         0|            0|Eastern Mediterra...|2023-11-02 02:04:...|
|2020-01-22|            Albania|        0|     0|        0|     0|        0|         0|            0|              Europe|2023-11-02 02:04:...|
|2020-01-22|            Algeria|        0|     0|        0|     0|        0|         0|            0|              Africa|2023-11-02 02:04:...|
|2020-01-22|            Andorra|        0|     0|        0|     0|        0|         0|            0|              Europe|2023-11-02 02:

In [165]:
data = spark.read.parquet("data/processed/usa_county_wise")
data.show()

+--------+----+----+-----+-------+------------+--------------+--------------+---------+----------+--------------------+-------+---------+------+--------------------+
|     uid|iso2|iso3|code3|   fips|      admin2|province_state|country_region|      lat|      long|        combined_key|   date|confirmed|deaths|         update_date|
+--------+----+----+-----+-------+------------+--------------+--------------+---------+----------+--------------------+-------+---------+------+--------------------+
|84051035|  US| USA|  840|51035.0|     Carroll|      Virginia|            US|36.730667| -80.73456|"Carroll, Virgini...|4/21/20|        3|     0|2023-11-02 02:04:...|
|84051036|  US| USA|  840|51036.0|Charles City|      Virginia|            US| 37.35358| -77.05634|"Charles City, Vi...|4/21/20|       11|     0|2023-11-02 02:04:...|
|84051037|  US| USA|  840|51037.0|   Charlotte|      Virginia|            US|37.019096| -78.66314|"Charlotte, Virgi...|4/21/20|        8|     0|2023-11-02 02:04:...|
|840

In [166]:
data = spark.read.parquet("data/processed/worldometer_data")
data.show()

+--------------+-------------+------------+-----------+---------+------------+----------+---------------+-------------+------------+----------------+-----------------------------+--------------------------+-----------+-------------------------+--------------------+--------------------+
|country_region|    continent|  population|total_cases|new_cases|total_deaths|new_deaths|total_recovered|new_recovered|active_cases|serious_critical|tot_cases_per_one_million_pop|deaths_per_one_million_pop|total_tests|tests_per_one_million_pop|          who_region|         update_date|
+--------------+-------------+------------+-----------+---------+------------+----------+---------------+-------------+------------+----------------+-----------------------------+--------------------------+-----------+-------------------------+--------------------+--------------------+
|           USA|North America|3.31198144E8|    5032179|     NULL|    162804.0|      NULL|      2576668.0|         NULL|   2292707.0|       