## Extract data

In [1]:
import pandas as pd

In [2]:
url = "https://raw.githubusercontent.com/datasets/covid-19/main/data/countries-aggregated.csv"
df=pd.read_csv(url)
df.to_csv("covid_data.csv",index=False)

In [3]:
df.head()

Unnamed: 0,Date,Country,Confirmed,Recovered,Deaths
0,2020-01-22,Afghanistan,0,0,0
1,2020-01-23,Afghanistan,0,0,0
2,2020-01-24,Afghanistan,0,0,0
3,2020-01-25,Afghanistan,0,0,0
4,2020-01-26,Afghanistan,0,0,0


In [4]:
from pyspark.sql import SparkSession

In [5]:
#start sparKsession
spark = SparkSession.builder\
        .appName("COVID-19 ETL Project")\
        .config("spark.jars","../mysql-connector-j-9.3.0/mysql-connector-j-9.3.0.jar")\
        .getOrCreate()

In [6]:
#Read csv
df = spark.read.csv("covid_data.csv",header=True,inferSchema=True)

In [7]:
#show sample data
df.show(5)

+----------+-----------+---------+---------+------+
|      Date|    Country|Confirmed|Recovered|Deaths|
+----------+-----------+---------+---------+------+
|2020-01-22|Afghanistan|        0|        0|     0|
|2020-01-23|Afghanistan|        0|        0|     0|
|2020-01-24|Afghanistan|        0|        0|     0|
|2020-01-25|Afghanistan|        0|        0|     0|
|2020-01-26|Afghanistan|        0|        0|     0|
+----------+-----------+---------+---------+------+
only showing top 5 rows



## Transform data

In [8]:
from pyspark.sql.functions import col,to_date

# Convert Date column to DateType
df=df.withColumn("Date",to_date(col("Date"),"yyyy-MM-dd"))

# Add Active Cases column
df=df.withColumn("Active_Cases",col("Confirmed")-(col("Recovered")+col("Deaths")))

# Filter for India and US
df_filtered = df.filter(col("Country").isin("India","US"))

df_filtered.show(10)


+----------+-------+---------+---------+------+------------+
|      Date|Country|Confirmed|Recovered|Deaths|Active_Cases|
+----------+-------+---------+---------+------+------------+
|2020-01-22|  India|        0|        0|     0|           0|
|2020-01-23|  India|        0|        0|     0|           0|
|2020-01-24|  India|        0|        0|     0|           0|
|2020-01-25|  India|        0|        0|     0|           0|
|2020-01-26|  India|        0|        0|     0|           0|
|2020-01-27|  India|        0|        0|     0|           0|
|2020-01-28|  India|        0|        0|     0|           0|
|2020-01-29|  India|        0|        0|     0|           0|
|2020-01-30|  India|        1|        0|     0|           1|
|2020-01-31|  India|        1|        0|     0|           1|
+----------+-------+---------+---------+------+------------+
only showing top 10 rows



## Load data

In [9]:
from dotenv import load_dotenv
import os

load_dotenv()

jdbc_url="jdbc:mysql://localhost:3306/covid_data"
table_name="country_covid"
username=os.getenv("MYSQL_USERNAME")
password=os.getenv("MYSQL_PASSWORD")

#Write DataFrame to MySQL table
df_filtered.write\
    .format("jdbc")\
    .option("url",jdbc_url)\
    .option("driver","com.mysql.cj.jdbc.Driver")\
    .option("dbtable",table_name)\
    .option("user",username)\
    .option("password",password)\
    .mode("append")\
    .save()

In [10]:
df_check=spark.read\
    .format("jdbc")\
    .option("url",jdbc_url)\
    .option("driver","com.mysql.cj.jdbc.Driver")\
    .option("dbtable",table_name)\
    .option("user",username)\
    .option("password",password)\
    .load()

In [11]:
df_check.show()

+----------+-------+---------+---------+------+------------+
|      date|country|confirmed|recovered|deaths|active_cases|
+----------+-------+---------+---------+------+------------+
|2020-01-22|  India|        0|        0|     0|           0|
|2020-01-22|     US|        1|        0|     0|           1|
|2020-01-23|  India|        0|        0|     0|           0|
|2020-01-23|     US|        1|        0|     0|           1|
|2020-01-24|  India|        0|        0|     0|           0|
|2020-01-24|     US|        2|        0|     0|           2|
|2020-01-25|  India|        0|        0|     0|           0|
|2020-01-25|     US|        2|        0|     0|           2|
|2020-01-26|     US|        5|        0|     0|           5|
|2020-01-26|  India|        0|        0|     0|           0|
|2020-01-27|     US|        5|        0|     0|           5|
|2020-01-27|  India|        0|        0|     0|           0|
|2020-01-28|  India|        0|        0|     0|           0|
|2020-01-28|     US|    