In [49]:
import findspark
findspark.init()

# Import Modules

In [50]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DateType, IntegerType

spark = SparkSession.builder.getOrCreate()
# df = spark.sql("select 'spark' as hello ")
# df.show()

# Load and Analyze the data

In [51]:
covid_schema = StructType([StructField('Date', DateType(), True),
                           StructField('State', StringType(), True),
                           StructField('Region', StringType(), True),
                           StructField('Confirmed', IntegerType(), True),
                           StructField('Deaths', IntegerType(), True),
                           StructField('Recovered', IntegerType(), True)])

In [53]:
covid_df = spark.read.csv('covid_19_data.csv', header=True, schema=covid_schema)

In [79]:
csv_df = spark.read.csv('covid_19_data.csv', header=True)

In [80]:
csv_df.show()

+---------+-----+-------------------+---------+------+---------+
|     Date|State|             Region|Confirmed|Deaths|Recovered|
+---------+-----+-------------------+---------+------+---------+
|4/29/2020| null|        Afghanistan|     1939|    60|      252|
|4/29/2020| null|            Albania|      766|    30|      455|
|4/29/2020| null|            Algeria|     3848|   444|     1702|
|4/29/2020| null|            Andorra|      743|    42|      423|
|4/29/2020| null|             Angola|       27|     2|        7|
|4/29/2020| null|Antigua and Barbuda|       24|     3|       11|
|4/29/2020| null|          Argentina|     4285|   214|     1192|
|4/29/2020| null|            Armenia|     1932|    30|      900|
|4/29/2020| null|            Austria|    15402|   580|    12779|
|4/29/2020| null|         Azerbaijan|     1766|    23|     1267|
|4/29/2020| null|            Bahamas|       80|    11|       23|
|4/29/2020| null|            Bahrain|     2921|     8|     1455|
|4/29/2020| null|        

In [55]:
csv_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Confirmed: string (nullable = true)
 |-- Deaths: string (nullable = true)
 |-- Recovered: string (nullable = true)



# Answers

## Q1. Show number of confiremed, death and recovered cases

In [57]:
df1 = covid_df.select("Region","Confirmed","Deaths","Recovered").groupBy("Region").sum("Confirmed","Deaths","Recovered").withColumnRenamed("sum(Confirmed)","Total_Cases").withColumnRenamed("sum(Deaths)","Death_Cases").withColumnRenamed("sum(Recovered)","Recovered_Cases").orderBy("Region")

In [58]:
df1.show()

+-------------------+-----------+-----------+---------------+
|             Region|Total_Cases|Death_Cases|Recovered_Cases|
+-------------------+-----------+-----------+---------------+
|        Afghanistan|       1939|         60|            252|
|            Albania|        766|         30|            455|
|            Algeria|       3848|        444|           1702|
|            Andorra|        743|         42|            423|
|             Angola|         27|          2|              7|
|Antigua and Barbuda|         24|          3|             11|
|          Argentina|       4285|        214|           1192|
|            Armenia|       1932|         30|            900|
|          Australia|       6752|         91|           5715|
|            Austria|      15402|        580|          12779|
|         Azerbaijan|       1766|         23|           1267|
|            Bahamas|         80|         11|             23|
|            Bahrain|       2921|          8|           1455|
|       

## Q2. In which region maximum number of case were recorded

In [66]:
df2 = covid_df.select("Region","Confirmed").groupBy("Region").sum("Confirmed").withColumnRenamed("sum(Confirmed)","Number_of_Cases").orderBy("Number_of_Cases",ascending=False)

In [68]:
df2.take(1)

[Row(Region='US', Number_of_Cases=1039909)]

## Q3. In which region maximum number of deaths were recorded

In [70]:
df3 = covid_df.select("Region","Deaths").groupBy("Region").sum("Deaths").withColumnRenamed("sum(Deaths)","Number_of_Deaths").orderBy("Number_of_Deaths",ascending=False)

In [71]:
df3.take(1)

[Row(Region='US', Number_of_Deaths=60967)]

## Q4. Show number of confiremed, death and recovered cases for PAK till April 29, 2020

In [88]:
df4 = covid_df.select("Region","Confirmed","Deaths","Recovered").filter(covid_df.Region == "Pakistan").groupBy("Region").sum("Confirmed","Deaths","Recovered").withColumnRenamed("sum(Confirmed)","Total_Cases").withColumnRenamed("sum(Deaths)","Death_Cases").withColumnRenamed("sum(Recovered)","Recovered_Cases")
df4.show()

+--------+-----------+-----------+---------------+
|  Region|Total_Cases|Death_Cases|Recovered_Cases|
+--------+-----------+-----------+---------------+
|Pakistan|      15525|        343|           3425|
+--------+-----------+-----------+---------------+



## Q5. Filter records with confiremed cases less than 10

In [86]:
df5 = covid_df[covid_df.Confirmed < 10]
df5.show()

+----+--------------------+--------------------+---------+------+---------+
|Date|               State|              Region|Confirmed|Deaths|Recovered|
+----+--------------------+--------------------+---------+------+---------+
|null|                null|              Bhutan|        7|     0|        5|
|null|                null|          MS Zaandam|        9|     2|        0|
|null|                null|          Mauritania|        8|     1|        6|
|null|                null|    Papua New Guinea|        8|     0|        0|
|null|                null|Sao Tome and Prin...|        8|     0|        4|
|null|                null|      Western Sahara|        6|     0|        5|
|null|                null|               Yemen|        6|     0|        1|
|null|            Anguilla|                  UK|        3|     0|        3|
|null|Bonaire, Sint Eus...|         Netherlands|        5|     0|        0|
|null|British Virgin Is...|                  UK|        6|     1|        3|
|null|Diamon