**Notebook 4-1-Fase 1** (v5)

Capa Silver (1): prepara datos para an√°lisis de correlaci√≥n

Dataset: **2023/01**

Objetivos del presente notebook:
- Depurar nulos, duplicados, valores no v√°lidos o inconsistentes
- Homogeneizar tipos de datos, unidades, formatos
- Tratar valores at√≠picos

Versiones:
- v1: c√≥digo traspasado desde la v2 del notebook 3
- v2: preparaci√≥n de datos, deja dataset acabado para estudio de la correlaci√≥n; el c√≥digo funciona, pero al ejecutar el notebook completo se para, seguramente por falta de memoria
- v3: divido el c√≥digo en funciones para liberar memoria entre ellas -> no funciona, error de memoria en phase_5_1_new_variables
- v4: vuelvo a v2, separo en fases
- v5: limpieza de v4; acabado

# Importaciones

In [1]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

from scipy.stats import skew

from pyspark.sql import SparkSession, DataFrame

from pyspark.sql.functions import (
    sum as spark_sum,
    min as spark_min,
    max as spark_max,
    round as spark_round,
    pi as spark_pi,
    count, col, when, isnan, isnull, mean, stddev, desc, asc,
    year, month, dayofweek, dayofmonth, weekofyear, date_format, to_date,
    isnotnull, date_trunc, datediff, lit, percentile_approx, expr, broadcast,
    coalesce, avg, hour, unix_timestamp, sin, cos, udf, skewness
)

from pyspark.sql.types import (
    NumericType, StringType, DateType, TimestampType, DoubleType, BooleanType, IntegerType
)

from pyspark import StorageLevel
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import (
    StringIndexer, StandardScaler, MinMaxScaler, VectorAssembler, StringIndexerModel,
    OneHotEncoder
)

from datetime import datetime

from google.colab import drive

import os
import re
import json
import time
import sys
import gc

In [2]:
# enlace de autorizaci√≥n manual de acceso a Google Drive
'''
from google.colab import auth

print("Ve a este enlace para autorizar manualmente:")
auth.authenticate_user()
'''
print("Usar cuando se produzca alg√∫n error al montar Google Drive")

Usar cuando se produzca alg√∫n error al montar Google Drive


In [3]:
# monta Google Drive
if not os.path.exists('/content/drive'):
    drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
# crea la estructura de directorios

# configuraci√≥n de paths
PROJECT_ROOT = "/content/drive/MyDrive/taxi_project"
BRONZE_DIR = f"{PROJECT_ROOT}/bronze"
SILVER_DIR = f"{PROJECT_ROOT}/silver"
METADATA_DIR = f"{PROJECT_ROOT}/metadata"

# ruta de la capa Bronze
BRONZE_PATH = f"{BRONZE_DIR}/taxi_data"

# ruta de la capa Silver
SILVER_PATH = f"{SILVER_DIR}/taxi_data"

# crea directorios si no existen
paths = [BRONZE_DIR, SILVER_DIR, METADATA_DIR]

for path in paths:
    os.makedirs(path, exist_ok=True)

In [5]:
# setup para Spark en Google Colab

# instala Java si no est√°
!apt-get install -y openjdk-11-jdk-headless -qq > /dev/null

# fija JAVA_HOME
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-openjdk-amd64'

# asegura versi√≥n compatible de PySpark
!pip install -q pyspark==3.5.1

In [6]:
# configuraci√≥n optimizada de Spark para Colab
'''
spark.driver.memory: asigna GB al proceso driver (m√°quina local act√∫a como driver y ejecutor)

spark.driver.maxResultSize: l√≠mite en GB de resultados que pueden ser devueltos al driver desde los executors;
evita que el driver se quede sin memoria

spark.executor.memory: memoria de los ejecutores (en local es el mismo proceso, pero influye en el planificador)

spark.sql.adaptive.enabled: activa el Adaptative Query Execution (AQE), ajuste din√°mico del plan de ejecuci√≥n
en tiempo de ejecuci√≥n

spark.sql.adaptive.coalescePartitions.enabled: permite a AQE reducir el n√∫mero de particiones en tiempo
de ejecuci√≥n (mejor rendimiento si hay particiones vac√≠as o desbalanceadas)

spark.sql.execution.arrow.pyspark.enabled: activa el uso de Apache Arrow para optimizar la conversi√≥n
entre dataframes de Spark y Pandas
'''

spark = SparkSession.builder \
    .appName("NYC-Taxi-Ingesta") \
    .config("spark.driver.memory", "4g") \
    .config("spark.driver.maxResultSize", "2g") \
    .config("spark.executor.memory", "2g") \
	  .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
	  .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

print(f"‚úÖ Spark inicializado - Version: {spark.version}")

‚úÖ Spark inicializado - Version: 3.5.1


# **Acceso a los datos**

In [7]:
def get_bronze_dataset_paths(bronze_root):
    """
    Devuelve todas las rutas individuales de datasets en la capa Bronze.

    Args:
        bronze_root (str): Ruta ra√≠z de la capa Bronze.

    Returns:
        list: Lista de rutas individuales de datasets.
    """
    parquet_dirs = []
    for root, _, files in os.walk(bronze_root):
        if any(f.endswith(".parquet") for f in files):
            parquet_dirs.append(root)
    return sorted(parquet_dirs)

In [8]:
def extract_year_month_from_hive_partition(path):
    """
    Extrae a√±o y mes de la estructura de particiones de Hive.

    Args:
        path (str): Ruta del dataset con estructura ingestion_year=YYYY/ingestion_month=MM

    Returns:
        tuple: (a√±o, mes) o (None, None) si no se puede extraer
    """
    # busca patrones de partici√≥n de Hive en la ruta completa
    year_pattern = r'ingestion_year=(\d{4})'
    month_pattern = r'ingestion_month=(\d{1,2})'

    year_match = re.search(year_pattern, path)
    month_match = re.search(month_pattern, path)

    if year_match and month_match:
        year = int(year_match.group(1))
        month = int(month_match.group(1))

        # valida que el mes est√© en rango v√°lido
        if 1 <= month <= 12:
            return year, month

    return None, None

In [9]:
def format_date_info(year, month):
    """
    Formatea la informaci√≥n de fecha para mostrar.

    Args:
        year (int): A√±o
        month (int): Mes

    Returns:
        str: Fecha formateada
    """
    if year and month:
        try:
            # crea objeto datetime para obtener el nombre del mes
            date_obj = datetime(year, month, 1)
            month_names_es = {
                1: 'Enero', 2: 'Febrero', 3: 'Marzo', 4: 'Abril',
                5: 'Mayo', 6: 'Junio', 7: 'Julio', 8: 'Agosto',
                9: 'Septiembre', 10: 'Octubre', 11: 'Noviembre', 12: 'Diciembre'
            }
            return f"{year}-{month:02d} ({month_names_es[month]} {year})"
        except ValueError:
            return f"{year}-{month:02d}"
    return "Fecha no identificada"

In [10]:
# lista de datasets disponibles
bronze_datasets = get_bronze_dataset_paths(BRONZE_PATH)

print(f"Total de datasets encontrados: {len(bronze_datasets)}")

# muestra mes y a√±o de los datos de cada dataset
for i, path in enumerate(bronze_datasets):

    year_bronze, month_bronze = extract_year_month_from_hive_partition(path)
    date_info = format_date_info(year_bronze, month_bronze)

    print(f"{i:2d}: {date_info:<25}")

Total de datasets encontrados: 3
 0: 2023-01 (Enero 2023)     
 1: 2023-02 (Febrero 2023)   
 2: 2023-03 (Marzo 2023)     


# **Funciones auxiliares**

- **Carga y guardado de ficheros**

In [11]:
def save_parquet(df, ruta, modo="overwrite", compresion="snappy"):
    """
    Guarda un DataFrame en formato Parquet.

    Args:
        df: DataFrame de PySpark
        ruta: str, ruta donde guardar el archivo
        modo: str, modo de escritura ("overwrite", "append", "ignore", "error")
        compresion: str, tipo de compresi√≥n ("snappy", "gzip", "lzo", "brotli", "lz4", "zstd")

    Returns:
        True si se guard√≥ correctamente, False en caso contrario
    """
    try:
        print(f"üíæ Guardando DataFrame en {ruta}...")
        print(f"üìä Registros a guardar: {df.count()}")

        df.write \
          .mode(modo) \
          .option("compression", compresion) \
          .parquet(ruta)

        print(f"‚úÖ Dataframe guardado correctamente en {ruta}")

        return True

    except Exception as e:
        print(f"‚ùå Error al guardar: {str(e)}")

        return False

