<h2>Creating a Spark object</h2>

In [1]:
from pyspark.sql import SparkSession

spark=SparkSession.builder.appName("covid_analysis").getOrCreate()

<h3>Regional Summary</h3>

In [22]:
df= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/country_wise_latest.csv",header=True)
df.createOrReplaceTempView("df")
result=spark.sql("""
    SELECT `WHO Region`, 
        SUM(`Confirmed`) AS total_cases, 
        SUM(`Deaths`) AS total_deaths, 
        SUM(`Recovered`) AS total_recoveries 
    FROM df 
    GROUP BY `WHO Region` 
    ORDER BY total_cases DESC;
    """)
result.show()
 

+--------------------+-----------+------------+----------------+
|          WHO Region|total_cases|total_deaths|total_recoveries|
+--------------------+-----------+------------+----------------+
|            Americas|  8839286.0|    342732.0|       4468616.0|
|              Europe|  3299523.0|    211144.0|       1993723.0|
|     South-East Asia|  1835297.0|     41349.0|       1156933.0|
|Eastern Mediterra...|  1490744.0|     38339.0|       1201400.0|
|              Africa|   723207.0|     12223.0|        440645.0|
|     Western Pacific|   292428.0|      8249.0|        206770.0|
+--------------------+-----------+------------+----------------+



<h3>Top 10 Countries By COVID-19 Recovery Rates</h3>

In [28]:
df= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/country_wise_latest.csv",header=True,inferSchema=True)
df.createOrReplaceTempView("df")
result=spark.sql("""
    select `Country/Region`, 
    (`Recovered`* 100.0 / `Confirmed`) as recovery_rate 
    from df
    order by recovery_rate 
    DESC limit 10;
""")
result.show()

+--------------+----------------+
|Country/Region|   recovery_rate|
+--------------+----------------+
|      Holy See|100.000000000000|
|       Grenada|100.000000000000|
|      Dominica|100.000000000000|
|      Djibouti| 98.379126309547|
|       Iceland| 98.327939590076|
|        Brunei| 97.872340425532|
|   New Zealand| 97.238278741169|
|         Qatar| 97.017254121919|
|      Malaysia| 96.597035040431|
|     Mauritius| 96.511627906977|
+--------------+----------------+



<h3>Comparison Of Recovery And Fatality Rates By Country</h3>

In [31]:
df= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/country_wise_latest.csv",header=True,inferSchema=True)
df.createOrReplaceTempView("df")
result=spark.sql("""
    SELECT `Country/Region`, (`Deaths` * 100.0 / `Confirmed`) AS death_rate, 
    (`Recovered` * 100.0 / `Confirmed`) AS recovery_rate 
    FROM df
    WHERE `Confirmed` > 0 
    ORDER BY "Country_Region";
""")
result.show()

+-------------------+---------------+---------------+
|     Country/Region|     death_rate|  recovery_rate|
+-------------------+---------------+---------------+
|        Afghanistan| 3.499434685492|69.486804732096|
|            Albania| 2.950819672131|56.250000000000|
|            Algeria| 4.157580524077|67.339934937261|
|            Andorra| 5.733186328556|88.533627342889|
|             Angola| 4.315789473684|25.473684210526|
|Antigua and Barbuda| 3.488372093023|75.581395348837|
|          Argentina| 1.827184976346|43.350097959574|
|            Armenia| 1.901577962022|71.315859855576|
|          Australia| 1.091289289682|60.844278899562|
|            Austria| 3.468236209748|88.753769821967|
|         Azerbaijan| 1.389345069960|76.338435262432|
|            Bahamas| 2.879581151832|23.821989528796|
|            Bahrain| 0.357124765716|91.459399219898|
|         Bangladesh| 1.310642059896|55.556636092386|
|           Barbados| 6.363636363636|85.454545454545|
|            Belarus| 0.7999

<h3>Countries with the Lowest Number of COVID-19 Deaths</h3>

In [38]:
df= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/country_wise_latest.csv",header=True,inferSchema=True)
df.createOrReplaceTempView("df")
result=spark.sql("""
    SELECT `Country/Region`, `Deaths` 
    FROM df 
    WHERE `Deaths` > 0 
    ORDER BY `Deaths` 
    ASC LIMIT 10;
""")
result.show()

+-------------------+------+
|     Country/Region|Deaths|
+-------------------+------+
|            Burundi|     1|
|      Liechtenstein|     1|
|     Western Sahara|     1|
|             Belize|     2|
|           Botswana|     2|
|             Uganda|     2|
|Antigua and Barbuda|     3|
|             Brunei|     3|
|             Monaco|     4|
|             Rwanda|     5|
+-------------------+------+



<h3>Countries with the Highest Number of COVID-19 Cases</h3>

