# Used basic statistics and data manipulation methods.

In [1]:
#SparkSession is the entry point to Spark SQL. It is the very first object 
#to create while developing Spark SQL applications.
#Used the SparkSession.builder method to create an instance of SparkSession with appName('WorldHealthRprt')
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('WorldHealthRprt').getOrCreate()

In [2]:
# Read the data from the three csv files(WH_2015,WH_2016,WH_2017) into a three dataframes i.e. df1,df2 & df3.
df1 = spark.read.csv ('WH_2015.csv', inferSchema=True, header =True)
df2 = spark.read.csv ('WH_2016.csv', inferSchema=True, header =True)
df3 = spark.read.csv ('WH_2017.csv', inferSchema=True, header =True)

In [3]:
#To show the schema of the dataframe df1
df1.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Happiness Rank: integer (nullable = true)
 |-- Happiness Score: double (nullable = true)
 |-- Standard Error: double (nullable = true)
 |-- Economy (GDP per Capita): double (nullable = true)
 |-- Family: double (nullable = true)
 |-- Health (Life Expectancy): double (nullable = true)
 |-- Freedom: double (nullable = true)
 |-- Trust (Government Corruption): double (nullable = true)
 |-- Generosity: double (nullable = true)
 |-- Dystopia Residual: double (nullable = true)



In [4]:
#Creating a new Dataframe 'df_2015' which contains only selected columns from df1(WH_2015 data) dataframe like Country,Happiness Rank 
#& Region columns and sorting them based on 'Country' in ascending order.
#Last three steps renaming the column names from ('Country','Happiness Rank','Region') to ('Country_2015','HR_2015','Region_2015')
df_2015 = df1.select(['Country','Happiness Rank','Region']).sort('Country')
df_2015 = df_2015.withColumnRenamed("Happiness Rank", "HR_2015")
df_2015 = df_2015.withColumnRenamed("Country", "Country_2015")
df_2015 = df_2015.withColumnRenamed("Region", "Region_2015")

In [5]:
#To show the schema of the dataframe df2
df2.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Happiness Rank: integer (nullable = true)
 |-- Happiness Score: double (nullable = true)
 |-- Lower Confidence Interval: double (nullable = true)
 |-- Upper Confidence Interval: double (nullable = true)
 |-- Economy (GDP per Capita): double (nullable = true)
 |-- Family: double (nullable = true)
 |-- Health (Life Expectancy): double (nullable = true)
 |-- Freedom: double (nullable = true)
 |-- Trust (Government Corruption): double (nullable = true)
 |-- Generosity: double (nullable = true)
 |-- Dystopia Residual: double (nullable = true)



In [6]:
#Creating a new Dataframe 'df_2016' which contains only selected columns from df2(WH_2016 data) dataframe like Country,Happiness Rank 
#& Region columns and sorting them based on 'Country' in ascending order.
#Last three steps renaming the column names from ('Country','Happiness Rank','Region') to ('Country_2016','HR_2016','Region_2016')
df_2016 = df2.select(['Country','Happiness Rank','Region']).sort('Country')
df_2016 = df_2016.withColumnRenamed("Happiness Rank", "HR_2016")
df_2016 = df_2016.withColumnRenamed("Country", "Country_2016")
df_2016 = df_2016.withColumnRenamed("Region", "Region_2016")

In [7]:
#To show the schema of the dataframe df2
df3.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Happiness.Rank: integer (nullable = true)
 |-- Happiness.Score: double (nullable = true)
 |-- Whisker.high: double (nullable = true)
 |-- Whisker.low: double (nullable = true)
 |-- Economy..GDP.per.Capita.: double (nullable = true)
 |-- Family: double (nullable = true)
 |-- Health..Life.Expectancy.: double (nullable = true)
 |-- Freedom: double (nullable = true)
 |-- Generosity: double (nullable = true)
 |-- Trust..Government.Corruption.: double (nullable = true)
 |-- Dystopia.Residual: double (nullable = true)



In [8]:
#Importing the regularExpression module or function
import re

