<img src="http://spark.apache.org/images/spark-logo.png"/>

In [56]:
# Installing required packages
!pip install pyspark
!pip install findspark



In [57]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context.

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

## Task 1 - Spark session

In this exercise, you will create and initialize the Spark session needed to load the dataframes and operate on it

### Task 1.1: Creating the spark session and context

In [59]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Introducción to dataframes") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at <ipython-input-3-02e6965e0434>:2 

### Task 1.2: Initialize Spark session
To work with dataframes we just need to verify that the spark session instance has been created.

In [60]:
spark

## Task 2 - Load the data in a Spark dataframe
In this section, you will first read the CSV file into a Spark DataFrame called sdf.

This DataFrame includes 32 observations on 11 variables:

In [61]:
# Load the file 'mtcars.csv' in Google Colab

from google.colab import files
uploaded = files.upload()

Saving Building_Permits.csv to Building_Permits (1).csv


In [112]:
# Read the file using `read` function in Spark.
# You must load the 'mtcars.csv' in Google Colab in the contet folder.

sdf = spark.read.csv('Building_Permits.csv',header=True, inferSchema=True)

In [113]:
sdf.show(5)

+-------------+-----------+----------------------+--------------------+-----+---+-------------+--------------------+-----------+-------------+----+-----------+--------------------+--------------+-------------------+----------+-----------+--------------+--------------------------------+-----------------------+--------------------------+--------------------------+-----------------------------+----------------+----------------------+--------------+------------+-------------------+--------------+-----------------+--------------+--------+---------------+--------------------------+--------------------------------------+--------------------------+--------------------------------------+-----------+-------------------+-----------------------------------+-------+--------------------+-------------+
|Permit Number|Permit Type|Permit Type Definition|Permit Creation Date|Block|Lot|Street Number|Street Number Suffix|Street Name|Street Suffix|Unit|Unit Suffix|         Description|Current Status|Curren

## Cuente el número de filas que existe en dataframe.

In [114]:
print(sdf.count())

198900


## Cuente el número de columnas del dataframe.

In [115]:
print(len(sdf.columns))

43


## Retorne las etiquetas de las columnas del dataframe.

In [116]:
print(sdf.columns)

['Permit Number', 'Permit Type', 'Permit Type Definition', 'Permit Creation Date', 'Block', 'Lot', 'Street Number', 'Street Number Suffix', 'Street Name', 'Street Suffix', 'Unit', 'Unit Suffix', 'Description', 'Current Status', 'Current Status Date', 'Filed Date', 'Issued Date', 'Completed Date', 'First Construction Document Date', 'Structural Notification', 'Number of Existing Stories', 'Number of Proposed Stories', 'Voluntary Soft-Story Retrofit', 'Fire Only Permit', 'Permit Expiration Date', 'Estimated Cost', 'Revised Cost', 'Existing Use', 'Existing Units', 'Proposed Use', 'Proposed Units', 'Plansets', 'TIDF Compliance', 'Existing Construction Type', 'Existing Construction Type Description', 'Proposed Construction Type', 'Proposed Construction Type Description', 'Site Permit', 'Supervisor District', 'Neighborhoods - Analysis Boundaries', 'Zipcode', 'Location', 'Record ID']


## Muestre las estadísticas resumen del dataset.

In [117]:
sdf.describe().show(truncate=False)

+-------+---------------------+------------------+--------------------------------+--------------------+------------------+-----------------+------------------+--------------------+-----------+-------------+-----------------+------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+-------------------------------------+--------------------+--------------------------------------------------------------------------+-------------------------------------------------------------------+-------------------------------------------------------------------------------------------+---------------------------------------------------------------+--------------------------+--------------------------+-----------------------------+-------------------

## Muestre el esquema del dataframe.

In [118]:
sdf.printSchema()