In [12]:
def load_parquet(spark, ruta, mostrar_info=True):
    """
    Lee un DataFrame en formato Parquet.

    Args:
        spark: sesi√≥n de Spark
        ruta: str, ruta del archivo Parquet a leer
        mostrar_info: str, True para mostrar informaci√≥n del DataFrame le√≠do

    Returns:
        DataFrame le√≠do o None si hay error
    """
    try:
        print(f"üìñ Leyendo DataFrame desde {ruta}...")

        df_leido = spark.read.parquet(ruta)

        if mostrar_info:
            filas = df_leido.count()
            columnas = len(df_leido.columns)
            print("‚úÖ Dataframe cargado correctamente")
            print(f"üìä Registros le√≠dos: {filas}")
            print(f"üìã Columnas: {columnas}")
            print(f"üè∑Ô∏è  Nombres de columnas: {df_leido.columns}")
        else:
            print("‚úÖ Dataframe cargado correctamente")

        return df_leido

    except Exception as e:
        print(f"‚ùå Error al leer el Dataframe: {str(e)}")
        return None

- **Carga de datos**

In [13]:
def basic_process_dataset(current_dataset_index, bronze_datasets):
  """
  Carga un dataset en Spark y realiza una exploraci√≥n b√°sica.

  Args:
      current_dataset_index (int): √çndice del dataset actual en la lista.
      bronze_datasets (list): Lista de rutas de datasets en la capa Bronze.

  Returns:
      tuple: la ruta del dataset, su nombre y el DataFrame cargado.
  """
  # comprueba que queden datasets pendientes de procesar
  if current_dataset_index < len(bronze_datasets):
    # ruta y nombre del dataset actual
    dataset_path = bronze_datasets[current_dataset_index]
    dataset_name = os.path.basename(dataset_path)

    print(f"=== Dataset {current_dataset_index}: {dataset_name} ===")

    # carga el dataset
    ds = spark.read.parquet(dataset_path)

    # exploraci√≥n b√°sica
    print(f"Total de registros: {ds.count():,}")
    print(f"\nRuta: {dataset_path}")

    print("\nEsquema:")
    ds.printSchema()

    print("\nPrimeras 5 filas:")
    ds.show(5)

    ## registros duplicados
    print("\n=== VERIFICACI√ìN DE DUPLICADOS ===")

    # total de registros
    total_registros = ds.count()
    # registros √∫nicos (eliminando duplicados)
    registros_unicos = ds.distinct().count()

    # determina si hay duplicados
    tiene_duplicados = total_registros > registros_unicos
    if tiene_duplicados:
        duplicados_count = total_registros - registros_unicos
        print(f"N√∫mero de registros duplicados: {duplicados_count}")
    else:
        print("No existen registros duplicados")

    ## registros con todos sus valores nulos
    print("\n=== REGISTROS CON TODOS SUS VALORES NULOS ===")

    # agrega una columna que cuenta las columnas no nulas por fila
    ds_con_conteo = ds.withColumn(
        "columnas_no_nulas",
        sum([when(~col(c).isNull(), 1).otherwise(0) for c in ds.columns])
    )

    # cuenta las filas donde todas las columnas son nulas (columnas_no_nulas = 0)
    registros_nulos_alt = ds_con_conteo.filter(col("columnas_no_nulas") == 0).count()
    print(f"Registros completamente nulos: {registros_nulos_alt}")

    ## valores nulos
    print("\n=== VALORES NULOS POR VARIABLE ===")

    # inicializa lista de resultados
    null_counts = []
    # recorre las columnas
    for col_name in ds.columns:

        # cuenta valores nulos de la columna actual
        null_count = ds.filter(col(col_name).isNull()).count()

        ## calcula el porcentaje de nulos como un float est√°ndar de Python
        # n√∫mero total de registros del Dataframe
        total_records = ds.count()
        if total_records > 0:
            # hay registros
            null_percentage = (null_count / total_records) * 100
        else:
            # no hay registros: evita divisi√≥n por cero
            null_percentage = 0.0

        # diccionario nombre columna, n√∫mero nulos y porcentaje nulos
        null_counts.append({
            'column': col_name,
            'null_count': null_count,
            # funci√≥n round incorporada de Python
            'null_percentage': __builtins__.round(null_percentage, 2)
        })

    # convierte la lista de resultados en Dataframe de Pandas
    null_ds = pd.DataFrame(null_counts)

    # muestra los resultados
    print(null_ds.to_string(index=False))

    # limpia la memoria
    del null_ds
    gc.collect()

    # devuelve ruta, nombre y dataframe
    return dataset_path, dataset_name, ds

  else:
    print("Ya est√°n procesados todos los datasets")

- **An√°lisis de datos**

In [14]:
'''
v2
'''
def analyze_date_ranges(ds, col_name, start_date, end_date, show_outside_dates=False):
    """
    Muestra el n√∫mero y porcentaje de registros fuera del rango de fechas,
    y opcionalmente el desglose diario antes y despu√©s.

    Args:
        ds: DataFrame de PySpark
        col_name: str, nombre de la columna de fecha
        start_date: str, fecha inicial (inclusive) en formato 'YYYY-MM-DD'
        end_date: str, fecha final (inclusive) en formato 'YYYY-MM-DD'
        show_outside_dates: bool, si True, muestra desgloses diarios fuera del rango
    """
    # comprueba que la columna sea de tipo timestamp
    ds = ds.withColumn(col_name, col(col_name).cast(TimestampType()))

    # crea una columna auxiliar con solo la fecha (sin la hora)
    ds_with_date = ds.withColumn("date_only", to_date(col(col_name)))

    # convierte las fechas de string a tipo date para comparaci√≥n consistente
    start_date_typed = lit(start_date).cast("date")
    end_date_typed = lit(end_date).cast("date")

    # filtra registros fuera del rango de fechas (sin considerar la hora)
    before_ds = ds_with_date.filter(col("date_only") < start_date_typed)
    after_ds = ds_with_date.filter(col("date_only") > end_date_typed)

    # cuenta registros
    total_count = ds.count()
    before_count = before_ds.count()
    after_count = after_ds.count()

    # calcula porcentajes
    before_pct = (before_count / total_count) * 100 if total_count > 0 else 0
    after_pct = (after_count / total_count) * 100 if total_count > 0 else 0

    # muestra resultados
    print(f"Total registros: {total_count:,}")
    print(f"Antes de {start_date}: {before_count:,} ({before_pct:.2f}%)")
    print(f"Despu√©s de {end_date}: {after_count:,} ({after_pct:.2f}%)")

    # comprueba si tiene que mostrar los desgloses diarios
    if show_outside_dates:

        # anteriores al rango
        if before_count > 0:
            print(f"\nDistribuci√≥n diaria antes de {start_date}:")

            (before_ds
             .groupBy("date_only")
             .agg(count("*").alias("registros"))
             .orderBy("date_only")
             # muestra hasta 30 d√≠as
             .show(30, truncate=False)
            )

        # posteriores al rango
        if after_count > 0:
            print(f"\nDistribuci√≥n diaria despu√©s de {end_date}:")

            (after_ds
             .groupBy("date_only")
             .agg(count("*").alias("registros"))
             .orderBy("date_only")
             # muestra hasta 30 d√≠as
             .show(30, truncate=False)
             )

In [15]:
def analyze_median_hourly_variation(ds, columnas):
    """
    Calcula y muestra la variaci√≥n de la mediana por hora para las columnas indicadas.
    Columna de fecha: tpep_pickup_datetime

    Args:
        ds: DataFrame de entrada
        columnas: lista de strings con los nombres de columnas a analizar
    """

    # a√±ade columna auxiliar con la hora
    ds_hour = ds.withColumn("hour_of_day", hour("tpep_pickup_datetime"))

    for col_name in columnas:
        print(f"Variaci√≥n de la mediana de '{col_name}':")

        # calcula el rango de variaci√≥n
        stats = ds_hour.groupBy('hour_of_day').agg(
            percentile_approx(col_name, 0.5).alias('mediana')
        )

        # estad√≠sticas de la variaci√≥n de medianas

        # calcula valores m√≠nimo, m√°ximo y promedio de la mediana
        stats_summary = stats.agg(
            spark_min("mediana").alias("mediana_min"),
            spark_max("mediana").alias("mediana_max"),
            avg("mediana").alias("mediana_promedio")
        )

        # a√±ade el rango de variaci√≥n
        stats_summary = stats_summary.withColumn(
            "rango_variacion", col("mediana_max") - col("mediana_min")
        ).withColumn(
            "factor_variacion_tanto_por_uno",
            (col("mediana_max") - col("mediana_min")) / col("mediana_promedio")
        )

        # muestra resultados
        stats_summary.show()

