In [1]:
# Import needed libraries
from pyspark.sql import SparkSession
from datetime import datetime
import opendatasets as od
import shutil
import os
import re

In [2]:
# Starting SparkSession
spark = SparkSession.builder.appName("FormatCovidData").getOrCreate()

In [3]:
# Download the Kaggle data required
url = "https://www.kaggle.com/datasets/imdevskp/corona-virus-report/download?datasetVersionNumber=166"
od.download(url, force=True)

Please provide your Kaggle credentials to download this dataset. Learn more: http://bit.ly/kaggle-creds
Your Kaggle username:Your Kaggle Key:Downloading corona-virus-report.zip to ./corona-virus-report


100%|██████████| 19.0M/19.0M [00:02<00:00, 8.09MB/s]





In [4]:
# Put in a list downloaded files
folder = "corona-virus-report"
covid_file_list = os.listdir(f"{folder}/")
print(covid_file_list)

['country_wise_latest.csv', 'covid_19_clean_complete.csv', 'day_wise.csv', 'full_grouped.csv', 'usa_county_wise.csv', 'worldometer_data.csv']


In [5]:
def data_type(x):
    """
    Convert data type automatically
    
    Args:
        x: Value to set data type
    Return:
        Same value, but with the correct data type
    """
    # regex pattern for check dates
    reg_date = re.compile("^\d{4}-\d{2}-\d{2}$")
    reg_date2 = re.compile("^\d{1,2}\/\d{1,2}\/\d{2}$")
    if x == '':
        return None
    else:
        if x.isdigit():
            return int(x)
        # Check if string match with date regex
        elif reg_date.match(x):
            return datetime.strptime(x, "%Y-%m-%d")
        elif reg_date2.match(x):
            return datetime.strptime(x, "%m/%d/%y")
        try:
            return float(x)
        except ValueError:
            return str(x).replace('"', '')

In [6]:
def parse_row(row, regex):
    """
    Parse rows, splitting by comma
    
    Args:
        row: String to be splitted
        regex: regex pattern to split, for ignore commas, in double quotes
    Return:
        Same row, but splited and formated
    """
    columns = re.split(regex, row)
    return list(map(lambda x:data_type(x), columns))

In [7]:
parquet_folder = "corona-virus-parquet-pyspark"
# regex pattern for ignore commas, in double quotes
regex = r',(?=(?:[^"]|"[^"]*")*$)'
# Iterate files downloaded list
for file in covid_file_list:
    print(file)
    # Read file downloaded as RDD
    rdd = spark.sparkContext.textFile(f"./{folder}/{file}")
    # Get header
    header = rdd.first()
    # Remove header from RDD
    data_rdd = rdd.filter(lambda row: row != header)
    # Format RDD to split rows and convert data types
    format_rdd = data_rdd.map(lambda x: parse_row(x, regex))
    # Convert RDD to DataFrame to next convert to parquet
    df = format_rdd.toDF(list(map(lambda x:x.replace('"', ''),re.split(regex, header))))
    print(df.dtypes)
    df.show(5)
    parquet_path = f"{parquet_folder}/{file.split('.')[0]}/"
    # Removing if path exists
    if os.path.exists(parquet_path):
        shutil.rmtree(parquet_path)
    # Saving as parquet
    df.write.parquet(parquet_path)

country_wise_latest.csv
[('Country/Region', 'string'), ('Confirmed', 'bigint'), ('Deaths', 'bigint'), ('Recovered', 'bigint'), ('Active', 'bigint'), ('New cases', 'bigint'), ('New deaths', 'bigint'), ('New recovered', 'bigint'), ('Deaths / 100 Cases', 'double'), ('Recovered / 100 Cases', 'double'), ('Deaths / 100 Recovered', 'double'), ('Confirmed last week', 'bigint'), ('1 week change', 'bigint'), ('1 week % increase', 'double'), ('WHO Region', 'string')]
+--------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|
+--------------+---------+------+---------+------+---------+----------+-------------+--------

In [8]:
# Stoping SparkSession
spark.stop()