root
 |-- Permit Number: string (nullable = true)
 |-- Permit Type: integer (nullable = true)
 |-- Permit Type Definition: string (nullable = true)
 |-- Permit Creation Date: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- Lot: string (nullable = true)
 |-- Street Number: integer (nullable = true)
 |-- Street Number Suffix: string (nullable = true)
 |-- Street Name: string (nullable = true)
 |-- Street Suffix: string (nullable = true)
 |-- Unit: integer (nullable = true)
 |-- Unit Suffix: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Current Status: string (nullable = true)
 |-- Current Status Date: string (nullable = true)
 |-- Filed Date: string (nullable = true)
 |-- Issued Date: string (nullable = true)
 |-- Completed Date: string (nullable = true)
 |-- First Construction Document Date: string (nullable = true)
 |-- Structural Notification: string (nullable = true)
 |-- Number of Existing Stories: string (nullable = true)
 |-- Number of

## Verifique que el tipo de datos de cada variable es el correcto. Caso contrario corríjalo.

In [119]:
from pyspark.sql.types import IntegerType, DoubleType, StringType
sdf= sdf.withColumn('Plansets', col('Plansets').cast(IntegerType()))

## Cuente cuantos valores faltantes tiene en cada variable.

In [120]:
from pyspark.sql.functions import col, count, when, isnull, mean, median, stddev
missing_counts = sdf.select([count(when(isnull(c), c)).alias(c) for c in sdf.columns])
missing_counts.show(vertical=True, truncate=False)

-RECORD 0----------------------------------------
 Permit Number                          | 0      
 Permit Type                            | 0      
 Permit Type Definition                 | 0      
 Permit Creation Date                   | 0      
 Block                                  | 0      
 Lot                                    | 0      
 Street Number                          | 0      
 Street Number Suffix                   | 196684 
 Street Name                            | 0      
 Street Suffix                          | 2768   
 Unit                                   | 169421 
 Unit Suffix                            | 196939 
 Description                            | 290    
 Current Status                         | 0      
 Current Status Date                    | 0      
 Filed Date                             | 1      
 Issued Date                            | 14812  
 Completed Date                         | 101033 
 First Construction Document Date       | 15223  


## Solvente los valores faltantes de cada variable. Aplique eliminación de filas y reemplazo de valores (valor promedio o categoría más frecuente).

### Creacion de una funcion que considere un umbral para eliminar las columnas con demasiados nulos.

In [121]:
from pyspark.sql.functions import count, when, isnull, col
from pyspark.sql.types import NumericType, StringType

def eliminar_columnas_con_nulos(sdf, umbral_eliminacion=0.5):
    """
    Elimina columnas con un porcentaje de valores nulos mayor al umbral especificado

    Parámetros:
    - sdf: DataFrame de Spark
    - umbral_eliminacion: Porcentaje (0-1) a partir del cual se elimina la columna

    Retorna:
    - DataFrame con columnas eliminadas
    - Lista de columnas eliminadas
    """

    # 1. Calcular total de filas
    total_filas = sdf.count()

    # 2. Calcular valores nulos por columna
    missing_counts = sdf.select([count(when(isnull(c), c)).alias(c) for c in sdf.columns])
    missing_dict = missing_counts.first().asDict()

    # 3. Identificar columnas a eliminar
    columnas_a_eliminar = [
        col_name for col_name, missing_count in missing_dict.items()
        if (missing_count / total_filas) > umbral_eliminacion
    ]

    # 4. Mostrar información sobre las columnas afectadas
    print(f"\nAnálisis de valores nulos (umbral de eliminación: {umbral_eliminacion*100}%)")
    print(f"Total de filas en el dataset: {total_filas}")
    print(f"\nColumnas a eliminar ({len(columnas_a_eliminar)}):")

    for col_name in columnas_a_eliminar:
        porcentaje_nulos = (missing_dict[col_name] / total_filas) * 100
        print(f"- {col_name}: {missing_dict[col_name]} nulos ({porcentaje_nulos:.1f}%)")

    # 5. Eliminar columnas
    if columnas_a_eliminar:
        sdf_limpio = sdf.drop(*columnas_a_eliminar)
        print("\nColumnas eliminadas exitosamente")
    else:
        sdf_limpio = sdf
        print("\nNo se eliminaron columnas (ninguna superó el umbral)")

    # 6. Mostrar resumen final
    print(f"\nResumen final:")
    print(f"- Columnas originales: {len(sdf.columns)}")
    print(f"- Columnas eliminadas: {len(columnas_a_eliminar)}")
    print(f"- Columnas restantes: {len(sdf_limpio.columns)}")

    return sdf_limpio, columnas_a_eliminar