In [16]:
'''
v2 - contadores separados para ceros, negativos y nulos
'''
def counting_columns_values(ds, columnas, contar_zeros=False, contar_negativos=False, contar_nulos=False):
    """
    Contabiliza los valores a cero, negativos y nulos de las columnas indicadas.

    Args:
        ds: Dataframe de Spark
        columnas: lista de columnas a contar
        contar_zeros: bool, contar valores a cero
        contar_negativos: bool, contar valores negativos
        contar_nulos: bool, contar valores nulos

    Returns:
        None
    """
    # comprueba que se haya solicitado contabilizar algo
    if not (contar_zeros or contar_negativos or contar_nulos):
        print("No se ha seleccionado ning√∫n tipo de contabilizaci√≥n.")
        return

    # inicia lista con las agregaciones a ejecutar sobre el dataframe
    aggs = []

    # recorre las columnas
    for c in columnas:

        # condiciones a evaluar
        if contar_zeros:
            #condiciones.append(col(c) == 0)
            aggs.append(count(when(col(c) == 0, lit(1))).alias(f"{c}_zeros"))
        if contar_negativos:
            #condiciones.append(col(c) < 0)
            aggs.append(count(when(col(c) < 0, lit(1))).alias(f"{c}_negativos"))
        if contar_nulos:
            #condiciones.append(col(c).isNull())
            aggs.append(count(when(col(c).isNull(), lit(1))).alias(f"{c}_nulos"))

    # agregaci√≥n que cuenta todas las filas del dataset, sin condiciones
    aggs.append(count(lit(1)).alias("total_rows"))

    # ejecuta todas las agregaciones, recoge el resultado y guarda la primera (y √∫nica) fila de resultados
    resultado = ds.agg(*aggs).collect()[0]

    # n√∫mero total de filas
    total_rows = resultado["total_rows"]

    # comprueba que el dataframe tenga registros
    if total_rows == 0:
        print("El DataFrame no tiene ning√∫n registro. No se pueden calcular porcentajes.")
        return

    # recorre las columnas
    for c in columnas:

        # comprueba si la columna actual est√° en 'resultado' para contar ceros, negativos o nulos
        if any(k in resultado for k in (f"{c}_zeros", f"{c}_negativos", f"{c}_nulos")):

            print(f"Variable '{c}':")

            # muestra contadores de la variable actual
            if contar_zeros:
                val = resultado[f"{c}_zeros"]
                print(f"   Valores a cero: {val:,} ({val/total_rows*100:.2f}%)")
            if contar_negativos:
                val = resultado[f"{c}_negativos"]
                print(f"   Valores negativos: {val:,} ({val/total_rows*100:.2f}%)")
            if contar_nulos:
                val = resultado[f"{c}_nulos"]
                print(f"   Valores nulos: {val:,} ({val/total_rows*100:.2f}%)")

    # limpia la memoria
    del resultado
    gc.collect()

- **Transformaci√≥n de datos**

In [17]:
def mode_row_spark(ds, variable):
    """
    Devuelve la moda de una variable de un dataframe de Spark.

    Args:
        ds: Dataframe de Spark
        variable: str, nombre de la variable

    Returns:
        str, valor de la moda
    """
    # comprueba el tipo de la variable
    col_type = dict(ds.dtypes)[variable]

    if col_type in ['int', 'bigint', 'float', 'double', 'decimal']:
        # variables num√©ricas: filtra los ceros antes de calcular la moda
        mode_row = (
            ds
            .filter(col(variable) > 0)
            .groupBy(variable)
            .agg(count("*").alias("count"))
            .orderBy(col("count").desc())
            .first()
        )
    else:
        # variables categ√≥ricas: calcula la moda directamente
        mode_row = (
            ds
            .groupBy(variable)
            .agg(count("*").alias("count"))
            .orderBy(col("count").desc())
            .first()
        )

    if mode_row:
        return mode_row[variable]
    else:
        # variables num√©ricas: todos los valores son cero, negativos o nulos
        # variables categ√≥ricas: todos los valores son nulos o columna vac√≠a
        return None

In [18]:
'''
v2 - acepta recibir variables acabadas en '_imputed'
'''
def impute_column_with_mode(ds, col_name, imputar_ceros=True, imputar_nulos=True):
    """
    Sustituye los valores a cero o nulos de la columna con su moda.

    Args:
        ds: Dataframe de Spark
        col_name: str, nombre de la columna
        imputar_ceros: bool, True para imputar los ceros
        imputar_nulos: bool, True para imputar los nulos

    Returns:
        Dataframe con una nueva columna (col_name_imputed) con la imputaci√≥n
            Nota: si col_name ya acababa en _imputed, no a√±ade _imputed de nuevo al nombre de la variable
    """
    # comprueba par√°metros recibidos
    if not imputar_ceros and not imputar_nulos:
        print("No se ha solicitado imputar ni ceros ni nulos.")
        return ds

    # calcula la moda
    moda = mode_row_spark(ds, col_name)
    print(f"Moda de {col_name}: {moda}")

    # cuenta registros antes de imputar
    if imputar_ceros:
        ceros_antes = ds.filter(col(col_name) == 0).count()
        print(f"Registros con ceros antes de la imputaci√≥n: {ceros_antes:,}")
    if imputar_nulos:
        nulos_antes = ds.filter(col(col_name).isNull()).count()
        print(f"Registros con nulos antes de la imputaci√≥n: {nulos_antes:,}")

    # condici√≥n de imputaci√≥n
    condiciones = []
    if imputar_ceros:
        condiciones.append(col(col_name) == 0)
    if imputar_nulos:
        condiciones.append(col(col_name).isNull())

    condicion_total = condiciones[0]
    for c in condiciones[1:]:
        condicion_total = condicion_total | c

    ## imputaci√≥n

    # comprueba si el nombre de la variable acaba ya en _imputed, si es as√≠ guarda el nombre sin _imputed
    col_base = col_name[:-8] if col_name.endswith("_imputed") else col_name
    col_nueva = f"{col_base}_imputed"

    ds_imputado = ds.withColumn(
        col_nueva,
        when(condicion_total, lit(moda)).otherwise(col(col_name))
    )

    # cuenta registros despu√©s de imputar
    if imputar_ceros:
        ceros_despues = ds_imputado.filter(col(col_nueva) == 0).count()
        print(f"Registros con ceros despu√©s de la imputaci√≥n: {ceros_despues:,}")
    if imputar_nulos:
        nulos_despues = ds_imputado.filter(col(col_nueva).isNull()).count()
        print(f"Registros con nulos despu√©s de la imputaci√≥n: {nulos_despues:,}")

    return ds_imputado

