Parte 2 Obligatorio_ Análisis de datos con Spark

Preparación del ambiente

In [1]:
# Instalación de Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Descargar e instalar Spark
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

# Instalación de findspark
!pip install -q findspark

# Configuración de las variables de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

# Inicialización de findspark
import findspark
findspark.init()

In [4]:
from pyspark.sql.functions import *
from google.colab import drive
drive.mount('/content/drive')

# Crear una sesión de Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
# Rutas en Drive
RawInput = '/content/drive/MyDrive/BigData/RawData'
RefinedOutput = '/content/drive/MyDrive/BigData/SparkRefinedData'

Carga de tablas

In [5]:
# Ruta a tablas
costLivingIndex_access = RawInput + '/Cost of living index by country 2020.csv'
countriesAgeStructure_access = RawInput + '/Coutries age structure.csv'
crimeIndex_access = RawInput + '/Crime index by countries 2020.csv'
healthCareIndex_access = RawInput + '/Health care index by countries 2020.csv'
propertiesPriceIndex_access= RawInput + '/Properties price index by countries 2020.csv'
populationDensityIndex_access= RawInput + '/Pupulation density by countries.csv'
qualityLifeIndex_access = RawInput + '/Quality of life index by countries 2020.csv'



# Cargar la tabla de datos
df_Cost_of_living_index_by_country_2020 = spark.read.csv(costLivingIndex_access, header=True, inferSchema=True)
df_Coutries_age_structure= spark.read.csv(countriesAgeStructure_access, header=True, inferSchema=True)
df_Crime_index_by_countries_2020=spark.read.csv(crimeIndex_access, header=True, inferSchema=True)
df_Health_care_index_by_countries_2020=spark.read.csv(healthCareIndex_access, header=True, inferSchema=True)
df_Properties_price_index_by_countries_2020=spark.read.csv(propertiesPriceIndex_access, header=True, inferSchema=True)
df_Pupulation_density_by_countries=spark.read.csv(populationDensityIndex_access, header=True, inferSchema=True)
df_Quality_of_life_index_by_countries_2020=spark.read.csv(qualityLifeIndex_access, header=True, inferSchema=True)



////////////////////////////////////////////////////
Cost_of_living_index_by_country_2020.csv
////////////////////////////////////////////////////

In [None]:
from pyspark.sql.functions import col, count, when


# Visualización de las primeras 10 líneas
df_Cost_of_living_index_by_country_2020.show(10)



+-----------+--------------------+----------+------------------------------+---------------+----------------------+----------------------------+
|    Country|Cost of Living Index|Rent Index|Cost of Living Plus Rent Index|Groceries Index|Restaurant Price Index|Local Purchasing Power Index|
+-----------+--------------------+----------+------------------------------+---------------+----------------------+----------------------------+
|Switzerland|               122.4|     50.25|                         87.89|         120.27|                123.01|                      119.53|
|     Norway|              101.43|     36.15|                         70.21|          91.14|                109.28|                       88.38|
|    Iceland|              100.48|     46.95|                         74.88|          86.89|                113.74|                       79.44|
|      Japan|               83.35|     25.97|                          55.9|          81.82|                 48.95|               

In [None]:
# Forma del dataset (conteo de filas y columnas)
row_count = df_Cost_of_living_index_by_country_2020.count()
column_count = len(df_Cost_of_living_index_by_country_2020.columns)
print(f"El dataset tiene {row_count} filas y {column_count} columnas.")


El dataset tiene 132 filas y 7 columnas.


In [None]:

# Verificar valores nulos por columna
df_Cost_of_living_index_by_country_2020.select([count(when(col(c).isNull(), c)).alias(c) for c in df_Cost_of_living_index_by_country_2020.columns]).show()


+-------+--------------------+----------+------------------------------+---------------+----------------------+----------------------------+
|Country|Cost of Living Index|Rent Index|Cost of Living Plus Rent Index|Groceries Index|Restaurant Price Index|Local Purchasing Power Index|
+-------+--------------------+----------+------------------------------+---------------+----------------------+----------------------------+
|      0|                   0|         0|                             0|              0|                     0|                           0|
+-------+--------------------+----------+------------------------------+---------------+----------------------+----------------------------+



In [None]:

# Identificación de columnas, contenidos no nulos y tipos de datos
df_Cost_of_living_index_by_country_2020.printSchema()


root
 |-- Country: string (nullable = true)
 |-- Cost of Living Index: double (nullable = true)
 |-- Rent Index: double (nullable = true)
 |-- Cost of Living Plus Rent Index: double (nullable = true)
 |-- Groceries Index: double (nullable = true)
 |-- Restaurant Price Index: double (nullable = true)
 |-- Local Purchasing Power Index: double (nullable = true)



In [None]:

# Visualización general de la tabla
df_Cost_of_living_index_by_country_2020.show(truncate=False)


+-------------+--------------------+----------+------------------------------+---------------+----------------------+----------------------------+
|Country      |Cost of Living Index|Rent Index|Cost of Living Plus Rent Index|Groceries Index|Restaurant Price Index|Local Purchasing Power Index|
+-------------+--------------------+----------+------------------------------+---------------+----------------------+----------------------------+
|Switzerland  |122.4               |50.25     |87.89                         |120.27         |123.01                |119.53                      |
|Norway       |101.43              |36.15     |70.21                         |91.14          |109.28                |88.38                       |
|Iceland      |100.48              |46.95     |74.88                         |86.89          |113.74                |79.44                       |
|Japan        |83.35               |25.97     |55.9                          |81.82          |48.95                 |8

In [None]:

# Verificar duplicados en la columna 'Country'
has_duplicates = df_Cost_of_living_index_by_country_2020.select("Country").distinct().count() < df_Cost_of_living_index_by_country_2020.count()
print(f"¿Existen duplicados en la columna 'Country'? {has_duplicates}")