# Ejemplo de uso:
# df_limpio, columnas_eliminadas = eliminar_columnas_con_nulos(sdf, umbral_eliminacion=0.7)

In [122]:
df_limpio, columnas_eliminadas = eliminar_columnas_con_nulos(sdf, umbral_eliminacion=0.8)


Análisis de valores nulos (umbral de eliminación: 80.0%)
Total de filas en el dataset: 198900

Columnas a eliminar (8):
- Street Number Suffix: 196684 nulos (98.9%)
- Unit: 169421 nulos (85.2%)
- Unit Suffix: 196939 nulos (99.0%)
- Structural Notification: 190415 nulos (95.7%)
- Voluntary Soft-Story Retrofit: 197179 nulos (99.1%)
- Fire Only Permit: 179724 nulos (90.4%)
- TIDF Compliance: 196962 nulos (99.0%)
- Site Permit: 191769 nulos (96.4%)

Columnas eliminadas exitosamente

Resumen final:
- Columnas originales: 43
- Columnas eliminadas: 8
- Columnas restantes: 35


### Verificacion del df menos las 8 columnas eliminadas determinadas por la funcion anterior.

In [99]:
print(len(df_limpio.columns))

35


In [123]:
from pyspark.sql.functions import col, count, when, isnull, mean, median, stddev
missing_counts = df_limpio.select([count(when(isnull(c), c)).alias(c) for c in df_limpio.columns])
missing_counts.show(vertical=True, truncate=False)

-RECORD 0----------------------------------------
 Permit Number                          | 0      
 Permit Type                            | 0      
 Permit Type Definition                 | 0      
 Permit Creation Date                   | 0      
 Block                                  | 0      
 Lot                                    | 0      
 Street Number                          | 0      
 Street Name                            | 0      
 Street Suffix                          | 2768   
 Description                            | 290    
 Current Status                         | 0      
 Current Status Date                    | 0      
 Filed Date                             | 1      
 Issued Date                            | 14812  
 Completed Date                         | 101033 
 First Construction Document Date       | 15223  
 Number of Existing Stories             | 43760  
 Number of Proposed Stories             | 43407  
 Permit Expiration Date                 | 53075  


### Funcion para rellenar las columnas restantes con la media.

In [124]:
from pyspark.sql.functions import mean, count, when, isnull, col
from pyspark.sql.types import NumericType, StringType

