This notebook uses data world covid 19 data which is fetch from https://www.mygov.in/covid-19 website
 This is daily aggregate data statewise updated daily around 12 noon.

In [0]:
# Reading the data directly from data.world url as a workaround because spark dataframe is unable to read data directly 
# from url and hence pandas is used to convert csv to pandas df.
import pandas as pd
df = pd.read_csv('https://query.data.world/s/irxczuog4sdx6zhyxfu62i7lmrdoxj')
df.head()
#df.columns

Unnamed: 0,StateCode,State Name,Active Cases (Yesterday),Positive Cases (Yesterday),Cured Cases (Yesterday),Death Cases (Yesterday),Active Cases (Today),Positive Cases (Today),Cured Cases (Today),Death Cases (Today),Last Updated (IST)
0,35,Andaman and Nicobar Islands,44,10586,10413,129,38,10590,10423,129,26-Aug-2022 11:23:57
1,28,Andhra Pradesh,965,2336266,2320568,14733,901,2336386,2320752,14733,26-Aug-2022 11:23:57
2,12,Arunachal Pradesh,96,66599,66207,296,84,66614,66234,296,26-Aug-2022 11:23:57
3,18,Assam,2926,743801,732846,8029,2977,743934,732927,8030,26-Aug-2022 11:23:57
4,10,Bihar,805,847221,834124,12292,848,847411,834271,12292,26-Aug-2022 11:23:57


In [0]:
#converting pandas dataframe df to spark dataframe sparkDF
df=spark.createDataFrame(df) 
df.printSchema()
df.show()

In [0]:
%sql
show tables;
--%fs ls /FileStore/tables
--describe formatted agg_covid_19_india_status;
--drop table agg_covid_19_india_status;
--drop table latest_covid_19_india_status_csv;

database,tableName,isTemporary
default,covid19_india_aggregate,False


In [0]:
%fs ls /user/hive/warehouse/

path,name,size,modificationTime
dbfs:/user/hive/warehouse/covid19_india_aggregate/,covid19_india_aggregate/,0,1661513998573


In [0]:
# to create spark DF from csv file already downloaded but since this method is replaced with
# pandas df and then converted it to spark df, it has been commented now
# File location and type
#file_location = "/FileStore/tables/India_COVID_19_Status.csv"

#file_type = "csv"

# CSV options
#infer_schema = "true"
#first_row_is_header = "true"
#delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
#df = spark.read.format(file_type) \
  #.option("inferSchema", infer_schema) \
  #.option("header", first_row_is_header) \
  #.option("sep", delimiter) \
  #.load(file_location)

#print(type(df))

#display(df)

In [0]:
df.columns

In [0]:
#Create the final df to be saved into the covid19_india_aggregate_data table.
# new columns are derived which indicates the covid status of new cases/deaths/recoveries reported on previous day 
# along with aggregate data.
from pyspark.sql.functions import round

df = df.select(df["State Name"].alias("State_Name"), 
               df["Positive Cases (Today)"].alias("Total_Cases"), 
               df["Active Cases (Today)"].alias("Total_Active_Cases"),
               df["Cured Cases (Today)"].alias("Total_Recoveries"), 
               df["Death Cases (Today)"].alias("Total_Deaths"),
               (df["Positive Cases (Today)"] - df["Positive Cases (Yesterday)"]).alias("New_Cases"),
               (df["Active Cases (Today)"] - df["Active Cases (Yesterday)"]).alias("New_Active_Cases"),
               (df["Cured Cases (Today)"] - df["Cured Cases (Yesterday)"]).alias("New_Recoveries"), 
               (df["Death Cases (Today)"] - df["Death Cases (Yesterday)"]).alias("New_Deaths"), 
               (round((df["Active Cases (Today)"]/df["Positive Cases (Today)"])*100)).alias("Active_ratio"),
               df["Last Updated (IST)"].alias("Last_Updated_IST") )

