In [0]:
#Importing the covid csv file provided in the USF Box

# File location and type
file_location = "/FileStore/tables/covid_19_clean_complete.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
# Reference:https://stackoverflow.com/questions/44296484/how-to-replace-null-nan-or-infinite-values-to-default-value-in-spark-scala
# Reference:https://stackoverflow.com/questions/46439410/spark-treating-null-values-in-csv-column-as-null-datatype
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option('nanValue',' ')\
  .option('nulValue',' ')\
  .load(file_location)



In [0]:
# Create a temp table for running queries

temp_table_name = "covid"

df.createOrReplaceTempView(temp_table_name)

In [0]:
# Testing if the table creation is working fine
spark.sql("select * from covid limit 5").show()

In [0]:
# Import function to convert date column into timestamp type
from pyspark.sql.functions import datediff,date_format,to_date,to_timestamp

In [0]:
# Create a new column called date_mod of timestamp type in format that we desire
df = df.withColumn('date_mod', to_date(df.date, 'MM/dd/yy'))

In [0]:
#Since this is time series data, keeping only the latest observation for all countries as this represents the most recent reading 
df_new = df.filter("date_mod == '2020-04-30'")

In [0]:
#Create a new column in the dataframe to calculate percentage
df_new = df_new.withColumn("percentage", (F.col("deaths")/F.col("confirmed"))*100)

In [0]:
# Create a view or table

temp_table_name = "q1"

df_new.createOrReplaceTempView(temp_table_name)

In [0]:
spark.sql("select country, percentage from q1 order by percentage desc limit 1").show()

#The country with the highest death rate is Yemen

In [0]:
# Create new dataframe for this question's computations
df_month = df

In [0]:
# Import split function
from pyspark.sql.functions import split

In [0]:
# Split the date-mod column and take the value of month alone. Store month in a separate column
split_col = split(df_month['date_mod'], '-')
df_month = df.withColumn('month', split_col.getItem(1))
display(df_month)

province,country,lat,long,date,confirmed,deaths,recovered,date_mod,month
,Afghanistan,33.0,65.0,1/22/2020,0,0,0,2020-01-22,1
,Albania,41.1533,20.1683,1/22/2020,0,0,0,2020-01-22,1
,Algeria,28.0339,1.6596,1/22/2020,0,0,0,2020-01-22,1
,Andorra,42.5063,1.5218,1/22/2020,0,0,0,2020-01-22,1
,Angola,-11.2027,17.8739,1/22/2020,0,0,0,2020-01-22,1
,Antigua and Barbuda,17.0608,-61.7964,1/22/2020,0,0,0,2020-01-22,1
,Argentina,-38.4161,-63.6167,1/22/2020,0,0,0,2020-01-22,1
,Armenia,40.0691,45.0382,1/22/2020,0,0,0,2020-01-22,1
Australian Capital Territory,Australia,-35.4735,149.0124,1/22/2020,0,0,0,2020-01-22,1
New South Wales,Australia,-33.8688,151.2093,1/22/2020,0,0,0,2020-01-22,1


In [0]:
#Create new dataframes for various months and filtering only the last day of the month since it represents the final reading for the month
df_jan = df_month.filter("date_mod == '2020-01-31'")
df_feb = df_month.filter("date_mod == '2020-02-29'")
df_march = df_month.filter("date_mod == '2020-03-31'")
df_apr = df_month.filter("date_mod == '2020-04-30'")

In [0]:
#Create a new column in the dataframes to calculate percentage
df_jan = df_jan.withColumn("percentage", (F.col("deaths")/F.col("confirmed"))*100)
df_feb = df_feb.withColumn("percentage", (F.col("deaths")/F.col("confirmed"))*100)
df_march = df_march.withColumn("percentage", (F.col("deaths")/F.col("confirmed"))*100)
df_apr = df_apr.withColumn("percentage", (F.col("deaths")/F.col("confirmed"))*100)

In [0]:
#Create tables for all the above dataframes
temp_table_name = "jan"
df_jan.createOrReplaceTempView(temp_table_name)

temp_table_name = "feb"
df_feb.createOrReplaceTempView(temp_table_name)

temp_table_name = "march"
df_march.createOrReplaceTempView(temp_table_name)

temp_table_name = "april"
df_apr.createOrReplaceTempView(temp_table_name)

