# Solución 2: Ejercicio resuelto con Spark usando PySpark

## Apartado 1: Importación de las librerías necesarias

In [1]:
# pip install PyArrow

In [3]:
# Cargamos librerías y creamos una sesión de Spark
import findspark
findspark.init()
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("pysparkdf").getOrCreate()
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
import pyspark.pandas as ps
import pyspark.sql.types as T

<br>
<br>

## Apartado 2: Importamos los datasets a dataframes

In [4]:
# Importamos el dataset hasta el año 2020
df1 = spark.read.option("header","true").csv("/Users/dj/Downloads/9 Processing Big Data/PROYECTO FINAL/world-happiness-report.csv")
display(df1)
df1.show()

DataFrame[Country name: string, year: string, Life Ladder: string, Log GDP per capita: string, Social support: string, Healthy life expectancy at birth: string, Freedom to make life choices: string, Generosity: string, Perceptions of corruption: string, Positive affect: string, Negative affect: string]

+------------+----+-----------+------------------+--------------+--------------------------------+----------------------------+----------+-------------------------+---------------+---------------+
|Country name|year|Life Ladder|Log GDP per capita|Social support|Healthy life expectancy at birth|Freedom to make life choices|Generosity|Perceptions of corruption|Positive affect|Negative affect|
+------------+----+-----------+------------------+--------------+--------------------------------+----------------------------+----------+-------------------------+---------------+---------------+
| Afghanistan|2008|      3.724|             7.370|         0.451|                          50.800|                       0.718|     0.168|                    0.882|          0.518|          0.258|
| Afghanistan|2009|      4.402|             7.540|         0.552|                          51.200|                       0.679|     0.190|                    0.850|          0.584|          0.237|
| Afghanistan|2

In [5]:
df1.printSchema()

root
 |-- Country name: string (nullable = true)
 |-- year: string (nullable = true)
 |-- Life Ladder: string (nullable = true)
 |-- Log GDP per capita: string (nullable = true)
 |-- Social support: string (nullable = true)
 |-- Healthy life expectancy at birth: string (nullable = true)
 |-- Freedom to make life choices: string (nullable = true)
 |-- Generosity: string (nullable = true)
 |-- Perceptions of corruption: string (nullable = true)
 |-- Positive affect: string (nullable = true)
 |-- Negative affect: string (nullable = true)



In [6]:
df2 = spark.read.option("header","true").csv("/Users/dj/Downloads/9 Processing Big Data/PROYECTO FINAL/world-happiness-report-2021.csv")
display(df2)
df2.show()

DataFrame[Country name: string, Regional indicator: string, Ladder score: string, Standard error of ladder score: string, upperwhisker: string, lowerwhisker: string, Logged GDP per capita: string, Social support: string, Healthy life expectancy: string, Freedom to make life choices: string, Generosity: string, Perceptions of corruption: string, Ladder score in Dystopia: string, Explained by: Log GDP per capita: string, Explained by: Social support: string, Explained by: Healthy life expectancy: string, Explained by: Freedom to make life choices: string, Explained by: Generosity: string, Explained by: Perceptions of corruption: string, Dystopia + residual: string]

+--------------+--------------------+------------+------------------------------+------------+------------+---------------------+--------------+-----------------------+----------------------------+----------+-------------------------+------------------------+--------------------------------+----------------------------+-------------------------------------+------------------------------------------+------------------------+---------------------------------------+-------------------+
|  Country name|  Regional indicator|Ladder score|Standard error of ladder score|upperwhisker|lowerwhisker|Logged GDP per capita|Social support|Healthy life expectancy|Freedom to make life choices|Generosity|Perceptions of corruption|Ladder score in Dystopia|Explained by: Log GDP per capita|Explained by: Social support|Explained by: Healthy life expectancy|Explained by: Freedom to make life choices|Explained by: Generosity|Explained by: Perceptions of corruption|Dystopia + residual|
+--------------+--------

In [7]:
df2.printSchema()

