In [40]:
from pyspark.sql import SparkSession

In [41]:
spark = SparkSession.builder.appName('Happiness_data').getOrCreate()

In [42]:
df1 = spark.read.csv("hdfs://localhost:9000/input2/happiness_data.csv", header=True, inferSchema=True)

In [43]:
df1.show(5) 

+-----------+--------------+-------------+--------------+-------------+-------------------+-------+--------------------+-------+-------------------------+----------+----------------+
|    Country|        Region|HappinessRank|HappinessScore|StandardError|EconomyGDPperCapita| Family|HealthLifeExpectancy|Freedom|TrustGovernmentCorruption|Generosity|DystopiaResidual|
+-----------+--------------+-------------+--------------+-------------+-------------------+-------+--------------------+-------+-------------------------+----------+----------------+
|Switzerland|Western Europe|            1|         7.587|      0.03411|            1.39651|1.34951|             0.94143|0.66557|                  0.41978|   0.29678|         2.51738|
|    Iceland|Western Europe|            2|         7.561|      0.04884|            1.30232|1.40223|             0.94784|0.62877|                  0.14145|    0.4363|         2.70201|
|    Denmark|Western Europe|            3|         7.527|      0.03328|            1.

In [44]:
df1.createOrReplaceTempView("Happiness_table")

In [27]:
result = spark.sql("select * from Happiness_table")
result.write.csv('hdfs://localhost:9000/data/result.csv', mode='overwrite', header=True)

In [8]:
result

DataFrame[Country: string, Region: string, HappinessRank: int, HappinessScore: double, StandardError: double, EconomyGDPperCapita: double, Family: double, HealthLifeExpectancy: double, Freedom: double, TrustGovernmentCorruption: double, Generosity: double, DystopiaResidual: double]

In [30]:
#result.show()

In [10]:
average_happiness_score = spark.sql("""
  SELECT AVG(HappinessScore) as AvgHappinessScore
  FROM happiness_table""")
average_happiness_score.write.csv('hdfs://localhost:9000/data/average_happiness_score.csv', mode='overwrite', header=True)

In [11]:
average_happiness_score.show()

+------------------+
| AvgHappinessScore|
+------------------+
|5.3757341772151905|
+------------------+



In [45]:
#Correlation between happiness score and contributing factors (e.g., economy, family, health, freedom, trust, generosity)


Correlation_Metrics = spark.sql("""
  SELECT
    corr(`HappinessScore`, `EconomyGDPperCapita`) AS Correlation_Economy,
    corr(`HappinessScore`, `Family`) AS Correlation_Family,
    corr(`HappinessScore`, `HealthLifeExpectancy`) AS Correlation_Health,
    corr(`HappinessScore`, `Freedom`) AS Correlation_Freedom,
    corr(`HappinessScore`, `TrustGovernmentCorruption`) AS Correlation_Trust,
    corr(`HappinessScore`, `Generosity`) AS Correlation_Generosity
  FROM happiness_table
""")
Correlation_Metrics.write.csv('hdfs://localhost:9000/data/Correlation_Metrics.csv', mode='overwrite', header=True)

Correlation_Metrics.show()



+-------------------+------------------+------------------+-------------------+-------------------+----------------------+
|Correlation_Economy|Correlation_Family|Correlation_Health|Correlation_Freedom|  Correlation_Trust|Correlation_Generosity|
+-------------------+------------------+------------------+-------------------+-------------------+----------------------+
| 0.7809655268660222|0.7406051972367849|0.7241995951050696| 0.5682109041925183|0.39519858383691153|     0.180318526697175|
+-------------------+------------------+------------------+-------------------+-------------------+----------------------+



In [12]:
happiness_per_country = spark.sql("""
  SELECT Country, HappinessScore
  FROM happiness_table""")
happiness_per_country.write.csv('hdfs://localhost:9000/data/happiness_per_country.csv', mode='overwrite', header=True)

In [13]:
happiness_per_country.show()


+--------------------+--------------+
|             Country|HappinessScore|
+--------------------+--------------+
|         Switzerland|         7.587|
|             Iceland|         7.561|
|             Denmark|         7.527|
|              Norway|         7.522|
|              Canada|         7.427|
|             Finland|         7.406|
|         Netherlands|         7.378|
|              Sweden|         7.364|
|         New Zealand|         7.286|
|           Australia|         7.284|
|              Israel|         7.278|
|          Costa Rica|         7.226|
|             Austria|           7.2|
|              Mexico|         7.187|
|       United States|         7.119|
|              Brazil|         6.983|
|          Luxembourg|         6.946|
|             Ireland|          6.94|
|             Belgium|         6.937|
|United Arab Emirates|         6.901|
+--------------------+--------------+
only showing top 20 rows



In [16]:
average_happiness_by_region = spark.sql("""
  SELECT
    Region,
    AVG(HappinessScore) AS AverageHappinessScore
  FROM happiness_table
  GROUP BY Region
  ORDER BY
    AverageHappinessScore DESC
""")
average_happiness_by_region.write.csv('hdfs://localhost:9000/data/average_happiness_by_region.csv', mode='overwrite', header=True)
average_happiness_by_region.show()