In [0]:
spark.sql("select country, month, percentage from jan order by percentage desc limit 1").show()
spark.sql("select country, month, percentage from feb order by percentage desc limit 1").show()
spark.sql("select country, month, percentage from march order by percentage desc limit 1").show()
spark.sql("select country, month, percentage from april order by percentage desc limit 1").show()

In [0]:
#Use the following data set to identify which country belongs to which continent.   

#https://github.com/dbouquin/IS_608/blob/master/NanosatDB_munging/Countries-Continents.csv (Links to an external site.) 

In [0]:
# Load the continents CSV

# File location and type
file_location = "/FileStore/tables/continentmapping.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
# Reference:https://stackoverflow.com/questions/44296484/how-to-replace-null-nan-or-infinite-values-to-default-value-in-spark-scala
# Reference:https://stackoverflow.com/questions/46439410/spark-treating-null-values-in-csv-column-as-null-datatype
contdf = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option('nanValue',' ')\
  .option('nulValue',' ')\
  .load(file_location)

In [0]:
# Create a view or table

temp_table_name = "conts"

contdf.createOrReplaceTempView(temp_table_name)

In [0]:
#Create a new dataframe using join on Continents table & original COVID table
mapped_df = spark.sql("select * from covid full outer join conts on covid.country = conts.Country")

In [0]:
#Filter only records representing Asia continent
asia = mapped_df.filter("Continent == 'Asia'")

In [0]:
#Create new dataframes for various months and filtering only the last day of the month since it represents the final reading for the month
asia_jan = asia.filter("date_mod == '2020-01-31'")
asia_feb = asia.filter("date_mod == '2020-02-29'")
asia_mar = asia.filter("date_mod == '2020-03-31'")
asia_apr = asia.filter("date_mod == '2020-04-30'")

In [0]:
split_col = split(asia['date_mod'], '-')
asia_jan = asia_jan.withColumn('month', split_col.getItem(1))
asia_feb = asia_feb.withColumn('month', split_col.getItem(1))
asia_mar = asia_mar.withColumn('month', split_col.getItem(1))
asia_apr = asia_apr.withColumn('month', split_col.getItem(1))

In [0]:
#Create tables for all the above dataframes
temp_table_name = "janas"
asia_jan.createOrReplaceTempView(temp_table_name)

temp_table_name = "febas"
asia_feb.createOrReplaceTempView(temp_table_name)

temp_table_name = "marchas"
asia_mar.createOrReplaceTempView(temp_table_name)

temp_table_name = "aprilas"
asia_apr.createOrReplaceTempView(temp_table_name)

In [0]:
spark.sql("select continent, month, sum(confirmed) as Infections from janas group by continent, month").show()
spark.sql("select continent, month, sum(confirmed) as Infections from febas group by continent, month").show()
spark.sql("select continent, month, sum(confirmed) as Infections from marchas group by continent, month").show()
spark.sql("select continent, month, sum(confirmed) as Infections from aprilas group by continent, month").show()

In [0]:
# Latitudes between 23.55 and 23.5 N are called the Tropics.
spark.sql("select * from covid limit 1").show()

In [0]:
# Negative latitude is used to denote the regions south of the equator
tropical = spark.sql("select * from covid where lat between -23.5 and 23.55")

In [0]:
# Create a dataframe for non tropical datasets
non_tropical = spark.sql("select * from covid where lat not between -23.5 and 23.55")

In [0]:
#Since this is time series data, keeping only the latest observation for all countries as this represents the most recent reading 
tropical = tropical.filter("date_mod == '2020-04-30'")
non_tropical = non_tropical.filter("date_mod == '2020-04-30'")

In [0]:
#Create a new column in the dataframes to calculate mortality rate
tropical = tropical.withColumn("percentage", (F.col("deaths")/F.col("confirmed"))*100)
non_tropical = non_tropical.withColumn("percentage", (F.col("deaths")/F.col("confirmed"))*100)

In [0]:
#Create tables for all the above dataframes
temp_table_name = "tropical"
tropical.createOrReplaceTempView(temp_table_name)

temp_table_name = "nontropical"
non_tropical.createOrReplaceTempView(temp_table_name)

In [0]:
# Calculate the number of cases, deaths in tropical regions & non-tropical regions
spark.sql("select sum(confirmed) as InfectedInTropicalRegions, sum(deaths) as DeathsInTropical from tropical").show()
spark.sql("select sum(confirmed) as InfectedIn_NonTropicalRegions, sum(deaths) as DeathsinNonTropical from nontropical").show()