¿Existen duplicados en la columna 'Country'? False


In [None]:

# Estadísticas descriptivas de las métricas numéricas
df_Cost_of_living_index_by_country_2020.describe().show()


+-------+-----------+--------------------+------------------+------------------------------+------------------+----------------------+----------------------------+
|summary|    Country|Cost of Living Index|        Rent Index|Cost of Living Plus Rent Index|   Groceries Index|Restaurant Price Index|Local Purchasing Power Index|
+-------+-----------+--------------------+------------------+------------------------------+------------------+----------------------+----------------------------+
|  count|        132|                 132|               132|                           132|               132|                   132|                         132|
|   mean|       null|     49.214696969697| 18.08946969696969|             34.32727272727272| 42.58303030303032|     42.27143939393938|           50.32469696969696|
| stddev|       null|  18.404922318671357|12.808608169043291|            14.989051988972005|17.302168065152994|    22.423585325017957|           27.35711145636023|
|    min|Afghani

In [None]:

# Eliminar el país Kosovo (Disputed Territory)
df_Cost_of_living_index_by_country_2020 = df_Cost_of_living_index_by_country_2020.filter(~col("Country").rlike(r'\(|\)'))


In [None]:

# Verificar que se eliminó correctamente y revisar estructura
print(f"Cantidad de filas después de la eliminación: {df_Cost_of_living_index_by_country_2020.count()}")
df_Cost_of_living_index_by_country_2020.printSchema()


Cantidad de filas después de la eliminación: 131
root
 |-- Country: string (nullable = true)
 |-- Cost of Living Index: double (nullable = true)
 |-- Rent Index: double (nullable = true)
 |-- Cost of Living Plus Rent Index: double (nullable = true)
 |-- Groceries Index: double (nullable = true)
 |-- Restaurant Price Index: double (nullable = true)
 |-- Local Purchasing Power Index: double (nullable = true)



In [None]:

# Eliminar espacios en blanco en las columnas de texto
from pyspark.sql.functions import trim
df_Cost_of_living_index_by_country_2020 = df_Cost_of_living_index_by_country_2020.select([trim(col(c)).alias(c) if df_Cost_of_living_index_by_country_2020.schema[c].dataType == "StringType" else col(c) for c in df_Cost_of_living_index_by_country_2020.columns])


In [None]:

# Guardar la tabla refinada en la carpeta RefinedData
df_Cost_of_living_index_by_country_2020.write.csv(RefinedOutput + "/RefData_Cost_of_living_index_by_country_2020.csv", header=True)



////////////////////////////////////////////////////
Coutries_age_structure.csv
////////////////////////////////////////////////////

In [None]:
from pyspark.sql.functions import col, count, when, regexp_replace, trim



# Visualización de las primeras 10 líneas
df_Coutries_age_structure.show(10)


+--------+-----------------+------------------+------------------+
| Country|Age 0 to 14 Years|Age 15 to 64 Years|Age above 65 Years|
+--------+-----------------+------------------+------------------+
|   Japan|           12.90%|            60.10%|               27%|
|   Italy|           13.50%|            63.50%|               23%|
|Portugal|           13.60%|            64.90%|               22%|
| Germany|           13.10%|            65.50%|               22%|
| Finland|           16.40%|            62.40%|               21%|
|Bulgaria|           14.20%|            65.00%|               21%|
|  Greece|           14.20%|            65.40%|               20%|
|  Sweden|           17.50%|            62.50%|               20%|
|  Latvia|           15.40%|            64.80%|               20%|
| Denmark|           16.50%|            63.80%|               20%|
+--------+-----------------+------------------+------------------+
only showing top 10 rows



In [None]:

# Forma del dataset (cantidad de filas y columnas)
row_count = df_Coutries_age_structure.count()
column_count = len(df_Coutries_age_structure.columns)
print(f"El dataset tiene {row_count} filas y {column_count} columnas.")


El dataset tiene 191 filas y 4 columnas.


In [None]:

# Identificación de columnas, contenidos no nulos y tipos de datos
df_Coutries_age_structure.printSchema()


root
 |-- Country: string (nullable = true)
 |-- Age 0 to 14 Years: string (nullable = true)
 |-- Age 15 to 64 Years: string (nullable = true)
 |-- Age above 65 Years: string (nullable = true)



In [None]:

# Verificar duplicados en la columna 'Country'
has_duplicates = df_Coutries_age_structure.select("Country").distinct().count() < df_Coutries_age_structure.count()
print(f"¿Existen duplicados en la columna 'Country'? {has_duplicates}")


¿Existen duplicados en la columna 'Country'? False


In [None]:

# Estadísticas descriptivas iniciales
df_Coutries_age_structure.describe().show()


+-------+-----------+-----------------+------------------+------------------+
|summary|    Country|Age 0 to 14 Years|Age 15 to 64 Years|Age above 65 Years|
+-------+-----------+-----------------+------------------+------------------+
|  count|        191|              191|               191|               191|
|   mean|       null|             null|              null|              null|
| stddev|       null|             null|              null|              null|
|    min|Afghanistan|           11.50%|            47.20%|                1%|
|    max|   Zimbabwe|           50.20%|            85.00%|                9%|
+-------+-----------+-----------------+------------------+------------------+



In [None]:

# Convertir columnas de porcentaje (string) a float
columns_to_convert = ['Age 0 to 14 Years', 'Age 15 to 64 Years', 'Age above 65 Years']
for col_name in columns_to_convert:
    df_Coutries_age_structure = df_Coutries_age_structure.withColumn(col_name, regexp_replace(col(col_name), '%', '').cast('float'))

# Visualización después de la conversión
df_Coutries_age_structure.show(15)


