# Práctica 1 — Dataset 1: Incidentes policiales de San Francisco (Bronze ➜ Silver)

**Asignatura:** PMDV (Curso 2025-2026)  
**Objetivo:** Leer el CSV de incidentes desde la capa **bronze** (MinIO), limpiar/transformar con **PySpark DataFrames** y persistir en **silver** en formato **Parquet**.

Este notebook sigue *paso a paso* el enunciado de la práctica (Dataset 1).

## Requisitos previos (manual)

1. Descarga desde **Moodle**: `sf_police_incidents_2018_to_2025_1208.zip`
2. Descomprime el ZIP y localiza el fichero:
   - `Police_Department_Incident_Reports__2018_to_Present_20251208.csv`
3. Sube **el CSV** al bucket **bronze** de MinIO, dentro de la carpeta/prefijo:

   - `sf_police_incidents/`

4. Verifica que puedes leer desde Spark con `s3a://bronze/...` y escribir en `s3a://silver/...`.

> Importante: la lectura debe hacerse **a nivel de carpeta**, no apuntando al CSV individual.

In [None]:
# ============================================================
# 0) Imports + SparkSession
# ============================================================
from pyspark.sql import SparkSession, functions as F, types as T
import os, re, unicodedata

spark = SparkSession.builder.getOrCreate()

# Recomendación: fija la zona horaria de la sesión (evita sorpresas con timestamps)
spark.conf.set("spark.sql.session.timeZone", "UTC")

print("Spark version:", spark.version)

In [None]:
# ============================================================
# 0.1) Configuración de acceso a MinIO (S3A)
# ============================================================
# Si tu entorno ya viene configurado (por ejemplo via spark-defaults.conf),
# esta celda puede ser redundante. Aun así, es útil como "recordatorio" y para local.
#
# Ajusta estos valores si tu despliegue usa otros nombres/puertos.
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "http://minio:9000")
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "minioadmin")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "minioadmin")
MINIO_USE_SSL = os.getenv("MINIO_USE_SSL", "false").lower() == "true"

hconf = spark._jsc.hadoopConfiguration()
hconf.set("fs.s3a.endpoint", MINIO_ENDPOINT)
hconf.set("fs.s3a.access.key", MINIO_ACCESS_KEY)
hconf.set("fs.s3a.secret.key", MINIO_SECRET_KEY)
hconf.set("fs.s3a.path.style.access", "true")
hconf.set("fs.s3a.connection.ssl.enabled", "true" if MINIO_USE_SSL else "false")
hconf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
hconf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

print("MINIO_ENDPOINT =", MINIO_ENDPOINT)
print("MINIO_ACCESS_KEY =", MINIO_ACCESS_KEY)

## 1) Lectura desde Bronze (sin inferir esquema)

- Se lee el dataset desde la carpeta en **bronze**.
- No se infiere el esquema (`inferSchema = false`), por lo que inicialmente las columnas vendrán como `string`.

In [None]:
BRONZE_PATH = "s3a://bronze/sf_police_incidents/"

df_raw = (
    spark.read
        .option("header", True)
        .option("inferSchema", "false")
        .csv(BRONZE_PATH)
)

df_raw.printSchema()
df_raw.show(5, truncate=False)

## 2) Selección de columnas requeridas

Seleccionamos **solo** las columnas pedidas en el enunciado.

In [None]:
required_cols = [
    "Incident ID",
    "Incident Number",
    "Row ID",
    "Incident Datetime",
    "Report Datetime",
    "Incident Code",
    "Incident Category",
    "Incident Subcategory",
    "Incident Description",
    "Report Type Code",
    "Resolution",
    "Police District",
    "Analysis Neighborhood",
    "Latitude",
    "Longitude",
]

df_selected = df_raw.select(*required_cols)

df_selected.printSchema()
df_selected.show(5, truncate=False)

## 3) Enfoque alternativo: eliminar columnas que no necesitas (sin usar `select`)

En el paso 2 hemos usado `select(...)` para quedarnos con las columnas necesarias.