In [40]:
df= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/country_wise_latest.csv",header=True,inferSchema=True)
df.createOrReplaceTempView("df")
result=spark.sql("""
    SELECT `Country/Region`, `Confirmed` 
    FROM df 
    ORDER BY `Confirmed` 
    DESC LIMIT 10;
""")
result.show()

+--------------+---------+
|Country/Region|Confirmed|
+--------------+---------+
|            US|  4290259|
|        Brazil|  2442375|
|         India|  1480073|
|        Russia|   816680|
|  South Africa|   452529|
|        Mexico|   395489|
|          Peru|   389717|
|         Chile|   347923|
|United Kingdom|   301708|
|          Iran|   293606|
+--------------+---------+



<h3>Global Recovery Rate</h3>

In [42]:
df= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/worldometer_data.csv",header=True,inferSchema=True)
df.createOrReplaceTempView("df")
result=spark.sql("""
SELECT (SUM(`TotalRecovered`)+SUM(`NewRecovered`)) * 100 / SUM(`TotalCases`) as recovery_rate 
from df;
""")
result.show()


+-----------------+
|    recovery_rate|
+-----------------+
|62.99339783483538|
+-----------------+



<h3>COVID-19 Trends By Continent</h3>

In [43]:
df= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/worldometer_data.csv",header=True,inferSchema=True)
df.createOrReplaceTempView("df")
result=spark.sql("""
    SELECT `Continent` , SUM(`TotalCases`) as TotalCases, 
    SUM(`TotalDeaths`) as TotalDeaths, 
    SUM(`TotalRecovered`) as TotalRecovered 
    from df 
    group by `Continent`;
""")
result.show()

+-----------------+----------+-----------+--------------+
|        Continent|TotalCases|TotalDeaths|TotalRecovered|
+-----------------+----------+-----------+--------------+
|           Europe|   2982576|     205232|       1587302|
|           Africa|   1011867|      22114|        693620|
|             null|       712|         13|           651|
|Australia/Oceania|     21735|        281|         12620|
|    North America|   5919209|     229855|       3151678|
|    South America|   4543273|     154885|       3116150|
|             Asia|   4689794|     100627|       3508170|
+-----------------+----------+-----------+--------------+



<h3>Continent That Has The Lowest Cases</h3>

In [44]:
df= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/worldometer_data.csv",header=True,inferSchema=True)
df.createOrReplaceTempView("df")
result=spark.sql("""
    SELECT `Continent`, 
        SUM(`TotalCases`) AS total_cases 
    FROM df 
    GROUP BY `Continent` 
    ORDER BY total_cases 
    ASC LIMIT 1;
""")
result.show()

+---------+-----------+
|Continent|total_cases|
+---------+-----------+
|     null|        712|
+---------+-----------+



<h3>Recovery Rate by Continent</h3>

In [45]:
df= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/worldometer_data.csv",header=True,inferSchema=True)
df.createOrReplaceTempView("df")
result=spark.sql("""
    SELECT `Continent`, 
        SUM(`TotalRecovered`) * 100.0 / SUM(`TotalCases`) AS recovery_rate 
    FROM df 
    WHERE `TotalCases` > 0 
    GROUP BY `Continent` 
    ORDER BY recovery_rate DESC;
""")
result.show()

+-----------------+-----------------+
|        Continent|    recovery_rate|
+-----------------+-----------------+
|             null|91.43258426966292|
|             Asia|74.80435174764606|
|    South America|68.58821822945705|
|           Africa|68.54853454060662|
|Australia/Oceania|58.06303197607545|
|    North America|53.24491836662635|
|           Europe|53.21916356867352|
+-----------------+-----------------+



<h3>Global Death %</h3>

In [47]:
df= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/country_wise_latest.csv",header=True,inferSchema=True)
df.createOrReplaceTempView("df")
result=spark.sql("""
    SELECT SUM(`Deaths`) * 100.0 / SUM(`Confirmed`) AS global_death_percentage 
    FROM df 
    WHERE `Confirmed` > 0;
""")
result.show()

+-----------------------+
|global_death_percentage|
+-----------------------+
|       3.96854825570971|
+-----------------------+



<h3>Local Death %</h3>

In [49]:
df= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/country_wise_latest.csv",header=True,inferSchema=True)
df.createOrReplaceTempView("df")
result=spark.sql("""
    SELECT `Country/Region`, 
        `Deaths` * 100.0 / `Confirmed` AS death_percentage 
    FROM df 
    WHERE `Confirmed` > 0 
    ORDER BY `Country/Region`;
""")
result.show()

