In [1]:
# Instalar SDK Java 8

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
# Descargar Spark 3.2.2

!wget -q https://archive.apache.org/dist/spark/spark-3.2.3/spark-3.2.3-bin-hadoop3.2.tgz

In [3]:
# Descomprimir el archivo descargado de Spark

!tar xf spark-3.2.3-bin-hadoop3.2.tgz

In [4]:
# Establecer las variables de entorno

import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.3-bin-hadoop3.2"

#### Hasta aqui la instalación, a partir de aquí el desarrollo del proyecto.

In [5]:
# Instalar la librería findspark

!pip install -q findspark

In [6]:
# Instalar pyspark

!pip install -q pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [7]:
### verificar la instalación ###
import findspark
findspark.init()

In [8]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("whr_spark").master("local[*]").getOrCreate()
spark


#### Lectura de ficheros de datos

In [9]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,DoubleType
#Leemos el cvs general de mi drive, solo los campos necesarios y con el formato necesaario
schema_whr = StructType() \
      .add("Country name",StringType(),True) \
      .add("year",IntegerType(),True) \
      .add("Life Ladder",DoubleType(),True) \
      .add("Log GDP per capita",DoubleType(),True) \
      .add("Social support",DoubleType(),True) \
      .add("Healthy life expectancy at birth",DoubleType(),True) \
      # .add("Freedom to make life choices",DoubleType(),True) \
      # .add("Generosity",DoubleType(),True) \
      # .add("Perceptions of corruption",DoubleType(),True) \
      # .add("Positive affect",DoubleType(),True) \
      # .add("Negative affect",DoubleType(),True)

df_whr = spark.read.format("csv") \
      .option("header", True) \
      .schema(schema_whr) \
      .load('/content/drive/MyDrive/practicas_BCKC/practica_BD_processing/Datasets/world-happiness-report.csv')

schema_whr21 = StructType() \
      .add("Country name",StringType(),True) \
      .add("Regional indicator",StringType(),True) \
      .add("Ladder score",DoubleType(),True) \
      .add("Standard error of ladder score",DoubleType(),True) \
      .add("upperwhisker",DoubleType(),True) \
      .add("lowerwhisker",DoubleType(),True) \
      .add("Logged GDP per capita",DoubleType(),True) \
      .add("Healthy life expectancy",DoubleType(),True) \
      # .add("Freedom to make life choices",DoubleType(),True) \
      # .add("Generosity",DoubleType(),True) \
      # .add("Perceptions of corruption",DoubleType(),True) \
      # .add("Ladder score in Dystopia",DoubleType(),True) \
      # .add("Explained by: Log GDP per capita",DoubleType(),True) \
      # .add("Explained by: Social support",DoubleType(),True) \
      # .add("Explained by: Healthy life expectancy",DoubleType(),True) \
      # .add("Explained by: Freedom to make life choices",DoubleType(),True) \
      # .add("Explained by: Generosity",DoubleType(),True) \
      # .add("Explained by: Perceptions of corruption",DoubleType(),True) \
      # .add("Dystopia + residual",DoubleType(),True)

df_whr2021 = spark.read.format("csv") \
      .option("header", True) \
      .schema(schema_whr21) \
      .load('/content/drive/MyDrive/practicas_BCKC/practica_BD_processing/Datasets/world-happiness-report-2021.csv')


schema_countries = StructType() \
      .add("country",StringType(),True) \
      .add("region",StringType(),True)

df_countries = spark.read.format("csv") \
      .option("header", True) \
      .schema(schema_countries) \
      .load('/content/drive/MyDrive/practicas_BCKC/practica_BD_processing/Datasets/list-of-countries-by-continent-2024.csv')
# df_whr.show(2)
# df_whr2021.show(2)
# df_countries.show(2)

#### Limpìeza y estandarización de datos

In [10]:
from pyspark.sql import functions as F

In [11]:
# Renombrar las columnas para normalizarlas
df_whr = df_whr.select('Country name','Life Ladder','Log GDP per capita','Healthy life expectancy at birth','year')
df_whr = df_whr.toDF(*("country", "ladder","gdp","healthy", "year"))
# df_whr.show(2)
df_whr2021 = df_whr2021.select('Country name','Ladder score','Logged GDP per capita','Healthy life expectancy')
df_whr2021 = df_whr2021.toDF(*("country", "ladder","gdp","healthy"))
# df_whr2021.show(2)


