# Proyecto: Big Data Processing

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m25.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824028 sha256=c8634abe508fe982096b50a84905d99f753e352719665e039c94f6d1345f009c
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [2]:
# Upload data files
from google.colab import files
uploaded = files.upload()

Saving world-happiness-report.csv to world-happiness-report.csv
Saving world-happiness-report-2021.csv to world-happiness-report-2021.csv


In [3]:
from pyspark.sql import SparkSession

# Create SparkSession object
spark = SparkSession.builder \
    .appName("World Happineess 2005-2021") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [4]:
from pyspark.sql.types import *

# Define data schemas
world_happiness_2005_2020_schema = StructType([
    StructField("Country name", StringType(), True),
    StructField("Year", IntegerType(), True),
    StructField("Life Ladder", FloatType(), True),
    StructField("Log GDP per capita", FloatType(), True),
    StructField("Social support", FloatType(), True),
    StructField("Healthy life expectancy at birth", FloatType(), True),
    StructField("Freedom to make life choices", FloatType(), True),
    StructField("Generosity", FloatType(), True),
    StructField("Perceptions of corruption", FloatType(), True),
    StructField("Positive affect", FloatType(), True),
    StructField("Negative affect", FloatType(), True),
]) 

world_happiness_2021_schema = StructType([
    StructField("Country name", StringType(), True),
    StructField("Regional indicator", StringType(), True),
    StructField("Ladder score", FloatType(), True),
    StructField("Standard error of ladder score", FloatType(), True),
    StructField("upperwhisker", FloatType(), True),
    StructField("lowerwhisker", FloatType(), True),
    StructField("Logged GDP per capita", FloatType(), True),
    StructField("Social support", FloatType(), True),
    StructField("Healthy life expectancy", FloatType(), True),
    StructField("Freedom to make life choices", FloatType(), True),
    StructField("Generosity", FloatType(), True),
    StructField("Ladder score in Dystopia", FloatType(), True),
    StructField("Explained by: Log GDP per capita", FloatType(), True),
    StructField("Explained by: Social support", FloatType(), True),
    StructField("Explained by: Healthy life expectancy", FloatType(), True),
    StructField("Explained by: Freedom to make life choices", FloatType(), True),
    StructField("Explained by: Generosity", FloatType(), True),
    StructField("Explained by: Perceptions of corruption", FloatType(), True),
    StructField("Dystopia + residual", FloatType(), True),
]) 

In [5]:
# Read csv data files and create DataFrames
world_happiness_2005_2020_df = spark.read.csv(
    "world-happiness-report.csv", 
    header=True, schema=world_happiness_2005_2020_schema, sep=",")

world_happiness_2021_df = spark.read.csv(
    "world-happiness-report-2021.csv", 
    header=True, schema=world_happiness_2021_schema, sep=",")

In [6]:
# Create Country-Region DataFrame
country_region_df = world_happiness_2021_df.groupBy("Country name", "Regional indicator").count() \
    .withColumnRenamed("Country name", "Country") 


world_happiness_2005_2020_cols = (
    "Country name",
    "Regional indicator",
    "Year",
    "Life Ladder",
    "Log GDP per capita",
    "Healthy life expectancy at birth"
)

# Create Region column, select and rename target columns
world_2005_2020_df = world_happiness_2005_2020_df.join(country_region_df, world_happiness_2005_2020_df["Country name"] == country_region_df["Country"], "left") \
    .select(*world_happiness_2005_2020_cols) \
    .withColumnRenamed("Country name", "Country") \
    .withColumnRenamed("Regional indicator", "Region") \
    .withColumnRenamed("Life Ladder", "Ladder Score") \
    .withColumnRenamed("Healthy life expectancy at birth", "Healthy life expectancy"
    )

world_2005_2020_df.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Ladder Score: float (nullable = true)
 |-- Log GDP per capita: float (nullable = true)
 |-- Healthy life expectancy: float (nullable = true)



In [7]:
from pyspark.sql.functions import lit

world_happiness_2021_cols = (
    "Country name",
    "Regional indicator",
    "Year",
    "Ladder score",
    "Logged GDP per capita",
    "Healthy life expectancy"
)

# Create Year column and select taget columns
world_2021_df = world_happiness_2021_df.withColumn("Year", lit(2021)) \
    .select(*world_happiness_2021_cols) \

world_2021_df.printSchema()