+--------+-----------------+------------------+------------------+
| Country|Age 0 to 14 Years|Age 15 to 64 Years|Age above 65 Years|
+--------+-----------------+------------------+------------------+
|   Japan|             12.9|              60.1|              27.0|
|   Italy|             13.5|              63.5|              23.0|
|Portugal|             13.6|              64.9|              22.0|
| Germany|             13.1|              65.5|              22.0|
| Finland|             16.4|              62.4|              21.0|
|Bulgaria|             14.2|              65.0|              21.0|
|  Greece|             14.2|              65.4|              20.0|
|  Sweden|             17.5|              62.5|              20.0|
|  Latvia|             15.4|              64.8|              20.0|
| Denmark|             16.5|              63.8|              20.0|
|  France|             18.1|              62.2|              20.0|
| Croatia|             14.7|              65.6|              2

In [None]:

# Estadísticas descriptivas después de la conversión
df_Coutries_age_structure.describe().show()


+-------+-----------+------------------+------------------+------------------+
|summary|    Country| Age 0 to 14 Years|Age 15 to 64 Years|Age above 65 Years|
+-------+-----------+------------------+------------------+------------------+
|  count|        191|               191|               191|               191|
|   mean|       null| 27.64722510532559|63.622617751515975| 8.780104712041885|
| stddev|       null|10.535807221661532| 6.584602865457367| 6.154486766367989|
|    min|Afghanistan|              11.5|              47.2|               1.0|
|    max|   Zimbabwe|              50.2|              85.0|              27.0|
+-------+-----------+------------------+------------------+------------------+



In [None]:

# Eliminar filas donde el país tiene paréntesis (ej. territorios)
df_Coutries_age_structure = df_Coutries_age_structure.filter(~col("Country").rlike(r'\(|\)'))


In [None]:

# Eliminar espacios en blanco de columnas de texto
df_Coutries_age_structure = df_Coutries_age_structure.select([trim(col(c)).alias(c) if df_Coutries_age_structure.schema[c].dataType == "StringType" else col(c) for c in df_Coutries_age_structure.columns])


In [None]:

# Verificar cuántos países quedaron después de la eliminación
print(f"Número de filas después de la eliminación: {df_Coutries_age_structure.count()}")
df_Coutries_age_structure.printSchema()


Número de filas después de la eliminación: 186
root
 |-- Country: string (nullable = true)
 |-- Age 0 to 14 Years: float (nullable = true)
 |-- Age 15 to 64 Years: float (nullable = true)
 |-- Age above 65 Years: float (nullable = true)



In [None]:

# Estadísticas descriptivas finales
df_Coutries_age_structure.describe().show()


+-------+-----------+------------------+------------------+------------------+
|summary|    Country| Age 0 to 14 Years|Age 15 to 64 Years|Age above 65 Years|
+-------+-----------+------------------+------------------+------------------+
|  count|        186|               186|               186|               186|
|   mean|       null| 27.75709674178913|63.598494580996935| 8.693548387096774|
| stddev|       null|10.588481760691469| 6.646668062790967| 6.151041334851478|
|    min|Afghanistan|              11.5|              47.2|               1.0|
|    max|   Zimbabwe|              50.2|              85.0|              27.0|
+-------+-----------+------------------+------------------+------------------+



In [None]:

# Guardar el dataset refinado en la carpeta RefinedData
df_Coutries_age_structure.write.csv(RefinedOutput +"/RefData_Coutries_age_structure.csv", header=True)

////////////////////////////////////////////////////
Crime_index_by_countries_2020.csv
////////////////////////////////////////////////////

In [None]:

from pyspark.sql.functions import col, regexp_replace, trim



# Visualizar las primeras 10 líneas
df_Crime_index_by_countries_2020.show(10)


+-------------------+-----------+------------+
|            Country|Crime Index|Safety Index|
+-------------------+-----------+------------+
|          Venezuela|      84.49|       15.51|
|   Papua New Guinea|      81.93|       18.07|
|       South Africa|      77.49|       22.51|
|        Afghanistan|      76.23|       23.77|
|           Honduras|      76.11|       23.89|
|Trinidad And Tobago|      73.19|       26.81|
|             Brazil|      68.88|       31.12|
|               Peru|      68.15|       31.85|
|        El Salvador|      67.96|       32.04|
|             Guyana|      67.66|       32.34|
+-------------------+-----------+------------+
only showing top 10 rows



In [None]:

# Forma del dataset: número de filas y columnas
row_count = df_Crime_index_by_countries_2020.count()
column_count = len(df_Crime_index_by_countries_2020.columns)
print(f"El dataset tiene {row_count} filas y {column_count} columnas.")


El dataset tiene 129 filas y 3 columnas.


In [None]:

# Información del dataset: columnas, contenidos no nulos y tipos de datos
df_Crime_index_by_countries_2020.printSchema()


root
 |-- Country: string (nullable = true)
 |-- Crime Index: double (nullable = true)
 |-- Safety Index: double (nullable = true)



In [None]:

# Verificar si existen duplicados en la columna 'Country'
has_duplicates = df_Crime_index_by_countries_2020.select("Country").distinct().count() < df_Crime_index_by_countries_2020.count()
print(f"¿Existen duplicados en la columna 'Country'? {has_duplicates}")


¿Existen duplicados en la columna 'Country'? False


In [None]:

# Estadísticas descriptivas
df_Crime_index_by_countries_2020.describe().show()


+-------+-----------+------------------+------------------+
|summary|    Country|       Crime Index|      Safety Index|
+-------+-----------+------------------+------------------+
|  count|        129|               129|               129|
|   mean|       null| 44.22248062015505| 55.77751937984494|
| stddev|       null|15.690481441337273|15.690481441337274|
|    min|Afghanistan|             11.86|             15.51|
|    max|   Zimbabwe|             84.49|             88.14|
+-------+-----------+------------------+------------------+



