In [None]:
# Installing PySpark:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [None]:
#Importing the dataset
df = spark.read.csv('Latest Covid-19 India Status.csv', inferSchema=True,
                    header=True)
df

State/UTs,Total Cases,Active,Discharged,Deaths,Active Ratio,Discharge Ratio,Death Ratio
Andaman and Nicobar,7641,9,7503,129,0.12,98.19,1.69
Andhra Pradesh,2059708,6453,2038960,14295,0.31,98.99,0.69
Arunachal Pradesh,54964,183,54501,280,0.33,99.16,0.51
Assam,605944,3458,596547,5939,0.57,98.45,0.98
Bihar,726021,46,716314,9661,0.01,98.66,1.33
Chandigarh,65297,30,64447,820,0.05,98.7,1.26
Chhattisgarh,1005624,197,991857,13570,0.02,98.63,1.35
Dadra and Nagar H...,10675,3,10668,4,0.03,99.93,0.04
Delhi,1439337,327,1413921,25089,0.02,98.23,1.74
Goa,177410,648,173423,3339,0.37,97.75,1.88


In [None]:
# Have column names separated by underscore instead of / and no spaces
from pyspark.sql import functions as F

renamed_df = df.select([F.col(col).alias(col.replace(' ', '_')) for col in df.columns])
renamed_df

State/UTs,Total_Cases,Active,Discharged,Deaths,Active_Ratio,Discharge_Ratio,Death_Ratio
Andaman and Nicobar,7641,9,7503,129,0.12,98.19,1.69
Andhra Pradesh,2059708,6453,2038960,14295,0.31,98.99,0.69
Arunachal Pradesh,54964,183,54501,280,0.33,99.16,0.51
Assam,605944,3458,596547,5939,0.57,98.45,0.98
Bihar,726021,46,716314,9661,0.01,98.66,1.33
Chandigarh,65297,30,64447,820,0.05,98.7,1.26
Chhattisgarh,1005624,197,991857,13570,0.02,98.63,1.35
Dadra and Nagar H...,10675,3,10668,4,0.03,99.93,0.04
Delhi,1439337,327,1413921,25089,0.02,98.23,1.74
Goa,177410,648,173423,3339,0.37,97.75,1.88


In [None]:
# To get rid of / and %
renamed_df = renamed_df.withColumnRenamed('State/UTs', 'State_UTs')
renamed_df = renamed_df.withColumnRenamed('Active_Ratio_(%)', 'Active_Ratio')
renamed_df = renamed_df.withColumnRenamed('Discharge_Ratio_(%)', 'Discharge_Ratio')
renamed_df = renamed_df.withColumnRenamed('Death_Ratio_(%)', 'Death_Ratio')
renamed_df

State_UTs,Total_Cases,Active,Discharged,Deaths,Active_Ratio,Discharge_Ratio,Death_Ratio
Andaman and Nicobar,7641,9,7503,129,0.12,98.19,1.69
Andhra Pradesh,2059708,6453,2038960,14295,0.31,98.99,0.69
Arunachal Pradesh,54964,183,54501,280,0.33,99.16,0.51
Assam,605944,3458,596547,5939,0.57,98.45,0.98
Bihar,726021,46,716314,9661,0.01,98.66,1.33
Chandigarh,65297,30,64447,820,0.05,98.7,1.26
Chhattisgarh,1005624,197,991857,13570,0.02,98.63,1.35
Dadra and Nagar H...,10675,3,10668,4,0.03,99.93,0.04
Delhi,1439337,327,1413921,25089,0.02,98.23,1.74
Goa,177410,648,173423,3339,0.37,97.75,1.88


In [None]:
# Create a temporary table
renamed_df.createOrReplaceTempView('Data')

In [None]:
# Read the table using sql command
spark.sql('Select * from Data')

