In [1]:
'''
@Authors: Vighnesh Harish Bilgi, Pavan Kalyan Yellaboina, Vipul Shete
@Date: 2023-03-10
@Last Modified by: Vighnesh Harish Bilgi 
@Last Modified time: 2023-03-10
@Title : Transform the COVID-19 dataset using Pyspark on Jupyter Notebook on AWS Glue Job
'''

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0 
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::004368799100:role/service-role/AWSGlueServiceRole-sample_dataset
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: b2e2a1bf-e9cc-402b-92dc-97a0c4dc7740
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
Waiting for session b2e2a1bf-e9cc-402b-92dc-97a0c4dc7740 to get into ready status...
Session b2e2a1bf-e9cc-402b-92dc-97a0c4dc7740 has been created.
'\n@Authors: Vighnesh Harish Bilgi, Pavan Kalyan Yellaboina, Vipul Shete\n@Date: 2023-03-10\

### Starting spark application

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, date_format, to_date, col, concat, isnan,when,count




In [3]:
spark=SparkSession.builder.appName('Pyspark_covid19_transform').getOrCreate()




### Import the COVID-19 data as a pyspark dataframe

In [9]:
pyspark_df = spark.read.csv("s3://poc-covid-19/raw_data/covid_19_data.csv", header = True, inferSchema = True)




Removing records that have 'OWID' in the iso_code.

In [10]:
pyspark_df = pyspark_df.filter(~col('iso_code').contains('OWID'))




Renaming 'location' column to 'country' 

In [11]:
pyspark_df = pyspark_df.withColumnRenamed("location","country")




##### Select column 'date', from date column, extract day, month and year as separate columns and then concatenate columns month, year and day to create column 'date_id' which will serve as primary key. 

##### Any duplicate records that exist has been dropped. 

##### These selected columns are then stored as a data Frame and then saved into an S3 bucket to be loaded as dimension table called 'dim_date' in AWS Redshift.  

In [12]:
dim_date = pyspark_df.withColumn('year',year(pyspark_df.date)).withColumn('month',month(pyspark_df.date)).withColumn("day", date_format(col("date"), "d"))
dim_date = dim_date.select(concat(dim_date.month,dim_date.year,dim_date.day).alias("date_id"),'year','month','day','date')
dim_date = dim_date.dropDuplicates()




### Load 'dim_date' table as .csv file into a S3 bucket

In [13]:
dim_date.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(
        path = "s3://poc-covid-19/processed_data/dim_date/", mode = "overwrite"
   )




##### Select columns 'people_vaccinated', 'people_fully_vaccinated', 'total_boosters' to create dimension table 'dim_vaccination'.  

##### Extracting year, month and day from 'date' column and concatenating with the 'iso_code' to create column 'country_date_id' which will serve as a primary key for this table.   

In [14]:
dim_vaccination = pyspark_df.select(concat(month(pyspark_df.date),year(pyspark_df.date),date_format(col("date"), "d"),pyspark_df.iso_code).alias("country_date_id"),
'people_vaccinated','people_fully_vaccinated','total_boosters')
dim_vaccination = dim_vaccination.dropDuplicates()




### Null values are replced with 0.

In [15]:
dim_vaccination = dim_vaccination.fillna({'people_vaccinated':0}).fillna({'people_fully_vaccinated':0}).fillna({'total_boosters':0})




### Load 'dim_vaccination' table as .csv file into a S3 bucket.

In [16]:
dim_vaccination.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(
        path = "s3://poc-covid-19/processed_data/dim_vaccination/", mode = "overwrite"
    )




##### Select columns 'hosp_patients', 'weely_icu_admissions', 'weekly_hosp_admissions', diabetes_prevalence, 'cardiovasc_death_rate' to create dimension table 'dim_patient'

##### Extracting year, month and day from 'date' column and concatenating with the 'iso_code' to create column 'country_date_id' which will serve as a primary key for this table.   

