## LAB1: PySpark+MySQL(JDBC)

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Window

In [2]:
# Extract data from HDFS (CSV)
df_death = spark.read.csv("hdfs://devenv/user/spark/pyspark_etl/Covid_Death.csv", header=True, inferSchema=True)
df_vaccine = spark.read.csv("hdfs://devenv/user/spark/pyspark_etl/Covid_Vaccine.csv", header=True, inferSchema=True)

In [3]:
df_death.select('location','date','population','new_cases','new_deaths')\
        .show(5)

+-----------+---------+----------+---------+----------+
|   location|     date|population|new_cases|new_deaths|
+-----------+---------+----------+---------+----------+
|Afghanistan|2020/2/24|  38928341|        1|      null|
|Afghanistan|2020/2/25|  38928341|        0|      null|
|Afghanistan|2020/2/26|  38928341|        0|      null|
|Afghanistan|2020/2/27|  38928341|        0|      null|
|Afghanistan|2020/2/28|  38928341|        0|      null|
+-----------+---------+----------+---------+----------+
only showing top 5 rows



In [4]:
df_vaccine.select('location','date','new_vaccinations')\
          .show(5)

+-----------+---------+----------------+
|   location|     date|new_vaccinations|
+-----------+---------+----------------+
|Afghanistan|2020/2/24|            null|
|Afghanistan|2020/2/25|            null|
|Afghanistan|2020/2/26|            null|
|Afghanistan|2020/2/27|            null|
|Afghanistan|2020/2/28|            null|
+-----------+---------+----------------+
only showing top 5 rows



### 1. Tracking covid-19 Vaccinations across countries

In [5]:
# Transform: Tracking Coronavirus Vaccinations Around the World (by country)
cond=[df_death.location==df_vaccine.location,df_death.date==df_vaccine.date]
window=Window.partitionBy('location').orderBy('location','date') 
df_vaccine_total=df_death.filter('continent IS NOT NULL')\
               .join(df_vaccine,cond)\
               .select(df_death.location,df_death.date,df_death.population,df_death.new_cases,df_death.new_deaths,df_vaccine.new_vaccinations)\
               .withColumn('total_vaccinations',sum('new_vaccinations').over(window))

In [7]:
df_vaccine_total.select(df_vaccine_total.location,df_vaccine_total.date,df_vaccine_total.new_vaccinations,df_vaccine_total.total_vaccinations)\
                .filter(df_vaccine_total.location=='Taiwan')\
                .filter(df_vaccine_total.new_vaccinations.isNotNull())\
                .show()

+--------+---------+----------------+------------------+
|location|     date|new_vaccinations|total_vaccinations|
+--------+---------+----------------+------------------+
|  Taiwan|2021/3/22|            1578|              1578|
|  Taiwan|2021/3/23|            1640|              3218|
|  Taiwan|2021/3/24|            1951|              5169|
|  Taiwan|2021/3/25|            1884|              7053|
|  Taiwan|2021/3/26|            2179|              9232|
|  Taiwan|2021/3/27|             145|              9377|
|  Taiwan|2021/3/28|              35|              9412|
|  Taiwan|2021/3/29|            1479|             10891|
|  Taiwan|2021/3/30|            1757|             12648|
|  Taiwan|2021/3/31|            1752|             14400|
|  Taiwan| 2021/4/1|            2050|             16450|
|  Taiwan|2021/4/10|             208|             16658|
|  Taiwan|2021/4/11|              43|             16701|
|  Taiwan|2021/4/12|            1181|             17882|
|  Taiwan|2021/4/13|           

In [8]:
#Load: output df to mySQL database by jdbc connector
df_vaccine_total.write \
                .option("driver", "com.mysql.jdbc.Driver") \
                .jdbc("jdbc:mysql://localhost:3306", "covid.vaccination",
                 properties={"user": "root", "password": "root"}) # input username and password

### 2. Tracking  covid-19 inflection rate and death rate  across countries

In [9]:
#Transformation: Tracking covid-19 inflection rate and death rate across countries
df_inflection_total=df_death.select(df_death.location,df_death.continent,df_death.population,df_death.new_cases,df_death.new_deaths)\
                            .filter(df_death.continent.isNotNull())\
                            .groupBy(df_death.location,df_death.continent,df_death.population)\
                            .agg({'new_cases':'sum','new_deaths':'sum'})\
                            .withColumnRenamed('sum(new_cases)','total_cases')\
                            .withColumnRenamed('sum(new_deaths)','total_deaths')\
                            .na.fill(0)

In [11]:
# Top 5 most affected countries with the highest number of COVID-19 cases
df_inflection_total.orderBy(desc('total_cases')).show(5)

+-------------+-------------+----------+-----------+------------+
|     location|    continent|population|total_cases|total_deaths|
+-------------+-------------+----------+-----------+------------+
|United States|North America| 331002647|   32557439|      579276|
|        India|         Asia|1380004385|   20665148|      226188|
|       Brazil|South America| 212559409|   14930183|      414399|
|       France|       Europe|  68147687|    5767541|      105792|
|       Russia|       Europe| 145934460|    4792354|      110022|
+-------------+-------------+----------+-----------+------------+
only showing top 5 rows