In [9]:
#Replacing the column names in df3 with any dots('.') with '_' and placing it new dataframe 'df_test'
#I'm doing it as in pyspark it will not take or consider any dots in column name specification, so replacing with '_'.
df_test = df3.toDF(*(re.sub(r'[\.\s]+', '_', c) for c in df3.columns))

In [10]:
#To show the schema of the dataframe df_test
df_test.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Happiness_Rank: integer (nullable = true)
 |-- Happiness_Score: double (nullable = true)
 |-- Whisker_high: double (nullable = true)
 |-- Whisker_low: double (nullable = true)
 |-- Economy_GDP_per_Capita_: double (nullable = true)
 |-- Family: double (nullable = true)
 |-- Health_Life_Expectancy_: double (nullable = true)
 |-- Freedom: double (nullable = true)
 |-- Generosity: double (nullable = true)
 |-- Trust_Government_Corruption_: double (nullable = true)
 |-- Dystopia_Residual: double (nullable = true)



In [11]:
#Creating a new Dataframe 'df_2017' which contains only selected columns from df_test(WH_2017 data) dataframe like Country,Happiness Rank 
#columns and sorting them based on 'Country' in ascending order.
#Last two steps renaming the column names from ('Country','Happiness_Rank') to ('Country_2017','HR_2017')
df_2017 = df_test.select(['Country','Happiness_Rank']).sort('Country')
df_2017 = df_2017.withColumnRenamed("Happiness_Rank", "HR_2017")
df_2017 = df_2017.withColumnRenamed("Country", "Country_2017")

In [12]:
#Importing all modules or functions from subpackage sql.functions
from pyspark.sql.functions import *

In [13]:
#Making alias for dataframes df_2015 & df_2016 with names df_2015_1 & df_2016_1
#Using this alias names in joining tables for refering the keys that is involved.
df_2015 = df_2015.alias("df_2015_1")
df_2016 = df_2016.alias("df_2016_1")

In [14]:
#Doing here full outer join on dataframes df_2015 & df_2016, so as not to miss any data on the key field 'Country' i.e
#Country_2015 & Country_2016 and placing the result in new dataframe 'join'
join = df_2015.join(df_2016, col("df_2015_1.Country_2015") == col("df_2016_1.Country_2016"),'outer')

In [15]:
#It will show the count or number of rows present in the dataframe join.
join.count()

164

In [16]:
#Creating a new column name 'Country_1' to maintain global country names from the year wh_2015 & wh_2016
#I'm using the function 'coalesce' it will merge the two columns country names into one, where if you have null it will replace 
#the contents from one of those columns(WH_2015, WH_2016) that don't have null value. If both have null content then it will
#replace the word 'Not_Available' in new Country_1 column and placing into new dataframe 'join1'
join1 = join.withColumn('Country_1',coalesce(join["Country_2015"],join["Country_2016"], lit("Not_Avaiable")))

In [17]:
#To show the schema of the dataframe join1
join1.printSchema()

root
 |-- Country_2015: string (nullable = true)
 |-- HR_2015: integer (nullable = true)
 |-- Region_2015: string (nullable = true)
 |-- Country_2016: string (nullable = true)
 |-- HR_2016: integer (nullable = true)
 |-- Region_2016: string (nullable = true)
 |-- Country_1: string (nullable = false)



In [18]:
#Making alias for dataframes df_2017 & join1(df_2015 & df_2016 data) with names df_2017_1 & join_1
#Using this alias names in joining tables for refering the keys that is involved.
df_2017 = df_2017.alias("df_2017_1")
join1 = join1.alias("join_1")

In [19]:
#Doing here full outer join on dataframes df_2017 & join1, so as not to miss any data on the key field 'Country' i.e
#Country_2017 & Country_1 and placing the result in new dataframe 'Final_join'
Final_join = join1.join(df_2017, col("join_1.Country_1") == col("df_2017_1.Country_2017"),'outer')

In [20]:
#It will show the count or number of rows present in the dataframe Final_join.
Final_join.count()

166