In [19]:
'''
v7 - acepta recibir variables acabadas en '_imputed'
'''
def impute_columns_with_median(
    ds: DataFrame, columnas: list, claves_agrupacion: list, valores_a_imputar: str
    ) -> DataFrame:
    """
    Sustituye los valores a cero, negativos o nulos de las columnas con su mediana hist√≥rica
    seg√∫n las claves de agrupaci√≥n proporcionadas y seg√∫n los valores a imputar indicados.

    Args:
        ds: Dataframe de Spark
        columnas: lista de columnas a imputar
        claves_agrupacion: lista de columnas para agrupar
            Ej: ['PULocationID', 'DOLocationID'] o ['PULocationID', 'DOLocationID', 'pickup_hour']
        valores_a_imputar: string que especifica qu√© valores imputar
            Valores aceptados: 'ceros', 'negativos', 'ambos' (ceros y negativos), 'nulos'

    Returns:
        Dataframe con nuevas columnas (col_name_imputed) con las imputaciones
            Nota: si col_name ya acababa en _imputed, a√±ade nueva variable acabada en _imputed_v2
    """
    # validaci√≥n de par√°metros
    if valores_a_imputar not in ["ceros", "negativos", "ambos", "nulos"]:
        raise ValueError("El valor de 'valores_a_imputar' debe ser 'ceros', 'negativos' o 'ambos'.")

    # verifica si necesitamos crear la columna de hora
    necesita_hora = "pickup_hour" in claves_agrupacion

    # dataframe con el resultado de todas las imputaciones
    ds_resultado = ds

    # crea la columna pickup_hour si es necesaria y no existe
    if necesita_hora and "pickup_hour" not in ds_resultado.columns:
        ds_resultado = ds_resultado.withColumn("pickup_hour", hour(col("tpep_pickup_datetime")))

    def get_condition_to_impute(col_name):
        """
        Define la condici√≥n para identificar valores a imputar
        """
        if valores_a_imputar == "ceros":
            return col(col_name) == 0
        elif valores_a_imputar == "negativos":
            return col(col_name) < 0
        elif valores_a_imputar == "ambos":
            return col(col_name) <= 0
        else:
            # 'nulos'
            return col(col_name).isNull()

    def get_valid_condition(col_name):
        """
        Define la condici√≥n para identificar valores v√°lidos (opuesta a la de imputaci√≥n)
        """
        if valores_a_imputar == "ceros":
            return col(col_name) != 0
        elif valores_a_imputar == "negativos":
            return col(col_name) >= 0
        elif valores_a_imputar == "ambos":
            return col(col_name) > 0
        else:
            # 'nulos'
            return col(col_name).isNotNull()

    # recorre las columnas
    for col_name in columnas:
        print(f"\nImputaci√≥n de {col_name}")

        # crea la columna imputada como copia de la original
        col_base = col_name[:-8] if col_name.endswith("_imputed") else col_name
        col_nueva = f"{col_base}_imputed_v2" if col_name.endswith("_imputed") else f"{col_name}_imputed"

        # condici√≥n para imputar
        condicion_imputar = get_condition_to_impute(col_name)

        # cuenta registros a imputar
        total_a_imputar = ds_resultado.filter(condicion_imputar).count()

        print(f"Registros con {col_name} a imputar ({valores_a_imputar}): {total_a_imputar}")

        # comprueba si hay registros a imputar
        if total_a_imputar == 0:
            # no hay registros a imputar

            print("No hay registros que imputar.")

            ds_resultado = ds_resultado.withColumn(col_nueva, col(col_name))

            # pasa a la siguiente columna
            continue

        ## c√°lculo de la mediana

        # s√≥lo los valores v√°lidos de distancia son √∫tiles para el c√°lculo de la mediana
        ds_validos = ds_resultado.filter(get_valid_condition(col_name))

        # calcula la mediana por par de zonas origen-destino
        medianas = (
            ds_validos
            .groupBy(*claves_agrupacion)
            .agg(expr(f"percentile_approx({col_name}, 0.5)").alias("col_mediana"))
        )

        # mantiene en memoria para reutilizaci√≥n (Spark ejecuta el .groupBy cada vez que se use 'medianas')
        medianas.cache()

        # calcula la mediana global para casos sin mediana por zonas
        mediana_global = ds_validos.select(
            expr(f"percentile_approx({col_name}, 0.5)").alias("mediana_global")
        ).collect()[0]["mediana_global"]

        ## une las medianas al dataset original

        # grupos √∫nicos encontrados
        num_grupos_unicos = medianas.count()

        # utiliza broadcast para optimizar la uni√≥n si es peque√±o (eval√∫a primero el tama√±o)
        if num_grupos_unicos < 10_000:
            # uni√≥n con broadcast
            ds_con_medianas = ds_resultado.join(
                broadcast(medianas),
                on=claves_agrupacion,
                how="left"
            )
        else:
            # uni√≥n normal
            ds_con_medianas = ds_resultado.join(
                medianas,
                on=claves_agrupacion,
                how="left"
            )

        ## imputa valores con la mediana correspondiente

        col_flag = f"{col_name}_fue_imputada"
        col_flag_global = f"{col_name}_usada_media_global"

        # crea columnas auxiliares
        ds_imputado = ds_con_medianas.withColumn(
            col_nueva,
            when(condicion_imputar,
                   coalesce(
                       # mediana de zona
                       col("col_mediana"),
                       # mediana global si no est√° disponible la de zona
                       lit(mediana_global)
                   )
            ).otherwise(col(col_name))
        ).withColumn(
            # flag para indicar si el valor ha sido imputado
            col_flag,
            when(condicion_imputar, 1).otherwise(0)
        ).withColumn(
            # flag para indicar si se ha imputado con la mediana global
            col_flag_global,
            when(condicion_imputar & col("col_mediana").isNull(), 1).otherwise(0)
        )

        # contadores de imputaciones
        contadores = ds_imputado.select(
            spark_sum(col_flag).alias("imputaciones_realizadas"),
            spark_sum(col_flag_global).alias("imputaciones_con_mediana_global")
        ).collect()[0]

        imputaciones_realizadas = contadores["imputaciones_realizadas"]
        imputaciones_globales = contadores["imputaciones_con_mediana_global"]

        # muestra un resumen
        print(f"Imputaciones realizadas: {imputaciones_realizadas}")
        print(f"Imputaciones usando mediana global: {imputaciones_globales}")
        print(f"Tasa de √©xito: {imputaciones_realizadas/total_a_imputar:.2%}")

        ## reconstruye el dataframe final

        # mantiene las columnas originales
        columnas_originales = [col for col in ds_resultado.columns]

        ds_resultado = ds_imputado.select(
            *columnas_originales,
            # a√±ade la columna imputada
            col(col_nueva)
        )

        # limpia la memoria
        del mediana_global
        del contadores

    # limpia la memoria
    gc.collect()

    return ds_resultado

# **1) Dataset: Enero de 2023 (2023-01)**

**1.1) Carga del dataset**

In [20]:
# Enero de 2023: dataset 0
dataset_index = 0

In [21]:
# procesa el dataset actual
dataset_path, dataset_name, ds = basic_process_dataset(
    dataset_index, bronze_datasets
)

=== Dataset 0: ingestion_month=1 ===
Total de registros: 3,066,766

Ruta: /content/drive/MyDrive/taxi_project/bronze/taxi_data/ingestion_year=2023/ingestion_month=1

Esquema:
root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

**1.2) Datos fuera del rango de fechas del dataset**

En el estudio de la capa Bronze vimos que hab√≠a fechas de inicio y de final de trayecto fuera del periodo de este dataset, es decir, anteriores al 1 y posteriores al 31 de Enero de 2023. Vamos a analizar estos casos.

In [22]:
col_fecha_inicio = "tpep_pickup_datetime"

start_date = '2023-01-01'
end_date = '2023-01-31'

analyze_date_ranges(ds, col_fecha_inicio, start_date, end_date, show_outside_dates=True)

Total registros: 3,066,766
Antes de 2023-01-01: 38 (0.00%)
Despu√©s de 2023-01-31: 10 (0.00%)

Distribuci√≥n diaria antes de 2023-01-01:
+----------+---------+
|date_only |registros|
+----------+---------+
|2008-12-31|2        |
|2022-10-24|4        |
|2022-10-25|7        |
|2022-12-31|25       |
+----------+---------+


Distribuci√≥n diaria despu√©s de 2023-01-31:
+----------+---------+
|date_only |registros|
+----------+---------+
|2023-02-01|10       |
+----------+---------+



In [23]:
col_fecha_final = "tpep_dropoff_datetime"

start_date = '2023-01-01'
end_date = '2023-01-31'

analyze_date_ranges(ds, col_fecha_final, start_date, end_date, show_outside_dates=True)

Total registros: 3,066,766
Antes de 2023-01-01: 25 (0.00%)
Despu√©s de 2023-01-31: 617 (0.02%)

Distribuci√≥n diaria antes de 2023-01-01:
+----------+---------+
|date_only |registros|
+----------+---------+
|2009-01-01|2        |
|2022-10-24|4        |
|2022-10-25|7        |
|2022-12-31|12       |
+----------+---------+


Distribuci√≥n diaria despu√©s de 2023-01-31:
+----------+---------+
|date_only |registros|
+----------+---------+
|2023-02-01|615      |
|2023-02-02|2        |
+----------+---------+



Existen 39 viajes que comenzaron antes del 31/12/2022, y 75 viajes que comenzaron el 31/12/2022.

Por otra parte, tenemos 39 viajes que acabaron antes del 31/12/2022, y 36 que finalizaron el mismo 31/12/2022.

Los 39 viajes que comenzaron y acabaron antes del 31/12 son trayectos en su totalidad fuera del periodo de este dataset. Y es razonable pensar que hay 36 viajes que comenzaron y finalizaron el 31/12, y 39 (75 menos 36) que comenzaron ese d√≠a y finalizaron el 01/01/2023.

Por otra parte, hay 30 viajes que comenzaron el 01/02/2023, 1.845 que acabaron en esa fecha, y 6 que acabaron el 02/02. En este caso, tendremos 1.815 (1.845 menos 30) viajes que acabaron el 01/02 pero no empezaron ese d√≠a.

En consecuencia, tenemos que los 39 viajes anteriores al 31/12/2022 y los 75 viajes que comenzaron el 31/12 no deber√≠an pertenecer a este dataset, referido s√≥lo a viajes de Enero de 2023, y que los 30 viajes que comenzaron el 01/02/2023 y los 6 que finalizaron el 02/02/2023 tampoco deber√≠an pertenecer a este dataset.

Vamos a eliminar estos registros.

In [24]:
total_original = ds.count()