root
 |-- Country name: string (nullable = true)
 |-- Regional indicator: string (nullable = true)
 |-- Ladder score: string (nullable = true)
 |-- Standard error of ladder score: string (nullable = true)
 |-- upperwhisker: string (nullable = true)
 |-- lowerwhisker: string (nullable = true)
 |-- Logged GDP per capita: string (nullable = true)
 |-- Social support: string (nullable = true)
 |-- Healthy life expectancy: string (nullable = true)
 |-- Freedom to make life choices: string (nullable = true)
 |-- Generosity: string (nullable = true)
 |-- Perceptions of corruption: string (nullable = true)
 |-- Ladder score in Dystopia: string (nullable = true)
 |-- Explained by: Log GDP per capita: string (nullable = true)
 |-- Explained by: Social support: string (nullable = true)
 |-- Explained by: Healthy life expectancy: string (nullable = true)
 |-- Explained by: Freedom to make life choices: string (nullable = true)
 |-- Explained by: Generosity: string (nullable = true)
 |-- Explained 

#### Vamos a hacer un filtrado de ambos dataframes para quedarnos con las columnas que nos interesan para responder a las preguntas planteadas. Además, juntaremos los dos datasets en uno solo.

In [8]:
# Selecciono las columnas de interés y las renombro
df1 = df1.select("Country name", "year", "Life Ladder", "Log GDP per capita", "Healthy life expectancy at birth")
df1 = df1.withColumnsRenamed({"Country name": "Country", "year": "Year", "Life Ladder": "Ladder", "Log GDP per capita": "GDP", "Healthy life expectancy at birth": "Life expectancy"})
df1.show()

+-----------+----+------+-----+---------------+
|    Country|Year|Ladder|  GDP|Life expectancy|
+-----------+----+------+-----+---------------+
|Afghanistan|2008| 3.724|7.370|         50.800|
|Afghanistan|2009| 4.402|7.540|         51.200|
|Afghanistan|2010| 4.758|7.647|         51.600|
|Afghanistan|2011| 3.832|7.620|         51.920|
|Afghanistan|2012| 3.783|7.705|         52.240|
|Afghanistan|2013| 3.572|7.725|         52.560|
|Afghanistan|2014| 3.131|7.718|         52.880|
|Afghanistan|2015| 3.983|7.702|         53.200|
|Afghanistan|2016| 4.220|7.697|         53.000|
|Afghanistan|2017| 2.662|7.697|         52.800|
|Afghanistan|2018| 2.694|7.692|         52.600|
|Afghanistan|2019| 2.375|7.697|         52.400|
|    Albania|2007| 4.634|9.142|         65.800|
|    Albania|2009| 5.485|9.262|         66.200|
|    Albania|2010| 5.269|9.303|         66.400|
|    Albania|2011| 5.867|9.331|         66.680|
|    Albania|2012| 5.510|9.347|         66.960|
|    Albania|2013| 4.551|9.359|         

In [9]:
# Ahora selecciono las columnas de interés del df2, las renombro y le añado la columna "Year"
df2 = df2.select("Country name", "Regional indicator", "Ladder score", "Logged GDP per capita", "Healthy life expectancy")
df2 = df2.withColumnsRenamed({"Country name": "Country", "Regional indicator": "Continental zone", "Ladder score": "Ladder", "Logged GDP per capita": "GDP", "Healthy life expectancy": "Life expectancy"})
df2 = df2.withColumn("Year", F.lit(2021))
# Voy a reordenar las columnas para que coincidan con el otro df
df2 = df2.select("Country", "Year", "Ladder", "GDP", "Life Expectancy", "Continental zone")
df2.show()

+--------------+----+------+------+---------------+--------------------+
|       Country|Year|Ladder|   GDP|Life Expectancy|    Continental zone|
+--------------+----+------+------+---------------+--------------------+
|       Finland|2021| 7.842|10.775|         72.000|      Western Europe|
|       Denmark|2021| 7.620|10.933|         72.700|      Western Europe|
|   Switzerland|2021| 7.571|11.117|         74.400|      Western Europe|
|       Iceland|2021| 7.554|10.878|         73.000|      Western Europe|
|   Netherlands|2021| 7.464|10.932|         72.400|      Western Europe|
|        Norway|2021| 7.392|11.053|         73.300|      Western Europe|
|        Sweden|2021| 7.363|10.867|         72.700|      Western Europe|
|    Luxembourg|2021| 7.324|11.647|         72.600|      Western Europe|
|   New Zealand|2021| 7.277|10.643|         73.400|North America and...|
|       Austria|2021| 7.268|10.906|         73.300|      Western Europe|
|     Australia|2021| 7.183|10.796|         73.900|