In [21]:
#Creating a new column name 'Country' to maintain global country names from df_2017 & Final_join
#I'm using the function 'coalesce' it will merge the two columns country names into one, where if you have null it will replace 
#the contents from one of those columns(WH_2017, final_join) that don't have null value. If both have null content then it will
#replace the word 'Not_Available' in new Country column and placing into new dataframe 'join2'
join2 = Final_join.withColumn('Country',coalesce(Final_join["Country_1"],Final_join["Country_2017"], lit("Not_Available")))

In [22]:
#Creating a new Dataframe 'Last' which contains only selected columns from join2 dataframe like Country,HR_2015,HR_2016 & HR_2017
Last = join2.select(['Country','HR_2015','HR_2016','HR_2017'])

In [23]:
#Filling any values with value zero and placing it in new dataframe 'Final'
Final = Last.na.fill(0)

# Q1) Gives Happiness Ranking that increased the most from 2015 to 2017

In [24]:
#Here filtering the ranks where HR_2015 is less than HR_2017 and discarding the rows whose happiness ranks are zero in the year 2015,2016 & 2017
#Creating a new column diff, it will store the difference of two ranks i.e(HR_2017 - HR_2015) and places it in Increased_rnk dataframe.
Increased_rnk = Final.filter((Final['HR_2015'] < Final['HR_2017']) & ~(Final['HR_2015'] == '0') & ~(Final['HR_2016'] == '0') & ~(Final['HR_2017'] == '0')).withColumn('diff',Final.HR_2017 - Final.HR_2015)

In [25]:
#Displays the result on sorting based on diff in descending order
Increased_rnk.sort(desc("diff")).show()

+----------+-------+-------+-------+----+
|   Country|HR_2015|HR_2016|HR_2017|diff|
+----------+-------+-------+-------+----+
| Venezuela|     23|     44|     82|  59|
|   Liberia|    116|    150|    148|  32|
|    Zambia|     85|    106|    116|  31|
|     Haiti|    119|    136|    145|  26|
|  Zimbabwe|    115|    131|    138|  23|
|   Ukraine|    111|    123|    132|  21|
|Kyrgyzstan|     77|     85|     98|  21|
|   Vietnam|     75|     96|     94|  19|
|    Bhutan|     79|     84|     97|  18|
|  Paraguay|     53|     70|     70|  17|
|   Nigeria|     78|    103|     95|  17|
|     Ghana|    114|    124|    131|  17|
|   Croatia|     62|     74|     77|  15|
|  Botswana|    128|    137|    142|  14|
|   Albania|     95|    109|    109|  14|
|     Sudan|    118|    133|    130|  12|
|    Mexico|     14|     21|     25|  11|
|   Jamaica|     65|     73|     76|  11|
|     Yemen|    136|    147|    146|  10|
|    Kosovo|     69|     77|     78|   9|
+----------+-------+-------+------

In [26]:
#Displays the country's happiness ranking that increased the most.
Increased_rnk.sort(desc("diff")).select(['Country']).show()

+----------+
|   Country|
+----------+
| Venezuela|
|   Liberia|
|    Zambia|
|     Haiti|
|  Zimbabwe|
|   Ukraine|
|Kyrgyzstan|
|   Vietnam|
|    Bhutan|
|   Nigeria|
|  Paraguay|
|     Ghana|
|   Croatia|
|  Botswana|
|   Albania|
|     Sudan|
|    Mexico|
|   Jamaica|
|     Yemen|
|    Kosovo|
+----------+
only showing top 20 rows



# Q2) Gives Happiness Ranking that decreased the most from 2015 to 2017

In [27]:
#Here filtering the ranks where HR_2015 is greater than HR_2017 and discarding the rows whose happiness ranks are zero in the year 2015,2016 & 2017
#Creating a new column diff, it will store the difference of two ranks i.e(HR_2015 - HR_2016) and places it in Decreased_rnk dataframe.
Decreased_rnk = Final.filter((Final['HR_2015'] > Final['HR_2017']) & ~(Final['HR_2015'] == '0') & ~(Final['HR_2016'] == '0') & ~(Final['HR_2017'] == '0')).withColumn('diff',Final.HR_2015 - Final.HR_2017)