def auto_impute_missing(sdf, missing_percentage=0.3):
    """
    Función para imputar automáticamente columnas con muchos valores faltantes

    Parámetros:
    - sdf: DataFrame de Spark
    - missing_percentage: Umbral porcentual para considerar "muchos" valores faltantes (0-1)

    Retorna:
    - DataFrame con valores imputados
    - Diccionario con los valores usados para imputación
    """

    # 1. Calcular umbral absoluto
    total_rows = df_limpio.count()
    missing_threshold = missing_percentage * total_rows

    # 2. Identificar columnas con muchos faltantes
    missing_counts = df_limpio.select([count(when(isnull(c), c)).alias(c) for c in df_limpio.columns])
    missing_dict = missing_counts.first().asDict()

    high_missing_cols = [
        col_name for col_name, missing_count in missing_dict.items()
        #if missing_count > missing_threshold and missing_count > 0
        if missing_count > 0
    ]

    print(f"\nColumnas con >{missing_percentage*100}% valores faltantes ({len(high_missing_cols)}):")
    for column in high_missing_cols:
        print(f"- {column}: {missing_dict[column]} faltantes ({missing_dict[column]/total_rows*100:.1f}%)")

    # 3. Calcular valores de imputación
    impute_values = {}

    for column_name in high_missing_cols:
        try:
            col_type = sdf.schema[column_name].dataType

            # Columnas numéricas
            if isinstance(col_type, NumericType):
                col_mean = sdf.agg(mean(column_name)).collect()[0][0]
                impute_values[column_name] = col_mean

            # Columnas categóricas
            elif isinstance(col_type, StringType):
                try:
                    mode_df = (sdf.filter(col(column_name).isNotNull())
                               .groupBy(column_name)
                               .count()
                               .orderBy('count', ascending=False))

                    if mode_df.count() > 0:
                        mode_value = mode_df.first()[0]
                        impute_values[column_name] = mode_value
                    else:
                        impute_values[column_name] = "Unknown"
                except:
                    impute_values[column_name] = "Unknown"

            # Otros tipos (date, boolean, etc.)
            else:
                impute_values[column_name] = None
                print(f"  [!] Columna '{column_name}' de tipo {col_type} - No se aplicó imputación automática")

        except Exception as e:
            print(f"  [!] Error procesando columna '{column_name}': {str(e)}")
            impute_values[column_name] = None

    # Filtrar solo columnas con valores de imputación válidos
    valid_impute = {k: v for k, v in impute_values.items() if v is not None}

    # 4. Aplicar imputación
    if valid_impute:
        print("\nValores de imputación a aplicar:")
        for col_name, val in valid_impute.items():
            print(f"- {col_name}: {val}")

        return df_limpio.fillna(valid_impute), valid_impute
    else:
        print("\nNo se aplicó imputación a ninguna columna")
        return df_limpio, {}

# Uso de la función
sdf_filled, imputation_values = auto_impute_missing(df_limpio, missing_percentage=0.1)

# Mostrar resultados (parte corregida)
print("\nResumen de imputación:")
for col_name, val in imputation_values.items():  # Usando col_name en lugar de col
    null_count = sdf_filled.filter(col(col_name).isNull()).count()  # Usando la función col()
    print(f"- {col_name}: {null_count} faltantes después de imputación (valor usado: {val})")


Columnas con >10.0% valores faltantes (25):
- Street Suffix: 2768 faltantes (1.4%)
- Description: 290 faltantes (0.1%)
- Filed Date: 1 faltantes (0.0%)
- Issued Date: 14812 faltantes (7.4%)
- Completed Date: 101033 faltantes (50.8%)
- First Construction Document Date: 15223 faltantes (7.7%)
- Number of Existing Stories: 43760 faltantes (22.0%)
- Number of Proposed Stories: 43407 faltantes (21.8%)
- Permit Expiration Date: 53075 faltantes (26.7%)
- Estimated Cost: 38794 faltantes (19.5%)
- Revised Cost: 6434 faltantes (3.2%)
- Existing Use: 41334 faltantes (20.8%)
- Existing Units: 51505 faltantes (25.9%)
- Proposed Use: 42584 faltantes (21.4%)
- Proposed Units: 50877 faltantes (25.6%)
- Plansets: 38093 faltantes (19.2%)
- Existing Construction Type: 44315 faltantes (22.3%)
- Existing Construction Type Description: 43853 faltantes (22.0%)
- Proposed Construction Type: 43457 faltantes (21.8%)
- Proposed Construction Type Description: 43352 faltantes (21.8%)
- Supervisor District: 2701 f

### Verificacion final de los resultados.

In [125]:
from pyspark.sql.functions import col, count, when, isnull, mean, median, stddev
missing_counts = sdf_filled.select([count(when(isnull(c), c)).alias(c) for c in sdf_filled.columns])
missing_counts.show(vertical=True, truncate=False)