In [10]:
# Ahora hay que añadirle a df1 la columna de Zonas Continentales que le falta

# Para ello primero voy a crear un diccionario con las claves de los continentes y los valores de los países correspondientes:
# Primero agrupamos por Zona Continental las columnas que necesitamos, luego le pasamos la función para crear una lista con los valores correspondientes al índice de cada grupo
grupos = df2.select("Continental zone", "Country").groupBy("Continental zone").agg(F.collect_list("Country").alias("Paises")).collect()
#Ahora con un bucle llenamos el diccionario con la clave del continente y los valores las listas de países creadas para cada continente en el paso anterior
diccionario = {row["Continental zone"]: row["Paises"] for row in grupos}

In [11]:
#Ahora vamos a unir los dos dataframes en uno total sin las Zonas Continentales, y luego se lo añadiremos a todo desde df2

# Guardo una copia modificada de df2 para poder eliminar fácilmente la columna repetida después del join
df2_mod = df2.withColumnRenamed("Country", "Pais")
# Unimos df1 y df2 sin Zonas Continentales y lo ordenamos
df_union_parcial = df1.union(df2.select("Country", "Year", "Ladder", "GDP", "Life expectancy")).orderBy("Country", "Year")
# Hago un Left Join de df1 y df2_mod en base a las columnas de países y lo ordenamos de nuevo.
# El Left Join conseguirá hacer una relación one-to-many hacia la izquierda, consiguiendo replicar los valores de los continentes en el país que le corresponde.
df_maestro = df_union_parcial.join(df2_mod.select("Pais", "Continental zone"), df1.Country == df2_mod.Pais, "left_outer").orderBy("Country", "Year")
# Ahora quitamos la columna repetida de los países
df_maestro = df_maestro.drop("Pais")
df_maestro = df_maestro  \
                .withColumn("Year", F.col("Year").cast(T.IntegerType()))  \
                .withColumn("Ladder", F.col("Ladder").cast(T.FloatType()))  \
                .withColumn("GDP", F.col("GDP").cast(T.FloatType()))  \
                .withColumn("Life expectancy", F.col("Life expectancy").cast(T.FloatType()))

display(df_maestro)
df_maestro.show()

DataFrame[Country: string, Year: int, Ladder: float, GDP: float, Life expectancy: float, Continental zone: string]

+-----------+----+------+-----+---------------+--------------------+
|    Country|Year|Ladder|  GDP|Life expectancy|    Continental zone|
+-----------+----+------+-----+---------------+--------------------+
|Afghanistan|2008| 3.724| 7.37|           50.8|          South Asia|
|Afghanistan|2009| 4.402| 7.54|           51.2|          South Asia|
|Afghanistan|2010| 4.758|7.647|           51.6|          South Asia|
|Afghanistan|2011| 3.832| 7.62|          51.92|          South Asia|
|Afghanistan|2012| 3.783|7.705|          52.24|          South Asia|
|Afghanistan|2013| 3.572|7.725|          52.56|          South Asia|
|Afghanistan|2014| 3.131|7.718|          52.88|          South Asia|
|Afghanistan|2015| 3.983|7.702|           53.2|          South Asia|
|Afghanistan|2016|  4.22|7.697|           53.0|          South Asia|
|Afghanistan|2017| 2.662|7.697|           52.8|          South Asia|
|Afghanistan|2018| 2.694|7.692|           52.6|          South Asia|
|Afghanistan|2019| 2.375|7.697|   

#### Ahora ya tenemos todos los datos en un solo dataframe y uniformes.
<br>
<br>

## Apartado 3: Empezamos a responder las preguntas del enunciado

### *Pregunta 1: País más feliz del 2021*

In [12]:
# Filtramos por año, ordenamos descendentemente y mostramos el primero
df_maestro.filter(df_maestro.Year == 2021).orderBy(F.desc("Ladder")).show(1)