In [28]:
#Displays the result on sorting based on diff in descending order
Decreased_rnk.sort(desc("diff")).show()

+-------------------+-------+-------+-------+----+
|            Country|HR_2015|HR_2016|HR_2017|diff|
+-------------------+-------+-------+-------+----+
|             Latvia|     89|     68|     54|  35|
|              Egypt|    135|    120|    104|  31|
|            Hungary|    104|     91|     75|  29|
|            Romania|     86|     71|     57|  29|
|           Bulgaria|    134|    129|    105|  29|
|            Senegal|    142|    128|    115|  27|
|           Cameroon|    133|    114|    107|  26|
|              Gabon|    143|    134|    118|  25|
|        Ivory Coast|    151|    139|    128|  23|
|              Nepal|    121|    107|     99|  22|
|           Malaysia|     61|     47|     42|  19|
|       Burkina Faso|    152|    145|    134|  18|
|        Philippines|     90|     82|     72|  18|
|       Sierra Leone|    123|    111|    106|  17|
|           Cambodia|    145|    140|    129|  16|
|            Myanmar|    129|    119|    114|  15|
|            Algeria|     68|  

In [29]:
#Displays the country's happiness ranking that decreased the most.
Decreased_rnk.sort(desc("diff")).select(['Country']).show()

+-------------------+
|            Country|
+-------------------+
|             Latvia|
|              Egypt|
|            Hungary|
|           Bulgaria|
|            Romania|
|            Senegal|
|           Cameroon|
|              Gabon|
|        Ivory Coast|
|              Nepal|
|           Malaysia|
|        Philippines|
|       Burkina Faso|
|       Sierra Leone|
|           Cambodia|
|            Algeria|
|Congo (Brazzaville)|
|            Myanmar|
|             Greece|
|            Lebanon|
+-------------------+
only showing top 20 rows



In [30]:
df_2017 = df_2017.alias("df_2017_2")
join2 = join2.alias("join_2")

In [31]:
#Doing inner join on df_2017 dataframe & join2 dataframe to get the regions for the df_2017 data and dropping off the 'HR_2017, which is duplicate column'
joiner = df_2017.join(join2, col("df_2017_2.Country_2017") == col("join_2.Country"),'inner').drop(df_2017['HR_2017'])

In [32]:
#Selecting the required columns ['Country','HR_2017','Region_2015','Region_2016'] from the joiner dataframe into Final_joiner dataframe.
Final_joiner = joiner.select(['Country','HR_2017','Region_2015','Region_2016'])

In [33]:
#To display null regions for the year 2015
Final_joiner.where(col('Region_2015').isNull()).show()

+--------------------+-------+-----------+--------------------+
|             Country|HR_2017|Region_2015|         Region_2016|
+--------------------+-------+-----------+--------------------+
|             Somalia|     93|       null|  Sub-Saharan Africa|
|Taiwan Province o...|     33|       null|                null|
|Hong Kong S.A.R.,...|     71|       null|                null|
|             Namibia|    111|       null|  Sub-Saharan Africa|
|         South Sudan|    147|       null|  Sub-Saharan Africa|
|              Belize|     50|       null|Latin America and...|
+--------------------+-------+-----------+--------------------+



In [34]:
#Creating the new column 'Region_2017', by using the coalsce function on 2015,2016 regions to populate the region for 2017
#If it is not present for corresponding country it will replace with 'Not Available'
Converter = Final_joiner.withColumn('Region_2017',coalesce(Final_joiner["Region_2015"],Final_joiner["Region_2016"], lit("Not_Available")))

In [35]:
#Displays the schema of a Converter dataframe.
Converter.printSchema()

root
 |-- Country: string (nullable = false)
 |-- HR_2017: integer (nullable = true)
 |-- Region_2015: string (nullable = true)
 |-- Region_2016: string (nullable = true)
 |-- Region_2017: string (nullable = false)