In [None]:

# Filtrar filas donde 'Country' tiene paréntesis (ej., Kosovo)
df_Crime_index_by_countries_2020 = df_Crime_index_by_countries_2020.filter(~col("Country").rlike(r'\(|\)'))


In [None]:

# Aplicar `trim` para eliminar espacios en blanco de columnas de texto
df_Crime_index_by_countries_2020 = df_Crime_index_by_countries_2020.select([trim(col(c)).alias(c) if df_Crime_index_by_countries_2020.schema[c].dataType == "StringType" else col(c) for c in df_Crime_index_by_countries_2020.columns])

# Ver cuántos países quedaron luego del filtrado
print(f"Número de filas después del filtrado: {df_Crime_index_by_countries_2020.count()}")
df_Crime_index_by_countries_2020.printSchema()


Número de filas después del filtrado: 128
root
 |-- Country: string (nullable = true)
 |-- Crime Index: double (nullable = true)
 |-- Safety Index: double (nullable = true)



In [None]:

# Estadísticas descriptivas finales
df_Crime_index_by_countries_2020.describe().show()


+-------+-----------+------------------+------------------+
|summary|    Country|       Crime Index|      Safety Index|
+-------+-----------+------------------+------------------+
|  count|        128|               128|               128|
|   mean|       null|44.310859375000014| 55.68914062499998|
| stddev|       null|15.719866212657415|15.719866212657417|
|    min|Afghanistan|             11.86|             15.51|
|    max|   Zimbabwe|             84.49|             88.14|
+-------+-----------+------------------+------------------+



In [None]:

# Guardar la tabla resultante en la carpeta RefinedData
df_Crime_index_by_countries_2020.write.csv(RefinedOutput +"/RefData_Crime_index_by_countries_2020.csv", header=True)

////////////////////////////////////////////////////
Health_care_index_by_countries_2020.csv
////////////////////////////////////////////////////

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import trim



# Visualizar las primeras 10 líneas
df_Health_care_index_by_countries_2020.show(10)


+-----------+-----------------+----------------------+
|    Country|Health Care Index|Health Care Exp. Index|
+-----------+-----------------+----------------------+
|     Taiwan|            86.71|                159.66|
|South Korea|            81.97|                149.94|
|      Japan|            81.14|                148.24|
|    Denmark|             80.0|                147.47|
|     France|            79.99|                146.81|
|      Spain|            78.88|                145.24|
|    Austria|            78.73|                144.24|
|   Thailand|            77.95|                142.04|
|  Australia|            77.38|                140.97|
|    Finland|            75.79|                138.01|
+-----------+-----------------+----------------------+
only showing top 10 rows



In [None]:

# Forma del dataset: número de filas y columnas
row_count = df_Health_care_index_by_countries_2020.count()
column_count = len(df_Health_care_index_by_countries_2020.columns)
print(f"El dataset tiene {row_count} filas y {column_count} columnas.")


El dataset tiene 93 filas y 3 columnas.


In [None]:

# Información del dataset: columnas, contenidos no nulos y tipos de datos
df_Health_care_index_by_countries_2020.printSchema()


root
 |-- Country: string (nullable = true)
 |-- Health Care Index: double (nullable = true)
 |-- Health Care Exp. Index: double (nullable = true)



In [None]:

# Verificar si existen duplicados en la columna 'Country'
has_duplicates = df_Health_care_index_by_countries_2020.select("Country").distinct().count() < df_Health_care_index_by_countries_2020.count()
print(f"¿Existen duplicados en la columna 'Country'? {has_duplicates}")


¿Existen duplicados en la columna 'Country'? False


In [None]:

# Estadísticas descriptivas
df_Health_care_index_by_countries_2020.describe().show()


+-------+-------+------------------+----------------------+
|summary|Country| Health Care Index|Health Care Exp. Index|
+-------+-------+------------------+----------------------+
|  count|     93|                93|                    93|
|   mean|   null| 63.41419354838709|     114.1974193548387|
| stddev|   null|10.317795657164439|     20.35005602509253|
|    min|Albania|             39.66|                 69.14|
|    max|Vietnam|             86.71|                159.66|
+-------+-------+------------------+----------------------+



In [None]:
from pyspark.sql.functions import col

# Renombrar todas las columnas para reemplazar espacios y puntos
df_Health_care_index_by_countries_2020 = df_Health_care_index_by_countries_2020.select(
    [col(f"`{c}`").alias(c.replace(" ", "_").replace(".", "_")) for c in df_Health_care_index_by_countries_2020.columns]
)

# Verificar el esquema actualizado
df_Health_care_index_by_countries_2020.printSchema()

# Mostrar las primeras líneas para confirmar
df_Health_care_index_by_countries_2020.show(10)

root
 |-- Country: string (nullable = true)
 |-- Health_Care_Index: double (nullable = true)
 |-- Health_Care_Exp__Index: double (nullable = true)

+-----------+-----------------+----------------------+
|    Country|Health_Care_Index|Health_Care_Exp__Index|
+-----------+-----------------+----------------------+
|     Taiwan|            86.71|                159.66|
|South Korea|            81.97|                149.94|
|      Japan|            81.14|                148.24|
|    Denmark|             80.0|                147.47|
|     France|            79.99|                146.81|
|      Spain|            78.88|                145.24|
|    Austria|            78.73|                144.24|
|   Thailand|            77.95|                142.04|
|  Australia|            77.38|                140.97|
|    Finland|            75.79|                138.01|
+-----------+-----------------+----------------------+
only showing top 10 rows



In [None]:

# Eliminar espacios en blanco en las columnas de texto
df_Health_care_index_by_countries_2020 = df_Health_care_index_by_countries_2020.select([trim(col(c)).alias(c) if df_Health_care_index_by_countries_2020.schema[c].dataType == "StringType" else col(c) for c in df_Health_care_index_by_countries_2020.columns])

# Información del dataset después de la limpieza
df_Health_care_index_by_countries_2020.printSchema()
print(f"Número de filas después de la limpieza: {df_Health_care_index_by_countries_2020.count()}")


root
 |-- Country: string (nullable = true)
 |-- Health_Care_Index: double (nullable = true)
 |-- Health_Care_Exp__Index: double (nullable = true)

Número de filas después de la limpieza: 93


In [None]:

# Guardar la tabla resultante en la carpeta RefinedData
df_Health_care_index_by_countries_2020.write.csv(RefinedOutput +"/RefData_Health_care_index_by_countries_2020.csv", header=True)

////////////////////////////////////////////////////
Properties_price_index_by_countries_2020.csv
////////////////////////////////////////////////////

In [None]:
from pyspark.sql.functions import col, trim

# Visualización de las primeras 10 líneas del archivo
df_Properties_price_index_by_countries_2020.show(10)


+-----------+---------------------+------------------------------+------------------------------------+-------------------------------+------------------------------------------+----------------------------------+-------------------+
|    Country|Price To Income Ratio|Gross Rental Yield City Centre|Gross Rental Yield Outside of Centre|Price To Rent Ratio City Centre|Price To Rent Ratio Outside Of City Centre|Mortgage As A Percentage Of Income|Affordability Index|
+-----------+---------------------+------------------------------+------------------------------------+-------------------------------+------------------------------------------+----------------------------------+-------------------+
|  Venezuela|               133.29|                          6.22|                                6.45|                          16.08|                                     15.49|                           3025.03|               0.03|
|      Syria|                60.83|                          2.7

In [None]:

# Forma del dataset (número de filas y columnas)
print(f"Shape: ({df_Properties_price_index_by_countries_2020.count()}, {len(df_Properties_price_index_by_countries_2020.columns)})")


Shape: (104, 8)


In [None]:

# Identificación de columnas, contenidos no nulos y tipo de dato
df_Properties_price_index_by_countries_2020.printSchema()


root
 |-- Country: string (nullable = true)
 |-- Price To Income Ratio: double (nullable = true)
 |-- Gross Rental Yield City Centre: double (nullable = true)
 |-- Gross Rental Yield Outside of Centre: double (nullable = true)
 |-- Price To Rent Ratio City Centre: double (nullable = true)
 |-- Price To Rent Ratio Outside Of City Centre: double (nullable = true)
 |-- Mortgage As A Percentage Of Income: double (nullable = true)
 |-- Affordability Index: double (nullable = true)



In [None]:

# Verificar duplicados en la columna "Country"
has_duplicates = df_Properties_price_index_by_countries_2020.select("Country").distinct().count() < df_Properties_price_index_by_countries_2020.count()
print(f"Duplicados en la columna 'Country': {has_duplicates}")


Duplicados en la columna 'Country': False


In [None]:

# Estadísticas descriptivas de las columnas numéricas
df_Properties_price_index_by_countries_2020.describe().show()


+-------+-------+---------------------+------------------------------+------------------------------------+-------------------------------+------------------------------------------+----------------------------------+-------------------+
|summary|Country|Price_To_Income_Ratio|Gross_Rental_Yield_City_Centre|Gross_Rental_Yield_Outside_of_Centre|Price_To_Rent_Ratio_City_Centre|Price_To_Rent_Ratio_Outside_Of_City_Centre|Mortgage_As_A_Percentage_Of_Income|Affordability_Index|
+-------+-------+---------------------+------------------------------+------------------------------------+-------------------------------+------------------------------------------+----------------------------------+-------------------+
|  count|    103|                  103|                           103|                                 103|                            103|                                       103|                               103|                103|
|   mean|   null|   14.870097087378639|         

In [None]:

# Eliminar filas que contienen paréntesis en la columna "Country" (territorios)
df_Properties_price_index_by_countries_2020 = df_Properties_price_index_by_countries_2020.filter(~col("Country").rlike(r"\(|\)"))

# Eliminar espacios en los textos de todas las columnas de tipo string
df_Properties_price_index_by_countries_2020 = df_Properties_price_index_by_countries_2020.select(
    [trim(col(c)).alias(c.replace(" ", "_").replace(".", "_")) for c in df_Properties_price_index_by_countries_2020.columns]
)

# Convertir las columnas numéricas de string a double
numeric_columns = [
    "Price_To_Income_Ratio",
    "Gross_Rental_Yield_City_Centre",
    "Gross_Rental_Yield_Outside_of_Centre",
    "Price_To_Rent_Ratio_City_Centre",
    "Price_To_Rent_Ratio_Outside_Of_City_Centre",
    "Mortgage_As_A_Percentage_Of_Income",
    "Affordability_Index"
]

for col_name in numeric_columns:
    df_Properties_price_index_by_countries_2020 = df_Properties_price_index_by_countries_2020.withColumn(col_name, col(col_name).cast("double"))

# Información del dataset después de la limpieza y conversión
df_Properties_price_index_by_countries_2020.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Price_To_Income_Ratio: double (nullable = true)
 |-- Gross_Rental_Yield_City_Centre: double (nullable = true)
 |-- Gross_Rental_Yield_Outside_of_Centre: double (nullable = true)
 |-- Price_To_Rent_Ratio_City_Centre: double (nullable = true)
 |-- Price_To_Rent_Ratio_Outside_Of_City_Centre: double (nullable = true)
 |-- Mortgage_As_A_Percentage_Of_Income: double (nullable = true)
 |-- Affordability_Index: double (nullable = true)



