In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'C:\\Users\\stoic\\Desktop\\spark-3.2.0-bin-hadoop3.2'

In [2]:
#importing all the required libraries and creating the spark session

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import FloatType, StringType, IntegerType
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from datetime import datetime
import time

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.regression import LinearRegression
import numpy as np

from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.clustering import KMeans

spark = SparkSession.builder.appName("query3").getOrCreate()

# Preaparing the Dataset

Before applying the linear regression on monthly increase in the no of cases we need to prepare the dataset accordingly. Following steps have been taken for adtaset preparation:

1. Conversion of the dates into single format of mm/dd/yyyy and renaming all the columns of the dataframe using datetime library and 'col' function from pyspark.
2. Creating a new Column 'States_New' Column which contains the state or otherwise country.
3. Calculating the monthly increasse in cases by subtaracting the cummulative cases of first date of consecutive months.
4. Transposing the dataframe in pandas and creating creating index against each month.

In [3]:
# creating the spark dataframe
start_time = time.time()
df = spark.read.csv('time_series_covid19_confirmed_global.csv',header = True)

In [4]:
#converting the all the date columns into a common format dd/mm/yyyy and renaming the columns of the dataframe

date_list = [_ for _ in df.columns if _ not in ['Province/State','Country/Region','Lat','Long']]
date_column_names = []
for dates in date_list:
    try:
        if datetime.strptime(dates, '%m/%d/%y'):
            dt = datetime.strptime(dates, '%m/%d/%y')
            dts = dt.strftime('%m/%d/%Y')
            date_column_names.append(dts)
    except:
        date_column_names.append(dates)
column_names = dict(zip(date_list,date_column_names))
column_names['Province/State']='State'
column_names['Country/Region']='Country'
column_names['Lat']='Lat'
column_names['Long']='Long'

df = df.select([F.col(c).alias(column_names.get(c, c)) for c in df.columns])
df.columns