# fechas a eliminar
fechas_eliminar_inicio = ['2008-12-31', '2022-10-24', '2022-10-25', '2022-12-31', '2023-02-01']
fechas_eliminar_final = ['2023-02-02']

# convierte a string formato YYYY-MM-DD para comparaci√≥n consistente
ds_filtered_dates = ds.withColumn("fecha_inicio_str", date_format(to_date(col(col_fecha_inicio)), "yyyy-MM-dd")) \
                      .withColumn("fecha_final_str", date_format(to_date(col(col_fecha_final)), "yyyy-MM-dd")) \
                      .filter(~col("fecha_inicio_str").isin(fechas_eliminar_inicio)) \
                      .filter(~col("fecha_final_str").isin(fechas_eliminar_final)) \
                      .drop("fecha_inicio_str", "fecha_final_str")

# contabilizaci√≥n final
total_limpio = ds_filtered_dates.count()
diferencia = total_original - total_limpio

# muestra resultados
print(f"Total de registros (con fechas fuera de rango): {total_original:,}")
print(f"Total de registros (sin fechas fuera de rango): {total_limpio:,}")
print(f"Registros eliminados: {diferencia:,} ({(diferencia / total_original) * 100:.2f}%)")

Total de registros (con fechas fuera de rango): 3,066,766
Total de registros (sin fechas fuera de rango): 3,066,716
Registros eliminados: 50 (0.00%)


Comprobaci√≥n:

In [25]:
analyze_date_ranges(ds_filtered_dates, col_fecha_inicio, start_date, end_date, show_outside_dates=True)

Total registros: 3,066,716
Antes de 2023-01-01: 0 (0.00%)
Despu√©s de 2023-01-31: 0 (0.00%)


In [26]:
analyze_date_ranges(ds_filtered_dates, col_fecha_final, start_date, end_date, show_outside_dates=True)

Total registros: 3,066,716
Antes de 2023-01-01: 0 (0.00%)
Despu√©s de 2023-01-31: 605 (0.02%)

Distribuci√≥n diaria despu√©s de 2023-01-31:
+----------+---------+
|date_only |registros|
+----------+---------+
|2023-02-01|605      |
+----------+---------+



No limpiamos todav√≠a la memoria de 'ds' porque usaremos este dataset m√°s adelante.

# **2) Valores a cero y negativos**

Tenemos variables con valores a cero y/o negativos:

- passenger_count: no deber√≠a tener cero pasajeros
- trip_distance: no deber√≠a tener una distancia igual a cero
- payment_type: el tipo de pago podr√≠a ser 0 (es un c√≥digo, no un valor num√©rico)
- fare_amount: importe base de la tarifa del viaje, no deber√≠a valer cero ni tener valores negativos
- extra: cargos adicionales, puede valer cero pero no tener valores negativos
- mta_tax: impuesto obligatorio, no deber√≠a tener valores a cero ni tener valores negativos
- tip_amount: propina, puede tener valores a cero pero no tener valores negativos
- tolls_amount, peajes, puede tener valores a cero pero no tener valores negativos
- improvement_surcharge: recargos por mejoras, puede tener valores a cero pero no tener valores negativos
- total_amount: pago total del cliente, no deber√≠a tener valores a cero ni tener valores negativos
- congestion_surcharge: recargo por congesti√≥n de tr√°fico, puede tener valores a cero pero no tener valores negativos
- airport_fee, tarifa del aeropuerto, puede tener valores a cero pero no tener valores negativos

**2.1) passenger_count**

Algo menos del 2% de valores a cero, dado que esta variable es el n√∫mero de pasajeros no deber√≠a tener valores a cero.

Los valores a cero son menos del 2%, podr√≠amos eliminar estos registros pero tambi√©n podemos mantenerlos imputando por alg√∫n otro valor.

passenger_count es una variable discreta, sus valores son n√∫meros enteros, por lo que vamos a imputar los valores a cero por la moda.

In [27]:
col_name = 'passenger_count'

ds_clean = impute_column_with_mode(ds_filtered_dates, col_name, imputar_ceros=True, imputar_nulos=False)

Moda de passenger_count: 1.0
Registros con ceros antes de la imputaci√≥n: 51,164
Registros con ceros despu√©s de la imputaci√≥n: 0


In [28]:
# comprobaci√≥n
ds_clean.filter(col(col_name)==0).select(col_name, f"{col_name}_imputed").show()

+---------------+-----------------------+
|passenger_count|passenger_count_imputed|
+---------------+-----------------------+
|            0.0|                    1.0|
|            0.0|                    1.0|
|            0.0|                    1.0|
|            0.0|                    1.0|
|            0.0|                    1.0|
|            0.0|                    1.0|
|            0.0|                    1.0|
|            0.0|                    1.0|
|            0.0|                    1.0|
|            0.0|                    1.0|
|            0.0|                    1.0|
|            0.0|                    1.0|
|            0.0|                    1.0|
|            0.0|                    1.0|
|            0.0|                    1.0|
|            0.0|                    1.0|
|            0.0|                    1.0|
|            0.0|                    1.0|
|            0.0|                    1.0|
|            0.0|                    1.0|
+---------------+-----------------

In [29]:
# limpia la memoria
if 'ds_filtered_dates' in locals():
    ds_filtered_dates.unpersist()

**2.2) trip_distance**

1,5% de valores a cero. La distancia del viaje no puede valer cero.

Podr√≠amos eliminar estos registros, pero en su lugar vamos a calcular para cada valor igual a cero la mediana de las distancias origen menos destino.

In [30]:
col_name = "trip_distance"

In [31]:
columnas = [col_name]

analyze_median_hourly_variation(ds_clean, columnas)

Variaci√≥n de la mediana de 'trip_distance':
+-----------+-----------+------------------+---------------+------------------------------+
|mediana_min|mediana_max|  mediana_promedio|rango_variacion|factor_variacion_tanto_por_uno|
+-----------+-----------+------------------+---------------+------------------------------+
|       1.62|        2.7|1.9395833333333332|           1.08|            0.5568206229860366|
+-----------+-----------+------------------+---------------+------------------------------+



Tenemos un factor de variaci√≥n del 55,7% sobre el trayecto promedio del viaje en funci√≥n de la hora. Es una variaci√≥n grande, por lo que debemos tener en cuenta, adem√°s del origen y destino del viaje, la hora a la que se ha iniciado.

In [32]:
# zona de origen, zona de destino y hora de inicio del viaje
claves_agrupacion = ["PULocationID", "DOLocationID", "pickup_hour"]

valores_a_imputar = "ceros"

ds_clean = impute_columns_with_median(ds_clean, columnas, claves_agrupacion, valores_a_imputar)


Imputaci√≥n de trip_distance
Registros con trip_distance a imputar (ceros): 45856
Imputaciones realizadas: 45856
Imputaciones usando mediana global: 5296
Tasa de √©xito: 100.00%


In [33]:
# comprobaci√≥n
columnas = [col_name, f"{col_name}_imputed"]

counting_columns_values(ds_clean, columnas, contar_zeros=True)

Variable 'trip_distance':
   Valores a cero: 45,856 (1.50%)
Variable 'trip_distance_imputed':
   Valores a cero: 0 (0.00%)


**2.3) variables de recargos en el precio**

Son cargos adicionales que no deber√≠an tener valores negativos.

- extra: menos del 1% de valores negativos.

- tip_amount: menos del 1% de valores negativos.

- tolls_amount: menos del 1% de valores negativos.

- improvement_surcharge: menos del 1% de valores negativos.

- congestion_surcharge: menos del 1% de valores negativos.

- airport_fee: menos del 1% de valores negativos.

Estas diferentes variables que hacen referencia a recargos en el precio del viaje y que contienen valores negativos suelen depender de las zonas de origen y destino del viaje. Por ello, podemos imputar estos valores negativos por la mediana del grupo formado la zona de origen y la de destino.

Vamos a ver a cu√°les de estas variables les afecta tambi√©n la hora del trayecto.

In [34]:
columnas = [
    "extra",
    "tip_amount",
    "tolls_amount",
    "improvement_surcharge",
    "congestion_surcharge",
    "airport_fee"
]

analyze_median_hourly_variation(ds_clean, columnas)

Variaci√≥n de la mediana de 'extra':
+-----------+-----------+------------------+---------------+------------------------------+
|mediana_min|mediana_max|  mediana_promedio|rango_variacion|factor_variacion_tanto_por_uno|
+-----------+-----------+------------------+---------------+------------------------------+
|        0.0|        2.5|0.8333333333333334|            2.5|                           3.0|
+-----------+-----------+------------------+---------------+------------------------------+