In [None]:

# Verificación final: número de filas y columnas
print(f"Shape final: ({df_Properties_price_index_by_countries_2020.count()}, {len(df_Properties_price_index_by_countries_2020.columns)})")


Shape final: (103, 8)


In [None]:

# Guardar la tabla resultante en la carpeta RefinedData
df_Properties_price_index_by_countries_2020.write.csv(RefinedOutput +"/RefData_Properties_price_index_by_countries_2020.csv", header=True)


////////////////////////////////////////////////////
Pupulation_density_by_countries.csv
////////////////////////////////////////////////////

In [None]:
from pyspark.sql.functions import col, trim, regexp_replace, to_date, dayofmonth, month, year

# Visualización de las primeras 10 líneas del archivo
df_Pupulation_density_by_countries.show(10)


+----+--------------------------------+--------+--------+----------+----------------+----------------+------------------+--------------------+
|Rank|Country (or dependent territory)|Area km2|Area mi2|Population|Density pop./km2|Density pop./mi2|              Date|   Population source|
+----+--------------------------------+--------+--------+----------+----------------+----------------+------------------+--------------------+
|   –|                           Macau|   32.90|      13|  6,76,100|          20,550|          53,224|September 30, 2019|Official quarterl...|
|   1|                          Monaco|    2.02|    0.78|    38,300|          18,960|          49,106| December 31, 2018|   Official estimate|
|   2|                       Singapore|   722.5|     279| 57,03,600|           7,894|          20,445|      July 1, 2019|   Official estimate|
|   –|                       Hong Kong|   1,106|     427| 75,00,700|           6,782|          17,565| December 31, 2019|   Official estimate|

In [None]:

# Forma del dataset con el objetivo de ver cuántos países están en la tabla y cuántas columnas
print((df_Pupulation_density_by_countries.count(), len(df_Pupulation_density_by_countries.columns)))


(251, 9)


In [None]:

# Rename columns to remove periods
for column in df_Pupulation_density_by_countries.columns:
    if "." in column:
        new_column = column.replace(".", "")  # Remove periods from column name
        df_Pupulation_density_by_countries = df_Pupulation_density_by_countries.withColumnRenamed(column, new_column)


# Cantidad de valores nulos por columna
df_Pupulation_density_by_countries.select(
    [(col(c).isNull().cast("int").alias(c)) for c in df_Pupulation_density_by_countries.columns]
).agg(*[sum(col(c)).alias(c) for c in df_Pupulation_density_by_countries.columns]).show()


+----+--------------------------------+--------+--------+----------+---------------+---------------+----+-----------------+
|Rank|Country (or dependent territory)|Area km2|Area mi2|Population|Density pop/km2|Density pop/mi2|Date|Population source|
+----+--------------------------------+--------+--------+----------+---------------+---------------+----+-----------------+
|   0|                               0|       0|       0|         0|              0|              0|   0|                7|
+----+--------------------------------+--------+--------+----------+---------------+---------------+----+-----------------+



In [None]:

# Identificación de columnas, contenidos no nulos y tipo de dato
df_Pupulation_density_by_countries.printSchema()


root
 |-- Rank: string (nullable = true)
 |-- Country (or dependent territory): string (nullable = true)
 |-- Area km2: string (nullable = true)
 |-- Area mi2: string (nullable = true)
 |-- Population: string (nullable = true)
 |-- Density pop/km2: string (nullable = true)
 |-- Density pop/mi2: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Population source: string (nullable = true)



In [None]:

# Verificar duplicados en la columna 'Country (or dependent territory)'
duplicated_countries = df_Pupulation_density_by_countries.select("Country (or dependent territory)").distinct().count() != df_Pupulation_density_by_countries.count()
print(f"¿Hay duplicados en 'Country (or dependent territory)'? {duplicated_countries}")


¿Hay duplicados en 'Country (or dependent territory)'? False


In [None]:

# Métricas del dataset
df_Pupulation_density_by_countries.describe().show()


+-------+-----------------+--------------------------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+
|summary|             Rank|Country (or dependent territory)|          Area km2|         Area mi2|       Population|   Density pop/km2|   Density pop/mi2|             Date| Population source|
+-------+-----------------+--------------------------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+
|  count|              251|                             251|               251|              251|              251|               251|               251|              251|               244|
|   mean|             97.5|                            null|273.24413793103446|186.7694029850746|            300.0|143.34594142259414| 258.7576958525346|             null|              null|
| stddev|56.14712815451918|                  

In [None]:

# Eliminar filas con nulos en la columna 'Population source'
df_Pupulation_density_by_countries_sinNan = df_Pupulation_density_by_countries.filter(col("Population source").isNotNull())

# Filas después de eliminar nulos
print(f"Filas después de eliminar nulos: {df_Pupulation_density_by_countries_sinNan.count()}")


Filas después de eliminar nulos: 244


In [None]:

# Convertir fechas al formato "DD-MM-YYYY" y agregar columnas Day, Month y Year
df_Pupulation_density_by_countries_sinNan_fechas = (
    df_Pupulation_density_by_countries_sinNan
    .withColumn("Date", to_date(col("Date"), "MMMM d, yyyy"))
    .withColumn("Day", dayofmonth(col("Date")))
    .withColumn("Month", month(col("Date")))
    .withColumn("Year", year(col("Date")))
)

# Limpiar columnas numéricas (población, densidad)
columns_to_clean = ["Population", "Density pop/km2", "Density pop/mi2"]
for col_name in columns_to_clean:
    df_Pupulation_density_by_countries_sinNan_fechas = df_Pupulation_density_by_countries_sinNan_fechas.withColumn(
        col_name, regexp_replace(col(col_name), ",", "").cast("double")
    )