In [12]:
# Añadir el año de los datos en una columna
df_whr2021 = df_whr2021.withColumn("year", F.lit(2021))
# df_whr2021.show(2)

In [13]:
# Unir los dataframes ahora que tienen la misma estructura y tendremos todos los datos de todos los años
df_whr_all = df_whr.union(df_whr2021)
df_whr_all.show(2)


+-----------+------+----+-------+----+
|    country|ladder| gdp|healthy|year|
+-----------+------+----+-------+----+
|Afghanistan| 3.724|7.37|   50.8|2008|
|Afghanistan| 4.402|7.54|   51.2|2009|
+-----------+------+----+-------+----+
only showing top 2 rows



In [14]:
# completar los datos del dataframe con la región

df_whr_c = df_whr_all.join(df_countries,['country'],"left")
df_whr_c.show(2)


+-----------+------+----+-------+----+------+
|    country|ladder| gdp|healthy|year|region|
+-----------+------+----+-------+----+------+
|Afghanistan| 3.724|7.37|   50.8|2008|  Asia|
|Afghanistan| 4.402|7.54|   51.2|2009|  Asia|
+-----------+------+----+-------+----+------+
only showing top 2 rows



In [15]:
# Observar los datos si contienen nulos, para eviar errores posteriores
print(f'nulos en Region: {df_whr_c.filter(df_whr_c.region.isNull()).count()}')
print(f'nulos en ladder: {df_whr_c.filter(df_whr_c.ladder.isNull()).count()}')
print(f'nulos en gdp: {df_whr_c.filter(df_whr_c.gdp.isNull()).count()}')
print(f'nulos en healthy: {df_whr_c.filter(df_whr_c.healthy.isNull()).count()}')
print(f'nulos en year: {df_whr_c.filter(df_whr_c.year.isNull()).count()}')

nulos en Region: 0
nulos en ladder: 0
nulos en gdp: 36
nulos en healthy: 55
nulos en year: 0


#### Soluciones.
Todos las soluciones las voy a buscar mediante objetos SQL de spark, pero tambien serian posibles mediante el objeto windows, en la solución a la primera pregunta pong un ejemplo.

1. ¿Cuál es el país más “feliz” del 2021 según la data? (considerar que la columna “Ladder score” mayor número más feliz es el país)

In [16]:
# Utilizando Window
from pyspark.sql import Window
w = Window.orderBy(F.col("ladder").desc())
df_res_1= df_whr_c.filter(F.col("year") == 2021).withColumn("drank", F.rank().over(w)).filter(F.col("drank") == 1)
# df_res_1.show()
# *************************************************************
print(f"El país más feliz del 2021 es: { df_res_1.select('country').first().country} ")

El país más feliz del 2021 es: Finland 


In [17]:
# Utilizando SQL
# creando la vista temporal
df_whr_c.createOrReplaceTempView("temp_whrAll")

In [18]:
Qselect = "select country, ladder from " +\
     " (select *, row_number() OVER (ORDER BY ladder DESC) as rn " +\
     " FROM temp_whrAll WHERE year = 2021) tmp where rn = 1"

df_res_1= spark.sql(Qselect)
# df_res_1.show()
print(f"El país más feliz del 2021 es: { df_res_1.select('country').first().country} ")

El país más feliz del 2021 es: Finland 


2. ¿Cuál es el país más “feliz” del 2021 por continente según la data?

In [19]:
Qselect = "select region, country, ladder from " +\
     " (select *, row_number() OVER (PARTITION BY region ORDER BY ladder DESC) as rn " +\
     " FROM temp_whrAll WHERE year = 2021) tmp where rn = 1 order by region asc"
df_res_2= spark.sql(Qselect)
df_res_2.show()

+-------------+-----------+------+
|       region|    country|ladder|
+-------------+-----------+------+
|       Africa|  Mauritius| 6.049|
|         Asia|     Israel| 7.157|
|       Europe|    Finland| 7.842|
|North America|     Canada| 7.103|
|      Oceania|New Zealand| 7.277|
|South America|    Uruguay| 6.431|
+-------------+-----------+------+