Variaci√≥n de la mediana de 'tip_amount':
+-----------+-----------+------------------+---------------+------------------------------+
|mediana_min|mediana_max|  mediana_promedio|rango_variacion|factor_variacion_tanto_por_uno|
+-----------+-----------+------------------+---------------+------------------------------+
|       2.02|        3.0|2.6704166666666667|           0.98|            0.3669839288500546|
+-----------+-----------+------------------+---------------+------------------------------+


- extra: factor de variaci√≥n sobre la mediana promedio del 300%, la hora influye de forma muy fuerte sobre esta variable.

- tip_amount: variaci√≥n del 36,7%, influencia moderada de la hora.

- tolls_amount: no hay variaci√≥n, la hora no influye.

- improvement_surcharge: no hay variaci√≥n con la hora.

- congestion_surcharge: no hay variaci√≥n con la hora.

- airport_fee: no hay variaci√≥n con la hora.

In [35]:
# imputaci√≥n de columnas con influencia de la hora
columnas_a_imputar = [
    "extra",
    "tip_amount"
]

claves_agrupacion = ["PULocationID", "DOLocationID", "pickup_hour"]
valores_a_imputar = "negativos"

ds_clean = impute_columns_with_median(ds_clean, columnas_a_imputar, claves_agrupacion, valores_a_imputar)


Imputaci√≥n de extra
Registros con extra a imputar (negativos): 12407
Imputaciones realizadas: 12407
Imputaciones usando mediana global: 60
Tasa de √©xito: 100.00%

Imputaci√≥n de tip_amount
Registros con tip_amount a imputar (negativos): 225
Imputaciones realizadas: 225
Imputaciones usando mediana global: 0
Tasa de √©xito: 100.00%


In [36]:
# comprobaci√≥n
columnas_a_imputar = [
    "extra_imputed",
    "tip_amount_imputed"
]

counting_columns_values(ds_clean, columnas_a_imputar, contar_negativos=True)

Variable 'extra_imputed':
   Valores negativos: 0 (0.00%)
Variable 'tip_amount_imputed':
   Valores negativos: 0 (0.00%)


In [37]:
# imputaci√≥n de columnas sin influencia de la hora
columnas_a_imputar = [
    "tolls_amount",
    "improvement_surcharge",
    "congestion_surcharge",
    "airport_fee"
]

claves_agrupacion = ["PULocationID", "DOLocationID"]
valores_a_imputar = "negativos"

ds_clean = impute_columns_with_median(ds_clean, columnas_a_imputar, claves_agrupacion, valores_a_imputar)


Imputaci√≥n de tolls_amount
Registros con tolls_amount a imputar (negativos): 1377
Imputaciones realizadas: 1377
Imputaciones usando mediana global: 0
Tasa de √©xito: 100.00%

Imputaci√≥n de improvement_surcharge
Registros con improvement_surcharge a imputar (negativos): 25153
Imputaciones realizadas: 25153
Imputaciones usando mediana global: 11
Tasa de √©xito: 100.00%

Imputaci√≥n de congestion_surcharge
Registros con congestion_surcharge a imputar (negativos): 19718
Imputaciones realizadas: 19718
Imputaciones usando mediana global: 3
Tasa de √©xito: 100.00%

Imputaci√≥n de airport_fee
Registros con airport_fee a imputar (negativos): 3607
Imputaciones realizadas: 3607
Imputaciones usando mediana global: 3
Tasa de √©xito: 100.00%


In [38]:
# comprobaci√≥n
columnas_a_imputar = [
    "tolls_amount_imputed",
    "improvement_surcharge_imputed",
    "congestion_surcharge_imputed",
    "airport_fee_imputed"
]

counting_columns_values(ds_clean, columnas_a_imputar, contar_negativos=True)

Variable 'tolls_amount_imputed':
   Valores negativos: 0 (0.00%)
Variable 'improvement_surcharge_imputed':
   Valores negativos: 0 (0.00%)
Variable 'congestion_surcharge_imputed':
   Valores negativos: 0 (0.00%)
Variable 'airport_fee_imputed':
   Valores negativos: 0 (0.00%)


**2.4) mta_tax**

Menos del 1% de valores a cero y menos del 1% de valores negativos. Es el impuesto obligatorio, no deber√≠a tener valores a cero ni tener valores negativos.

Hemos visto que los valores de mta_tax est√°n concentrados en +0,5 (98,41%). Por tanto, vamos a sustituir los valores a cero y negativos por +0,5

In [39]:
col_name = "mta_tax"

ds_clean = ds_clean.withColumn(
    f"{col_name}_imputed",
    when(col(col_name) <= 0, 0.5).otherwise(col(col_name))
)

In [40]:
# comprobaci√≥n
columnas = [col_name, f"{col_name}_imputed"]

counting_columns_values(ds_clean, columnas, contar_zeros=True, contar_negativos=True)

Variable 'mta_tax':
   Valores a cero: 23,415 (0.76%)
   Valores negativos: 24,501 (0.80%)
Variable 'mta_tax_imputed':
   Valores a cero: 0 (0.00%)
   Valores negativos: 0 (0.00%)


**2.5) fare_amount**

Menos del 1% de valores a cero y menos del 1% de valores negativos. Es el importe base de la tarifa del viaje, no deber√≠a valer cero ni tener valores negativos.

Como en las variables anteriores, vamos a estudiar la dependencia con la hora, e imputar con el grupo zona de origen, zona de destino y en su caso la hora.

In [41]:
col_name = "fare_amount"

columnas_a_imputar = [col_name]

analyze_median_hourly_variation(ds_clean, columnas_a_imputar)

Variaci√≥n de la mediana de 'fare_amount':
+-----------+-----------+------------------+------------------+------------------------------+
|mediana_min|mediana_max|  mediana_promedio|   rango_variacion|factor_variacion_tanto_por_uno|
+-----------+-----------+------------------+------------------+------------------------------+
|       12.1|       14.9|13.041250000000003|2.8000000000000007|           0.21470334515479728|
+-----------+-----------+------------------+------------------+------------------------------+



Tenemos una variaci√≥n moderada con la hora (21,5%). Vamos a considerar la hora al realizar la imputaci√≥n.

In [42]:
# imputaci√≥n de columnas con influencia de la hora
claves_agrupacion = ["PULocationID", "DOLocationID", "pickup_hour"]
valores_a_imputar = "ambos"

ds_clean = impute_columns_with_median(ds_clean, columnas_a_imputar, claves_agrupacion, valores_a_imputar)


Imputaci√≥n de fare_amount
Registros con fare_amount a imputar (ambos): 26158
Imputaciones realizadas: 26158
Imputaciones usando mediana global: 142
Tasa de √©xito: 100.00%


In [43]:
# comprobaci√≥n
columnas = [col_name, f"{col_name}_imputed"]

counting_columns_values(ds_clean, columnas, contar_zeros=True, contar_negativos=True)

Variable 'fare_amount':
   Valores a cero: 1,109 (0.04%)
   Valores negativos: 25,049 (0.82%)
Variable 'fare_amount_imputed':
   Valores a cero: 0 (0.00%)
   Valores negativos: 0 (0.00%)


**2.6) total_amount**

Menos del 1% de valores a cero y menos del 1% de valores negativos. Pago total del cliente, no deber√≠a tener valores a cero ni tener valores negativos.

Al ser el pago total del cliente, vamos a imputar los valores a cero y negativos por la suma de las variables que componen el precio total del viaje: fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, congestion_surcharge y airport_fee.

Hay que tener en cuenta que en estas variables hemos imputado valores anteriormente, para el c√°lculo de total_amount debemos usar las variables imputadas.

In [44]:
# reconstruye el importe total del trayecto en la nueva variable total_reconstructed
ds_reconstructed = ds_clean.withColumn(
    "total_reconstructed",
    coalesce(
        col("fare_amount_imputed") +
        coalesce(col("extra_imputed"), lit(0)) +
        coalesce(col("mta_tax_imputed"), lit(0)) +
        coalesce(col("tip_amount_imputed"), lit(0)) +
        coalesce(col("tolls_amount_imputed"), lit(0)) +
        coalesce(col("improvement_surcharge_imputed"), lit(0)) +
        coalesce(col("congestion_surcharge_imputed"), lit(0)) +
        coalesce(col("airport_fee_imputed"), lit(0)),
        col("total_amount")
    )
)