In [0]:
# This will only be used for inserting the data first time when table does not exist
# We will be appending the data rather than overwritining so that previous days aggregate is 
# also present for any furture analysis. using last_updated column we can identify data belongs to which date.
# df.write.mode("append").saveAsTable("covid19_india_aggregate")

In [0]:
#create a dataframe of existing data in covid19_india_aggregate
#df_temp = spark.sql("select * from covid19_india_aggregate")
#df_temp.count()

In [0]:
# create new df after union of new data df and existing data in table covid19_india_aggregate
df_final = df.union(spark.sql("select * from covid19_india_aggregate")).drop_duplicates()
df_final.count()

In [0]:
display(df_final.orderBy("State_Name"))

State_Name,Total_Cases,Total_Active_Cases,Total_Recoveries,Total_Deaths,New_Cases,New_Active_Cases,New_Recoveries,New_Deaths,Active_ratio,Last_Updated_IST
Andaman and Nicobar Islands,10590,38,10423,129,4,-6,10,0,0.0,26-Aug-2022 11:23:57
Andaman and Nicobar Islands,10586,44,10413,129,7,-1,8,0,0.0,25-Aug-2022 21:43:06
Andhra Pradesh,2336266,965,2320568,14733,136,-74,210,0,0.0,25-Aug-2022 21:43:06
Andhra Pradesh,2336386,901,2320752,14733,120,-64,184,0,0.0,26-Aug-2022 11:23:57
Arunachal Pradesh,66614,84,66234,296,15,-12,27,0,0.0,26-Aug-2022 11:23:57
Arunachal Pradesh,66599,96,66207,296,14,-11,25,0,0.0,25-Aug-2022 21:43:06
Assam,743934,2977,732927,8030,133,51,81,1,0.0,26-Aug-2022 11:23:57
Assam,743801,2926,732846,8029,124,-34,158,0,0.0,25-Aug-2022 21:43:06
Bihar,847221,805,834124,12292,133,10,123,0,0.0,25-Aug-2022 21:43:06
Bihar,847411,848,834271,12292,190,43,147,0,0.0,26-Aug-2022 11:23:57


In [0]:
df_final.write.mode("overwrite").saveAsTable("covid19_india_aggregate")

In [0]:
%sql
select count(*) from covid19_india_aggregate;

count(1)
72


In [0]:
#%fs rm -r /user/hive/warehouse/covid19_india_aggregate_data/

In [0]:
%sql
show tables;
describe formatted covid19_india_aggregate;


col_name,data_type,comment
State_Name,string,
Total_Cases,bigint,
Total_Active_Cases,bigint,
Total_Recoveries,bigint,
Total_Deaths,bigint,
New_Cases,bigint,
New_Active_Cases,bigint,
New_Recoveries,bigint,
New_Deaths,bigint,
Active_ratio,double,


In [0]:
print(spark.catalog.listTables())

In [0]:
%sql
select * from covid19_india_aggregate

State_Name,Total_Cases,Total_Active_Cases,Total_Recoveries,Total_Deaths,New_Cases,New_Active_Cases,New_Recoveries,New_Deaths,Active_ratio,Last_Updated_IST
Andhra Pradesh,2336386,901,2320752,14733,120,-64,184,0,0.0,26-Aug-2022 11:23:57
Arunachal Pradesh,66614,84,66234,296,15,-12,27,0,0.0,26-Aug-2022 11:23:57
Andaman and Nicobar Islands,10590,38,10423,129,4,-6,10,0,0.0,26-Aug-2022 11:23:57
Assam,743934,2977,732927,8030,133,51,81,1,0.0,26-Aug-2022 11:23:57
Haryana,1050053,2737,1036644,10672,496,-203,698,1,0.0,26-Aug-2022 11:23:57
Goa*,255562,1113,250591,3858,154,-27,180,1,0.0,26-Aug-2022 11:23:57
Gujarat,1268832,1894,1255937,11001,282,-71,352,1,0.0,26-Aug-2022 11:23:57
Himachal Pradesh,310271,1413,304664,4194,207,-168,375,0,0.0,26-Aug-2022 11:23:57
Madhya Pradesh,1053134,561,1041804,10769,63,-45,108,0,0.0,26-Aug-2022 11:23:57
Maharashtra,8091276,12269,7930793,148214,1887,-309,2190,6,0.0,26-Aug-2022 11:23:57