In [36]:
#Selecting required fields like 'Country','HR_2017','Region_2017' for the year 2017 data and placing in in Doer_2017 data.
Doer_2017 = Converter.select(['Country','HR_2017','Region_2017'])

In [37]:
#Here grouping based on Region and calculating avg or mean on HR_Ranks and placing it in new dataframes
#for all years 2015,2016 & 2017
df_2015_R = df_2015.groupBy("Region_2015").agg(avg(col("HR_2015"))).alias('HR_2015')
df_2015_CR = df_2015_R.sort("Region_2015")
df_2016_R = df_2016.groupBy("Region_2016").agg(avg(col("HR_2016"))).alias('HR_2016')
df_2016_CR = df_2016_R.sort("Region_2016")
df_2017_R = Doer_2017.groupBy("Region_2017").agg(avg(col("HR_2017"))).alias('HR_2017')
df_2017_CR = df_2017_R.sort("Region_2017")

In [38]:
#Making table alias for dataframes df_2015_CR & df_2016_CR for joining purpose.
df_2015_CR = df_2015_CR.alias("df_2015_CR1")
df_2016_CR = df_2016_CR.alias("df_2016_CR1")

In [39]:
#Doing outer join on df_2015_CR & df_2016_CR dataframes on region as a key and dropping of 2016_region which is duplicate one.
df_CR = df_2015_CR.join(df_2016_CR, col("df_2015_CR1.Region_2015") == col("df_2016_CR1.Region_2016"),'outer').drop(df_2016_CR['Region_2016'])

In [40]:
#Making table alias for dataframes df_CR & df_2017_CR for joining purpose.
df_CR = df_CR.alias("df_CR1")
df_2017_CR = df_2017_CR.alias("df_2017_CR1")

In [41]:
#Doing outer join on df_2017_CR & df_CR dataframes on region as a key and dropping of 2015_region which is duplicate one.
df_CR_last = df_CR.join(df_2017_CR, col("df_CR1.Region_2015") == col("df_2017_CR1.Region_2017"),'outer').drop(df_CR['Region_2015'])

# Q3) Provide the ranking of the happiest continents for all years

In [42]:
#Renaming the selected columns into meaning full one as specified below.
df_CR_last = df_CR_last.withColumnRenamed("Region_2017", "Continent")
df_CR_last = df_CR_last.withColumnRenamed("avg(HR_2017)", "Mean_HR_2017")
df_CR_last = df_CR_last.withColumnRenamed("avg(HR_2016)", "Mean_HR_2016")
df_CR_last = df_CR_last.withColumnRenamed("avg(HR_2015)", "Mean_HR_2015")

In [43]:
#It will fill if we have nulls in the data and places in new dataframe 'Continent_Ranking'
Continent_Ranking = df_CR_last.na.fill(0)

In [44]:
#It will show the ranking of the happiest continents which is based on the mean calculated for each year on groupby continents.
Continent_Ranking.select(['Continent','Mean_HR_2015','Mean_HR_2016','Mean_HR_2017']).sort(desc("Mean_HR_2015")).show()

+--------------------+------------------+------------------+------------------+
|           Continent|      Mean_HR_2015|      Mean_HR_2016|      Mean_HR_2017|
+--------------------+------------------+------------------+------------------+
|  Sub-Saharan Africa|             127.9| 129.6578947368421|127.87179487179488|
|       Southern Asia|113.14285714285714|111.71428571428571|109.85714285714286|
|   Southeastern Asia| 81.22222222222223|              80.0|             73.75|
|Central and Easte...|              79.0| 78.44827586206897| 75.06896551724138|
|Middle East and N...|              77.6| 78.10526315789474| 76.42105263157895|
|        Eastern Asia|              64.5| 67.16666666666667|             71.25|
|Latin America and...| 46.90909090909091|48.333333333333336| 50.77272727272727|
|      Western Europe|29.523809523809526| 29.19047619047619|27.142857142857142|
|       North America|              10.0|               9.5|              10.5|
|Australia and New...|               9.5