+-------+----+------+------+---------------+----------------+
|Country|Year|Ladder|   GDP|Life expectancy|Continental zone|
+-------+----+------+------+---------------+----------------+
|Finland|2021| 7.842|10.775|           72.0|  Western Europe|
+-------+----+------+------+---------------+----------------+
only showing top 1 row



#### Respuesta: El país más feliz del 2021 fue Finlandia
<br>
<br>

### *Pregunta 2: País más feliz por continente del 2021*

In [13]:
# Una de las opciones es crear un ranking basado en el Ladder segmentado por Zonas Continentales. Para ello, necesitamos una Windoe function.
from pyspark.sql.window import Window
# Guardamos aparte la información de partición por Zona Continental y ordenamiento descendente por claridad.
wind = Window.partitionBy("Continental zone").orderBy(F.desc("Ladder"))
# Ahora filtramos por año y creamos una columna con el ranking, añadiendo las columnas que queremos ver.
max_2021_conts = df_maestro.filter(df_maestro.Year == 2021).withColumn("Ladder rank", F.rank().over(wind)).select("Continental zone", "Country", "Ladder", "Ladder rank").orderBy(("Ladder rank"))
# Nos quedamos con los números 1 de cada partición.
max_2021_conts = max_2021_conts.filter(F.col("Ladder rank") == 1)


#### Respuesta:

In [14]:
max_2021_conts.show()

+--------------------+--------------------+------+-----------+
|    Continental zone|             Country|Ladder|Ladder rank|
+--------------------+--------------------+------+-----------+
|Central and Easte...|      Czech Republic| 6.965|          1|
|Commonwealth of I...|          Uzbekistan| 6.179|          1|
|           East Asia|Taiwan Province o...| 6.584|          1|
|Latin America and...|          Costa Rica| 7.069|          1|
|Middle East and N...|              Israel| 7.157|          1|
|North America and...|         New Zealand| 7.277|          1|
|          South Asia|               Nepal| 5.269|          1|
|      Southeast Asia|           Singapore| 6.377|          1|
|  Sub-Saharan Africa|           Mauritius| 6.049|          1|
|      Western Europe|             Finland| 7.842|          1|
+--------------------+--------------------+------+-----------+



# 

### *Pregunta 3: País que más veces ha sido el más feliz en la totalidad de años*

In [15]:
# Creamos otro ranking particionando por año esta vez.
wind2 = Window.partitionBy("Year").orderBy(F.desc("Ladder"))
total_ranking = df_maestro.withColumn("Ranking", F.rank().over(wind2)).filter(F.col("Ranking") == 1).select("Year","Country","Ladder", "Ranking")
total_ranking.show()

+----+-----------+------+-------+
|Year|    Country|Ladder|Ranking|
+----+-----------+------+-------+
|2005|    Denmark| 8.019|      1|
|2006|    Finland| 7.672|      1|
|2007|    Denmark| 7.834|      1|
|2008|    Denmark| 7.971|      1|
|2009|    Denmark| 7.683|      1|
|2010|    Denmark| 7.771|      1|
|2011|    Denmark| 7.788|      1|
|2012|Switzerland| 7.776|      1|
|2013|     Canada| 7.594|      1|
|2014|    Denmark| 7.508|      1|
|2015|     Norway| 7.603|      1|
|2016|    Finland|  7.66|      1|
|2017|    Finland| 7.788|      1|
|2018|    Finland| 7.858|      1|
|2019|    Finland|  7.78|      1|
|2020|    Finland| 7.889|      1|
|2021|    Finland| 7.842|      1|
+----+-----------+------+-------+



In [16]:
# Esta vez habrá repetición de valores, asi que contamos cuántas veces aparece cada país número 1
contador = total_ranking.groupBy("Country").count()
contador.orderBy(F.desc(F.col("count"))).show()

+-----------+-----+
|    Country|count|
+-----------+-----+
|    Finland|    7|
|    Denmark|    7|
|     Norway|    1|
|Switzerland|    1|
|     Canada|    1|
+-----------+-----+



#### Respuesta: Hay dos ganadores absolutos con el mismo número de años en primer puesto: Dinamarca y Finlandia
<br>
<br>