3. ¿Cuál es el país que más veces ocupó el primer lugar en todos los años?

In [20]:

Qselect =  "select country, repe from " +\
      "(select country, count(*) as repe from " +\
     " (select *, row_number() OVER (PARTITION BY year ORDER BY ladder DESC) as rn " +\
     " FROM temp_whrAll) tmp where rn = 1 group by country)"+\
     " WHERE repe = "+\
      "(select max(nveces) from (select count(*) as nveces from " +\
     " (select *, row_number() OVER (PARTITION BY year ORDER BY ladder DESC) as rn " +\
     " FROM temp_whrAll) tmp where rn = 1 group by country))"

df_res_3= spark.sql(Qselect)
# df_res_3.show()
# como pueden ser varios los registros, creo lista de valores del campo necesario
countrys=df_res_3.select(df_res_3.country).rdd.flatMap(lambda x: x).collect()
# *************************************************************
print(f'Los paises que mas han ocupado el primer lugar son: {", ".join(countrys)}')

Los paises que mas han ocupado el primer lugar son: Finland, Denmark


4. ¿Qué puesto de Felicidad tiene el país con mayor GDP del 2020?

In [21]:
Qselect = "select country, rn from " +\
     " (select *, row_number() OVER (ORDER BY ladder DESC) as rn " +\
     " FROM temp_whrAll WHERE year = 2021) tmp " +\
     " where country = " +\
     " (select country from " +\
     " (select *, row_number() OVER (ORDER BY gdp DESC) as rn " +\
     " FROM temp_whrAll WHERE year = 2020) tmp where rn = 1)"

df_res_4= spark.sql(Qselect)
# Aqui el resultado solo puede ser uno, otro metodo de acceso seria ir a la fila y sus valores.
row_list = df_res_4.collect()
print(f'El Pais con mejor ranking GDP del 2020 es {row_list[0].__getitem__("country")} y ocupa la posición {row_list[0].__getitem__("rn")} del ranking de paises más feleces del 2021')

El Pais con mejor ranking GDP del 2020 es Ireland y ocupa la posición 15 del ranking de paises más feleces del 2021


5. ¿En que porcentaje a variado a nivel mundial el GDP promedio del 2020 respecto al 2021? ¿Aumentó o disminuyó?

In [22]:
Qselect = "select (select avg(gdp) from " +\
     " temp_whrAll WHERE year = 2021 and gdp is not null) as gdp2021, (select avg(gdp) from " +\
     " temp_whrAll WHERE year = 2020 and gdp is not null) as gdp2020 "

df_res_5= spark.sql(Qselect)
# df_res_w.show()
row_list = df_res_5.collect()
promedio2021 = row_list[0].__getitem__("gdp2021")
promedio2020 = row_list[0].__getitem__("gdp2020")
if promedio2021 > promedio2020:
  print(f'El el promedio de gdp anual aumentó en un:  {"{:.2f}".format((promedio2021-promedio2020)/promedio2021*100)} %')
else:
  print(f'El el promedio de gdp anual disminuyó en un:  {"{:.2f}".format(abs((promedio2021-promedio2020)/promedio2021*100))} %')

El el promedio de gdp anual disminuyó en un:  3.38 %


6. ¿Cuál es el país con mayor expectativa de vide (“Healthy life expectancy at birth”)? Y ¿Cuánto tenia en ese indicador en el 2019?

In [23]:
Qselect = "select country, healthy as healthy19 from " +\
      " temp_whrAll WHERE year = 2021 and country = " +\
      " (select country from " +\
      " (select *, row_number() OVER (ORDER BY healthy DESC) as rn " +\
      " FROM temp_whrAll WHERE year = 2021) tmp where rn = 1)"

df_res_6= spark.sql(Qselect)
# df_res_6.show()
# Aqui el resultado solo puede ser uno, otro metodo de acceso seria ir a la fila y sus valores. healthy
row_list = df_res_6.collect()
print(f'El Pais  con mayor expectativa de vida en 2021 es {row_list[0].__getitem__("country")} y en 2019 su esperanza de vida era de {row_list[0].__getitem__("healthy19")}')

El Pais  con mayor expectativa de vida en 2021 es Iceland y en 2019 su esperanza de vida era de 0.983