root
 |-- Country name: string (nullable = true)
 |-- Regional indicator: string (nullable = true)
 |-- Year: integer (nullable = false)
 |-- Ladder score: float (nullable = true)
 |-- Logged GDP per capita: float (nullable = true)
 |-- Healthy life expectancy: float (nullable = true)



In [8]:
# Union 2005-2020 and 2021 DataFrames
world_happiness_2005_2021_df = world_2005_2020_df.union(world_2021_df) \
    .sort("Country", "Year")
    
world_happiness_2005_2021_df.show(15)

+-----------+--------------------+----+------------+------------------+-----------------------+
|    Country|              Region|Year|Ladder Score|Log GDP per capita|Healthy life expectancy|
+-----------+--------------------+----+------------+------------------+-----------------------+
|Afghanistan|          South Asia|2008|       3.724|              7.37|                   50.8|
|Afghanistan|          South Asia|2009|       4.402|              7.54|                   51.2|
|Afghanistan|          South Asia|2010|       4.758|             7.647|                   51.6|
|Afghanistan|          South Asia|2011|       3.832|              7.62|                  51.92|
|Afghanistan|          South Asia|2012|       3.783|             7.705|                  52.24|
|Afghanistan|          South Asia|2013|       3.572|             7.725|                  52.56|
|Afghanistan|          South Asia|2014|       3.131|             7.718|                  52.88|
|Afghanistan|          South Asia|2015| 

In [9]:
from pyspark.sql.functions import col, when, count

# Missing values
world_happiness_2005_2021_df.select([count(when(col(c).isNull(), c)).alias(c) for c in world_happiness_2005_2021_df.columns]).show()

+-------+------+----+------------+------------------+-----------------------+
|Country|Region|Year|Ladder Score|Log GDP per capita|Healthy life expectancy|
+-------+------+----+------------+------------------+-----------------------+
|      0|    63|   0|           0|                36|                     55|
+-------+------+----+------------+------------------+-----------------------+



In [10]:
# Countries with no Region value
world_happiness_2005_2021_df.filter(col("Region").isNull()) \
    .groupBy("Country").count() \
    .drop("count") \
    .show() 

+--------------------+
|             Country|
+--------------------+
|              Guyana|
|            Djibouti|
|               Sudan|
|              Angola|
|               Qatar|
|             Somalia|
|            Suriname|
|                Cuba|
|    Congo (Kinshasa)|
|Central African R...|
|              Bhutan|
|                Oman|
|               Syria|
| Trinidad and Tobago|
|   Somaliland region|
|         South Sudan|
|              Belize|
+--------------------+





---



#### 1. ¿Cuál es el país más “feliz” del 2021 según la data?

In [11]:
from pyspark.sql.functions import desc

happiness_2021_df = world_happiness_2005_2021_df.filter(col("Year") == 2021) 
happiness_2021_df.select("Country", "Region", "Year", "Ladder Score") \
    .sort(desc("Ladder Score")) \
    .show(10, False)  # Finland

+-----------+---------------------+----+------------+
|Country    |Region               |Year|Ladder Score|
+-----------+---------------------+----+------------+
|Finland    |Western Europe       |2021|7.842       |
|Denmark    |Western Europe       |2021|7.62        |
|Switzerland|Western Europe       |2021|7.571       |
|Iceland    |Western Europe       |2021|7.554       |
|Netherlands|Western Europe       |2021|7.464       |
|Norway     |Western Europe       |2021|7.392       |
|Sweden     |Western Europe       |2021|7.363       |
|Luxembourg |Western Europe       |2021|7.324       |
|New Zealand|North America and ANZ|2021|7.277       |
|Austria    |Western Europe       |2021|7.268       |
+-----------+---------------------+----+------------+
only showing top 10 rows



#### 2. ¿Cuál es el país más “feliz” del 2021 por continente según la data?

In [12]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_region = Window.partitionBy("Region").orderBy(desc("Ladder Score"))
happiness_2021_df.withColumn("row", row_number().over(window_region)) \
  .filter(col("row") == 1).drop("row") \
  .sort(desc("Ladder Score")) \
  .select("Country", "Region", "Ladder Score") \
  .show(truncate=False)

