In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import desc
from pyspark.sql.types import DecimalType, FloatType
from pyspark.ml.feature import VectorAssembler
import matplotlib.pyplot as plt 
import geopandas
import geoplot
from pyproj import CRS
import pandas_bokeh
pandas_bokeh.output_notebook()
import seaborn as sns
sns.set()

In [None]:
#hello world 

### Import the data to Spark

In [None]:
spark = SparkSession.builder.appName("COVID").config("spark.some.config.option", "some-value").getOrCreate()
confirmed = spark.read.csv('time_series_19-covid-Confirmed_archived_0325.csv', header=True, inferSchema = True)

# I will start by creating temporary table querying with SQL
confirmed.createOrReplaceTempView('COVID_Confirmed')
sparkConfirmedDF = spark.sql('''SELECT * FROM COVID_Confirmed''')

### Preprocess the data

###### Handle null values & Select only the relevant rows (i.e., those that include data about Australia)

In [None]:
# Handling (Dropping) 'NULL / None' Values
ConfirmedDF = sparkConfirmedDF.na.drop()

# Selecting Australia Confirmed Cases
confirmed = spark.sql('''
SELECT * FROM COVID_Confirmed 
WHERE `Country/Region` = 'Australia'
''')
confirmed.show()

In [None]:
# show data in visualisation
confirmed2=confirmed.toPandas()
confirmed2.plot_bokeh(kind='bar', x='Province/State', y = list(confirmed2.columns.values),
              xlabel='Provinces / States', ylabel='Frequency', 
              legend=False, title='Frequency by Days', figsize=(1000, 500))

In [None]:
gpdf = geopandas.GeoDataFrame(confirmed2, geometry=geopandas.points_from_xy(confirmed2.Long, confirmed2.Lat))
gpdf.crs = CRS.from_string("epsg:4326")
gpdf.plot_bokeh(legend=False)

In [None]:
columns_to_select = confirmed.columns[14:]
# Selecting a subset from the dataframe
confirmed = confirmed.select(columns_to_select)

In [None]:
# sum up all rows data into 1 row (merging)
confirmed = confirmed.select(confirmed.columns).groupBy().sum()

###### Merge the data to one row since we want to consider the entire Australia

In [None]:
# sum up all rows data into 1 row (merging)
confirmed = confirmed.select(confirmed.columns).groupBy().sum()

# renaming column names for indexing
counter = 0
for column in confirmed.columns:
    confirmed = confirmed.withColumnRenamed(f"{column}", f"{counter}")
    counter = counter + 1
    
# Converting to Pandas to transpose the dataframe
confirmed_transpose = confirmed.toPandas().transpose()
confirmed_transpose

In [None]:
# Creating a Spark DataFrame from our Pandas DataFrame
confirmedSparkDF = spark.createDataFrame(confirmed_transpose)
confirmedSparkDF = confirmedSparkDF.withColumnRenamed(f"{0}", "total_counts")
confirmedSparkDF = confirmedSparkDF.withColumn("indexes", F.row_number()
                                               .over(Window.orderBy(F.monotonically_increasing_id())) - 1)
confirmedSparkDF.show()

In [None]:
featureassembler = VectorAssembler(inputCols = ['indexes'], outputCol="Days")
output = featureassembler.transform(confirmedSparkDF)
finalized_data = output.select("Days", "total_counts")

###### Splitting into Train & Test data

In [None]:
finalized_data.createOrReplaceTempView("Finalized_Cases")
train_data = spark.sql("SELECT * FROM Finalized_Cases LIMIT 50")
test_data = spark.sql("SELECT * FROM Finalized_Cases ORDER BY Days DESC LIMIT 2")

#####  Alternate method to split data into train and test data  ######
# train_data = finalized_data.where(F.col('indexes') < 50)
# test_data = finalized_data.where(F.col('indexes') > 49)

train_data.show()
test_data.show()

In [None]:
# plotting the data
#finalized_data.toPandas().drop(['Days'],axis=1).plot(kind='line',figsize=(10,5))
finalized_data.toPandas().plot_bokeh(kind='line', legend=False, figsize=(1000, 500))

### Apply Regression Method

In [None]:
regressor = LinearRegression(featuresCol='Days', labelCol='total_counts', maxIter=10, regParam=0.8, elasticNetParam=0.5)
regressor = regressor.fit(train_data)

print('Regressor coefficient: ', regressor.coefficients)
print('Regressor intercept: ', regressor.intercept)
Summary = regressor.summary
print("total Iterations: %d" % Summary.totalIterations)
print("RMSE: %f" % Summary.rootMeanSquaredError)
print("R-squared: %f" % Summary.r2)
train_data.describe().show()

In [None]:
pred_results = regressor.evaluate(test_data)
test_data = pred_results.predictions
test_data.show()

### Compute the Difference

In [None]:
prediction = pred_results.predictions.select("prediction").collect()
test_data_actual = test_data.withColumnRenamed("total_counts","actual")
test_data_actual = test_data_actual.select("actual").collect()

actual_cases = []
predicted_cases = []
for actual in test_data_actual:
    actual_cases.append(float(actual.actual))  
for predicts in prediction:
    predicted_cases.append(predicts.prediction)
    
difference = [actual - predicted for actual, predicted in zip(actual_cases, predicted_cases)]
differenceDF = spark.createDataFrame(difference, FloatType())
differenceDF = differenceDF.withColumnRenamed("value","difference_values")
differenceDF.show()

### Visualizing Results

In [None]:
actual_DF = output.select("indexes", "total_counts").withColumnRenamed(f"indexes", "Days")

# Converting to Pandas to plot scatter and regression plots
actual_data = actual_DF.toPandas()
Predicted_df = test_data.toPandas()

Predicted_df['Days']=[51,50]
f, ax = plt.subplots(figsize=(12,8))
sns.regplot(x='Days',y='total_counts',data=actual_data[:-2])
sns.scatterplot(x='Days',y='total_counts',data=Predicted_df)
sns.scatterplot(x='Days',y='prediction',data=Predicted_df,marker='x')

In [None]:
# create residual plot
f, ax = plt.subplots(figsize=(12, 8))
sns.residplot(x='Days',y='total_counts',data=actual_data[:-2])
sns.scatterplot(x=[51,50],y='difference_values',data=differenceDF.toPandas(),marker='x')

In [None]:
#End