In [45]:
# compara el total original y el reconstruido
comparison = ds_reconstructed.select(

    # total de registros
    count("*").alias("total"),

    # coincidencia entre el original y el reconstruido
    spark_sum(when((col("total_amount") > 0) &
                 (expr("abs(total_amount - total_reconstructed)") < 0.01), 1)
          .otherwise(0)).alias("matches_perfectos"),

    # cuenta registros con discrepancia superior a 0.01
    spark_sum(when(expr("abs(total_amount - total_reconstructed)") > 0.01, 1)
              .otherwise(0)).alias("registros_con_discrepancia"),

    # promedio de las diferencias (solo en registros con discrepancia)
    avg(when(expr("abs(total_amount - total_reconstructed)") > 0.01,
             expr("abs(total_amount - total_reconstructed)"))).alias("diferencia_en_discrepancias"),

    # porcentaje de discrepancias sobre el total
    (
        spark_sum(when(expr("abs(total_amount - total_reconstructed)") > 0.01, 1)
                  .otherwise(0)) * 100.0 / count("*")
    ).alias("porcentaje_de_discrepancias")

)

# muestra la comparaci√≥n obtenida
comparison.show()

+-------+-----------------+--------------------------+---------------------------+---------------------------+
|  total|matches_perfectos|registros_con_discrepancia|diferencia_en_discrepancias|porcentaje_de_discrepancias|
+-------+-----------------+--------------------------+---------------------------+---------------------------+
|3066716|          2188578|                    878138|          3.696118423300164|         28.634474141068164|
+-------+-----------------+--------------------------+---------------------------+---------------------------+



Menos de un 30% de discrepancias entre el importe total y la suma de los cargos que conforman el importe total, aunque la discrepancia media no llega a $4,00

In [46]:
# aplica la imputaci√≥n a los valores cero o negativos en la nueva variable total_amount_imputed
ds_imputed = ds_reconstructed.withColumn(
    "total_amount_imputed",
    when(col("total_amount") <= 0, col("total_reconstructed"))
    .otherwise(col("total_amount"))
)

In [47]:
# comprobaci√≥n
col_name = "total_amount"

columnas = [col_name, f"{col_name}_imputed"]

counting_columns_values(ds_imputed, columnas, contar_zeros=True, contar_negativos=True)

Variable 'total_amount':
   Valores a cero: 567 (0.02%)
   Valores negativos: 25,204 (0.82%)
Variable 'total_amount_imputed':
   Valores a cero: 0 (0.00%)
   Valores negativos: 0 (0.00%)


In [48]:
# limpia la memoria
if 'ds_clean' in locals():
    ds_clean.unpersist()
if 'ds_reconstructed' in locals():
    ds_reconstructed.unpersist()

# **3) Valores nulos**

Vimos que no hay registros con todos sus valores nulos, pero que ten√≠amos 5 variables con el mismo n√∫mero de valores nulos, lo que puede indicar que est√°n todos los valores nulos en los mismos registros. Vamos a comprobarlo.

In [49]:
ds_imputed.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'airport_fee',
 'ingestion_timestamp',
 'source_file',
 'passenger_count_imputed',
 'pickup_hour',
 'trip_distance_imputed',
 'extra_imputed',
 'tip_amount_imputed',
 'tolls_amount_imputed',
 'improvement_surcharge_imputed',
 'congestion_surcharge_imputed',
 'airport_fee_imputed',
 'mta_tax_imputed',
 'fare_amount_imputed',
 'total_reconstructed',
 'total_amount_imputed']

In [50]:
# variables con valores nulos
columnas_a_verificar = [
    'passenger_count_imputed',
    'RatecodeID',
    'store_and_fwd_flag',
    'congestion_surcharge_imputed',
    'airport_fee_imputed'
]

# contabiliza registros donde las 5 columnas son todas nulas
condicion_todas_nulas = None
for c in columnas_a_verificar:
    if condicion_todas_nulas is None:
        condicion_todas_nulas = col(c).isNull()
    else:
        condicion_todas_nulas &= col(c).isNull()
registros_todas_nulas = ds_imputed.filter(condicion_todas_nulas).count()

print(f"Registros donde las 5 columnas son TODAS nulas: {registros_todas_nulas}")

# contabiliza registros donde al menos una columna es nula
condicion_alguna_nula = None
for c in columnas_a_verificar:
    if condicion_alguna_nula is None:
        condicion_alguna_nula = col(c).isNull()
    else:
        condicion_alguna_nula |= col(c).isNull()
registros_alguna_nula = ds_imputed.filter(condicion_alguna_nula).count()

print(f"Registros donde AL MENOS una columna es nula: {registros_alguna_nula}")

# verificaci√≥n
if registros_todas_nulas == registros_alguna_nula:
    print("\n‚úÖ CONFIRMADO: Los valores nulos est√°n en los MISMOS registros")
    print(f"   Hay {registros_todas_nulas} registros donde las 5 columnas son nulas simult√°neamente")
else:
    print("\n‚ùå Los valores nulos NO est√°n en los mismos registros")
    print(f"   - Registros con todas nulas: {registros_todas_nulas}")
    print(f"   - Registros con al menos una nula: {registros_alguna_nula}")

Registros donde las 5 columnas son TODAS nulas: 71743
Registros donde AL MENOS una columna es nula: 71743

‚úÖ CONFIRMADO: Los valores nulos est√°n en los MISMOS registros
   Hay 71743 registros donde las 5 columnas son nulas simult√°neamente


In [51]:
# porcentaje de valores nulos
nulos = ds_imputed.select(
    (count(when(col("passenger_count_imputed").isNull(), True)) / count("*") * 100).alias("porcentaje_nulos")
)

nulos.show()

+------------------+
|  porcentaje_nulos|
+------------------+
|2.3394080182188373|
+------------------+



El porcentaje de registros con las 5 variables nulas es inferior al 3%. Podr√≠amos eliminar todos estos registros, pero para no perder informaci√≥n de otras variables s√≠ v√°lidas de dichos registros, vamos a imputar con la media o la mediana, seg√∫n el caso.

**3.1) passenger_count**

Variable num√©rica discreta: imputamos con la moda.

In [52]:
col_name = "passenger_count_imputed"

ds_imputed = impute_column_with_mode(ds_imputed, col_name, imputar_ceros=False, imputar_nulos=True)

Moda de passenger_count_imputed: 1.0
Registros con nulos antes de la imputaci√≥n: 71,743
Registros con nulos despu√©s de la imputaci√≥n: 0


**3.2) RatecodeID**

Variable num√©rica discreta: imputamos con la moda.

In [53]:
col_name = "RatecodeID"

ds_imputed = impute_column_with_mode(ds_imputed, col_name, imputar_ceros=False, imputar_nulos=True)

Moda de RatecodeID: 1.0
Registros con nulos antes de la imputaci√≥n: 71,743
Registros con nulos despu√©s de la imputaci√≥n: 0


**3.3) store_and_fwd_flag**

Variable categ√≥rica: imputamos con la moda.

In [54]:
col_name = "store_and_fwd_flag"

ds_imputed = impute_column_with_mode(ds_imputed, col_name, imputar_ceros=False, imputar_nulos=True)

Moda de store_and_fwd_flag: N
Registros con nulos antes de la imputaci√≥n: 71,743
Registros con nulos despu√©s de la imputaci√≥n: 0


**3.4) congestion_surcharge, airport_fee**

Variables continuas: imputamos con la mediana.

In [55]:
columnas_a_imputar = [
    "congestion_surcharge_imputed",
    "airport_fee_imputed"
]

Comprobamos la variaci√≥n con la hora.

In [56]:
analyze_median_hourly_variation(ds_imputed, columnas_a_imputar)

Variaci√≥n de la mediana de 'congestion_surcharge_imputed':
+-----------+-----------+----------------+---------------+------------------------------+
|mediana_min|mediana_max|mediana_promedio|rango_variacion|factor_variacion_tanto_por_uno|
+-----------+-----------+----------------+---------------+------------------------------+
|        2.5|        2.5|             2.5|            0.0|                           0.0|
+-----------+-----------+----------------+---------------+------------------------------+

Variaci√≥n de la mediana de 'airport_fee_imputed':
+-----------+-----------+----------------+---------------+------------------------------+
|mediana_min|mediana_max|mediana_promedio|rango_variacion|factor_variacion_tanto_por_uno|
+-----------+-----------+----------------+---------------+------------------------------+
|        0.0|        0.0|             0.0|            0.0|                          NULL|
+-----------+-----------+----------------+---------------+--------------------

Ninguna de las dos variables tiene influencia horaria.

In [57]:
claves_agrupacion = ["PULocationID", "DOLocationID"]
valores_a_imputar = "nulos"

ds_imputed = impute_columns_with_median(ds_imputed, columnas_a_imputar, claves_agrupacion, valores_a_imputar)


Imputaci√≥n de congestion_surcharge_imputed
Registros con congestion_surcharge_imputed a imputar (nulos): 71743
Imputaciones realizadas: 71743
Imputaciones usando mediana global: 1039
Tasa de √©xito: 100.00%