### *Pregunta 4: ¿Qué puesto de felicidad tiene el país con mayor GDP del mundo en 2020?*

In [17]:
# Ahora filtramos por el año 2020, hacemos ranking por Año y ordenaremos por Ladder
wind3 = Window.partitionBy("Year").orderBy(F.desc("Ladder"))
ladder_2020 = df_maestro.filter(F.col("Year") == 2020).withColumn("Ranking", F.rank().over(wind3)).select("Country", "GDP", "Ladder", "Ranking", "Year").orderBy(F.desc(F.col("GDP")))
# Mostramos el primero del ranking
ladder_2020.show(1)

23/11/12 00:11:47 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


+-------+------+------+-------+----+
|Country|   GDP|Ladder|Ranking|Year|
+-------+------+------+-------+----+
|Ireland|11.323| 7.035|     13|2020|
+-------+------+------+-------+----+
only showing top 1 row



#### Respuesta: Irlanda fue el país con más GDP en 2020, y su ránking de felicidad fue el nº 13
<br>
<br>

### *Pregunta 5: Porcentaje de variación del GDP medio mundial del 2020 al 2021*

In [18]:
# Calculamos los promedios de los dos años.
avg_2020 = df_maestro.filter(F.col("Year") == 2020).agg(F.avg("GDP")).withColumnRenamed('avg(GDP)', "Avg 2020")

In [19]:
avg_2021 = df_maestro.filter(F.col("Year") == 2021).agg(F.avg("GDP")).withColumnRenamed('avg(GDP)', "Avg 2021")

In [20]:
# Ahora, ya que no se puede operar entre distintos dataframes de sparks, hay que juntarlos en un solo dataframe para operar entre columnas
operacion = avg_2020.crossJoin(avg_2021)
operacion.show()

+-----------------+-----------------+
|         Avg 2020|         Avg 2021|
+-----------------+-----------------+
|9.751329579136588|9.432208032416018|
+-----------------+-----------------+



In [21]:
# Y ahora operamos dentro del dataframe
operacion = operacion.withColumn("Porcentaje", (F.col("Avg 2021") - F.col("Avg 2020")) / F.col("Avg 2020") * 100)
operacion.show()

+-----------------+-----------------+------------------+
|         Avg 2020|         Avg 2021|        Porcentaje|
+-----------------+-----------------+------------------+
|9.751329579136588|9.432208032416018|-3.272595230534974|
+-----------------+-----------------+------------------+



#### Respuesta: El GDP mundial disminuyó un 3.272595 aproximadamente.
<br>
<br>

### *Pregunta 6: ¿Cuál es el país con mayor expectativa de vida en 2021, y sus resultados en 2020 y 2019?*

In [22]:
# Haremos un ranking para la Life expectancy, y filtraremos por los tres años de uno en uno
wind4 = Window.partitionBy("Year").orderBy(F.desc("Life expectancy"))
elderlest = df_maestro.filter(F.col("Year") == 2021).withColumn("Life ranking", F.rank().over(wind4)).select("Country", "Year", "Life expectancy", "Life ranking")
elderlest.show(1)

+---------+----+---------------+------------+
|  Country|Year|Life expectancy|Life ranking|
+---------+----+---------------+------------+
|Singapore|2021|         76.953|           1|
+---------+----+---------------+------------+
only showing top 1 row



In [51]:
# Ahora usamos el país para filtrar
df_maestro.filter(df_maestro.Country == "Singapore").filter(df_maestro.Year == 2020).select("Country", "Year", "Life expectancy").show()

+-------+----+---------------+
|Country|Year|Life expectancy|
+-------+----+---------------+
+-------+----+---------------+



In [53]:
df_maestro.filter(df_maestro.Country == "Singapore").filter(df_maestro.Year == 2019).select("Country", "Year", "Life expectancy").show()

+---------+----+---------------+
|  Country|Year|Life expectancy|
+---------+----+---------------+
|Singapore|2019|           77.1|
+---------+----+---------------+



#### Respuesta: El país con mayor esperanza de vida en 2021 es Singapur, con 76,95 años. Para 2020 no tiene datos, y para 2019 tenía 77,1, algo mayor. El descenso podría estar causado por la epidemia del Covid en gran medida.