In [17]:
dim_patient = pyspark_df.select(concat(month(pyspark_df.date),year(pyspark_df.date),date_format(col("date"), "d"),pyspark_df.iso_code).alias("country_date_id"),'hosp_patients','weekly_icu_admissions','weekly_hosp_admissions','diabetes_prevalence','cardiovasc_death_rate')
dim_patient = dim_patient.dropDuplicates()




### Null values are replaced with 0.

In [18]:
dim_patient = dim_patient.fillna({'hosp_patients':0}).fillna({'weekly_icu_admissions':0}).fillna({'weekly_hosp_admissions':0}).fillna({'diabetes_prevalence':0}).fillna({'cardiovasc_death_rate':0})




### Load 'dim_patient' table into a S3 bucket.

In [19]:
dim_patient.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(
        path = "s3://poc-covid-19/processed_data/dim_patients/", mode = "overwrite"
    )




##### Select columns 'positive_rate', 'tests_per_case', 'tests_units' to create dimension table 'dim_testing'

##### Extracting year, month and day from 'date' column and concatenating with the 'iso_code' to create column 'country_date_id' which will serve as a primary key for this table.   

In [20]:
dim_testing = pyspark_df.select(concat(month(pyspark_df.date),year(pyspark_df.date),date_format(col("date"), "d"),pyspark_df.iso_code).alias("country_date_id"),'positive_rate','tests_per_case','tests_units')
dim_testing = dim_testing.dropDuplicates()




#### Null values are replaced with 0 for numerical columns, and are replaced with Ã¢ÂÂno tests performedÃ¢ÂÂ under tests_units. 

In [23]:
dim_testing = dim_testing.fillna({'tests_units':'no tests performed'}).fillna({'positive_rate':0}).fillna({'tests_per_case':0})




### Load 'dim_testing' table as a .csv file into a S3 bucket. 

In [24]:
dim_testing.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(path = "s3://poc-covid-19/processed_data/dim_testing/", mode = "overwrite")




##### Select columns 'iso_code', 'continent', 'country', 'population' to create table 'dim_country'. 'iso_code' will be used as primary key for the table. 

In [25]:
dim_country = pyspark_df.select('iso_code','continent','country','population')
dim_country = dim_country.dropDuplicates()




### Load 'dim_country' as a .csv file into a S3 bucket.

In [26]:
dim_country.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(
        path = "s3://poc-covid-19/processed_data/dim_country/", mode = "overwrite"
    )




##### Select columns "iso_code","new_cases","total_cases","total_deaths","new_deaths","total_tests","new_tests","total_vaccinations","new_vaccinations","icu_patients" to create fact table covid_fact 

##### Extract day, month and year as separate columns and then concatenate columns month,year and day to create column date_id 

##### Concatenate date_id column with the iso_code to create column country_date_id. 

##### Concatenate country_date_id column with the continent to create column fact_id. This will serve as primary key for this table. 

In [27]:
covid_fact = pyspark_df.select(concat(month(pyspark_df.date),year(pyspark_df.date),date_format(col("date"), "d"),
                                'iso_code','continent').alias('fact_id'),
                               concat(month(pyspark_df.date),year(pyspark_df.date),date_format(col("date"), "d"),
                                      'iso_code').alias('country_date_id'),
                              concat(month(pyspark_df.date),year(pyspark_df.date),date_format(col("date"), "d")).alias('date_id'),
                               "iso_code","new_cases", "total_cases", "total_deaths", "new_deaths", "total_tests",
                               "new_tests", "total_vaccinations", "new_vaccinations", "icu_patients")
covid_fact.dropDuplicates()

DataFrame[fact_id: string, country_date_id: string, date_id: string, iso_code: string, new_cases: double, total_cases: double, total_deaths: double, new_deaths: double, total_tests: double, new_tests: double, total_vaccinations: double, new_vaccinations: double, icu_patients: double]


### Null values are replaced with 0.

In [28]:
covid_fact = covid_fact.fillna(0)




### Load 'covid_fact' as a .csv file into a S3 bucket.

In [29]:
covid_fact.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(
        path = "s3://poc-covid-19/processed_data/covid_fact/", mode = "overwrite"
    )


