In [308]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import os
from datetime import datetime 
from functools import reduce
from operator import add
import pandas as pd
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

**Filter data to find only country with Australia**

In [263]:
def filterData(df):
      return df.filter(col("Country/Region") == "Australia")

**Transpose data**

In [302]:
def t_data(df):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("Date"), col(c).alias("Cases")) for c in cols
    ])).alias("kvs")

    return df.select([kvs]).select(["kvs.Date", "kvs.Cases"])

**Relative path**

In [265]:
dirname = os.path.abspath('')
filepath = os.path.join(dirname, 'time_series_19-covid-Confirmed_archived_0325.csv')

**Read file**

In [267]:
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
df = sqlContext.read.option("inferSchema", "true").option("header", "true").csv(filepath)

In [268]:
df = filterData(df)

counts = df.count()
print ("Number of rows in df -> %i" % (counts))

df.toPandas()

Number of rows in df -> 9


Unnamed: 0,Province/State,Country/Region,Lat,Long,1/22/20,1/23/20,1/24/20,1/25/20,1/26/20,1/27/20,...,3/14/20,3/15/20,3/16/20,3/17/20,3/18/20,3/19/20,3/20/20,3/21/20,3/22/20,3/23/20
0,New South Wales,Australia,-33.8688,151.2093,0,0,0,0,3,4,...,112,134,171,210,267,307,353,436,533,533
1,Victoria,Australia,-37.8136,144.9631,0,0,0,0,1,1,...,49,57,71,94,121,121,121,229,296,296
2,Queensland,Australia,-28.0167,153.4,0,0,0,0,0,0,...,46,61,68,78,94,144,184,221,221,221
3,South Australia,Australia,-34.9285,138.6007,0,0,0,0,0,0,...,19,20,29,29,37,42,50,67,100,100
4,From Diamond Princess,Australia,35.4437,139.638,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
5,Western Australia,Australia,-31.9505,115.8605,0,0,0,0,0,0,...,17,17,28,31,35,52,64,90,120,120
6,Tasmania,Australia,-41.4545,145.9707,0,0,0,0,0,0,...,5,6,7,7,10,10,10,16,22,22
7,Northern Territory,Australia,-12.4634,130.8456,0,0,0,0,0,0,...,1,1,1,1,1,1,3,3,3,3
8,Australian Capital Territory,Australia,-35.4735,149.0124,0,0,0,0,0,0,...,1,1,2,2,3,4,6,9,19,19


**Dropped columns which are outside the time range**

In [269]:
names = df.schema.names
start_date = datetime.strptime("1/31/20", '%m/%d/%y')
end_date = datetime.strptime("3/22/20", '%m/%d/%y')
#len(names)
for column in names:
    if column != "Province/State" and column != "Country/Region" and column != "Lat" and column != "Long":
        col_name = datetime.strptime(column, '%m/%d/%y')
        if not(col_name > start_date and col_name < end_date):
            df = df.drop(column)
    


df.toPandas()

Unnamed: 0,Province/State,Country/Region,Lat,Long,2/1/20,2/2/20,2/3/20,2/4/20,2/5/20,2/6/20,...,3/12/20,3/13/20,3/14/20,3/15/20,3/16/20,3/17/20,3/18/20,3/19/20,3/20/20,3/21/20
0,New South Wales,Australia,-33.8688,151.2093,4,4,4,4,4,4,...,65,92,112,134,171,210,267,307,353,436
1,Victoria,Australia,-37.8136,144.9631,4,4,4,4,4,4,...,21,36,49,57,71,94,121,121,121,229
2,Queensland,Australia,-28.0167,153.4,3,2,2,3,3,4,...,20,35,46,61,68,78,94,144,184,221
3,South Australia,Australia,-34.9285,138.6007,1,2,2,2,2,2,...,9,16,19,20,29,29,37,42,50,67
4,From Diamond Princess,Australia,35.4437,139.638,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
5,Western Australia,Australia,-31.9505,115.8605,0,0,0,0,0,0,...,9,14,17,17,28,31,35,52,64,90
6,Tasmania,Australia,-41.4545,145.9707,0,0,0,0,0,0,...,3,5,5,6,7,7,10,10,10,16
7,Northern Territory,Australia,-12.4634,130.8456,0,0,0,0,0,0,...,1,1,1,1,1,1,1,1,3,3
8,Australian Capital Territory,Australia,-35.4735,149.0124,0,0,0,0,0,0,...,0,1,1,1,2,2,3,4,6,9