State_UTs,Total_Cases,Active,Discharged,Deaths,Active_Ratio,Discharge_Ratio,Death_Ratio
Andaman and Nicobar,7641,9,7503,129,0.12,98.19,1.69
Andhra Pradesh,2059708,6453,2038960,14295,0.31,98.99,0.69
Arunachal Pradesh,54964,183,54501,280,0.33,99.16,0.51
Assam,605944,3458,596547,5939,0.57,98.45,0.98
Bihar,726021,46,716314,9661,0.01,98.66,1.33
Chandigarh,65297,30,64447,820,0.05,98.7,1.26
Chhattisgarh,1005624,197,991857,13570,0.02,98.63,1.35
Dadra and Nagar H...,10675,3,10668,4,0.03,99.93,0.04
Delhi,1439337,327,1413921,25089,0.02,98.23,1.74
Goa,177410,648,173423,3339,0.37,97.75,1.88


In [None]:
# Total number of states
spark.sql('select count(State_UTs) from Data')

count(State_UTs)
36


In [None]:
# Order by active number of cases, descending show 5
spark.sql('select * from Data order by Active desc limit 5')

State_UTs,Total_Cases,Active,Discharged,Deaths,Active_Ratio,Discharge_Ratio,Death_Ratio
Kerala,4838811,95349,4716728,26734,1.97,97.48,0.55
Maharashtra,6588429,33379,6415316,139734,0.51,97.37,2.12
Tamil Nadu,2684641,15238,2633534,35869,0.57,98.1,1.34
Mizoram,111651,13316,97955,380,11.93,87.73,0.34
Karnataka,2982869,9700,2935238,37931,0.33,98.4,1.27


In [None]:
# Order by death ratio, descending show 5
spark.sql('select * from Data order by Death_Ratio desc limit 5')

State_UTs,Total_Cases,Active,Discharged,Deaths,Active_Ratio,Discharge_Ratio,Death_Ratio
Punjab,601992,228,585224,16540,0.04,97.21,2.75
Uttarakhand,343739,179,336163,7397,0.05,97.8,2.15
Nagaland,31547,228,30645,674,0.72,97.14,2.14
Maharashtra,6588429,33379,6415316,139734,0.51,97.37,2.12
Goa,177410,648,173423,3339,0.37,97.75,1.88


In [None]:
# Total of infected and dead people
spark.sql('select sum(Total_Cases), sum(Deaths) from data')

sum(Total_Cases),sum(Deaths)
34053573,451980


In [None]:
## Order by active cases, ascending show 5, to indicate the safest places
spark.sql('select * from Data order by Active asc limit 5')

State_UTs,Total_Cases,Active,Discharged,Deaths,Active_Ratio,Discharge_Ratio,Death_Ratio
Lakshadweep,10365,1,10313,51,0.01,99.5,0.49
Dadra and Nagar H...,10675,3,10668,4,0.03,99.93,0.04
Andaman and Nicobar,7641,9,7503,129,0.12,98.19,1.69
Chandigarh,65297,30,64447,820,0.05,98.7,1.26
Rajasthan,954383,40,945389,8954,0.0,99.06,0.94


#### Visualizations


In [None]:
# Convert data into pd dataframe
import pandas as pd

pd_df = renamed_df.toPandas()
pd_df.head()

Unnamed: 0,State_UTs,Total_Cases,Active,Discharged,Deaths,Active_Ratio,Discharge_Ratio,Death_Ratio
0,Andaman and Nicobar,7641,9,7503,129,0.12,98.19,1.69
1,Andhra Pradesh,2059708,6453,2038960,14295,0.31,98.99,0.69
2,Arunachal Pradesh,54964,183,54501,280,0.33,99.16,0.51
3,Assam,605944,3458,596547,5939,0.57,98.45,0.98
4,Bihar,726021,46,716314,9661,0.01,98.66,1.33


In [None]:
# sumarize the data
pd_df.describe()