# Limpiar columnas de área
area_columns = ["Area km2", "Area mi2"]
for col_name in area_columns:
    df_Pupulation_density_by_countries_sinNan_fechas = df_Pupulation_density_by_countries_sinNan_fechas.withColumn(
        col_name, regexp_replace(col(col_name), ",", "").cast("double")
    )

# Renombrar la columna 'Country (or dependent territory)' a 'Country'
df_Pupulation_density_by_countries_sinNan_fechas_float_country = df_Pupulation_density_by_countries_sinNan_fechas.withColumnRenamed(
    "Country (or dependent territory)", "Country"
)

# Reemplazar 'Uruguay[note 5]' por 'Uruguay'
df_Pupulation_density_by_countries_sinNan_fechas_float_country = df_Pupulation_density_by_countries_sinNan_fechas_float_country.withColumn(
    "Country", regexp_replace(col("Country"), "Uruguay\\[note 5\\]", "Uruguay")
)

# Eliminar filas con paréntesis en 'Country'
df_Pupulation_density_by_countries_sinNan_fechas_float_country = df_Pupulation_density_by_countries_sinNan_fechas_float_country.filter(
    ~col("Country").rlike(r"\(|\)")
)

# Eliminar espacios en todas las columnas tipo string
df_Pupulation_density_by_countries_sinNan_fechas_float_country = df_Pupulation_density_by_countries_sinNan_fechas_float_country.select(
    [trim(col(c)).alias(c) for c in df_Pupulation_density_by_countries_sinNan_fechas_float_country.columns]
)


In [None]:

# Métricas del DataFrame final
df_Pupulation_density_by_countries_sinNan_fechas_float_country.describe().show()


+-------+-----------------+-----------+-----------------+------------------+-------------------+-----------------+-----------------+----------+--------------------+------------------+------------------+------------------+
|summary|             Rank|    Country|         Area km2|          Area mi2|         Population|  Density pop/km2|  Density pop/mi2|      Date|   Population source|               Day|             Month|              Year|
+-------+-----------------+-----------+-----------------+------------------+-------------------+-----------------+-----------------+----------+--------------------+------------------+------------------+------------------+
|  count|              201|        201|              201|               201|                201|              201|              201|       201|                 201|               201|               201|               201|
|   mean|97.40932642487047|       null|665558.7306467661|256973.65149253732|3.794880363681592E7|437.255721393034

In [None]:

# Información del dataset después de la limpieza
df_Pupulation_density_by_countries_sinNan_fechas_float_country.printSchema()


root
 |-- Rank: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Area km2: string (nullable = true)
 |-- Area mi2: string (nullable = true)
 |-- Population: string (nullable = true)
 |-- Density pop/km2: string (nullable = true)
 |-- Density pop/mi2: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Population source: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Year: string (nullable = true)



In [None]:
from pyspark.sql.functions import col, trim, regexp_replace, to_date, dayofmonth, month, year
from pyspark.sql.types import IntegerType, FloatType, DateType

# Rename columns to remove periods
for column in df_Pupulation_density_by_countries_sinNan_fechas_float_country.columns:
    if "." in column:
        new_column = column.replace(".", "")  # Remove periods from column name
        df_Pupulation_density_by_countries_sinNan_fechas_float_country = df_Pupulation_density_by_countries_sinNan_fechas_float_country.withColumnRenamed(column, new_column)

# First, perform the data cleaning and transformations from cell 22
# Including creating the 'Day', 'Month', and 'Year' columns

# ... (Code from cell 22 to create and clean the DataFrame) ...


# Then, cast columns to appropriate data types
df_Pupulation_density_by_countries_sinNan_fechas_float_country = df_Pupulation_density_by_countries_sinNan_fechas_float_country.withColumn("Area km2", col("Area km2").cast(FloatType()))
df_Pupulation_density_by_countries_sinNan_fechas_float_country = df_Pupulation_density_by_countries_sinNan_fechas_float_country.withColumn("Area mi2", col("Area mi2").cast(FloatType()))
df_Pupulation_density_by_countries_sinNan_fechas_float_country = df_Pupulation_density_by_countries_sinNan_fechas_float_country.withColumn("Population", col("Population").cast(IntegerType()))
df_Pupulation_density_by_countries_sinNan_fechas_float_country = df_Pupulation_density_by_countries_sinNan_fechas_float_country.withColumn("Density pop/km2", col("Density pop/km2").cast(FloatType()))
df_Pupulation_density_by_countries_sinNan_fechas_float_country = df_Pupulation_density_by_countries_sinNan_fechas_float_country.withColumn("Density pop/mi2", col("Density pop/mi2").cast(FloatType()))
df_Pupulation_density_by_countries_sinNan_fechas_float_country = df_Pupulation_density_by_countries_sinNan_fechas_float_country.withColumn("Date", to_date(col("Date"), "yyyy-MM-dd"))  # Assuming date format is yyyy-MM-dd
# Now you can safely cast 'Day', 'Month', and 'Year' to IntegerType
df_Pupulation_density_by_countries_sinNan_fechas_float_country = df_Pupulation_density_by_countries_sinNan_fechas_float_country.withColumn("Day", col("Day").cast(IntegerType()))
df_Pupulation_density_by_countries_sinNan_fechas_float_country = df_Pupulation_density_by_countries_sinNan_fechas_float_country.withColumn("Month", col("Month").cast(IntegerType()))
df_Pupulation_density_by_countries_sinNan_fechas_float_country = df_Pupulation_density_by_countries_sinNan_fechas_float_country.withColumn("Year", col("Year").cast(IntegerType()))

# Cantidad de valores nulos por columna
df_Pupulation_density_by_countries_sinNan_fechas_float_country.select(
    [(col(c).isNull().cast("int").alias(c)) for c in df_Pupulation_density_by_countries_sinNan_fechas_float_country.columns]
).agg(*[sum(col(c)).alias(c) for c in df_Pupulation_density_by_countries_sinNan_fechas_float_country.columns]).show()

