In [2]:
import pyspark

#Creamos una sesion de spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max, row_number
from pyspark.sql.window import Window

#Creamos un objeto de spark session
spark = SparkSession.builder.appName("pysparkdf").getOrCreate()

In [3]:
#Leemos los ficheros cvs
df_21 = spark.read.option("header", "true").csv("C:/Users/CARLES/1.CARLES/FORMACIONES/BOOTCAMP KEEPCODING/5. Data Processing/PROYECTO FINAL/Datasets/world-happiness-report-2021.csv")
df_total = spark.read.option("header", "true").csv("C:/Users/CARLES/1.CARLES/FORMACIONES/BOOTCAMP KEEPCODING/5. Data Processing/PROYECTO FINAL/Datasets/world-happiness-report.csv")

In [4]:
#Mostramos los primeros 10 registros
df_total.show(10)

+------------+----+-----------+------------------+--------------+--------------------------------+----------------------------+----------+-------------------------+---------------+---------------+
|Country name|year|Life Ladder|Log GDP per capita|Social support|Healthy life expectancy at birth|Freedom to make life choices|Generosity|Perceptions of corruption|Positive affect|Negative affect|
+------------+----+-----------+------------------+--------------+--------------------------------+----------------------------+----------+-------------------------+---------------+---------------+
| Afghanistan|2008|      3.724|             7.370|         0.451|                          50.800|                       0.718|     0.168|                    0.882|          0.518|          0.258|
| Afghanistan|2009|      4.402|             7.540|         0.552|                          51.200|                       0.679|     0.190|                    0.850|          0.584|          0.237|
| Afghanistan|2

In [5]:
df_21.show(10)

+------------+--------------------+------------+------------------------------+------------+------------+---------------------+--------------+-----------------------+----------------------------+----------+-------------------------+------------------------+--------------------------------+----------------------------+-------------------------------------+------------------------------------------+------------------------+---------------------------------------+-------------------+
|Country name|  Regional indicator|Ladder score|Standard error of ladder score|upperwhisker|lowerwhisker|Logged GDP per capita|Social support|Healthy life expectancy|Freedom to make life choices|Generosity|Perceptions of corruption|Ladder score in Dystopia|Explained by: Log GDP per capita|Explained by: Social support|Explained by: Healthy life expectancy|Explained by: Freedom to make life choices|Explained by: Generosity|Explained by: Perceptions of corruption|Dystopia + residual|
+------------+--------------

In [6]:
# 1. ¿Cuál es el país más “feliz” del 2021 según la data? (considerar que la columna “Ladder score” mayor número más feliz es el país)

df_21.select("Country name", "Ladder score").orderBy("Ladder score", ascending = False).show(1)


+------------+------------+
|Country name|Ladder score|
+------------+------------+
|     Finland|       7.842|
+------------+------------+
only showing top 1 row



In [9]:
#2. ¿Cuál es el país más “feliz” del 2021 por continente según la data?

max_ladder_per_continent = df_21.groupBy("Regional indicator").agg(max(col("Ladder score")).alias("Max_Ladder_score"))

max_ladder_country = max_ladder_per_continent.join(df_21, (max_ladder_per_continent["Regional indicator"] == df_21["Regional indicator"]) & (max_ladder_per_continent["Max_Ladder_score"] 
== df_21["Ladder score"]), "inner")

max_ladder_country_result = max_ladder_country.select(max_ladder_per_continent["Regional indicator"], "Country name", "Ladder score")

max_ladder_country_result.show(1)

+------------------+------------+------------+
|Regional indicator|Country name|Ladder score|
+------------------+------------+------------+
|    Western Europe|     Finland|       7.842|
+------------------+------------+------------+
only showing top 1 row



In [10]:
# 3. ¿Cuál es el país que más veces ocupó el primer lugar en todos los años?

# Filtramos por el máximo Life Ladder de cada año
max_year_leader = df_total.groupBy("year").agg(max(col("Life Ladder")).alias("Max_Leader_score"))

# Renombramos la columna year para que ambas tablas no tengan el mismo nombre
max_year_leader = max_year_leader.withColumnRenamed("year", "year_df")

# Cruzamos ambas tablas por el año y el valor máximo creado anteriormente
max_leader_country = max_year_leader.join(df_total, (max_year_leader["year_df"] == df_total["year"]) & 
(max_year_leader["Max_Leader_score"] == df_total["Life Ladder"]), "inner")

# Filtramos año, Life Ladder y el país con el máximo Life Ladder
max_leader_country_select = max_leader_country.select("year", "Life Ladder", "Country name").orderBy("year", ascending = False)

# Agrupamos por país y hacemos un contar de cada pais
max_leader_country_filter = max_leader_country_select.groupBy("Country name").count().orderBy("count", ascending=False).show(1)


