In [None]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql.functions import col
from pyspark.sql.types import TimestampType

In [None]:
df = spark.read.option("multiline", "true").json("Files/covid_test.json")
# df now is a Spark DataFrame containing JSON data from "Files/covid_test.json".

In [None]:
#convert Json fields into columns - Data transformation
df = \
df.\
    select (
        col('cases').getItem('1M_pop').alias('1M_pop_cases'),
        col('cases').getItem('active').alias('active_cases'),
        col('cases').getItem('recovered').alias('recovered_cases'),
        col('cases').getItem('total').alias('total_cases'),
        'continent',
        'country',
        'day',
        col('deaths').getItem('1M_pop').alias('1M_pop_deaths'),
        col('deaths').getItem('total').alias('total_deaths'),
        'population',
        col('tests').getItem('1M_pop').alias('1M_pop_tests'),
        col('tests').getItem('total').alias('total_tests'),
        'time'
    )

In [None]:
#Split date time field into 2 separate fields, date and time
from pyspark.sql.functions import col, split
df = df.\
    withColumn('Update_date', split(col('time'), 'T').getItem(0)). \
    withColumn('Update_time', split(split(col('time'), 'T').getItem(1), '\+').getItem(0))

In [None]:
#Drop the date time field as it is not needed anymore
df= df.drop('time')

In [None]:
#Convert datatype into numeric for relevant fields
from pyspark.sql.functions import col
df = df.\
    withColumn("1M_pop_cases", col("1M_pop_cases").cast("double")).\
    withColumn("active_cases", col("active_cases").cast("double")).\
    withColumn("recovered_cases", col("recovered_cases").cast("double")).\
    withColumn("total_cases", col("total_cases").cast("double")).\
    withColumn("1M_pop_deaths", col("1M_pop_deaths").cast("double")).\
    withColumn("total_deaths", col("total_deaths").cast("double")).\
    withColumn("population", col("population").cast("double")).\
    withColumn("1M_pop_tests", col("1M_pop_tests").cast("double")).\
    withColumn("total_tests", col("total_tests").cast("double"))

In [None]:
#Fix the datatype for date and time field created
from pyspark.sql.functions import col, to_date, to_timestamp
df = df.\
    withColumn("date", to_date(col("Update_date"), "yyyy-MM-dd")).\
    withColumn("timestamp", to_timestamp(col("Update_time"), "yyyy-MM-dd'T'HH:mm:ssXXX"))

In [None]:
#Drop columns which are not needed 
df= df.drop('day', 'date', 'timestamp')

In [None]:
#As data is updated in API on a daily basis, we will overwrite our table daily to only keep the most up to date copy of data and no historical records
df.write.mode('overwrite').saveAsTable('covid_by_country')