+--------------------+---------------------+
|              Region|AverageHappinessScore|
+--------------------+---------------------+
|Australia and New...|                7.285|
|       North America|                7.273|
|      Western Europe|    6.689619047619048|
|Latin America and...|   6.1446818181818195|
|        Eastern Asia|    5.626166666666666|
|Middle East and N...|    5.406899999999999|
|Central and Easte...|    5.332931034482758|
|   Southeastern Asia|    5.317444444444445|
|       Southern Asia|    4.580857142857143|
|  Sub-Saharan Africa|    4.202800000000001|
+--------------------+---------------------+



In [17]:
average_happiness_by_region

DataFrame[Region: string, AverageHappinessScore: double]

In [18]:
average_factors = spark.sql("""
  SELECT
    AVG(EconomyGDPperCapita) AS AvgEconomy,
    AVG(Family) AS AvgFamily,
    AVG(Freedom) AS AvgFreedom,
    AVG(Generosity) AS AvgGenerosity,
    AVG(HealthLifeExpectancy) AS AvgHealth
  FROM happiness_table
""")
average_factors.show()

+------------------+------------------+------------------+-------------------+------------------+
|        AvgEconomy|         AvgFamily|        AvgFreedom|      AvgGenerosity|         AvgHealth|
+------------------+------------------+------------------+-------------------+------------------+
|0.8461372151898726|0.9910459493670887|0.4286149367088611|0.23729550632911403|0.6302593670886079|
+------------------+------------------+------------------+-------------------+------------------+



In [19]:
number_of_countries = spark.sql("SELECT COUNT(DISTINCT Country) AS NumberOfCountries FROM happiness_table")
number_of_countries.show()


+-----------------+
|NumberOfCountries|
+-----------------+
|              158|
+-----------------+



In [20]:
top_10_happiest_countries = spark.sql("""
  SELECT Country, `HappinessScore` as `HappinessScore`
  FROM happiness_table
  ORDER BY `HappinessScore` DESC
  LIMIT 10
""")
top_10_happiest_countries.show()
top_10_happiest_countries.write.csv('hdfs://localhost:9000/data/top_10_happiest_countries.csv', mode='overwrite', header=True)



+-----------+--------------+
|    Country|HappinessScore|
+-----------+--------------+
|Switzerland|         7.587|
|    Iceland|         7.561|
|    Denmark|         7.527|
|     Norway|         7.522|
|     Canada|         7.427|
|    Finland|         7.406|
|Netherlands|         7.378|
|     Sweden|         7.364|
|New Zealand|         7.286|
|  Australia|         7.284|
+-----------+--------------+



In [21]:
top_10_happiest_countries

DataFrame[Country: string, HappinessScore: double]

In [22]:
bottom_10_happiness_countries = spark.sql("""
  SELECT Country, `HappinessScore`
  FROM happiness_table
  ORDER BY `HappinessScore`
  LIMIT 10
""")
bottom_10_happiness_countries.show()
bottom_10_happiness_countries.write.csv('hdfs://localhost:9000/data/bottom_10_happiness_countries.csv', mode='overwrite', header=True)


+------------+--------------+
|     Country|HappinessScore|
+------------+--------------+
|        Togo|         2.839|
|     Burundi|         2.905|
|       Syria|         3.006|
|       Benin|          3.34|
|      Rwanda|         3.465|
| Afghanistan|         3.575|
|Burkina Faso|         3.587|
| Ivory Coast|         3.655|
|      Guinea|         3.656|
|        Chad|         3.667|
+------------+--------------+



+-------------------+------------------+------------------+-------------------+-------------------+----------------------+
|Correlation_Economy|Correlation_Family|Correlation_Health|Correlation_Freedom|  Correlation_Trust|Correlation_Generosity|
+-------------------+------------------+------------------+-------------------+-------------------+----------------------+
| 0.7809655268660222|0.7406051972367849|0.7241995951050696| 0.5682109041925183|0.39519858383691153|     0.180318526697175|
+-------------------+------------------+------------------+-------------------+-------------------+----------------------+



In [24]:
Correlation_Metrics

DataFrame[Correlation_Economy: double, Correlation_Family: double, Correlation_Health: double, Correlation_Freedom: double, Correlation_Trust: double, Correlation_Generosity: double]

In [25]:
highest_happiness_country = spark.sql("""
  SELECT Country, MAX(HappinessScore) AS HappiestCountry
  FROM happiness_table
  GROUP BY Country
  ORDER BY HappiestCountry DESC
  LIMIT 1
""")
highest_happiness_country.show()


+-----------+---------------+
|    Country|HappiestCountry|
+-----------+---------------+
|Switzerland|          7.587|
+-----------+---------------+



In [26]:
lowest_happiness_country = spark.sql("""
  SELECT Country, MIN(HappinessScore) AS LeastHappyCountry
  FROM happiness_table
  GROUP BY Country
  ORDER BY LeastHappyCountry ASC
  LIMIT 1
""")
lowest_happiness_country.show()


+-------+-----------------+
|Country|LeastHappyCountry|
+-------+-----------------+
|   Togo|            2.839|
+-------+-----------------+