+------------+-----+
|Country name|count|
+------------+-----+
|     Denmark|    7|
+------------+-----+
only showing top 1 row



In [11]:
# 4. ¿Qué puesto de Felicidad tiene el país con mayor GDP del 2020?

# Ordenamos por Log GDP per capita y Life Ladder. Filtramos el año 2020
order_highest_GDP = df_total.select("Country name", "year", "Log GDP per capita").filter(df_total["year"] == "2020").orderBy("Log GDP per capita", ascending = False)
order_highest_Life_Ladder = df_total.select("Country name", "year", "Life Ladder").filter(df_total["year"] == "2020").orderBy("Life Ladder", ascending = False)

# Creamos dos window function para ordenar por GDP y Life Ladder
window_GDP = Window.orderBy(order_highest_GDP["Log GDP per capita"].desc())
window_Life_Ladder = Window.orderBy(order_highest_Life_Ladder["Life Ladder"].desc())

# Añadimos una nueva columna con la posición
row_number_highest_GDP = order_highest_GDP.withColumn("position_GDP", row_number().over(window_GDP))
row_number_highest_Life_Ladder = order_highest_Life_Ladder.withColumn("position_Life_Ladder", row_number().over(window_Life_Ladder))

#Cruzamos ambas tablas para asignar la posición de Felicidad al país con mayor GDP
Life_Ladder_position_highest_GDP = row_number_highest_GDP.join(
    row_number_highest_Life_Ladder,
    row_number_highest_GDP["Country name"] == row_number_highest_Life_Ladder["Country name"],
    "inner"
).orderBy("position_GDP", ascending=True).select(
    row_number_highest_GDP["Country name"], 
    row_number_highest_GDP["year"], 
    row_number_highest_GDP["position_GDP"],
    row_number_highest_Life_Ladder["position_Life_Ladder"]
).show(1)


+------------+----+------------+--------------------+
|Country name|year|position_GDP|position_Life_Ladder|
+------------+----+------------+--------------------+
|    Bulgaria|2020|           1|                  56|
+------------+----+------------+--------------------+
only showing top 1 row



In [12]:
# 5. ¿En que porcentaje a variado a nivel mundial el GDP promedio del 2020 respecto al 2021? ¿Aumentó o disminuyó?

# Hacemos un cast en los dos datasets, ya que no reconoce como número el campo a comparar
df_total_with_GDP_Cast = df_total.withColumn("Log GDP per capita cast", df_total["Log GDP per capita"].cast("float"))
df_total_with_GDP_Cast_21 = df_21.withColumn("Log GDP per capita cast", df_21["Logged GDP per capita"].cast("float"))

# Obtenemos el promedio de cada año
order_highest_GDP_2020 = df_total_with_GDP_Cast.filter(df_total_with_GDP_Cast["year"] == "2020").groupBy().avg("Log GDP per capita cast").collect()[0][0]
order_highest_GDP_2021 = df_total_with_GDP_Cast_21.groupBy().avg("Log GDP per capita cast").collect()[0][0]

# Calculamos la variación de los promedios entre el año 2021 y 2021
variacion = order_highest_GDP_2021 / order_highest_GDP_2020 - 1

# Imprimir la variación de los promedios entre el año 2020 y 2021
print("AVG 2020", order_highest_GDP_2020)
print("AVG 2020", order_highest_GDP_2021)
print("Variación entre 2021 y 2020",variacion)

AVG 2020 9.751329579136588
AVG 2020 9.432208032416018
Variación entre 2021 y 2020 -0.03272595230534969


In [13]:
# 6. ¿Cuál es el país con mayor expectativa de vide (“Healthy life expectancy at birth”)? Y ¿Cuánto tenia en ese indicador en el 2019?

# Seleccionamos el pais y Healthy life expectancy, y ordenamos de mayor a menor para ver cuál es el mayor
df_21_Healthy = df_21.select("Country name", "Healthy life expectancy").orderBy("Healthy life expectancy", ascending = False).show(1)

# Vemos que es Singapore, por lo que ahora filtramos el dataset por 2019 y por Singapore
df_19_Singapore_Healtyt = df_total.select("Country name", "year", "Healthy life expectancy at birth").filter((df_total["Country name"] == "Singapore") \
    & (df_total["year"] == "2019")).show()

+------------+-----------------------+
|Country name|Healthy life expectancy|
+------------+-----------------------+
|   Singapore|                 76.953|
+------------+-----------------------+
only showing top 1 row

+------------+----+--------------------------------+
|Country name|year|Healthy life expectancy at birth|
+------------+----+--------------------------------+
|   Singapore|2019|                          77.100|
+------------+----+--------------------------------+