['State',
 'Country',
 'Lat',
 'Long',
 '01/22/2020',
 '01/23/2020',
 '01/24/2020',
 '01/25/2020',
 '01/26/2020',
 '01/27/2020',
 '01/28/2020',
 '01/29/2020',
 '01/30/2020',
 '01/31/2020',
 '02/01/2020',
 '02/02/2020',
 '02/03/2020',
 '02/04/2020',
 '02/05/2020',
 '02/06/2020',
 '02/07/2020',
 '02/08/2020',
 '02/09/2020',
 '02/10/2020',
 '02/11/2020',
 '02/12/2020',
 '02/13/2020',
 '02/14/2020',
 '02/15/2020',
 '02/16/2020',
 '02/17/2020',
 '02/18/2020',
 '02/19/2020',
 '02/20/2020',
 '02/21/2020',
 '02/22/2020',
 '02/23/2020',
 '02/24/2020',
 '02/25/2020',
 '02/26/2020',
 '02/27/2020',
 '02/28/2020',
 '02/29/2020',
 '03/01/2020',
 '03/02/2020',
 '03/03/2020',
 '03/04/2020',
 '03/05/2020',
 '03/06/2020',
 '03/07/2020',
 '03/08/2020',
 '03/09/2020',
 '03/10/2020',
 '03/11/2020',
 '03/12/2020',
 '03/13/2020',
 '03/14/2020',
 '03/15/2020',
 '03/16/2020',
 '03/17/2020',
 '03/18/2020',
 '03/19/2020',
 '03/20/2020',
 '03/21/2020',
 '03/22/2020',
 '03/23/2020',
 '03/24/2020',
 '03/25/2020',
 

In [5]:
#creating a dictionary which contains all the dates of current month and first date of next month
date_list = [_ for _ in df.columns if _ not in ['State','Country','Lat','Long']]
dates_by_month ={}
for dates in date_list:
    try:
        if datetime.strptime(dates, '%m/%d/%Y'):
            date_split = dates.split("/")
            month_year_split = date_split[0]+"/"+date_split[-1]
            if month_year_split in dates_by_month.keys():
                dates_by_month[month_year_split].append(dates)
            else:
                dates_by_month[month_year_split] = [dates]
    except:
        pass
dates_by_month_keys = list(dates_by_month.keys())
for index,key in enumerate(dates_by_month_keys):
    if (index+1)<len(dates_by_month.keys()):
        dates_by_month[key].append(dates_by_month[dates_by_month_keys[index+1]][0])

In [6]:
#dataframe prepartion like renaming columns and droping duplicates 
df=df.withColumn('States_New',when(df.State.isNull(),df.Country).otherwise(df.State))
df=df.dropDuplicates(['States_New'])

In [7]:
#typecasting the date columns into integer type
date_list = [_ for _ in df.columns if _ not in ['State','Country','Lat','Long','States_New']]
for column in date_list:    
    df = df.withColumn(column,df[column].cast('Integer'))
    
df.printSchema()

root
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Lat: string (nullable = true)
 |-- Long: string (nullable = true)
 |-- 01/22/2020: integer (nullable = true)
 |-- 01/23/2020: integer (nullable = true)
 |-- 01/24/2020: integer (nullable = true)
 |-- 01/25/2020: integer (nullable = true)
 |-- 01/26/2020: integer (nullable = true)
 |-- 01/27/2020: integer (nullable = true)
 |-- 01/28/2020: integer (nullable = true)
 |-- 01/29/2020: integer (nullable = true)
 |-- 01/30/2020: integer (nullable = true)
 |-- 01/31/2020: integer (nullable = true)
 |-- 02/01/2020: integer (nullable = true)
 |-- 02/02/2020: integer (nullable = true)
 |-- 02/03/2020: integer (nullable = true)
 |-- 02/04/2020: integer (nullable = true)
 |-- 02/05/2020: integer (nullable = true)
 |-- 02/06/2020: integer (nullable = true)
 |-- 02/07/2020: integer (nullable = true)
 |-- 02/08/2020: integer (nullable = true)
 |-- 02/09/2020: integer (nullable = true)
 |-- 02/10/2020: integer (nulla

In [8]:
#Calculating monthly increase in the number of cases and appending the as new column in month/year format
for k,v in dates_by_month.items():
    print(v[-1],v[0])
    df = df.withColumn(k, (df[v[-1]] - df[v[0]]))


02/01/2020 01/22/2020
03/01/2020 02/01/2020
04/01/2020 03/01/2020
05/01/2020 04/01/2020
06/01/2020 05/01/2020
07/01/2020 06/01/2020
08/01/2020 07/01/2020
09/01/2020 08/01/2020
10/01/2020 09/01/2020
11/01/2020 10/01/2020
12/01/2020 11/01/2020
01/01/2021 12/01/2020
02/01/2021 01/01/2021
03/01/2021 02/01/2021
04/01/2021 03/01/2021
05/01/2021 04/01/2021
06/01/2021 05/01/2021
07/01/2021 06/01/2021
08/01/2021 07/01/2021
09/01/2021 08/01/2021
10/01/2021 09/01/2021
11/01/2021 10/01/2021
11/18/2021 11/01/2021


In [9]:
#conversion of dataframe into pandas dataframe
dates_by_month_keys.insert(0,"States_New")
df_temp=df.select(dates_by_month_keys)
psdf = df_temp.toPandas()
display(psdf)

Unnamed: 0,States_New,01/2020,02/2020,03/2020,04/2020,05/2020,06/2020,07/2020,08/2020,09/2020,...,02/2021,03/2021,04/2021,05/2021,06/2021,07/2021,08/2021,09/2021,10/2021,11/2021
0,Afghanistan,0,5,187,1979,13665,16012,4948,1486,1089,...,674,784,3422,13038,47239,27285,5759,1931,1093,455
1,Albania,0,0,259,523,361,1437,2816,4210,4200,...,28939,17575,5679,1152,186,598,14248,23409,14719,9524
2,Alberta,0,0,754,4819,1471,1064,2735,3223,4169,...,9232,15412,43960,34551,4269,2308,19950,45557,24397,7015
3,Algeria,0,1,846,3307,5359,4759,16678,13883,6857,...,5677,4049,5007,6907,10857,32489,23963,6990,3049,1966
4,Andorra,0,0,390,355,20,90,70,259,866,...,917,1164,1179,497,189,760,368,176,294,519
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
274,Yukon,0,0,5,6,0,0,3,1,0,...,2,2,7,3,273,239,97,65,186,405
275,Yunnan,92,81,8,3,0,0,6,8,10,...,2,18,93,9,66,412,271,336,131,84
276,Zambia,0,0,36,73,980,543,4596,6153,2421,...,23960,9547,3121,4151,62011,38461,10229,2592,625,297
277,Zhejiang,589,606,52,11,0,1,1,8,4,...,5,2,21,20,22,10,36,17,46,5


In [10]:
#pandas dataframe manipulations
psdf = psdf.transpose()
psdf=psdf.reset_index()
header_row = 0
psdf.columns = psdf.iloc[header_row]
psdf = psdf.drop(header_row)
psdf=psdf.reset_index()
psdf.rename(columns={'index': 'Month_index'}, inplace=True)
psdf.rename(columns = {'States_New':'Dates'}, inplace = True)
display(psdf)

Unnamed: 0,Month_index,Dates,Afghanistan,Albania,Alberta,Algeria,Andorra,Angola,Anguilla,Anhui,...,Wallis and Futuna,West Bank and Gaza,Western Australia,Xinjiang,Yemen,Yukon,Yunnan,Zambia,Zhejiang,Zimbabwe
0,1,01/2020,0,0,0,0,0,0,0,296,...,0,0,0,18,0,0,92,0,589,0
1,2,02/2020,5,0,0,1,0,0,0,693,...,0,0,2,58,0,0,81,0,606,0
2,3,03/2020,187,259,754,846,390,8,2,0,...,0,134,390,0,0,5,8,36,52,8
3,4,04/2020,1979,523,4819,3307,355,22,1,1,...,0,219,159,0,7,6,3,73,11,32
4,5,05/2020,13665,361,1471,5359,20,56,0,0,...,0,96,40,0,347,0,0,980,0,163
5,6,06/2020,16012,1437,1064,4759,90,205,0,0,...,0,2309,20,0,836,0,0,543,1,402
6,7,07/2020,4948,2816,2735,16678,70,873,0,0,...,0,9402,30,592,540,3,6,4596,1,3054
7,8,08/2020,1486,4210,3223,13883,259,1565,0,0,...,0,11121,14,234,232,1,8,6153,8,2900
8,9,09/2020,1089,4200,4169,6857,866,2385,0,0,...,0,17041,30,0,77,0,10,2421,4,1291
9,10,10/2020,2054,7396,9429,6582,2775,5921,0,0,...,1,13738,84,57,24,8,4,1678,4,524


In [11]:
#conversion of pandas dataframe back to pyspark dataframe after doing the transpose  of dataframe
df_sp= spark.createDataFrame(psdf)


# Calculating Monthly Trendline Coefficients

The authors have used pyspark MLlib for calculating the linaer regression for all the states.

1. Conversion of Month_index into dense vectors with the help of udf implementation in spark.
2. Calculating the trendline coefficient for each state column as target column and and Month_index as predictor column using Pyspark Linear regression library.
3. Selecting the top 50 states and saving the results into csv file.

In [12]:
#Creation of UDF to convert column into dense vector
to_vector = udf(lambda a: Vectors.dense(a), VectorUDT())
data = df_sp.withColumn("Feature", F.array("Month_index")).withColumn("Feature", to_vector("Feature"))
data.select(['Feature','Month_index']).show()


+-------+-----------+
|Feature|Month_index|
+-------+-----------+
|  [1.0]|          1|
|  [2.0]|          2|
|  [3.0]|          3|
|  [4.0]|          4|
|  [5.0]|          5|
|  [6.0]|          6|
|  [7.0]|          7|
|  [8.0]|          8|
|  [9.0]|          9|
| [10.0]|         10|
| [11.0]|         11|
| [12.0]|         12|
| [13.0]|         13|
| [14.0]|         14|
| [15.0]|         15|
| [16.0]|         16|
| [17.0]|         17|
| [18.0]|         18|
| [19.0]|         19|
| [20.0]|         20|
+-------+-----------+
only showing top 20 rows



In [13]:
data.printSchema()

root
 |-- Month_index: long (nullable = true)
 |-- Dates: string (nullable = true)
 |-- Afghanistan: long (nullable = true)
 |-- Albania: long (nullable = true)
 |-- Alberta: long (nullable = true)
 |-- Algeria: long (nullable = true)
 |-- Andorra: long (nullable = true)
 |-- Angola: long (nullable = true)
 |-- Anguilla: long (nullable = true)
 |-- Anhui: long (nullable = true)
 |-- Antigua and Barbuda: long (nullable = true)
 |-- Argentina: long (nullable = true)
 |-- Armenia: long (nullable = true)
 |-- Aruba: long (nullable = true)
 |-- Australian Capital Territory: long (nullable = true)
 |-- Austria: long (nullable = true)
 |-- Azerbaijan: long (nullable = true)
 |-- Bahamas: long (nullable = true)
 |-- Bahrain: long (nullable = true)
 |-- Bangladesh: long (nullable = true)
 |-- Barbados: long (nullable = true)
 |-- Beijing: long (nullable = true)
 |-- Belarus: long (nullable = true)
 |-- Belgium: long (nullable = true)
 |-- Belize: long (nullable = true)
 |-- Benin: long (nullabl

In [14]:
#function for calulation of linear regression coefficients using Spark ML-Lib
def calculate_trendline_coefficient(data,target_column):
    """
Description: This function takes a pyspark dataframe object and calculates the linear regression coefficient
on the target column.



Args:
data (pyspark dataframe object): The dataset which needs to be predictor('feature') and target column.
target_column (string): The target column for the linear regression coefficient needs to be calculated.



Returns:
Vector: Returns a dense vector.
"""
    lin_reg = LinearRegression(featuresCol='Feature',labelCol=target_column, maxIter=10, regParam=0.3, elasticNetParam=0.8)
    lr_model = lin_reg.fit(data)
    return lr_model.coefficients

In [15]:
#calulation of trendline coefficient for each state
item_names = df.select(F.col('States_New')).collect()
state_list=df.select(['States_New']).toPandas()['States_New'].to_list()
coefficient_list = []
count =1
for state in state_list:
    print(count)
    coefficient_list.append([state, calculate_trendline_coefficient(data,state)])
    count+=1
coefficient_list 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277


[['Chad', DenseVector([-0.7542])],
 ['Manitoba', DenseVector([190.2885])],
 ['Paraguay', DenseVector([1335.5563])],
 ['Russia', DenseVector([31351.045])],
 ['Anguilla', DenseVector([10.6442])],
 ['Yemen', DenseVector([32.6532])],
 ['Guangdong', DenseVector([-9.6186])],
 ['Senegal', DenseVector([200.051])],
 ['Cabo Verde', DenseVector([94.7895])],
 ['Sweden', DenseVector([2101.8134])],
 ['Kiribati', DenseVector([0.0])],
 ['Hunan', DenseVector([-9.1567])],
 ['Shanxi', DenseVector([-1.71])],
 ['Guyana', DenseVector([191.1502])],
 ['Burma', DenseVector([2813.967])],
 ['Tibet', DenseVector([0.0])],
 ['Eritrea', DenseVector([25.3772])],
 ['Philippines', DenseVector([14350.5495])],
 ['Djibouti', DenseVector([1.8471])],
 ['Tonga', DenseVector([0.0])],
 ['Malaysia', DenseVector([17536.5494])],
 ['Singapore', DenseVector([1437.0854])],
 ['Hubei', DenseVector([-672.7224])],
 ['Fiji', DenseVector([385.6023])],
 ['Turkey', DenseVector([37021.6771])],
 ['Yukon', DenseVector([10.7266])],
 ['Tianjin',

In [16]:
import pandas as pd
#Creating a pyspark dataframe which contains the respective coefficients
coeff_list = [x[1][0] for x in coefficient_list ]
country_list = [x[0] for x in coefficient_list]
df_pd = pd.DataFrame(list(zip(country_list, coeff_list)),columns =['States_New', 'Coefficient'])
df_pd = spark.createDataFrame(df_pd)


In [17]:
#Creating a new dataframe by joining and selecting only top 50 states
df_join=df_pd.join(df_temp,['States_New']).orderBy(desc("Coefficient")).limit(50)
df_join.show()

+--------------+------------------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|    States_New|       Coefficient|01/2020|02/2020|03/2020|04/2020|05/2020|06/2020|07/2020|08/2020|09/2020|10/2020|11/2020|12/2020|01/2021|02/2021|03/2021|04/2021|05/2021|06/2021|07/2021|08/2021|09/2021|10/2021|11/2021|
+--------------+------------------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|            US|104410.55620012921|      7|     24| 224280| 891683| 700111| 877939|1931381|1442890|1212060|1995306|4605141|6449335|6115363|2327626|1838852|1858787| 895919| 393260|1347109|4448815|4100022|2466810|1408598|
|         India| 92328.14206792765|      1|      2|   1995|  35259| 161113| 406271|1146082|2018800|2624545|1835245|12701

In [18]:
#Saving the results into a csv file
df_join.toPandas().to_csv("monthly_coefficients.csv")

# Applying K-Means on Top 50 States

Before calculating the kmeans for top 50 states, some preprocessing steps have been taken:

1. Calculating daily increase in cases by taking the diffence between the columns in pandas library.
2. Converting the the dataframe back to pyspark dataframe.
3. Conversion of mm/dd/yyyy date format to mm_dd_yyyy format to avoid any error in kmeans implementation.

In [19]:
#Calculating daily increase in the number of cases and appending them in the same column
df_join_pd=df_pd.join(df,['States_New']).orderBy(desc("Coefficient")).limit(50)
df_join_pd = df_join_pd.toPandas()
display(df_join_pd)

  df[column_name] = series


Unnamed: 0,States_New,Coefficient,State,Country,Lat,Long,01/22/2020,01/23/2020,01/24/2020,01/25/2020,...,02/2021,03/2021,04/2021,05/2021,06/2021,07/2021,08/2021,09/2021,10/2021,11/2021
0,US,104410.5562,,US,40.0,-100.0,1,1,2,2,...,2327626,1838852,1858787,895919,393260,1347109,4448815,4100022,2466810,1408598
1,India,92328.142068,,India,20.593684,78.96288,0,0,0,0,...,358282,1178604,7254326,8750375,2150419,1237707,1161979,933124,505176,193386
2,United Kingdom,43579.447951,,United Kingdom,55.3781,-3.436,0,0,0,0,...,346226,168257,68264,71908,338025,1052204,944407,1016551,1255686,624605
3,Brazil,41545.937375,,Brazil,-14.235,-51.9253,0,0,0,0,...,1357679,2252843,1886131,1898505,1997824,1316054,865857,641436,369042,175269
4,Turkey,37021.67714,,Turkey,38.9637,35.2433,0,0,0,0,...,226297,646509,1491420,407108,174424,316995,664312,770666,878723,443554
5,Russia,31351.044983,,Russia,61.52401,105.318756,0,0,0,0,...,384111,293441,256611,262979,449841,734791,631139,586405,992248,646013
6,Iran,27781.429875,,Iran,32.427908,53.688046,0,0,0,0,...,215083,257635,618843,407666,295037,684659,1121714,576332,332930,129280
7,Indonesia,18516.881656,,Indonesia,-0.7893,113.9213,0,0,0,0,...,252006,176540,155026,153647,376581,1237288,659742,116590,28033,7584
8,Germany,17965.842339,,Germany,51.165691,10.451526,0,0,0,0,...,223242,410754,557577,269008,44051,41318,201562,269222,370212,652688
9,Malaysia,17536.549414,,Malaysia,4.210484,101.975766,0,0,0,4,...,83407,44098,64916,167868,179505,371455,634594,492568,218684,93265


In [20]:
#calculating the daily increase in cases
df_join_pd1 = df_join_pd.iloc[:,:7]
df_join_pd2 = df_join_pd.iloc[:,6:673].diff(axis=1)
df_join_pd2=df_join_pd2.drop(columns=['01/22/2020'])
df_join_concat = pd.concat([df_join_pd1,df_join_pd2],axis =1, sort=False)
for i in df_join_concat.columns[7:]:
    df_join_concat=df_join_concat.astype({i: 'int64'})
display(df_join_concat)


  This is separate from the ipykernel package so we can avoid doing imports until


Unnamed: 0,States_New,Coefficient,State,Country,Lat,Long,01/22/2020,01/23/2020,01/24/2020,01/25/2020,...,11/09/2021,11/10/2021,11/11/2021,11/12/2021,11/13/2021,11/14/2021,11/15/2021,11/16/2021,11/17/2021,11/18/2021
0,US,104410.5562,,US,40.0,-100.0,1,0,1,0,...,80039,94696,55314,146251,40833,30161,138787,87586,111404,109578
1,India,92328.142068,,India,20.593684,78.96288,0,0,0,0,...,11466,13091,12516,11850,11271,10229,8865,10197,11919,11106
2,United Kingdom,43579.447951,,United Kingdom,55.3781,-3.436,0,0,0,0,...,32785,39325,42401,38900,37669,36128,39270,36821,37868,46858
3,Brazil,41545.937375,,Brazil,-14.235,-51.9253,0,0,0,0,...,10948,12273,15300,14598,14642,4129,2799,4918,11977,12301
4,Turkey,37021.67714,,Turkey,38.9637,35.2433,0,0,0,0,...,28662,27259,24898,23637,22583,21624,23852,25101,23867,22234
5,Russia,31351.044983,,Russia,61.52401,105.318756,0,0,0,0,...,37999,36896,39584,38941,38068,37620,37210,35612,35415,36155
6,Iran,27781.429875,,Iran,32.427908,53.688046,0,0,0,0,...,8305,7948,7539,7322,4306,6143,7494,6430,6251,5882
7,Indonesia,18516.881656,,Indonesia,-0.7893,113.9213,0,0,0,0,...,434,480,435,399,359,339,221,347,522,400
8,Germany,17965.842339,,Germany,51.165691,10.451526,0,0,0,0,...,41286,51077,48834,45356,34309,19203,34958,53627,68366,58768
9,Malaysia,17536.549414,,Malaysia,4.210484,101.975766,0,0,0,4,...,6321,5325,6323,6517,5809,5162,5143,5413,6288,6380


In [21]:
for i in df_join_concat.columns[:6]:
    df_join_concat=df_join_concat.astype({i: 'string'})
df_join = spark.createDataFrame(df_join_concat)
df_join.printSchema()

root
 |-- States_New: string (nullable = true)
 |-- Coefficient: string (nullable = true)
 |-- State: struct (nullable = true)
 |-- Country: string (nullable = true)
 |-- Lat: string (nullable = true)
 |-- Long: string (nullable = true)
 |-- 01/22/2020: long (nullable = true)
 |-- 01/23/2020: long (nullable = true)
 |-- 01/24/2020: long (nullable = true)
 |-- 01/25/2020: long (nullable = true)
 |-- 01/26/2020: long (nullable = true)
 |-- 01/27/2020: long (nullable = true)
 |-- 01/28/2020: long (nullable = true)
 |-- 01/29/2020: long (nullable = true)
 |-- 01/30/2020: long (nullable = true)
 |-- 01/31/2020: long (nullable = true)
 |-- 02/01/2020: long (nullable = true)
 |-- 02/02/2020: long (nullable = true)
 |-- 02/03/2020: long (nullable = true)
 |-- 02/04/2020: long (nullable = true)
 |-- 02/05/2020: long (nullable = true)
 |-- 02/06/2020: long (nullable = true)
 |-- 02/07/2020: long (nullable = true)
 |-- 02/08/2020: long (nullable = true)
 |-- 02/09/2020: long (nullable = true)
 |-

In [22]:
#replacing the / in date columns with _ to avoid any error with kmeans
date_column_names = []
for dates in date_list:
    dates = dates.replace('/','_')
    date_column_names.append(dates)
column_names = dict(zip(date_list,date_column_names))
column_names['States_New']='States_New'
column_names['State']='State'
column_names['Country']='Country'
column_names['Lat']='Lat'
column_names['Long']='Long'
column_names['Coefficient']='Coefficient'

df_join = df_join.select([F.col(c).alias(column_names.get(c, c)) for c in df_join.columns])
df_join.columns
date_list = [x.replace('/','_') for x in date_list]
date_list

['01_22_2020',
 '01_23_2020',
 '01_24_2020',
 '01_25_2020',
 '01_26_2020',
 '01_27_2020',
 '01_28_2020',
 '01_29_2020',
 '01_30_2020',
 '01_31_2020',
 '02_01_2020',
 '02_02_2020',
 '02_03_2020',
 '02_04_2020',
 '02_05_2020',
 '02_06_2020',
 '02_07_2020',
 '02_08_2020',
 '02_09_2020',
 '02_10_2020',
 '02_11_2020',
 '02_12_2020',
 '02_13_2020',
 '02_14_2020',
 '02_15_2020',
 '02_16_2020',
 '02_17_2020',
 '02_18_2020',
 '02_19_2020',
 '02_20_2020',
 '02_21_2020',
 '02_22_2020',
 '02_23_2020',
 '02_24_2020',
 '02_25_2020',
 '02_26_2020',
 '02_27_2020',
 '02_28_2020',
 '02_29_2020',
 '03_01_2020',
 '03_02_2020',
 '03_03_2020',
 '03_04_2020',
 '03_05_2020',
 '03_06_2020',
 '03_07_2020',
 '03_08_2020',
 '03_09_2020',
 '03_10_2020',
 '03_11_2020',
 '03_12_2020',
 '03_13_2020',
 '03_14_2020',
 '03_15_2020',
 '03_16_2020',
 '03_17_2020',
 '03_18_2020',
 '03_19_2020',
 '03_20_2020',
 '03_21_2020',
 '03_22_2020',
 '03_23_2020',
 '03_24_2020',
 '03_25_2020',
 '03_26_2020',
 '03_27_2020',
 '03_28_20

# Monthly Kmeans clusters

Following steps have been followed to create monthly clusters using pyspark Kmeans 

1. Conversion of all the dates of of a month into a single vector column as 'features' using VectorAssembler library.
2. Training the kmeans model using the transformed dstaset for each month.
3. Predicting the labels using the trained model to dicide the states into repective clusters for each month.
4. Saving the results into csv file.

In [24]:
#kmeans clusters calculated on a monthly basis
labels_dict = {}
for k,v in dates_by_month.items():
    print(k)
    v=[x.replace('/','_') for x in v]
    v=v[:-1]
    #creating feature column for kmeans
    vector_assembler = VectorAssembler(inputCols = v, outputCol='features')
    df_join_sp=vector_assembler.transform(df_join)


    # Trains a k-means model.
    kmeans = KMeans().setK(4).setSeed(1)
    model = kmeans.fit(df_join_sp)

    # predicting the labels with the trained model
    prediction = model.transform(df_join_sp).select('prediction').collect()
    labels = [p.prediction for p in prediction ]
    labels_dict[k] = labels

01/2020
02/2020
03/2020
04/2020
05/2020
06/2020
07/2020
08/2020
09/2020
10/2020
11/2020
12/2020
01/2021
02/2021
03/2021
04/2021
05/2021
06/2021
07/2021
08/2021
09/2021
10/2021
11/2021


In [25]:
#Saving the results into csv file
df_final=df_join_sp.toPandas()
for k,v in labels_dict.items():
    df_final[k+'_labels']=v

df_final.to_csv("Query_3_monthly_clusters.csv")
print("--- %s seconds ---" % (time.time() - start_time))

--- 6580.238069534302 seconds ---


  df[column_name] = series
  after removing the cwd from sys.path.
