In [1]:
"""
    Constants used throughout the file
"""

covid_data_url = "https://raw.githubusercontent.com/datasets/covid-19/main/data/countries-aggregated.csv"

covid_raw_data_path = "./data/raw_data.csv"

In [2]:
import requests

def fetchData(url, file_path = covid_raw_data_path):
    file = open(file_path, 'w')
    file.write(requests.get(url).text)
    file.close()

fetchData(covid_data_url)

In [12]:
# from pyspark import SparkConf, SparkContext

# appName = "Testing"
# conf = SparkConf().setAppName(appName).setMaster("local")
# sc = SparkContext(conf=conf)
# print(type(sc))

# distFile = sc.textFile(covid_raw_data_path)
# print(distFile.count())
# distFile.take(10)

<class 'pyspark.context.SparkContext'>
91393


['Date,Country,Confirmed,Recovered,Deaths',
 '2020-01-22,Afghanistan,0,0,0',
 '2020-01-23,Afghanistan,0,0,0',
 '2020-01-24,Afghanistan,0,0,0',
 '2020-01-25,Afghanistan,0,0,0',
 '2020-01-26,Afghanistan,0,0,0',
 '2020-01-27,Afghanistan,0,0,0',
 '2020-01-28,Afghanistan,0,0,0',
 '2020-01-29,Afghanistan,0,0,0',
 '2020-01-30,Afghanistan,0,0,0']

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName("DE_project_1").getOrCreate()
print('spark var type: ', type(spark))

spark var type:  <class 'pyspark.sql.session.SparkSession'>


In [4]:
data_schema = [StructField('Date', DateType(), True), 
                StructField('Country', StringType(), True),
                StructField('Confirmed', IntegerType(), True),
                StructField('Recovered', IntegerType(), True),
                StructField('Deaths', IntegerType(), True)
            ]
schema = StructType(fields=data_schema)

df = spark.read.csv(covid_raw_data_path, header=True, schema=schema)
df.show()

+----------+-----------+---------+---------+------+
|      Date|    Country|Confirmed|Recovered|Deaths|
+----------+-----------+---------+---------+------+
|2020-01-22|Afghanistan|        0|        0|     0|
|2020-01-23|Afghanistan|        0|        0|     0|
|2020-01-24|Afghanistan|        0|        0|     0|
|2020-01-25|Afghanistan|        0|        0|     0|
|2020-01-26|Afghanistan|        0|        0|     0|
|2020-01-27|Afghanistan|        0|        0|     0|
|2020-01-28|Afghanistan|        0|        0|     0|
|2020-01-29|Afghanistan|        0|        0|     0|
|2020-01-30|Afghanistan|        0|        0|     0|
|2020-01-31|Afghanistan|        0|        0|     0|
|2020-02-01|Afghanistan|        0|        0|     0|
|2020-02-02|Afghanistan|        0|        0|     0|
|2020-02-03|Afghanistan|        0|        0|     0|
|2020-02-04|Afghanistan|        0|        0|     0|
|2020-02-05|Afghanistan|        0|        0|     0|
|2020-02-06|Afghanistan|        0|        0|     0|
|2020-02-07|

#### Country code has to be used in Grafana WorldMap. So the codes are fetched from the file and added to the data.

In [5]:
from pyspark.sql.types import Row
df_country_code = []
country = Row('Country', '2_letter_code', '3_letter_code')
with open('./country_code.txt', 'r') as codes:
    lines = codes.readlines()
    for line in lines:
        row = line.split('\t')[:3]
        df_country_code.append(country(row[0], row[1], row[2]))
        
print('No of Countries code present in data: ', len(df_country_code))
df_country_code = spark.createDataFrame(df_country_code)
df_country_code.show()

No of Countries code present in data:  249
+-------------------+-------------+-------------+
|            Country|2_letter_code|3_letter_code|
+-------------------+-------------+-------------+
|        Afghanistan|           AF|          AFG|
|            Albania|           AL|          ALB|
|            Algeria|           DZ|          DZA|
|     American Samoa|           AS|          ASM|
|            Andorra|           AD|          AND|
|             Angola|           AO|          AGO|
|           Anguilla|           AI|          AIA|
|         Antarctica|           AQ|          ATA|
|Antigua and Barbuda|           AG|          ATG|
|          Argentina|           AR|          ARG|
|            Armenia|           AM|          ARM|
|              Aruba|           AW|          ABW|
|          Australia|           AU|          AUS|
|            Austria|           AT|          AUT|
|         Azerbaijan|           AZ|          AZE|
|            Bahamas|           BS|          BHS|
|      

#### Adding country code to data

In [6]:
dff = df.join(df_country_code, on='Country')
dff.count()

89280

In [7]:
dff.select("Country").distinct().count()

186

## SparkSQL

In [8]:
dff.createOrReplaceTempView("covid")

In [9]:
query = """
            SELECT COUNT(*) FROM covid
        """
spark.sql(query).show()

+--------+
|count(1)|
+--------+
|   89280|
+--------+



In [10]:
query = """
            SELECT COUNT(DISTINCT Date) FROM covid
        """
spark.sql(query).show()

+--------------------+
|count(DISTINCT Date)|
+--------------------+
|                 480|
+--------------------+



In [11]:
query = """
            SELECT MIN(Date), MAX(Date) FROM covid
        """
spark.sql(query).show()

+----------+----------+
| min(Date)| max(Date)|
+----------+----------+
|2020-01-22|2021-05-15|
+----------+----------+



In [12]:
query = """
            SELECT COUNT(DISTINCT Country) FROM covid
        """
spark.sql(query).show()

+-----------------------+
|count(DISTINCT Country)|
+-----------------------+
|                    186|
+-----------------------+



