In [144]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lower

In [145]:
spark = SparkSession.builder.appName('CovidData').getOrCreate(); 

In [146]:
df = spark.read.csv('./complete.csv', sep=',',
                         inferSchema=True, header=True); 
df.printSchema(); 


root
 |-- Date: date (nullable = true)
 |-- Name of State / UT: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Total Confirmed cases: double (nullable = true)
 |-- Death: string (nullable = true)
 |-- Cured/Discharged/Migrated: double (nullable = true)
 |-- New cases: integer (nullable = true)
 |-- New deaths: integer (nullable = true)
 |-- New recovered: integer (nullable = true)



In [147]:
low_df = df.withColumn("Name of State / UT", lower(df["Name of State / UT"])); 



low_df.show(); 

+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|      Date|Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|2020-01-30|            kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-01-31|            kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0|
|2020-02-01|            kerala| 10.8505|  76.2711|                  2.0|    0|                      0.0|        1|         0|            0|
|2020-02-02|            kerala| 10.8505|  76.2711|                  3.0|    0|                      0.0|        1|         0|            0|
|2020-02-03|        

In [148]:
low_df.createOrReplaceTempView("covidData"); 
gretest_df = spark.sql("select * from covidData order by `Total Confirmed cases` desc limit 1; "); 
gretest_df.show();


+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|      Date|Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+
|2020-08-06|       maharashtra| 19.7515|  75.7139|             468265.0|16476|                 305521.0|    10309|         0|         6165|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+



In [149]:
second_greatest_df = spark.sql("""select `Name of State / UT` as state from covidData where `Total Confirmed cases` = (
    select max(`Total Confirmed cases`) from covidData where `Total Confirmed cases` < 
    (
        select max(`Total Confirmed cases`) from covidData
    )
)
"""); 
second_greatest_df.show(); 

+-----------+
|      state|
+-----------+
|maharashtra|
+-----------+



In [150]:
least_death_df = spark.sql("""select `Name of State / UT`, death_int / `Total Confirmed cases` as ratio
from (
    select `Name of State / UT`, 
           cast(Death AS double) as death_int, 
           `Total Confirmed cases`
    from covidData
    where `Name of State / UT` not like '%union%'
) as converted_data where death_int >0 and `Total Confirmed cases` >0
order by ratio ASC
limit 1 
"""); 

print(least_death_df.show());

+------------------+--------------------+
|Name of State / UT|               ratio|
+------------------+--------------------+
|           tripura|5.213764337851929E-4|
+------------------+--------------------+

None


In [151]:
month_newer_df = spark.sql("""select concat(year(Date), '/', month(Date)) as Month, sum(`New recovered`) as Recovered from covidData
group by year(Date), month(Date)
order by Recovered DESC
limit 1"""); 
month_newer_df.show(); 

+------+---------+
| Month|Recovered|
+------+---------+
|2020/7|   722983|
+------+---------+



In [152]:
parse_date_df = spark.sql("""
select *, case 
           when month(Date) = 1 then 'January'
           when month(Date) = 2 then 'February'
           when month(Date) = 3 then 'March'
           when month(Date) = 4 then 'April'
           when month(Date) = 5 then 'May'
           when month(Date) = 6 then 'June'
           when month(Date) = 7 then 'July'
           when month(Date) = 8 then 'August'
           when month(Date) = 9 then 'September'
           when month(Date) = 10 then 'October'
           when month(Date) = 11 then 'November'
           when month(Date) = 12 then 'December'
       end as month
from covidData                   
"""); 

parse_date_df.show(); 

+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+--------+
|      Date|Name of State / UT|Latitude|Longitude|Total Confirmed cases|Death|Cured/Discharged/Migrated|New cases|New deaths|New recovered|   month|
+----------+------------------+--------+---------+---------------------+-----+-------------------------+---------+----------+-------------+--------+
|2020-01-30|            kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0| January|
|2020-01-31|            kerala| 10.8505|  76.2711|                  1.0|    0|                      0.0|        0|         0|            0| January|
|2020-02-01|            kerala| 10.8505|  76.2711|                  2.0|    0|                      0.0|        1|         0|            0|February|
|2020-02-02|            kerala| 10.8505|  76.2711|                  3.0|    0|                      0.0|  