+-------------------+----------------+
|     Country/Region|death_percentage|
+-------------------+----------------+
|        Afghanistan|  3.499434685492|
|            Albania|  2.950819672131|
|            Algeria|  4.157580524077|
|            Andorra|  5.733186328556|
|             Angola|  4.315789473684|
|Antigua and Barbuda|  3.488372093023|
|          Argentina|  1.827184976346|
|            Armenia|  1.901577962022|
|          Australia|  1.091289289682|
|            Austria|  3.468236209748|
|         Azerbaijan|  1.389345069960|
|            Bahamas|  2.879581151832|
|            Bahrain|  0.357124765716|
|         Bangladesh|  1.310642059896|
|           Barbados|  6.363636363636|
|            Belarus|  0.799988104266|
|            Belgium| 14.785933642440|
|             Belize|  4.166666666667|
|              Benin|  1.977401129944|
|             Bhutan|           0E-12|
+-------------------+----------------+
only showing top 20 rows



<h3>Global Infection %</h3>

In [50]:
df1= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/country_wise_latest.csv",header=True,inferSchema=True)
df1.createOrReplaceTempView("df1")
df2= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/worldometer_data.csv",header=True,inferSchema=True)
df2.createOrReplaceTempView("df2")
result=spark.sql("""
    SELECT SUM(cwl.`Confirmed`)*100.0/ NULLIF(SUM(wd.`Population`), 0) AS infect_percent
    from df1 cwl left join df2 wd 
    on cwl.`Country/Region`=wd.`Country/Region`;
""")
result.show()

+----------------+
|  infect_percent|
+----------------+
|0.29201419840115|
+----------------+



<h3>Local Infection %</h3>

In [51]:
df1= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/country_wise_latest.csv",header=True,inferSchema=True)
df1.createOrReplaceTempView("df1")
df2= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/worldometer_data.csv",header=True,inferSchema=True)
df2.createOrReplaceTempView("df2")
result=spark.sql("""
    SELECT cwl.`Country/Region`, (cwl.`Confirmed`*100.0 / wd.`Population`) as infect_percent 
    from df1 cwl left join df2 wd on cwl.`Country/Region`=wd.`Country/Region` 
    order by `Country/Region`;
""")
result.show()

+-------------------+--------------+
|     Country/Region|infect_percent|
+-------------------+--------------+
|        Afghanistan|0.092959533623|
|            Albania|0.169593427560|
|            Algeria|0.063681987186|
|            Andorra|1.173684619167|
|             Angola|0.002882605147|
|Antigua and Barbuda|0.087746148352|
|          Argentina|0.370087382676|
|            Armenia|1.261551428212|
|          Australia|0.059943912898|
|            Austria|0.228128772578|
|         Azerbaijan|0.300012524335|
|            Bahamas|0.097048900451|
|            Bahrain|2.313395274655|
|         Bangladesh|0.137229649629|
|           Barbados|0.038272717467|
|            Belarus|0.711726033260|
|            Belgium|0.572915009126|
|             Belize|0.012050854606|
|              Benin|0.014565532387|
|             Bhutan|0.012816479663|
+-------------------+--------------+
only showing top 20 rows



<h3>To find out the countries with the highest infection rates</h3>

In [52]:
df1= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/country_wise_latest.csv",header=True,inferSchema=True)
df1.createOrReplaceTempView("df1")
df2= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/worldometer_data.csv",header=True,inferSchema=True)
df2.createOrReplaceTempView("df2")
result=spark.sql("""
    SELECT cwl.`Country/Region`, 
        (cwl.`Confirmed`*100.0 / NULLIF(wd.`Population`,0)) as infect_percent 
    from df1 cwl left join df2 wd on cwl.`Country/Region`=wd.`Country/Region` 
    where cwl.`Confirmed`>0 AND 
    wd.`Population` is not null 
    order by infect_percent 
    DESC limit 1;
""")
result.show()

+--------------+--------------+
|Country/Region|infect_percent|
+--------------+--------------+
|         Qatar|3.903298127897|
+--------------+--------------+



<h3>To find out the countries and continents with the highest death counts</h3>

In [55]:
df= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/worldometer_data.csv",header=True,inferSchema=True)
df.createOrReplaceTempView("df")
result=spark.sql("""
    SELECT `Country/Region`, 
        `Continent`, `TotalDeaths` from df 
    where `TotalDeaths` is not null 
    order by `TotalDeaths` DESC;
""")
result.show()

+--------------+-------------+-----------+
|Country/Region|    Continent|TotalDeaths|
+--------------+-------------+-----------+
|           USA|North America|     162804|
|        Brazil|South America|      98644|
|        Mexico|North America|      50517|
|            UK|       Europe|      46413|
|         India|         Asia|      41638|
|         Italy|       Europe|      35187|
|        France|       Europe|      30312|
|         Spain|       Europe|      28500|
|          Peru|South America|      20424|
|          Iran|         Asia|      17976|
|        Russia|       Europe|      14606|
|      Colombia|South America|      11939|
|         Chile|South America|       9889|
|       Belgium|       Europe|       9859|
|  South Africa|       Africa|       9604|
|       Germany|       Europe|       9252|
|        Canada|North America|       8966|
|   Netherlands|       Europe|       6153|
|      Pakistan|         Asia|       6035|
|       Ecuador|South America|       5877|
+----------