In [12]:
df_inflection_percentage=df_inflection_total.withColumn('inflection_rate',format_number(df_inflection_total['total_cases']/df_inflection_total['population']*100,2))\
                                            .withColumn('death_rate',format_number(df_inflection_total['total_deaths']/df_inflection_total['total_cases']*100,2))          

In [13]:
# Top 5 most affected countries with the highest rate of COVID-19 Inflection
df_inflection_percentage.select('location','continent','population','inflection_rate','death_rate')\
                        .orderBy(desc('inflection_rate'))\
                        .show(5)

+-------------+-------------+----------+---------------+----------+
|     location|    continent|population|inflection_rate|death_rate|
+-------------+-------------+----------+---------------+----------+
|       Sweden|       Europe|  10099270|           9.86|      1.42|
|United States|North America| 331002647|           9.84|      1.78|
|       Israel|         Asia|   8655541|           9.69|      0.76|
|      Estonia|       Europe|   1326539|           9.33|      0.96|
|    Lithuania|       Europe|   2722291|           9.28|      1.58|
+-------------+-------------+----------+---------------+----------+
only showing top 5 rows



In [14]:
#Load: output df to mySQL database by jdbc connector
df_inflection_percentage.write \
                .option("driver", "com.mysql.jdbc.Driver") \
                .jdbc("jdbc:mysql://localhost:3306", "covid.inflection_percentage", 
                 properties={"user": "root", "password": "root"}) # input username and password

## LAB2: PySpark+PostgreSQL(Docker-Compose)

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Window
import psycopg2

In [2]:
# Extract data from HDFS (CSV)
df_death = spark.read.csv("hdfs://devenv/user/spark/pyspark_etl/Covid_Death.csv", header=True, inferSchema=True)
df_vaccine = spark.read.csv("hdfs://devenv/user/spark/pyspark_etl/Covid_Vaccine.csv", header=True, inferSchema=True)

In [3]:
# Transform1: Tracking Coronavirus Vaccinations Around the World (by country)
cond=[df_death.location==df_vaccine.location,df_death.date==df_vaccine.date]
window=Window.partitionBy('location').orderBy('location','date') 
df_vaccine_total=df_death.filter('continent IS NOT NULL')\
               .join(df_vaccine,cond)\
               .select(df_death.location,df_death.date,df_death.population,df_death.new_cases,df_death.new_deaths,df_vaccine.new_vaccinations)\
               .withColumn('total_vaccinations',sum('new_vaccinations').over(window))

In [4]:
#Transformation2: Tracking covid-19 inflection rate and death rate across countries
df_inflection_total=df_death.select(df_death.location,df_death.continent,df_death.population,df_death.new_cases,df_death.new_deaths)\
                            .filter(df_death.continent.isNotNull())\
                            .groupBy(df_death.location,df_death.continent,df_death.population)\
                            .agg({'new_cases':'sum','new_deaths':'sum'})\
                            .withColumnRenamed('sum(new_cases)','total_cases')\
                            .withColumnRenamed('sum(new_deaths)','total_deaths')\
                            .na.fill(0)

In [5]:
df_inflection_percentage=df_inflection_total.withColumn('inflection_rate',format_number(df_inflection_total['total_cases']/df_inflection_total['population']*100,2))\
                                            .withColumn('death_rate',format_number(df_inflection_total['total_deaths']/df_inflection_total['total_cases']*100,2))  

In [6]:
#Load to postgresql database(docker-compose)
conn = psycopg2.connect(
        host = "localhost",
        database = "covid",
        user = "root",     # input username
        password = "root") # input password

In [7]:
cursor = conn.cursor()

sql_query='''

CREATE TABLE IF NOT EXISTS vaccination( \
        location VARCHAR(255),
        date Date,
        population int,
        new_cases int,
        new_deaths int,
        new_vaccinations int,
        total_vaccinations int
)
'''

cursor.execute(sql_query)
conn.commit()
cursor.close()
# conn.close()

In [8]:
cursor = conn.cursor()

vaccination_seq = [tuple(x) for x in df_vaccine_total.collect()]

records_list_template = ','.join(['%s'] * len(vaccination_seq))

insert_query = "INSERT INTO vaccination ( \
                location, date, population, new_cases,\
                new_deaths, new_vaccinations, total_vaccinations \
                ) VALUES {}".format(records_list_template)



cursor.execute(insert_query, vaccination_seq) 
conn.commit()
cursor.close()
# conn.close()

In [9]:
cursor = conn.cursor()

sql_query='''

CREATE TABLE IF NOT EXISTS inflection_percentage( \
        location VARCHAR(255),
        continent VARCHAR(255),
        population int,
        total_cases int,
        total_deaths int,
        inflection_rate decimal,
        death_rate decimal
)
'''

cursor.execute(sql_query)
conn.commit()
cursor.close()
# conn.close()

In [10]:
cursor = conn.cursor()

inflection_seq = [tuple(x) for x in df_inflection_percentage.collect()]

records_list_template = ','.join(['%s'] * len(inflection_seq))

insert_query = "INSERT INTO inflection_percentage ( \
                location, continent, population, total_cases,\
                total_deaths, inflection_rate, death_rate \
                ) VALUES {}".format(records_list_template)



cursor.execute(insert_query, inflection_seq) 
conn.commit()
cursor.close()
conn.close()