Una alternativa equivalente es **eliminar** las columnas sobrantes con `drop(...)`.
Esto es útil cuando tienes claro qué columnas **no** quieres (por ejemplo, una lista larga)
o cuando quieres mantener el orden original del dataset, pero sin campos innecesarios.

Documentación oficial (Spark 4.0.1):  
- `pyspark.sql.DataFrame.drop`: https://spark.apache.org/docs/4.0.1/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.drop.html

In [None]:
cols_to_drop = [c for c in df_raw.columns if c not in required_cols]

df_alt = df_raw.drop(*cols_to_drop)

print("Mismas columnas (como conjunto):", set(df_alt.columns) == set(df_selected.columns))
print("Nº columnas df_selected:", len(df_selected.columns))
print("Nº columnas df_alt     :", len(df_alt.columns))

df_alt.show(5, truncate=False)

## 4) Renombrado a `snake_case`

Reglas:
- minúsculas
- palabras separadas por `_`
- sin espacios ni caracteres especiales

Ejemplo: `Incident Datetime` → `incident_datetime`

In [None]:
def to_snake_case(name: str) -> str:
    # Normaliza acentos y caracteres raros
    name = unicodedata.normalize("NFKD", name).encode("ascii", "ignore").decode("ascii")
    # Sustituye no-alfanumérico por espacio
    name = re.sub(r"[^0-9a-zA-Z]+", " ", name)
    # Separa camel case (por si acaso) y colapsa espacios
    name = re.sub(r"([a-z0-9])([A-Z])", r"\1 \2", name)
    name = re.sub(r"\s+", " ", name).strip().lower()
    # Une con underscore
    return name.replace(" ", "_")

snake_cols = [to_snake_case(c) for c in df_selected.columns]
df = df_selected.toDF(*snake_cols)

df.printSchema()
df.show(5, truncate=False)

## 5) Convertir `incident_datetime` y `report_datetime` a `timestamp`

Como el CSV puede venir con formatos distintos (dependiendo de la exportación),
usamos un parseo “robusto” intentando varios patrones y quedándonos con el primero que funcione.

In [None]:
def parse_timestamp(col):
    # Intenta varios formatos comunes (ISO 8601, con/sin milisegundos, con/sin TZ)
    return F.coalesce(
        F.to_timestamp(col, "yyyy-MM-dd'T'HH:mm:ss.SSSX"),
        F.to_timestamp(col, "yyyy-MM-dd'T'HH:mm:ss.SSS"),
        F.to_timestamp(col, "yyyy-MM-dd'T'HH:mm:ssX"),
        F.to_timestamp(col, "yyyy-MM-dd'T'HH:mm:ss"),
        F.to_timestamp(col, "yyyy-MM-dd HH:mm:ss"),
        F.to_timestamp(col)  # fallback: formato por defecto
    )

df = (
    df.withColumn("incident_datetime", parse_timestamp(F.col("incident_datetime")))
      .withColumn("report_datetime", parse_timestamp(F.col("report_datetime")))
)

df.printSchema()
df.show(5, truncate=False)

## 6) Convertir el resto de columnas numéricas a tipos adecuados

Convertimos identificadores y coordenadas a tipos numéricos soportados por Spark.

In [None]:
df = (
    df.withColumn("incident_id", F.col("incident_id").cast(T.LongType()))
      .withColumn("incident_number", F.col("incident_number").cast(T.LongType()))
      .withColumn("row_id", F.col("row_id").cast(T.LongType()))
      .withColumn("incident_code", F.col("incident_code").cast(T.IntegerType()))
      .withColumn("latitude", F.col("latitude").cast(T.DoubleType()))
      .withColumn("longitude", F.col("longitude").cast(T.DoubleType()))
)

df.printSchema()
df.show(5, truncate=False)

## 7) Crear `reporting_delay_minutes`

Diferencia en minutos entre `report_datetime` e `incident_datetime`.

In [None]:
df = df.withColumn(
    "reporting_delay_minutes",
    ((F.col("report_datetime").cast("long") - F.col("incident_datetime").cast("long")) / 60).cast("int")
)