Imputaci√≥n de airport_fee_imputed
Registros con airport_fee_imputed a imputar (nulos): 71743
Imputaciones realizadas: 71743
Imputaciones usando mediana global: 1039
Tasa de √©xito: 100.00%


**3.5) Comprobaci√≥n**

Verificamos que no queden valores nulos en el dataset resultante.

In [58]:
print("=== VALORES NULOS POR VARIABLE ===")

total_records = ds_imputed.count()

if total_records == 0:
    print("El dataframe est√° vac√≠o.")
else:
    # suma nulos por columna en un solo paso
    agg_exprs = [
        spark_sum(col(c).isNull().cast("int")).alias(c)
        for c in ds_imputed.columns
    ]
    null_counts_row = ds_imputed.agg(*agg_exprs).collect()[0]

    null_counts = []
    for col_name in ds_imputed.columns:
        null_count = null_counts_row[col_name]
        null_percentage = (null_count / total_records) * 100

        # diccionario nombre columna, n√∫mero nulos y porcentaje nulos
        null_counts.append({
            'column': col_name,
            'null_count': null_count,
            'null_percentage': round(null_percentage, 2)
        })

    # convierte la lista de resultados en Dataframe de Pandas
    null_ds = pd.DataFrame(null_counts)

    # muestra los resultados
    print(null_ds.to_string(index=False))

=== VALORES NULOS POR VARIABLE ===
                         column  null_count  null_percentage
                       VendorID           0             0.00
           tpep_pickup_datetime           0             0.00
          tpep_dropoff_datetime           0             0.00
                passenger_count       71743             2.34
                  trip_distance           0             0.00
                     RatecodeID       71743             2.34
             store_and_fwd_flag       71743             2.34
                   PULocationID           0             0.00
                   DOLocationID           0             0.00
                   payment_type           0             0.00
                    fare_amount           0             0.00
                          extra           0             0.00
                        mta_tax           0             0.00
                     tip_amount           0             0.00
                   tolls_amount           0       

Valores nulos eliminados de las variables: passenger_count_imputed, RatecodeID_imputed, store_and_fwd_flag_imputed, congestion_surcharge_imputed_v2 y airport_fee_imputed_v2.

# **4) Valores extremos**

In [59]:
ds_imputed.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'airport_fee',
 'ingestion_timestamp',
 'source_file',
 'passenger_count_imputed',
 'pickup_hour',
 'trip_distance_imputed',
 'extra_imputed',
 'tip_amount_imputed',
 'tolls_amount_imputed',
 'improvement_surcharge_imputed',
 'congestion_surcharge_imputed',
 'airport_fee_imputed',
 'mta_tax_imputed',
 'fare_amount_imputed',
 'total_reconstructed',
 'total_amount_imputed',
 'RatecodeID_imputed',
 'store_and_fwd_flag_imputed',
 'congestion_surcharge_imputed_v2',
 'airport_fee_imputed_v2']

Variables en las que hemos encontrado outliers:

- passenger_count
- trip_distance
- RatecodeID
- payment_type
- fare_amount
- extra
- mta_tax
- tip_amount
- tolls_amount
- improvement_surcharge
- total_amount
- congestion_surcharge
- airport_fee

RatecodeID y payment_type son c√≥digos de tarifa y de pago; sus valores extremos son v√°lidos. En el caso de RatecodeID hay un valor, 99, muy separado del resto, pero al ser c√≥digos puede corresponder a un c√≥digo v√°lido.

Para el resto de variables los outliers son valores v√°lidos.

El caso de mta_tax es diferente: en principio sus outliers son v√°lidos, pero tiene tres valores extremos muy por encima del resto: 53,16 cuando el valor inmediatamente anterior es 4,0. mta_tax es el impuesto obligatorio, no parece que 53,16 vaya a ser un valor v√°lido de impuesto. Vamos a estudiarlo por separado.

**4.1) mta_tax**

In [60]:
ds_imputed.filter(col("mta_tax") == 53.16) \
    .select(
        "mta_tax_imputed",
        "total_amount_imputed",
        "fare_amount_imputed",
        "extra_imputed",
        "tip_amount_imputed",
        "tolls_amount_imputed",
        "improvement_surcharge_imputed",
        "congestion_surcharge_imputed_v2",
        "airport_fee_imputed_v2"
    ).show(truncate=False)

+---------------+--------------------+-------------------+-------------+------------------+--------------------+-----------------------------+-------------------------------+----------------------+
|mta_tax_imputed|total_amount_imputed|fare_amount_imputed|extra_imputed|tip_amount_imputed|tolls_amount_imputed|improvement_surcharge_imputed|congestion_surcharge_imputed_v2|airport_fee_imputed_v2|
+---------------+--------------------+-------------------+-------------+------------------+--------------------+-----------------------------+-------------------------------+----------------------+
|53.16          |142.87              |86.01              |3.7          |0.0               |0.0                 |0.0                          |2.5                            |0.0                   |
+---------------+--------------------+-------------------+-------------+------------------+--------------------+-----------------------------+-------------------------------+----------------------+



En los 3 casos, la suma de los diferentes cargos excluyendo mta_tax es de 92,21; un impuesto de 53,16 parece exesivo, apunta a un error. Vamos a confirmar estos datos en el dataset original.

In [61]:
ds.filter(col("mta_tax") == 53.16) \
    .select(
        "mta_tax",
        "total_amount",
        "fare_amount",
        "extra",
        "tip_amount",
        "tolls_amount",
        "improvement_surcharge",
        "congestion_surcharge",
        "airport_fee"
    ).show(truncate=False)

+-------+------------+-----------+-----+----------+------------+---------------------+--------------------+-----------+
|mta_tax|total_amount|fare_amount|extra|tip_amount|tolls_amount|improvement_surcharge|congestion_surcharge|airport_fee|
+-------+------------+-----------+-----+----------+------------+---------------------+--------------------+-----------+
|53.16  |142.87      |86.01      |3.7  |0.0       |0.0         |0.0                  |2.5                 |0.0        |
+-------+------------+-----------+-----+----------+------------+---------------------+--------------------+-----------+



Los datos de los tres registros provienen del dataset original, no han sido modificados durante este an√°lisis de datos.

El que mta_tax tenga un valor aparentemente err√≥neo y que total_amount tenga un valor muy superior a la suma del resto de cargos excluido mta_tax (142,87 frente a 92,21) confirma que estos registros tienen valores err√≥neos.

Por ello, vamos a eliminar estos 3 registros.

In [62]:
ds_filtered_outliers = ds_imputed.filter(col("mta_tax") != 53.16)

# comprobaci√≥n
ds_filtered_outliers.filter(col("mta_tax") == 53.16) \
    .select(
        "mta_tax_imputed"
    ).show(truncate=False)

+---------------+
|mta_tax_imputed|
+---------------+
+---------------+



In [63]:
# limpia la memoria
if 'ds_imputed' in locals():
    ds_imputed.unpersist()

# **Guarda los datos**

In [64]:
dir_dataset = f"{SILVER_DIR}/dataset_202301_filtered_outliers_251109"

In [65]:
# guarda dataframe en formato Parquet
save_parquet(ds_filtered_outliers, dir_dataset)

üíæ Guardando DataFrame en /content/drive/MyDrive/taxi_project/silver/dataset_202301_filtered_outliers_251109...
üìä Registros a guardar: 3066715
‚úÖ Dataframe guardado correctamente en /content/drive/MyDrive/taxi_project/silver/dataset_202301_filtered_outliers_251109


True

In [66]:
# limpia la memoria
ds.unpersist()
ds_filtered_outliers.unpersist()

DataFrame[VendorID: bigint, tpep_pickup_datetime: timestamp_ntz, tpep_dropoff_datetime: timestamp_ntz, passenger_count: double, trip_distance: double, RatecodeID: double, store_and_fwd_flag: string, PULocationID: bigint, DOLocationID: bigint, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, airport_fee: double, ingestion_timestamp: timestamp, source_file: string, passenger_count_imputed: double, pickup_hour: int, trip_distance_imputed: double, extra_imputed: double, tip_amount_imputed: double, tolls_amount_imputed: double, improvement_surcharge_imputed: double, congestion_surcharge_imputed: double, airport_fee_imputed: double, mta_tax_imputed: double, fare_amount_imputed: double, total_reconstructed: double, total_amount_imputed: double, RatecodeID_imputed: double, store_and_fwd_flag_imputed: string, congestion_surcharge_imputed_v2: doub

In [67]:
# cierra Spark
spark.stop()
print("üîå Sesi√≥n Spark cerrada")

üîå Sesi√≥n Spark cerrada