Unnamed: 0,Total_Cases,Active,Discharged,Deaths,Active_Ratio,Discharge_Ratio,Death_Ratio
count,36.0,36.0,36.0,36.0,36.0,36.0,36.0
mean,945932.6,5600.888889,927776.7,12555.0,0.671667,98.0575,1.270833
std,1409612.0,16678.791917,1376146.0,24029.282581,1.973788,1.889917,0.56656
min,7641.0,1.0,7503.0,4.0,0.0,87.73,0.04
25%,83928.0,113.5,82658.25,819.0,0.03,97.7875,0.9625
50%,475203.5,228.0,464187.0,5537.0,0.29,98.425,1.315
75%,1012787.0,3607.5,999187.8,13751.25,0.57,98.735,1.575
max,6588429.0,95349.0,6415316.0,139734.0,11.93,99.93,2.75


In [None]:
# Correlation
pd_df.corr()

Unnamed: 0,Total_Cases,Active,Discharged,Deaths,Active_Ratio,Discharge_Ratio,Death_Ratio
Total_Cases,1.0,0.728514,0.99998,0.888156,-0.027262,0.008026,0.067935
Active,0.728514,1.0,0.726699,0.424456,0.223211,-0.179168,-0.179703
Discharged,0.99998,0.726699,1.0,0.887149,-0.02952,0.010698,0.066877
Deaths,0.888156,0.424456,0.887149,1.0,-0.063609,-0.017499,0.279919
Active_Ratio,-0.027262,0.223211,-0.02952,-0.063609,1.0,-0.957929,-0.288299
Discharge_Ratio,0.008026,-0.179168,0.010698,-0.017499,-0.957929,1.0,0.001358
Death_Ratio,0.067935,-0.179703,0.066877,0.279919,-0.288299,0.001358,1.0


In [None]:
# Make 4 Histograms: 'Total Cases', 'Deaths', 'Active', 'Discharged'
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots

fig = make_subplots(rows=2, cols=2, subplot_titles=['Total Cases', 'Deaths', 'Active', 'Discharged'])

total_cases = go.Histogram(x=pd_df['Total_Cases'], nbinsx=20, name='Total Cases')
deaths = go.Histogram(x=pd_df['Deaths'], nbinsx=20, name='Deaths')
active = go.Histogram(x=pd_df['Active'], nbinsx=20, name='Active')
discharged = go.Histogram(x=pd_df['Discharged'], nbinsx=20, name='Discharged')

fig.add_trace(total_cases, 1, 1)
fig.add_trace(deaths, 1, 2)
fig.add_trace(active, 2, 1)
fig.add_trace(discharged, 2, 2)

fig.update_layout(showlegend=False)
fig.show()

In [None]:
# Box plots
fig = make_subplots(rows=2, cols=2, subplot_titles=['Total Cases', 'Deaths', 'Active', 'Discharged'])

total_cases = go.Box(x=pd_df['Total_Cases'], name='Total Cases', text=pd_df['State_UTs'])
deaths = go.Box(x=pd_df['Deaths'], name='Deaths', text=pd_df['State_UTs'])
active = go.Box(x=pd_df['Active'], name='Active', text=pd_df['State_UTs'])
discharged = go.Box(x=pd_df['Discharged'], name='Discharged', text=pd_df['State_UTs'])

fig.add_trace(total_cases, 1, 1)
fig.add_trace(deaths, 1, 2)
fig.add_trace(active, 2, 1)
fig.add_trace(discharged, 2, 2)

fig.update_layout(showlegend=False)
fig.show()

In [None]:
# Bar charts
fig = make_subplots(rows=2, cols=2, subplot_titles=['Total Cases', 'Deaths', 'Active', 'Discharged'])