df.select("incident_datetime", "report_datetime", "reporting_delay_minutes").show(10, truncate=False)

## 8) Crear `delay_bucket` (categoría por rango de minutos)

Rangos requeridos (valores **string**):
- `<0`
- `0_10`
- `10_60`
- `60_1440`
- `>1440`

In [None]:
delay = F.col("reporting_delay_minutes")

df = df.withColumn(
    "delay_bucket",
    F.when(delay.isNull(), F.lit(None).cast("string"))
     .when(delay < 0, F.lit("<0"))
     .when((delay >= 0) & (delay < 10), F.lit("0_10"))
     .when((delay >= 10) & (delay < 60), F.lit("10_60"))
     .when((delay >= 60) & (delay <= 1440), F.lit("60_1440"))
     .otherwise(F.lit(">1440"))
)

df.groupBy("delay_bucket").count().orderBy("delay_bucket").show(truncate=False)

## 9) Escritura en Silver (Parquet)

Persistimos el DataFrame final en:

- `silver/sf_police_incidents/`

usando formato **Parquet**.

In [None]:
SILVER_PATH = "s3a://silver/sf_police_incidents/"

(
    df.write
      .mode("overwrite")
      .parquet(SILVER_PATH)
)

print("Escritura completada en:", SILVER_PATH)

# (Opcional) Lectura de verificación
df_check = spark.read.parquet(SILVER_PATH)
print("Filas en silver:", df_check.count())
df_check.printSchema()

## 10) Preguntas analíticas

### i) ¿Cuántas filas tienen valores nulos en `latitude` o `longitude`?

In [None]:
null_latlon_rows = df.filter(F.col("latitude").isNull() | F.col("longitude").isNull()).count()
print("Filas con latitude o longitude nulos:", null_latlon_rows)

**Respuesta (completa tras ejecutar):**  
- **Resultado:** `TODO: sustituye por el número impreso arriba`  
- **Comentario:** estas filas no tienen geolocalización completa, por lo que (según el análisis posterior) pueden requerir filtrado,
  imputación o tratarse como “sin localización” en visualizaciones/mapas.

### ii) ¿Cuántos `incident_id` aparecen más de una vez en el dataset?

Ten en cuenta que un mismo incidente puede aparecer en varias filas.

In [None]:
duplicated_incident_ids = (
    df.groupBy("incident_id")
      .count()
      .filter(F.col("count") > 1)
      .count()
)

print("Número de incident_id que aparecen más de una vez:", duplicated_incident_ids)

**Respuesta (completa tras ejecutar):**  
- **Resultado:** `TODO: sustituye por el número impreso arriba`  
- **Comentario:** esto indica cuántos IDs de incidente tienen varias filas (por ejemplo, por desgloses o múltiples registros asociados).
  En análisis posteriores, conviene decidir si el “evento” es `incident_id` (agregando) o si se trabaja a nivel de fila.

### iii) ¿Cuántos incidentes tienen un retraso entre 10 y 60 minutos en su procesamiento?

Usa exclusivamente la columna `delay_bucket`.

In [None]:
# Opción A: contar filas cuyo bucket sea 10_60
rows_10_60 = df.filter(F.col("delay_bucket") == "10_60").count()

# Opción B (recomendable si quieres "incidentes" únicos): contar incident_id distintos en ese bucket
incidents_10_60 = (
    df.filter(F.col("delay_bucket") == "10_60")
      .select("incident_id")
      .distinct()
      .count()
)

print("Filas con delay_bucket == '10_60':", rows_10_60)
print("Incident_id distintos con delay_bucket == '10_60':", incidents_10_60)

**Respuesta (completa tras ejecutar):**  
- **Resultado (según interpretación):**
  - Filas: `TODO`
  - Incidentes únicos (`incident_id` distintos): `TODO`
- **Comentario:** el enunciado habla de “incidentes”, así que suele tener sentido reportar **IDs distintos**.
  En cualquier caso, el filtrado se hace usando únicamente `delay_bucket`, tal y como se pide.