In [61]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
sesion_spark = SparkSession.builder.appName("pysparkdf").getOrCreate()
from pyspark.sql.window import Window

In [62]:

ruta_2021 = 'Datasets/world-happiness-report-2021.csv'
ruta_historico = 'Datasets/world-happiness-report.csv'

df_hapinnes_2021 = sesion_spark.read.csv(ruta_2021, header=True, inferSchema=True)
df_hapinnes_historico = sesion_spark.read.csv(ruta_historico, header=True, inferSchema=True)


In [63]:

# Mostrando los primeros registros de los DataFrames
df_hapinnes_2021.show(5)
df_hapinnes_historico.show(5)

# Mostrando el esquema del DataFrame de felicidad 2021
df_hapinnes_2021.printSchema()


+------------+------------------+------------+------------------------------+------------+------------+---------------------+--------------+-----------------------+----------------------------+----------+-------------------------+------------------------+--------------------------------+----------------------------+-------------------------------------+------------------------------------------+------------------------+---------------------------------------+-------------------+
|Country name|Regional indicator|Ladder score|Standard error of ladder score|upperwhisker|lowerwhisker|Logged GDP per capita|Social support|Healthy life expectancy|Freedom to make life choices|Generosity|Perceptions of corruption|Ladder score in Dystopia|Explained by: Log GDP per capita|Explained by: Social support|Explained by: Healthy life expectancy|Explained by: Freedom to make life choices|Explained by: Generosity|Explained by: Perceptions of corruption|Dystopia + residual|
+------------+------------------

In [64]:

# Ejercicio 1: País con mayor índice de felicidad
pais_mas_feliz_2021 = df_hapinnes_2021.orderBy(desc("Ladder score")).select("Country name", "Ladder score").limit(1)
pais_mas_feliz_2021.show()



+------------+------------+
|Country name|Ladder score|
+------------+------------+
|     Finland|       7.842|
+------------+------------+



In [65]:
#EJERCICIO2
df_continentes = sesion_spark.createDataFrame([
    ("Western Europe", "Europa"), 
    ("North America and ANZ", "America"),
    ("Middle East and North Africa", "Africa"), 
    ("Latin America and Caribbean", "America"),
    ("Central and Eastern Europe", "Europa"), 
    ("East Asia", "Asia"),
    ("Southeast Asia", "Asia"), 
    ("Commonwealth of Independent States", "Asia"),
    ("Sub-Saharan Africa", "Africa"), 
    ("South Asia","Asia")],
    ["Indicador Regional", "Continente"])
df_continentes.show(truncate=False)


+----------------------------------+----------+
|Indicador Regional                |Continente|
+----------------------------------+----------+
|Western Europe                    |Europa    |
|North America and ANZ             |America   |
|Middle East and North Africa      |Africa    |
|Latin America and Caribbean       |America   |
|Central and Eastern Europe        |Europa    |
|East Asia                         |Asia      |
|Southeast Asia                    |Asia      |
|Commonwealth of Independent States|Asia      |
|Sub-Saharan Africa                |Africa    |
|South Asia                        |Asia      |
+----------------------------------+----------+



In [66]:

columnas_continentes = ["Regional Indicator", "Continente"]
df_continentes = sesion_spark.createDataFrame(datos_continentes, columnas_continentes)


In [67]:
from pyspark.sql.functions import col, max, desc

# Unión de los DataFrames
df_union = df_hapinnes_2021.join(df_continentes, "Regional Indicator")

# Encontrar el puntaje máximo de felicidad por continente
df_max_ladder_por_continente = df_union.groupBy("Continente")\
    .agg(max("Ladder score").alias("Max Ladder Score"))

# Unir con el DataFrame original para obtener el nombre del país
df_pais_mas_feliz_por_continente = df_max_ladder_por_continente.join(df_union, 
                                                                     (df_union.Continente == df_max_ladder_por_continente.Continente) &
                                                                     (df_union["Ladder score"] == df_max_ladder_por_continente["Max Ladder Score"]))\
    .select(df_max_ladder_por_continente.Continente, "Country name", "Max Ladder Score")\
    .orderBy("Continente")

# Mostrar el resultado
df_pais_mas_feliz_por_continente.show()


+----------+--------------------+----------------+
|Continente|        Country name|Max Ladder Score|
+----------+--------------------+----------------+
|   América|         New Zealand|           7.277|
|      Asia|Taiwan Province o...|           6.584|
|    Europa|             Finland|           7.842|
|    África|              Israel|           7.157|
+----------+--------------------+----------------+



In [68]:
#EJERCICIO3
from pyspark.sql.functions import col, desc
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

windowSpec = Window.partitionBy("year").orderBy(desc("Life Ladder"))

pais_primer_lugar = df_hapinnes_historico.withColumn("rank", rank().over(windowSpec))\
    .filter(col("rank") == 1)\
    .groupBy("Country name").count()\
    .orderBy(desc("count"))

max_count = pais_primer_lugar.agg({"count": "max"}).collect()[0][0]
pais_primer_lugar.filter(col("count") == max_count).show()


+------------+-----+
|Country name|count|
+------------+-----+
|     Denmark|    7|
+------------+-----+



In [69]:
#EJERCICIO4
from pyspark.sql.functions import desc, rank
from pyspark.sql.window import Window

# Ventana para el ranking de felicidad
windowSpec = Window.partitionBy("year").orderBy(desc("Life Ladder"))


df_ranking_2020 = df_hapinnes_historico.filter(df_hapinnes_historico["year"] == 2020)\
    .withColumn("Happiness Rank", rank().over(windowSpec))

# Encontrar el país con el mayor GDP en 2020
pais_mayor_gdp_2020 = df_ranking_2020.orderBy(desc("Log GDP per capita")).select("Country name").limit(1)
pais_mayor_gdp_2020_nombre = pais_mayor_gdp_2020.collect()[0][0]  # Obtiene el nombre del país

# Obtener el puesto de felicidad de ese país
puesto_felicidad = df_ranking_2020.filter(df_ranking_2020["Country name"] == pais_mayor_gdp_2020_nombre)\
    .select("Country name", "Life Ladder", "Happiness Rank")

puesto_felicidad.show()


+------------+-----------+--------------+
|Country name|Life Ladder|Happiness Rank|
+------------+-----------+--------------+
|     Ireland|      7.035|            13|
+------------+-----------+--------------+



In [70]:
from pyspark.sql.functions import col

# Calculando el PIB promedio para 2020
gdp_promedio_2020 = df_hapinnes_historico.filter(df_hapinnes_historico["year"] == 2020)\
    .agg({"Log GDP per capita": "avg"})\
    .withColumnRenamed("avg(Log GDP per capita)", "avg_gdp_2020")

# Calculando el PIB promedio para 2021
gdp_promedio_2021 = df_hapinnes_2021.agg({"Logged GDP per capita": "avg"})\
    .withColumnRenamed("avg(Logged GDP per capita)", "avg_gdp_2021")

# Calculando la variación del PIB promedio del 2020 al 2021
variacion_gdp = gdp_promedio_2020.join(gdp_promedio_2021)\
    .withColumn("variacion_gdp", (col("avg_gdp_2021") - col("avg_gdp_2020")) / col("avg_gdp_2020") * 100)

variacion_gdp.show()




+-----------------+-----------------+-------------------+
|     avg_gdp_2020|     avg_gdp_2021|      variacion_gdp|
+-----------------+-----------------+-------------------+
|9.751329545454546|9.432208053691273|-3.2725946782511013|
+-----------------+-----------------+-------------------+