In [0]:
display(df)

State_Name,Total_Cases,Total_Active_Cases,Total_Recoveries,Total_Deaths,New_Cases,New_Active_Cases,New_Recoveries,New_Deaths,Active_ratio,Last_Updated_IST
Andaman and Nicobar Islands,10590,38,10423,129,4,-6,10,0,0.0,26-Aug-2022 11:23:57
Andhra Pradesh,2336386,901,2320752,14733,120,-64,184,0,0.0,26-Aug-2022 11:23:57
Arunachal Pradesh,66614,84,66234,296,15,-12,27,0,0.0,26-Aug-2022 11:23:57
Assam,743934,2977,732927,8030,133,51,81,1,0.0,26-Aug-2022 11:23:57
Bihar,847411,848,834271,12292,190,43,147,0,0.0,26-Aug-2022 11:23:57
Chandigarh,98590,367,97043,1180,64,-17,80,1,0.0,26-Aug-2022 11:23:57
Chhattisgarh,1173064,1291,1157669,14104,173,-178,348,3,0.0,26-Aug-2022 11:23:57
Dadra and Nagar Haveli and Daman and Diu,11574,5,11565,4,0,-2,2,0,0.0,26-Aug-2022 11:23:57
Delhi,1997054,3654,1966954,26446,702,-656,1354,4,0.0,26-Aug-2022 11:23:57
Goa*,255562,1113,250591,3858,154,-27,180,1,0.0,26-Aug-2022 11:23:57


In [0]:

%sql
--# Pie chart showing top 10 states in terms Total Cases State-wise
select state_name as `States`, Total_Cases, Total_Active_Cases, Total_Recoveries, Total_Deaths, New_Cases, New_Active_Cases, New_Deaths
from covid19_india_aggregate order by Total_Cases desc limit 10;

States,Total_Cases,Total_Active_Cases,Total_Recoveries,Total_Deaths,New_Cases,New_Active_Cases,New_Deaths
Maharashtra,8091276,12269,7930793,148214,1887,-309,6
Maharashtra,8089389,12578,7928603,148208,1913,223,5
Kerala,6748557,8291,6669507,70759,1186,54,2
Kerala,6747371,8237,6668406,70728,1154,363,0
Karnataka,4046105,8700,3997181,40224,1286,-1075,3
Karnataka,4044819,9775,3994823,40221,1255,-934,3
Tamil Nadu,3565562,5496,3522032,38034,542,-134,1
Tamil Nadu,3565020,5630,3521357,38033,547,-102,0
Andhra Pradesh,2336386,901,2320752,14733,120,-64,0
Andhra Pradesh,2336266,965,2320568,14733,136,-74,0


In [0]:

%sql
--Pie chart showing top 10 states in terms Total Deaths State-wise
select State_name as `States`, Total_Cases, Total_Active_Cases, Total_Recoveries, Total_Deaths, New_Cases, New_Active_Cases, New_Deaths
from covid19_india_aggregate order by Total_Deaths desc limit 10;

States,Total_Cases,Total_Active_Cases,Total_Recoveries,Total_Deaths,New_Cases,New_Active_Cases,New_Deaths
Maharashtra,8091276,12269,7930793,148214,1887,-309,6
Maharashtra,8089389,12578,7928603,148208,1913,223,5
Kerala,6748557,8291,6669507,70759,1186,54,2
Kerala,6747371,8237,6668406,70728,1154,363,0
Karnataka,4046105,8700,3997181,40224,1286,-1075,3
Karnataka,4044819,9775,3994823,40221,1255,-934,3
Tamil Nadu,3565562,5496,3522032,38034,542,-134,1
Tamil Nadu,3565020,5630,3521357,38033,547,-102,0
Delhi,1997054,3654,1966954,26446,702,-656,4
Delhi,1996352,4310,1965600,26442,945,-346,6