In [270]:
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Province/State,9,,,Australian Capital Territory,Western Australia
Country/Region,9,,,Australia,Australia
Lat,9,-24.502866666666666,23.946964075796746,-41.4545,35.4437
Long,9,141.0555888888889,11.754214953203432,115.8605,153.4
2/1/20,9,1.3333333333333333,1.8027756377319948,0,4
2/2/20,9,1.3333333333333333,1.7320508075688774,0,4
2/3/20,9,1.3333333333333333,1.7320508075688774,0,4
2/4/20,9,1.4444444444444444,1.810463415200036,0,4
2/5/20,9,1.4444444444444444,1.810463415200036,0,4


In [271]:
df_new_south_wales = df.filter(col("Province/State") == "New South Wales")
df_new_south_wales = df_new_south_wales.drop(*["Province/State","Country/Region", "Lat", "Long"])
df_new_south_wales.toPandas()

Unnamed: 0,2/1/20,2/2/20,2/3/20,2/4/20,2/5/20,2/6/20,2/7/20,2/8/20,2/9/20,2/10/20,...,3/12/20,3/13/20,3/14/20,3/15/20,3/16/20,3/17/20,3/18/20,3/19/20,3/20/20,3/21/20
0,4,4,4,4,4,4,4,4,4,4,...,65,92,112,134,171,210,267,307,353,436


In [303]:
df_new_south_wales_t = t_data(df_new_south_wales)
df_new_south_wales_t = df_new_south_wales_t.select("*").withColumn("Date_value", monotonically_increasing_id())
df_new_south_wales_t.toPandas()

Unnamed: 0,Date,Cases,Date_value
0,2/1/20,4,0
1,2/2/20,4,1
2,2/3/20,4,2
3,2/4/20,4,3
4,2/5/20,4,4
5,2/6/20,4,5
6,2/7/20,4,6
7,2/8/20,4,7
8,2/9/20,4,8
9,2/10/20,4,9


In [304]:
vectorAssembler = VectorAssembler(inputCols = ['Cases'], outputCol = 'features')
v_df = vectorAssembler.transform(df_new_south_wales_t)
v_df = v_df.select(['features', 'Date_value'])
v_df.show(3)

+--------+----------+
|features|Date_value|
+--------+----------+
|   [4.0]|         0|
|   [4.0]|         1|
|   [4.0]|         2|
+--------+----------+
only showing top 3 rows



In [306]:
lr = LinearRegression(featuresCol = 'features', labelCol='Date_value', maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(v_df)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

Coefficients: [0.09947314890089527]
Intercept: 19.34331196097759
numIterations: 3
objectiveHistory: [0.4999999999999998, 0.4360246316911617, 0.2726352066098193]
+-------------------+
|          residuals|
+-------------------+
| -19.74120455658117|
| -18.74120455658117|
| -17.74120455658117|
| -16.74120455658117|
| -15.74120455658117|
| -14.74120455658117|
| -13.74120455658117|
| -12.74120455658117|
| -11.74120455658117|
| -10.74120455658117|
|  -9.74120455658117|
|  -8.74120455658117|
|  -7.74120455658117|
|  -6.74120455658117|
|  -5.74120455658117|
|  -4.74120455658117|
|-3.7412045565811702|
|-2.7412045565811702|
|-1.7412045565811702|
|-0.7412045565811702|
+-------------------+
only showing top 20 rows

RMSE: 10.416292
r2: 0.478996


In [311]:
lr_predictions = lrModel.transform(v_df)
lr_predictions.select("prediction","Date_value","features").show(5)
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="Date_value",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+-----------------+----------+--------+
|       prediction|Date_value|features|
+-----------------+----------+--------+
|19.74120455658117|         0|   [4.0]|
|19.74120455658117|         1|   [4.0]|
|19.74120455658117|         2|   [4.0]|
|19.74120455658117|         3|   [4.0]|
|19.74120455658117|         4|   [4.0]|
+-----------------+----------+--------+
only showing top 5 rows

R Squared (R2) on test data = 0.478996