<h3>Average number of deaths by day (Continents and Countries)(global)</h3>

In [58]:
df= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/full_grouped.csv",header=True,inferSchema=True)
df.createOrReplaceTempView("df")
result=spark.sql("""
    select `Country/Region`, 
        `Date`,avg(`Deaths`) as average_death 
    from df 
    where `Deaths` is not null 
    group by `Country/Region`,`Date` 
    order by `Date` desc;
""")
result.show()

+--------------+----------+-------------+
|Country/Region|      Date|average_death|
+--------------+----------+-------------+
|   Timor-Leste|2020-07-27|          0.0|
|         China|2020-07-27|       4656.0|
|     Australia|2020-07-27|        167.0|
|      Cameroon|2020-07-27|        391.0|
|        Greece|2020-07-27|        202.0|
|     Greenland|2020-07-27|          0.0|
|       Iceland|2020-07-27|         10.0|
|      Cambodia|2020-07-27|          0.0|
|   Afghanistan|2020-07-27|       1269.0|
|       Namibia|2020-07-27|          8.0|
|         Haiti|2020-07-27|        158.0|
|       Bahrain|2020-07-27|        141.0|
|       Ecuador|2020-07-27|       5532.0|
|     Mauritius|2020-07-27|         10.0|
|          Iraq|2020-07-27|       4458.0|
|     Indonesia|2020-07-27|       4838.0|
|        Russia|2020-07-27|      13334.0|
|      Honduras|2020-07-27|       1166.0|
|       Estonia|2020-07-27|         69.0|
|        Uganda|2020-07-27|          2.0|
+--------------+----------+-------

<h3>Average number of deaths by day (Continents and Countries)(local)</h3>

In [61]:
df1= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/full_grouped.csv",header=True,inferSchema=True)
df1.createOrReplaceTempView("df1")
df2= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/worldometer_data.csv",header=True,inferSchema=True)
df2.createOrReplaceTempView("df2")
result=spark.sql("""
    SELECT fg.`Country/Region`, 
        wd.`Continent`, 
        fg.`Date`, 
        avg(fg.`Deaths`) as average_death 
    from df1 fg left join df2 wd on fg.`Country/Region`=wd.`Country/Region` 
    where fg.`Deaths` is not null 
    group by fg.`Country/Region`, wd.`Continent`,fg.`Date` 
    order by average_death DESC;
""")
result.show()

+--------------+---------+----------+-------------+
|Country/Region|Continent|      Date|average_death|
+--------------+---------+----------+-------------+
|            US|     null|2020-07-27|     148011.0|
|            US|     null|2020-07-26|     146935.0|
|            US|     null|2020-07-25|     146465.0|
|            US|     null|2020-07-24|     145560.0|
|            US|     null|2020-07-23|     144430.0|
|            US|     null|2020-07-22|     143316.0|
|            US|     null|2020-07-21|     142121.0|
|            US|     null|2020-07-20|     141025.0|
|            US|     null|2020-07-19|     140534.0|
|            US|     null|2020-07-18|     140119.0|
|            US|     null|2020-07-17|     139266.0|
|            US|     null|2020-07-16|     138358.0|
|            US|     null|2020-07-15|     137415.0|
|            US|     null|2020-07-14|     136466.0|
|            US|     null|2020-07-13|     135566.0|
|            US|     null|2020-07-12|     135205.0|
|           

<h3>Average of cases divided by the number of population of each country (TOP 10)</h3>

In [63]:
df= spark.read.csv(r"hdfs://localhost:9000/user/hadoop/covid_dataset/archive/worldometer_data.csv",header=True,inferSchema=True)
df.createOrReplaceTempView("df")
result=spark.sql("""
    select `Country/Region`, 
        avg(`TotalCases`* 100.0 /NULLIF(`Population`,0)) as average_case 
    from df 
    where `Population` is not null 
    group by `Country/Region` 
    order by average_case DESC limit 10;
""")
result.show()

+--------------+------------------+
|Country/Region|      average_case|
+--------------+------------------+
|         Qatar|3.9921575750450000|
| French Guiana|2.7145648579590000|
|       Bahrain|2.5130239079750000|
|    San Marino|2.0596381637100000|
|         Chile|1.9164810228280000|
|        Panama|1.6527039892330000|
|        Kuwait|1.6378443167540000|
|          Oman|1.5769043963730000|
|           USA|1.5193862960520000|
|  Vatican City|1.4981273408240000|
+--------------+------------------+