+----+-------+--------+--------+----------+---------------+---------------+----+-----------------+---+-----+----+
|Rank|Country|Area km2|Area mi2|Population|Density pop/km2|Density pop/mi2|Date|Population source|Day|Month|Year|
+----+-------+--------+--------+----------+---------------+---------------+----+-----------------+---+-----+----+
|   0|      0|       0|       0|        91|              0|              0|   0|                0|  0|    0|   0|
+----+-------+--------+--------+----------+---------------+---------------+----+-----------------+---+-----+----+



In [None]:
# Vuelvo a verificar información del dataset después del cambio de tipo de dato
df_Pupulation_density_by_countries_sinNan_fechas_float_country.printSchema()


root
 |-- Rank: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Area km2: float (nullable = true)
 |-- Area mi2: float (nullable = true)
 |-- Population: integer (nullable = true)
 |-- Density pop/km2: float (nullable = true)
 |-- Density pop/mi2: float (nullable = true)
 |-- Date: date (nullable = true)
 |-- Population source: string (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Year: integer (nullable = true)



In [None]:

# Verificar duplicados en 'Country'
is_duplicated = df_Pupulation_density_by_countries_sinNan_fechas_float_country.select("Country").distinct().count() != df_Pupulation_density_by_countries_sinNan_fechas_float_country.count()
print(f"¿Hay duplicados en 'Country'? {is_duplicated}")


¿Hay duplicados en 'Country'? False


In [None]:

# Guardar la tabla final en la carpeta RefinedData
df_Pupulation_density_by_countries_sinNan_fechas_float_country.write.csv(RefinedOutput +"/RefData_Pupulation_density_by_countries.csv", header=True, mode="overwrite")


////////////////////////////////////////////////////
Quality_of_life_index_by_countries_2020.csv
////////////////////////////////////////////////////

In [6]:

# Visualización de las primeras 10 líneas del archivo
df_Quality_of_life_index_by_countries_2020.show(10)



+-----------+---------------------+----------------------+------------+-----------------+--------------------+------------------------------+--------------------------+---------------+-------------+
|    Country|Quality of Life Index|Purchasing Power Index|Safety Index|Health Care Index|Cost of Living Index|Property Price to Income Ratio|Traffic Commute Time Index|Pollution Index|Climate Index|
+-----------+---------------------+----------------------+------------+-----------------+--------------------+------------------------------+--------------------------+---------------+-------------+
|    Denmark|               192.67|                100.88|        74.9|             80.0|                83.0|                          7.45|                     28.85|          21.33|         81.8|
|Switzerland|               192.01|                119.53|        78.4|            72.44|               122.4|                          8.68|                     29.09|          22.39|        79.24|
|    

In [7]:
# Forma del dataset: Número de filas y columnas
num_rows = df_Quality_of_life_index_by_countries_2020.count()
num_cols = len(df_Quality_of_life_index_by_countries_2020.columns)
print(f"Número de filas: {num_rows}, Número de columnas: {num_cols}")



Número de filas: 80, Número de columnas: 10


In [8]:
# Identificación de columnas, contenidos no nulos y tipo de dato
df_Quality_of_life_index_by_countries_2020.printSchema()



root
 |-- Country: string (nullable = true)
 |-- Quality of Life Index: double (nullable = true)
 |-- Purchasing Power Index: double (nullable = true)
 |-- Safety Index: double (nullable = true)
 |-- Health Care Index: double (nullable = true)
 |-- Cost of Living Index: double (nullable = true)
 |-- Property Price to Income Ratio: double (nullable = true)
 |-- Traffic Commute Time Index: double (nullable = true)
 |-- Pollution Index: double (nullable = true)
 |-- Climate Index: double (nullable = true)



In [9]:
# Verificar duplicados en la columna 'Country'
country_duplicates = df_Quality_of_life_index_by_countries_2020.groupBy("Country").count().filter(col("count") > 1).count()
print(f"¿Hay duplicados en la columna 'Country'? {'Sí' if country_duplicates > 0 else 'No'}")



¿Hay duplicados en la columna 'Country'? No


In [10]:
# Métricas descriptivas de las columnas numéricas
df_Quality_of_life_index_by_countries_2020.describe().show()



+-------+---------+---------------------+----------------------+------------------+-----------------+--------------------+------------------------------+--------------------------+-----------------+-----------------+
|summary|  Country|Quality of Life Index|Purchasing Power Index|      Safety Index|Health Care Index|Cost of Living Index|Property Price to Income Ratio|Traffic Commute Time Index|  Pollution Index|    Climate Index|
+-------+---------+---------------------+----------------------+------------------+-----------------+--------------------+------------------------------+--------------------------+-----------------+-----------------+
|  count|       80|                   80|                    80|                80|               80|                  80|                            80|                        80|               80|               80|
|   mean|     null|   134.10037499999993|                59.751|         61.079125|          64.8405|   52.46124999999997|          

In [11]:
# Aplicar trim para eliminar espacios en blanco de las columnas de tipo texto
df_Quality_of_life_index_by_countries_2020 = df_Quality_of_life_index_by_countries_2020.select(
    [trim(col(c)).alias(c) if df_Quality_of_life_index_by_countries_2020.schema[c].dataType == "StringType" else col(c)
     for c in df_Quality_of_life_index_by_countries_2020.columns]
)


In [12]:

# Guardar el DataFrame limpio como un archivo CSV
df_Quality_of_life_index_by_countries_2020.write.csv(RefinedOutput +"/RefData_Quality_of_life_index_by_countries_2020.csv", header=True, mode="overwrite")