-RECORD 0-------------------------------------
 Permit Number                          | 0   
 Permit Type                            | 0   
 Permit Type Definition                 | 0   
 Permit Creation Date                   | 0   
 Block                                  | 0   
 Lot                                    | 0   
 Street Number                          | 0   
 Street Name                            | 0   
 Street Suffix                          | 0   
 Description                            | 0   
 Current Status                         | 0   
 Current Status Date                    | 0   
 Filed Date                             | 0   
 Issued Date                            | 0   
 Completed Date                         | 0   
 First Construction Document Date       | 0   
 Number of Existing Stories             | 0   
 Number of Proposed Stories             | 0   
 Permit Expiration Date                 | 0   
 Estimated Cost                         | 0   
 Revised Cost

## Indique cuantas filas de datos tiene al final de la actividad anterior.

In [126]:
print(sdf_filled.count())

198900


## Determine el valor máximo de la columna ‘Number of Existing Stories’.

In [127]:
max_stories = sdf_filled.agg({'Number of Existing Stories': 'max'}).collect()[0][0]
print(f"\nValor máximo de 'Number of Existing Stories': {max_stories}")


Valor máximo de 'Number of Existing Stories': issued


## Determine el valor mínimo de la columna ‘Estimated Cost’.

In [128]:
min_cost = sdf_filled.agg({'Estimated Cost': 'min'}).collect()[0][0]
print(f"Valor mínimo de 'Estimated Cost': {min_cost}")

Valor mínimo de 'Estimated Cost': 01/01/2015


## Determine la media de ‘Existing Units’.

In [129]:
mean_units = sdf_filled.agg({'Existing Units': 'mean'}).collect()[0][0]
print(f"Media de 'Existing Units': {mean_units:.2f}")

Media de 'Existing Units': 467.11


## Determine la mediana de ‘Plansets’.

In [130]:
median_stories = sdf_filled.approxQuantile('Plansets', [0.5], 0.01)[0]
print(f"Mediana de 'Plansets': {median_stories}")

Mediana de 'Plansets': 2.0


## Determine la desviación estándar de ‘Estimated Cost’. Interprete el resultado.

In [131]:
stddev_cost = sdf_filled.agg({'Estimated Cost': 'stddev'}).collect()[0][0]
print(f"\nDesviación estándar de 'Estimated Cost': {stddev_cost:.2f}")
print("Interpretación: Indica la dispersión de los costos estimados alrededor de la media.")
print(f"Un valor alto ({stddev_cost:.2f}) sugiere que los costos varían significativamente entre proyectos.")


Desviación estándar de 'Estimated Cost': 3272566.48
Interpretación: Indica la dispersión de los costos estimados alrededor de la media.
Un valor alto (3272566.48) sugiere que los costos varían significativamente entre proyectos.


## Calcule la matriz de correlación de las variables numéricas del dataframe.

In [133]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

numeric_cols = [c for c, dtype in sdf_filled.dtypes if dtype in ['int', 'double', 'float']]

print("\nColumnas numéricas para matriz de correlación:")
print(numeric_cols)

if len(numeric_cols) >= 2:  # Necesitamos al menos 2 columnas numéricas
    assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
    df_vector = assembler.transform(sdf_filled).select("features")

    matrix = Correlation.corr(df_vector, "features").collect()[0][0]
    print("\nMatriz de correlación:")
    print(matrix.toArray())

else:
    print("\nNo hay suficientes columnas numéricas para calcular correlación.")



Columnas numéricas para matriz de correlación:
['Permit Type', 'Street Number', 'Plansets']

Matriz de correlación:
[[ 1.00000000e+00 -2.28088096e-03 -8.22764556e-03]
 [-2.28088096e-03  1.00000000e+00 -2.05544647e-05]
 [-8.22764556e-03 -2.05544647e-05  1.00000000e+00]]


Las tres variables no están correlacionadas entre sí. Es decir, cambios en una variable no implican cambios sistemáticos en las otras. Esto puede ser útil si, por ejemplo, se está haciendo un modelo predictivo y se quiere evitar multicolinealidad entre variables independientes.

In [134]:
spark.stop()