total_cases = go.Bar(y=pd_df['Total_Cases'], name='Total Cases', hovertext=pd_df['State_UTs'])
deaths = go.Bar(y=pd_df['Deaths'], name='Deaths', hovertext=pd_df['State_UTs'])
active = go.Bar(y=pd_df['Active'], name='Active', hovertext=pd_df['State_UTs'])
discharged = go.Bar(y=pd_df['Discharged'], name='Discharged', hovertext=pd_df['State_UTs'])

fig.add_trace(total_cases, 1, 1)
fig.add_trace(deaths, 1, 2)
fig.add_trace(active, 2, 1)
fig.add_trace(discharged, 2, 2)

fig.update_layout(showlegend=False)
fig.show()

In [None]:
# Grouped bar chart
fig = go.Figure([go.Bar(y=pd_df['Total_Cases'], name='Total Cases', hovertext=pd_df['State_UTs'], x=pd_df['State_UTs']),
                 go.Bar(y=pd_df['Discharged'], name='Discharged', hovertext=pd_df['State_UTs'], x=pd_df['State_UTs'])])
fig.update_layout(barmode='group')
fig.update_layout(title='Total & Discharged Cases')
fig.show()

In [None]:
# pie Charts
fig = make_subplots(rows=2, cols=2,
                    subplot_titles=['Total Cases', 'Deaths', 'Active', 'Discharged'],
                    specs=[[{'type':'domain'}, {'type':'domain'}],
                           [{'type':'domain'}, {'type':'domain'}]])

total_cases = go.Pie(values=pd_df['Total_Cases'], name='Total Cases', labels=pd_df['State_UTs'])
deaths = go.Pie(values=pd_df['Deaths'], name='Deaths', labels=pd_df['State_UTs'])
active = go.Pie(values=pd_df['Active'], name='Active', labels=pd_df['State_UTs'])
discharged = go.Pie(values=pd_df['Discharged'], name='Discharged', labels=pd_df['State_UTs'])

fig.add_trace(total_cases, 1, 1)
fig.add_trace(deaths, 1, 2)
fig.add_trace(active, 2, 1)
fig.add_trace(discharged, 2, 2)

fig.update_traces(hoverinfo='percent+label')
fig.update_layout(showlegend=False)

fig.update_traces(textposition='inside')

fig = go.Figure(fig)
fig.show()

In [None]:
# Relationshhip between deaths and total cases
# Scatter plot
fig = go.Figure([go.Scatter(x=pd_df['Total_Cases'], y=pd_df['Deaths'], mode='markers', text=pd_df['State_UTs'])])
fig.update_layout(title='Deaths vs Total Cases', xaxis_title='Total Cases', yaxis_title='Deaths')
fig.show()

#### Do a simple statistical regression model

In [None]:
# Convert to numpy arrays
X = pd_df['Total_Cases'].to_numpy()
y = pd_df['Deaths'].to_numpy()

X.shape, y.shape

((36,), (36,))

In [None]:
# fit model
from sklearn.linear_model import LinearRegression

model = LinearRegression()
model.fit(X.reshape((len(X), 1)), y) # Here we use one to treat it as multiple linear regression
(model.intercept_, model.coef_[0])

(-1766.5668659538696, 0.015140155988164275)

Explains the scatter plot above, with these coefficients

In [None]:
model.score(X.reshape((len(X), 1)), y)

0.7888206072169359

In [None]:
0.888156**2 #from the correlation matrix, deaths and total cases

0.788821080336

In [None]:
# Prediction
predictions = model.predict(X.reshape((len(X), 1)))
predictions.shape

(36,)

In [None]:
# Plot actual vs predicted
fig = go.Figure([go.Scatter(x=pd_df['Total_Cases'], y=pd_df['Deaths'], mode='markers', text=pd_df['State_UTs'], name='Actual Deaths'),
                 go.Scatter(x=pd_df['Total_Cases'], y=predictions, name='Predicted Deaths')])
fig.update_layout(title='Deaths vs Total Cases', xaxis_title='Total Cases', yaxis_title='Deaths')
fig.show()