+------------------------+----------------------------------+------------+
|Country                 |Region                            |Ladder Score|
+------------------------+----------------------------------+------------+
|Finland                 |Western Europe                    |7.842       |
|New Zealand             |North America and ANZ             |7.277       |
|Israel                  |Middle East and North Africa      |7.157       |
|Costa Rica              |Latin America and Caribbean       |7.069       |
|Czech Republic          |Central and Eastern Europe        |6.965       |
|Taiwan Province of China|East Asia                         |6.584       |
|Singapore               |Southeast Asia                    |6.377       |
|Uzbekistan              |Commonwealth of Independent States|6.179       |
|Mauritius               |Sub-Saharan Africa                |6.049       |
|Nepal                   |South Asia                        |5.269       |
+------------------------

#### 3. ¿Cuál es el país que más veces ocupó el primer lugar en todos los años?

In [13]:
window_year = Window.partitionBy("Year").orderBy(desc("Ladder Score"))
world_happiness_2005_2021_df.withColumn("row", row_number().over(window_year)) \
  .filter(col("row") == 1).drop("row") \
  .groupBy("Country").count() \
  .sort(desc("count")) \
  .show(truncate=False)  # Finland & Denmark

+-----------+-----+
|Country    |count|
+-----------+-----+
|Finland    |7    |
|Denmark    |7    |
|Norway     |1    |
|Switzerland|1    |
|Canada     |1    |
+-----------+-----+



#### 4. ¿Qué puesto de Felicidad tiene el país con mayor GDP del 2020?

In [14]:
window = Window.orderBy(desc("Ladder Score"))
world_happiness_2005_2021_df.filter(col("Year") == 2020) \
    .withColumn("Hapinness Rank", row_number().over(window)) \
    .drop("Healthy life expectancy") \
    .sort(desc("Log GDP per capita")) \
    .show(10)  # 13

+--------------------+--------------------+----+------------+------------------+--------------+
|             Country|              Region|Year|Ladder Score|Log GDP per capita|Hapinness Rank|
+--------------------+--------------------+----+------------+------------------+--------------+
|             Ireland|      Western Europe|2020|       7.035|            11.323|            13|
|         Switzerland|      Western Europe|2020|       7.508|            11.081|             4|
|United Arab Emirates|Middle East and N...|2020|       6.458|            11.053|            27|
|              Norway|      Western Europe|2020|        7.29|            11.042|             8|
|       United States|North America and...|2020|       7.028|            11.001|            14|
|             Denmark|      Western Europe|2020|       7.515|             10.91|             3|
|         Netherlands|      Western Europe|2020|       7.504|            10.901|             5|
|             Austria|      Western Euro

#### 5. ¿En que porcentaje ha variado a nivel mundial el GDP promedio del 2020 respecto al 2021? ¿Aumentó o disminuyó?

In [15]:
from pyspark.sql.functions import exp, avg

happiness_2020_df = world_happiness_2005_2021_df.filter(col("Year") == 2020)
gdp_2020 = happiness_2020_df.select(avg(exp("Log GDP per capita"))).first()[0]
gdp_2021 = happiness_2021_df.select(avg(exp("Log GDP per capita"))).first()[0]
rel_variation = gdp_2021 / gdp_2020 - 1
print(f"{rel_variation * 100:.2f}%")  # -13.01%

-13.01%


#### 6. ¿Cuál es el país con mayor expectativa de vida? Y ¿Cuánto tenía en ese indicador en el 2019?

In [16]:
life_expectancy_2021_df = happiness_2021_df.sort(desc("Healthy life expectancy"))
life_expectancy_2021_df.show(10, False)

highest_life_expectancy_country = life_expectancy_2021_df.first()["Country"]  # Singapore
world_happiness_2005_2021_df.filter((col("Country") == highest_life_expectancy_country) & (col("Year") == 2019)) \
    .select("Country", "Year", "Healthy life expectancy") \
    .show()  # 77.1 years

+-------------------------+---------------------+----+------------+------------------+-----------------------+
|Country                  |Region               |Year|Ladder Score|Log GDP per capita|Healthy life expectancy|
+-------------------------+---------------------+----+------------+------------------+-----------------------+
|Singapore                |Southeast Asia       |2021|6.377       |11.488            |76.953                 |
|Hong Kong S.A.R. of China|East Asia            |2021|5.477       |11.0              |76.82                  |
|Japan                    |East Asia            |2021|5.94        |10.611            |75.1                   |
|Spain                    |Western Europe       |2021|6.491       |10.571            |74.7                   |
|Switzerland              |Western Europe       |2021|7.571       |11.117            |74.4                   |
|France                   |Western Europe       |2021|6.69        |10.704            |74.0                   |
|