In [0]:
# Initializing the connection with Azure SQL Database

jdbcHostname = "ukeucovidreport-server.database.windows.net"
jdbcPort = 1433
jdbcDatabase = "covidreport-db"
jdbcUsername = "camille_loue"
jdbcPassword = "2505Basile"
jdbcDriver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"

jdbcURL = f"jdbc:sqlserver://{jdbcHostname}:{jdbcPort};databaseName={jdbcDatabase};user={jdbcUsername};password={jdbcPassword}"

Data Wrangling

In [0]:
# Creating dataframes by fetching the data from the schema
df_population = spark.read.format("jdbc").option("url",jdbcURL).option("dbtable", "covid_reporting.population").load()
df_lookup = spark.read.format("jdbc").option("url",jdbcURL).option("dbtable", "covid_reporting.lookup").load()
df_cases_deaths = spark.read.format("jdbc").option("url",jdbcURL).option("dbtable", "covid_reporting.cases_deaths").load()
df_testing = spark.read.format("jdbc").option("url",jdbcURL).option("dbtable", "covid_reporting.testing").load()
df_hospital_admissions = spark.read.format("jdbc").option("url",jdbcURL).option("dbtable", "covid_reporting.hospital_admissions_weekly").load()

In [0]:
# Joining the dataframes based on the primary key of the schema
join_test = df_lookup.join(df_cases_deaths,["reported_year_week","country_code_2_digit"]).join(df_hospital_admissions,["reported_year_week","country_code_2_digit"]).join(df_testing,["reported_year_week","country_code_2_digit"]).join(df_population,["country_code_2_digit"])

In [0]:
# Dropping irrelevant and duplicate columns
columns_to_drop = ['country', 'source','testing_data_source']
df = join_test.drop(*columns_to_drop)

In [0]:
# Filling null values with 0
df = df.na.fill(0)

Modelling

In [0]:
# StringIndexer and OneHotEncoder for the categorical columns
from pyspark.ml.feature import StringIndexer
indexer_1=StringIndexer(inputCol='country_code_2_digit',outputCol='country_code_2_digit_encoded')
indexed_1=indexer_1.fit(df).transform(df)

indexer_2=StringIndexer(inputCol='reported_year_week',outputCol='reported_year_week_encoded')
indexed=indexer_2.fit(indexed_1).transform(indexed_1)

In [0]:
# Dropping categorical columns
columns_to_drop = ['country_code_2_digit','reported_year_week']
final_df = indexed.drop(*columns_to_drop)

In [0]:
# Applying VotingAssembler on the dataframe
vectorAssembler = VectorAssembler(inputCols = ['testing_rate','positivity_rate','hospital_occupancy_count','icu_occupancy_count','age_group_0_14','age_group_15_24','age_group_25_49','age_group_50_64','age_group_65_79','age_group_80_max','weekly_cases_count'], outputCol = 'features')
ml_df = vectorAssembler.transform(final_df)
ml_df = ml_df.select(['features', 'weekly_deaths_count'])
ml_df.show(3)

In [0]:
# Train Test Split
splits = ml_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

Linear Regression

In [0]:
# Training the model 
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol='features',labelCol='weekly_deaths_count')
lr_model = lr.fit(train_df)

In [0]:
# Fitting the model
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","weekly_deaths_count","features").show(5)

In [0]:
# Predictions performance
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="weekly_deaths_count",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

Random Forest

In [0]:
# Training the model
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

rf = RandomForestRegressor(featuresCol='features',labelCol='weekly_deaths_count')
rf_model = rf.fit(train_df)

In [0]:
# Fitting the model
predictions = rf_model.transform(test_df)

# Select example rows to display.
predictions.select("prediction", "weekly_deaths_count", "features").show(5)

In [0]:
# Predictions performance
evaluator = RegressionEvaluator(
    labelCol="weekly_deaths_count", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

evaluator_2 = RegressionEvaluator(
    labelCol="weekly_deaths_count", predictionCol="prediction", metricName="r2")
r2 = evaluator_2.evaluate(predictions)
print("R Squarred on test data = %g" % r2)