In [0]:

%sql
--# Bar chart showing top 10 states in terms pf Total Cases vs Total Recoveries
select state_name as `States`, Total_Cases, Total_Active_Cases, Total_Recoveries, Total_Deaths, New_Cases, New_Active_Cases, New_Deaths
from covid19_india_aggregate order by Total_Cases desc limit 10;

States,Total_Cases,Total_Active_Cases,Total_Recoveries,Total_Deaths,New_Cases,New_Active_Cases,New_Deaths
Maharashtra,8091276,12269,7930793,148214,1887,-309,6
Maharashtra,8089389,12578,7928603,148208,1913,223,5
Kerala,6748557,8291,6669507,70759,1186,54,2
Kerala,6747371,8237,6668406,70728,1154,363,0
Karnataka,4046105,8700,3997181,40224,1286,-1075,3
Karnataka,4044819,9775,3994823,40221,1255,-934,3
Tamil Nadu,3565562,5496,3522032,38034,542,-134,1
Tamil Nadu,3565020,5630,3521357,38033,547,-102,0
Andhra Pradesh,2336386,901,2320752,14733,120,-64,0
Andhra Pradesh,2336266,965,2320568,14733,136,-74,0


In [0]:

%sql
--# Pie chart showing top 10 states in terms of Total Active Cases State-wise
select state_name as `States`, Total_Cases, Total_Active_Cases, Total_Recoveries, Total_Deaths, New_Cases, New_Active_Cases, New_Deaths
from covid19_india_aggregate order by Total_Active_Cases desc limit 10;

States,Total_Cases,Total_Active_Cases,Total_Recoveries,Total_Deaths,New_Cases,New_Active_Cases,New_Deaths
Punjab**,782201,17210,747101,17890,233,231,2
Punjab**,781968,16979,747101,17888,244,242,2
Maharashtra,8089389,12578,7928603,148208,1913,223,5
Maharashtra,8091276,12269,7930793,148214,1887,-309,6
Karnataka,4044819,9775,3994823,40221,1255,-934,3
Karnataka,4046105,8700,3997181,40224,1286,-1075,3
Kerala,6748557,8291,6669507,70759,1186,54,2
Kerala,6747371,8237,6668406,70728,1154,363,0
Tamil Nadu,3565020,5630,3521357,38033,547,-102,0
Tamil Nadu,3565562,5496,3522032,38034,542,-134,1


In [0]:

%sql
--# Bar chart showing top 10 states in terms of New Active cases vs New Active Cases
select state_name as `States`, Total_Cases, Total_Active_Cases, Total_Recoveries, Total_Deaths, New_Cases, New_Active_Cases, New_Deaths
from covid19_india_aggregate order by New_Cases desc limit 10;

States,Total_Cases,Total_Active_Cases,Total_Recoveries,Total_Deaths,New_Cases,New_Active_Cases,New_Deaths
Maharashtra,8089389,12578,7928603,148208,1913,223,5
Maharashtra,8091276,12269,7930793,148214,1887,-309,6
Karnataka,4046105,8700,3997181,40224,1286,-1075,3
Karnataka,4044819,9775,3994823,40221,1255,-934,3
Kerala,6748557,8291,6669507,70759,1186,54,2
Kerala,6747371,8237,6668406,70728,1154,363,0
Delhi,1996352,4310,1965600,26442,945,-346,6
Delhi,1997054,3654,1966954,26446,702,-656,4
Haryana,1049557,2940,1035946,10671,623,-267,2
Tamil Nadu,3565020,5630,3521357,38033,547,-102,0