In [13]:
query = """
            SELECT Date, Country, Confirmed as Cum_Confirmed, Recovered as Cum_Recovered, Deaths as Cum_Deaths, 2_letter_code, 3_letter_code
            FROM covid
            WHERE Country='India'
            ORDER BY Country ASC, Date ASC
        """
dff = spark.sql(query)
dff.show()

+----------+-------+-------------+-------------+----------+-------------+-------------+
|      Date|Country|Cum_Confirmed|Cum_Recovered|Cum_Deaths|2_letter_code|3_letter_code|
+----------+-------+-------------+-------------+----------+-------------+-------------+
|2020-01-22|  India|            0|            0|         0|           IN|          IND|
|2020-01-23|  India|            0|            0|         0|           IN|          IND|
|2020-01-24|  India|            0|            0|         0|           IN|          IND|
|2020-01-25|  India|            0|            0|         0|           IN|          IND|
|2020-01-26|  India|            0|            0|         0|           IN|          IND|
|2020-01-27|  India|            0|            0|         0|           IN|          IND|
|2020-01-28|  India|            0|            0|         0|           IN|          IND|
|2020-01-29|  India|            0|            0|         0|           IN|          IND|
|2020-01-30|  India|            

In [14]:
query = """
            SELECT Date,
            Country,
            Confirmed,
            Recovered,
            Deaths,
            Confirmed - LAG(Confirmed,1) OVER(PARTITION BY Country  ORDER BY Date) AS Cur_Confirmed,
            Recovered - LAG(Recovered,1) OVER(PARTITION BY Country  ORDER BY Date) AS Cur_Recovered,
            Deaths - LAG(Deaths,1) OVER(PARTITION BY Country  ORDER BY Date) AS Cur_Deaths,
            2_letter_code, 
            3_letter_code
            FROM covid
            ORDER BY Country ASC, Date ASC
        """
df_country_wise_agg = spark.sql(query)
df_country_wise_agg = df_country_wise_agg.dropna()
df_country_wise_agg.show()

+----------+-----------+---------+---------+------+-------------+-------------+----------+-------------+-------------+
|      Date|    Country|Confirmed|Recovered|Deaths|Cur_Confirmed|Cur_Recovered|Cur_Deaths|2_letter_code|3_letter_code|
+----------+-----------+---------+---------+------+-------------+-------------+----------+-------------+-------------+
|2020-01-23|Afghanistan|        0|        0|     0|            0|            0|         0|           AF|          AFG|
|2020-01-24|Afghanistan|        0|        0|     0|            0|            0|         0|           AF|          AFG|
|2020-01-25|Afghanistan|        0|        0|     0|            0|            0|         0|           AF|          AFG|
|2020-01-26|Afghanistan|        0|        0|     0|            0|            0|         0|           AF|          AFG|
|2020-01-27|Afghanistan|        0|        0|     0|            0|            0|         0|           AF|          AFG|
|2020-01-28|Afghanistan|        0|        0|    

In [15]:
df_country_wise_agg.count()

89094

### Saving data to MySQL DB

In [17]:
from tqdm import tqdm
import mysql.connector as mySQL

In [18]:
DB_config = {
    'user': "Ubuntu",
    'password': "password",
}
DB_NAME = 'COVID_19_data'

In [19]:
print("#------------------> SQL <------------------#")

TABLES = {}
TABLES['country_wise_data'] = (
    "CREATE TABLE `country_wise_data` ("
    "  `Date` DATE,"
    "  `Country` VARCHAR(100),"
    "  `Tot_Confirmed` BIGINT,"
    "  `Tot_Recovered` BIGINT,"
    "  `Tot_Deaths` BIGINT,"
    "  `Confirmed` BIGINT,"
    "  `Recovered` BIGINT,"
    "  `Deaths` BIGINT,"
    "  `2_letter_code` VARCHAR(2),"
    "  `3_letter_code` VARCHAR(3)"
    ") ENGINE=InnoDB")


cnx = mySQL.connect(**DB_config)
cursor = cnx.cursor(buffered=True)
# cursor.execute('SET GLOBAL max_allowed_packet=67108864')

def create_database(cursor):
    try:
        cursor.execute(
            "CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(DB_NAME))
    except mysql.connector.Error as err:
        print("Failed creating database: {}".format(err))
        exit(1)

try:
    cursor.execute("USE {}".format(DB_NAME))
except mySQL.Error as err:
    print("Database {} does not exists.".format(DB_NAME))
    if err.errno == mySQL.errorcode.ER_BAD_DB_ERROR:
        create_database(cursor)
        print("Database {} created successfully.".format(DB_NAME))
        cnx.database = DB_NAME
    else:
        print(err)
        exit(1)

for table_name in TABLES:
    table_description = TABLES[table_name]
    try:
        print("Creating table {}: ".format(table_name), end='')
        cursor.execute(table_description)
    except mySQL.Error as err:
        if err.errno == mySQL.errorcode.ER_TABLE_EXISTS_ERROR:
            print("already exists.")
        else:
            print(err.msg)
    else:
        print("OK")

print("Saving data to SQL...", end=' ')

insert_query = """
INSERT INTO country_wise_data(Date, Country, Tot_Confirmed, Tot_Recovered, Tot_Deaths, Confirmed, Recovered, Deaths, 2_letter_code, 3_letter_code) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
"""
for row in tqdm(df_country_wise_agg.rdd.collect()):
    cursor.execute(insert_query ,tuple(row))
    
print('Done')
cnx.commit()
cursor.close()
cnx.close()

#------------------> SQL <------------------#
Database COVID_19_data does not exists.
Database COVID_19_data created successfully.
Creating table country_wise_data: OK
Saving data to SQL... 

100%|██████████| 89094/89094 [00:38<00:00, 2334.08it/s]


Done
