# Data Wrangling Completo: De Fuentes Dispares a Data Lake

## Objetivos de Aprendizaje
- Distinguir entre los procesos ETL y ELT, con sus trade-offs reales
- Ejecutar cada paso de data wrangling: Discovery, Structuring, Cleaning, Enriching, Validating, Publishing
- Integrar datos de multiples fuentes con esquemas distintos en un solo dataset
- Simular localmente el flujo que en AWS involucra S3, Glue Crawlers y Athena

## Prerequisitos
- `00_setup/02_spark_basics.ipynb`
- `02_etl_pipeline/01_etl_concepts.ipynb`

## Tiempo Estimado
90 minutos

## Modulo AWS Academy Relacionado
Modulo 6: Ingesting and Preparing Data
- ETL vs ELT comparison
- Data wrangling steps (Discovery, Structuring, Cleaning, Enriching, Validating, Publishing)
- Scenario: Support ticket ingestion from two systems

---

## Escenario de Negocio

Una empresa SaaS adquirio una startup. Ambas usan sistemas de tickets de soporte distintos.
Un analista de datos pidio al equipo de ingenieria de datos que integre los tickets de ambos
sistemas para analizar relaciones entre experiencias de soporte, volumenes de tickets y
renovaciones de contrato.

**Requisitos del analista:**
- Tickets tecnicos trabajados o cerrados por cliente en 2020
- Segmentacion por region de ventas
- Cada equipo regional solo debe ver sus propios datos

**Restricciones:**
- Sistema 1 (supp1): exporta JSON con ciertos campos
- Sistema 2 (supp2): exporta JSON con campos diferentes
- Data warehouse existente: tiene tabla de clientes con sales_group

Este es exactamente el escenario del Modulo 6 de AWS Academy. Lo implementaremos
completo con PySpark.

In [4]:
# =================================================================
# SETUP INICIAL
# =================================================================
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import json
import os
import shutil
from datetime import datetime

spark = SparkSession.builder \
    .appName("DataWrangling_M06") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# Directorio base para el ejercicio
BASE_DIR = "/home/jovyan/data/m06_wrangling"
os.makedirs(BASE_DIR, exist_ok=True)

print(f"Spark {spark.version} listo")
print(f"Directorio de trabajo: {BASE_DIR}")

Spark 3.5.0 listo
Directorio de trabajo: /home/jovyan/data/m06_wrangling


---
# PASO 0: Generar Datos Simulados

En un entorno real, estos datos vendrian de exportaciones JSON de dos sistemas
de tickets distintos y de una consulta al data warehouse.

Aqui los generamos para que el ejercicio sea autocontenido.

Nota importante: los dos sistemas tienen esquemas DISTINTOS.
Esto es intencional y refleja la realidad cuando se adquiere una empresa.

In [5]:
# =================================================================
# Sistema de tickets 1 (supp1) - Sistema original de la empresa
# Campos: ticket_id, requestor_id, submitter, assignee, group,
#          subject, status, priority (texto), ticket_type,
#          create_date, updated_date, solved_date
# =================================================================
supp1_data = [
    {"ticket_id": 900862, "requestor_id": 1744899, "submitter": "agent_joe",
     "assignee": "agent_maria", "group": "tier2", 
     "subject": "Nightly batch failing", "status": "Closed",
     "priority": "Medium", "ticket_type": "Problem",
     "create_date": "2020-01-04 08:06:00", "updated_date": "2020-04-30 09:00:00",
     "solved_date": "2020-04-30 09:00:00"},
    {"ticket_id": 900863, "requestor_id": 1744005, "submitter": "agent_joe",
     "assignee": "agent_carlos", "group": "tier1",
     "subject": "Intermittent error message", "status": "Open",
     "priority": "Low", "ticket_type": "Problem",
     "create_date": "2020-01-06 11:14:00", "updated_date": None,
     "solved_date": None},
    {"ticket_id": 900870, "requestor_id": 1744899, "submitter": "self",
     "assignee": "agent_maria", "group": "tier1",
     "subject": "Cannot export PDF report", "status": "Solved",
     "priority": "High", "ticket_type": "Problem",
     "create_date": "2020-03-15 14:22:00", "updated_date": "2020-03-16 10:00:00",
     "solved_date": "2020-03-16 10:00:00"},
    {"ticket_id": 900871, "requestor_id": 2020411, "submitter": "self",
     "assignee": "agent_joe", "group": "tier1",
     "subject": "How to configure SSO?", "status": "Closed",
     "priority": "Low", "ticket_type": "Question",
     "create_date": "2020-05-20 09:30:00", "updated_date": "2020-05-21 11:00:00",
     "solved_date": "2020-05-21 11:00:00"},
    {"ticket_id": 900872, "requestor_id": 1744005, "submitter": "agent_carlos",
     "assignee": "agent_carlos", "group": "tier2",
     "subject": "Performance degradation after update", "status": "Open",
     "priority": "High", "ticket_type": "Incident",
     "create_date": "2020-07-10 16:45:00", "updated_date": "2020-07-12 08:00:00",
     "solved_date": None},
    # Ticket de 2019 - NO deberia incluirse en el resultado final
    {"ticket_id": 900850, "requestor_id": 1744899, "submitter": "self",
     "assignee": "agent_joe", "group": "tier1",
     "subject": "Login issue", "status": "Closed",
     "priority": "Medium", "ticket_type": "Problem",
     "create_date": "2019-11-20 10:00:00", "updated_date": "2019-11-21 15:00:00",
     "solved_date": "2019-11-21 15:00:00"}
]

# =================================================================
# Sistema de tickets 2 (supp2) - Sistema de la startup adquirida
# Campos DISTINTOS: issue_id, cust_num, description, status,
#                    priority (numerico 1-3), create_date,
#                    updated_date, closed_date
# Problemas intencionales:
#   - cust_num faltante en un registro
#   - caracteres basura en description
#   - formatos de fecha inconsistentes
#   - prioridad numerica vs texto
# =================================================================
supp2_data = [
    {"issue_id": 900865, "cust_num": None, "description": "Error message 808 <*% &#",
     "status": "Closed", "priority": 2,
     "create_date": "6/26/20 2:45:29", "updated_date": "6/27/20 21:31:48",
     "closed_date": "6/27/20 23:30:09"},
    {"issue_id": 900866, "cust_num": 2020411, "description": "Nullpointer exception on save",
     "status": "Open", "priority": 1,
     "create_date": "4/2/20 12:15", "updated_date": "4/5/20 11:15",
     "closed_date": None},
    {"issue_id": 900867, "cust_num": 2022010, "description": "Question about grades report",
     "status": "Open", "priority": 1,
     "create_date": "4/6/20 10:56", "updated_date": None,
     "closed_date": None},
    {"issue_id": 900868, "cust_num": 2022010, "description": "Dashboard loading slowly \t\n",
     "status": "Closed", "priority": 3,
     "create_date": "8/15/20 9:00", "updated_date": "8/16/20 14:30",
     "closed_date": "8/16/20 14:30"},
    # Duplicado intencional del issue 900866
    {"issue_id": 900866, "cust_num": 2020411, "description": "Nullpointer exception on save",
     "status": "Open", "priority": 1,
     "create_date": "4/2/20 12:15", "updated_date": "4/5/20 11:15",
     "closed_date": None},
    # Ticket de 2021 - NO deberia incluirse
    {"issue_id": 900900, "cust_num": 2020411, "description": "New feature request",
     "status": "Open", "priority": 3,
     "create_date": "1/15/21 8:00", "updated_date": None,
     "closed_date": None}
]

# =================================================================
# Tabla de clientes del data warehouse existente
# =================================================================
customers_data = [
    {"customer_id": 1744899, "cust_name": "Acme Corp", "primary_poc": "John Smith",
     "status": "Active", "sales_group": "Europe"},
    {"customer_id": 1744005, "cust_name": "TechStart Inc", "primary_poc": "Jane Doe",
     "status": "Active", "sales_group": "Europe"},
    {"customer_id": 2020411, "cust_name": "DataFlow LLC", "primary_poc": "Carlos Ruiz",
     "status": "Active", "sales_group": "USEast"},
    {"customer_id": 2022010, "cust_name": "CloudNine SAS", "primary_poc": "Marie Dupont",
     "status": "Active", "sales_group": "USWest"},
    {"customer_id": 2070555, "cust_name": "GlobalTech GmbH", "primary_poc": "Hans Mueller",
     "status": "Inactive", "sales_group": "Europe"}
]

# Guardar como archivos JSON (simulando exportaciones reales)
# Spark espera JSONL (un objeto por línea) o array JSON válido
# Usamos JSONL que es el formato más común para Spark
sources_dir = os.path.join(BASE_DIR, "sources")
os.makedirs(sources_dir, exist_ok=True)

# Guardar como JSONL (JSON Lines) - un objeto por línea
with open(os.path.join(sources_dir, "supp1_tickets.json"), "w") as f:
    for record in supp1_data:
        f.write(json.dumps(record) + "\n")

with open(os.path.join(sources_dir, "supp2_tickets.json"), "w") as f:
    for record in supp2_data:
        f.write(json.dumps(record) + "\n")

with open(os.path.join(sources_dir, "customers.json"), "w") as f:
    for record in customers_data:
        f.write(json.dumps(record) + "\n")

print("Archivos fuente generados (formato JSONL):")
for f in os.listdir(sources_dir):
    size = os.path.getsize(os.path.join(sources_dir, f))
    print(f"  {f} ({size} bytes)")

Archivos fuente generados (formato JSONL):
  customers.json (647 bytes)
  supp2_tickets.json (1240 bytes)
  supp1_tickets.json (1958 bytes)


---
# PASO 1: DISCOVERY (Descubrimiento)

## Que dice la presentacion AWS Academy:
- Identificar relaciones entre fuentes
- Identificar formatos de datos
- Determinar que datos necesitamos y como obtenerlos
- Determinar como organizar y controlar acceso
- Determinar herramientas necesarias

## Que hacemos en la practica:
Cargamos los JSON, inspeccionamos esquemas, identificamos
las diferencias entre fuentes y planificamos el mapeo.

In [6]:
# =================================================================
# 1.1 Cargar y examinar cada fuente
# =================================================================

# Cargar supp1 (sistema original)
df_supp1_raw = spark.read.json(os.path.join(sources_dir, "supp1_tickets.json"))

# Cargar supp2 (sistema de la startup)
df_supp2_raw = spark.read.json(os.path.join(sources_dir, "supp2_tickets.json"))

# Cargar clientes del data warehouse
df_customers = spark.read.json(os.path.join(sources_dir, "customers.json"))

print("=" * 60)
print("DISCOVERY: Esquema de supp1 (sistema original)")
print("=" * 60)
df_supp1_raw.printSchema()
print(f"Registros: {df_supp1_raw.count()}")
df_supp1_raw.show(3, truncate=False)

DISCOVERY: Esquema de supp1 (sistema original)
root
 |-- assignee: string (nullable = true)
 |-- create_date: string (nullable = true)
 |-- group: string (nullable = true)
 |-- priority: string (nullable = true)
 |-- requestor_id: long (nullable = true)
 |-- solved_date: string (nullable = true)
 |-- status: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- submitter: string (nullable = true)
 |-- ticket_id: long (nullable = true)
 |-- ticket_type: string (nullable = true)
 |-- updated_date: string (nullable = true)

Registros: 6
+------------+-------------------+-----+--------+------------+-------------------+------+--------------------------+---------+---------+-----------+-------------------+
|assignee    |create_date        |group|priority|requestor_id|solved_date        |status|subject                   |submitter|ticket_id|ticket_type|updated_date       |
+------------+-------------------+-----+--------+------------+-------------------+------+------------------

In [7]:
print("=" * 60)
print("DISCOVERY: Esquema de supp2 (startup adquirida)")
print("=" * 60)
df_supp2_raw.printSchema()
print(f"Registros: {df_supp2_raw.count()}")
df_supp2_raw.show(3, truncate=False)

DISCOVERY: Esquema de supp2 (startup adquirida)
root
 |-- closed_date: string (nullable = true)
 |-- create_date: string (nullable = true)
 |-- cust_num: long (nullable = true)
 |-- description: string (nullable = true)
 |-- issue_id: long (nullable = true)
 |-- priority: long (nullable = true)
 |-- status: string (nullable = true)
 |-- updated_date: string (nullable = true)

Registros: 6
+----------------+---------------+--------+-----------------------------+--------+--------+------+----------------+
|closed_date     |create_date    |cust_num|description                  |issue_id|priority|status|updated_date    |
+----------------+---------------+--------+-----------------------------+--------+--------+------+----------------+
|6/27/20 23:30:09|6/26/20 2:45:29|NULL    |Error message 808 <*% &#     |900865  |2       |Closed|6/27/20 21:31:48|
|NULL            |4/2/20 12:15   |2020411 |Nullpointer exception on save|900866  |1       |Open  |4/5/20 11:15    |
|NULL            |4/6/20 10:

In [8]:
print("=" * 60)
print("DISCOVERY: Esquema de clientes (data warehouse)")
print("=" * 60)
df_customers.printSchema()
df_customers.show(truncate=False)

DISCOVERY: Esquema de clientes (data warehouse)
root
 |-- cust_name: string (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- primary_poc: string (nullable = true)
 |-- sales_group: string (nullable = true)
 |-- status: string (nullable = true)

+---------------+-----------+------------+-----------+--------+
|cust_name      |customer_id|primary_poc |sales_group|status  |
+---------------+-----------+------------+-----------+--------+
|Acme Corp      |1744899    |John Smith  |Europe     |Active  |
|TechStart Inc  |1744005    |Jane Doe    |Europe     |Active  |
|DataFlow LLC   |2020411    |Carlos Ruiz |USEast     |Active  |
|CloudNine SAS  |2022010    |Marie Dupont|USWest     |Active  |
|GlobalTech GmbH|2070555    |Hans Mueller|Europe     |Inactive|
+---------------+-----------+------------+-----------+--------+



In [9]:
# =================================================================
# 1.2 Analisis de diferencias entre fuentes
# =================================================================
# Este analisis es lo que un ingeniero de datos hace ANTES de escribir
# codigo de transformacion. Documenta las decisiones.

print("=" * 60)
print("DISCOVERY: Mapeo de campos identificado")
print("=" * 60)
print()
print("Relaciones entre campos:")
print("  supp1.requestor_id = supp2.cust_num = customers.customer_id")
print("  supp1.ticket_id ~ supp2.issue_id (campo ID unico)")
print("  supp1.subject ~ supp2.description (texto del ticket)")
print("  supp1.solved_date ~ supp2.closed_date")
print()
print("Diferencias criticas:")
print("  PRIORIDAD: supp1 usa texto (High/Medium/Low)")
print("             supp2 usa numeros (1=High, 2=Medium, 3=Low)")
print("  FECHAS:    supp1 usa formato ISO (2020-01-04 08:06:00)")
print("             supp2 usa formato US corto (6/26/20 2:45:29)")
print("  TIPO:      supp1 tiene ticket_type")
print("             supp2 NO tiene ticket_type")
print("  CAMPOS EXTRA: supp1 tiene submitter, assignee, group")
print("                supp2 no los tiene")
print()
print("Filtros necesarios:")
print("  - Solo tickets donde create_date o updated_date sea en 2020")
print("  - Acceso segmentado por sales_group del cliente")
print()
print("Herramientas disponibles:")
print("  - PySpark para transformaciones (equivale a AWS Glue Job)")
print("  - Sistema de archivos local (equivale a S3)")
print("  - Spark SQL para queries (equivale a Athena)")

DISCOVERY: Mapeo de campos identificado

Relaciones entre campos:
  supp1.requestor_id = supp2.cust_num = customers.customer_id
  supp1.ticket_id ~ supp2.issue_id (campo ID unico)
  supp1.subject ~ supp2.description (texto del ticket)
  supp1.solved_date ~ supp2.closed_date

Diferencias criticas:
  PRIORIDAD: supp1 usa texto (High/Medium/Low)
             supp2 usa numeros (1=High, 2=Medium, 3=Low)
  FECHAS:    supp1 usa formato ISO (2020-01-04 08:06:00)
             supp2 usa formato US corto (6/26/20 2:45:29)
  TIPO:      supp1 tiene ticket_type
             supp2 NO tiene ticket_type
  CAMPOS EXTRA: supp1 tiene submitter, assignee, group
                supp2 no los tiene

Filtros necesarios:
  - Solo tickets donde create_date o updated_date sea en 2020
  - Acceso segmentado por sales_group del cliente

Herramientas disponibles:
  - PySpark para transformaciones (equivale a AWS Glue Job)
  - Sistema de archivos local (equivale a S3)
  - Spark SQL para queries (equivale a Athena)


### Reflexion de Discovery

**Pregunta para el estudiante:** Observa que supp2 no tiene campo `ticket_type`.
Hay dos opciones: (a) clasificar manualmente cada ticket de supp2, o
(b) dejarlo como NULL y que el analista lo clasifique despues.

En un enfoque ETL, elegirias (a). En un enfoque ELT, elegirias (b).
Para este ejercicio usaremos un enfoque hibrido: clasificaremos basandonos
en palabras clave del campo description.

---
# PASO 2: STRUCTURING (Estructuracion)

## Que dice la presentacion:
- Parsear archivos fuente
- Mapear campos de fuente a destino
- Organizar almacenamiento
- Gestionar tamano de archivos

## Que hacemos:
Renombramos campos de supp2 para que coincidan con supp1,
convertimos la prioridad numerica a texto, y parseamos fechas.

In [None]:
# =================================================================
# 2.1 Estructurar supp1: seleccionar campos relevantes
# =================================================================
# De supp1 tomamos los campos que necesitamos para el analisis.
# Los campos submitter, assignee, group no son necesarios
# para el requerimiento del analista.

df_supp1_structured = df_supp1_raw.select(
    F.col("ticket_id").cast("long"),
    F.col("requestor_id").cast("long").alias("customer_id"),
    F.col("subject"),
    F.col("status"),
    F.col("priority"),
    F.col("ticket_type"),
    F.to_timestamp("create_date", "yyyy-MM-dd HH:mm:ss").alias("create_date"),
    F.to_timestamp("updated_date", "yyyy-MM-dd HH:mm:ss").alias("updated_date"),
    F.to_timestamp("solved_date", "yyyy-MM-dd HH:mm:ss").alias("solved_date")
).withColumn("source_system", F.lit("supp1"))

print("supp1 estructurado:")
df_supp1_structured.printSchema()
df_supp1_structured.show(3, truncate=False)

In [None]:
# =================================================================
# 2.2 Estructurar supp2: renombrar campos y convertir tipos
# =================================================================
# Este paso implementa el MAPEO DE CAMPOS que se describe
# en la slide 26-27 de la presentacion.
#
# Mapeo:
#   issue_id    -> ticket_id
#   cust_num    -> customer_id
#   description -> subject
#   closed_date -> solved_date
#   priority 1  -> "High", 2 -> "Medium", 3 -> "Low"

# Mapeo de prioridad numerica a texto
priority_map = F.when(F.col("priority") == 1, "High") \
    .when(F.col("priority") == 2, "Medium") \
    .when(F.col("priority") == 3, "Low") \
    .otherwise("Unknown")

# Clasificacion automatica de ticket_type basada en palabras clave
# Esto es una decision de ingenieria: en lugar de dejar NULL,
# inferimos el tipo a partir del contenido.
ticket_type_inferred = F.when(
    F.lower(F.col("description")).rlike("question|how to|help with"), "Question"
).when(
    F.lower(F.col("description")).rlike("error|exception|failing|slow"), "Problem"
).otherwise("Problem")  # Default conservador

# Parseo de fechas con formato US corto (M/d/yy H:mm:ss o M/d/yy H:mm)
# PySpark no maneja bien formatos ambiguos, asi que usamos coalesce
# para probar multiples formatos.
def parse_supp2_date(col_name):
    return F.coalesce(
        F.to_timestamp(F.col(col_name), "M/d/yy H:mm:ss"),
        F.to_timestamp(F.col(col_name), "M/d/yy H:mm")
    )

df_supp2_structured = df_supp2_raw.select(
    F.col("issue_id").cast("long").alias("ticket_id"),
    F.col("cust_num").cast("long").alias("customer_id"),
    F.col("description").alias("subject"),
    F.col("status"),
    priority_map.alias("priority"),
    ticket_type_inferred.alias("ticket_type"),
    parse_supp2_date("create_date").alias("create_date"),
    parse_supp2_date("updated_date").alias("updated_date"),
    parse_supp2_date("closed_date").alias("solved_date")
).withColumn("source_system", F.lit("supp2"))

print("supp2 estructurado (con campos renombrados y tipos convertidos):")
df_supp2_structured.printSchema()
df_supp2_structured.show(truncate=False)

In [None]:
# =================================================================
# 2.3 Verificar que ambos DataFrames tienen el mismo esquema
# =================================================================
# Esto es critico antes de combinarlos.

print("Columnas supp1:", df_supp1_structured.columns)
print("Columnas supp2:", df_supp2_structured.columns)
print()

# Verificar que los esquemas coinciden
schemas_match = (df_supp1_structured.schema == df_supp2_structured.schema)
print(f"Esquemas identicos: {schemas_match}")

if not schemas_match:
    print("\nDiferencias encontradas:")
    for f1, f2 in zip(df_supp1_structured.schema.fields, df_supp2_structured.schema.fields):
        if f1 != f2:
            print(f"  {f1.name}: {f1.dataType} vs {f2.dataType}")

---
# PASO 3: CLEANING (Limpieza)

## Que dice la presentacion:
- Eliminar columnas no deseadas, duplicados, valores basura
- Rellenar campos obligatorios vacios
- Validar/modificar tipos de datos
- Identificar y corregir outliers

## Que hacemos:
Limpiamos cada fuente por separado (como recomienda la presentacion),
porque cada fuente tiene sus propios problemas de calidad.

In [None]:
# =================================================================
# 3.1 Diagnostico de calidad de datos
# =================================================================
# Antes de limpiar, necesitamos saber QUE limpiar.

def diagnostico_calidad(df, nombre):
    """Genera un reporte de calidad de datos para un DataFrame."""
    print(f"\n{'=' * 50}")
    print(f"DIAGNOSTICO DE CALIDAD: {nombre}")
    print(f"{'=' * 50}")
    print(f"Total registros: {df.count()}")
    
    # Contar nulos por columna
    print("\nNulos por columna:")
    nulos = df.select([
        F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c)
        for c in df.columns
    ])
    for row in nulos.collect():
        for col_name in df.columns:
            val = row[col_name]
            if val > 0:
                print(f"  {col_name}: {val} nulos")
    
    # Contar duplicados por campo ID
    id_col = "ticket_id"
    total = df.count()
    distintos = df.select(id_col).distinct().count()
    if total != distintos:
        print(f"\nDUPLICADOS: {total - distintos} registros duplicados por {id_col}")
    else:
        print(f"\nSin duplicados por {id_col}")

diagnostico_calidad(df_supp1_structured, "supp1")
diagnostico_calidad(df_supp2_structured, "supp2")

In [None]:
# =================================================================
# 3.2 Limpiar supp1
# =================================================================
# Problemas identificados:
#   - Tiene un ticket de 2019 que no entra en el filtro de 2020
#   - Campos solved_date y updated_date pueden ser nulos (normal)

# Filtrar solo tickets de 2020 (create_date O updated_date en 2020)
df_supp1_clean = df_supp1_structured.filter(
    (F.year("create_date") == 2020) | (F.year("updated_date") == 2020)
)

print(f"supp1 antes del filtro de fecha: {df_supp1_structured.count()} registros")
print(f"supp1 despues del filtro 2020:   {df_supp1_clean.count()} registros")
df_supp1_clean.show(truncate=False)

In [None]:
# =================================================================
# 3.3 Limpiar supp2
# =================================================================
# Problemas identificados:
#   - Duplicado del issue 900866
#   - customer_id nulo en issue 900865
#   - Caracteres basura en subject ("<*% &#")
#   - Caracteres de control en subject ("\t\n")
#   - Ticket de 2021 que no entra en el filtro

import re

df_supp2_clean = df_supp2_structured \
    .dropDuplicates(["ticket_id"]) \
    .filter(
        (F.year("create_date") == 2020) | (F.year("updated_date") == 2020)
    ) \
    .withColumn(
        "customer_id",
        F.when(F.col("customer_id").isNull(), F.lit(9999999))
         .otherwise(F.col("customer_id"))
    ) \
    .withColumn(
        "subject",
        F.regexp_replace(F.col("subject"), r"[<>*%&#\t\n\r]", "")
    ) \
    .withColumn(
        "subject",
        F.trim(F.col("subject"))
    )

print(f"supp2 antes de limpieza: {df_supp2_structured.count()} registros")
print(f"supp2 despues de limpieza: {df_supp2_clean.count()} registros")
print()
print("Registros limpios:")
df_supp2_clean.show(truncate=False)

In [None]:
# =================================================================
# 3.4 Log de limpieza
# =================================================================
# En un pipeline real, registrarias estas metricas en un sistema
# de monitoreo (CloudWatch, Datadog, etc.).

print("RESUMEN DE LIMPIEZA:")
print(f"  supp1: {df_supp1_structured.count()} -> {df_supp1_clean.count()} "
      f"(eliminados: {df_supp1_structured.count() - df_supp1_clean.count()})")
print(f"  supp2: {df_supp2_structured.count()} -> {df_supp2_clean.count()} "
      f"(eliminados: {df_supp2_structured.count() - df_supp2_clean.count()})")
print(f"  Motivos: filtro de fecha 2020, duplicados, datos fuera de rango")

---
# PASO 4: ENRICHING (Enriquecimiento)

## Que dice la presentacion:
- Combinar datos de fuentes limpias en un solo dataset
- Agregar valores adicionales para soportar el analisis

## Que hacemos:
Combinamos supp1 + supp2 (UNION) y enriquecemos con la region
de ventas (JOIN con clientes).

In [None]:
# =================================================================
# 4.1 Combinar ambas fuentes (UNION)
# =================================================================
# Esto equivale a "append rows from supp1 to the bottom of supp2"
# como describe la slide 37.

df_combined = df_supp1_clean.unionByName(df_supp2_clean)

print(f"Registros supp1: {df_supp1_clean.count()}")
print(f"Registros supp2: {df_supp2_clean.count()}")
print(f"Registros combinados: {df_combined.count()}")
print()
df_combined.orderBy("ticket_id").show(truncate=False)

In [None]:
# =================================================================
# 4.2 Enriquecer con la region de ventas (JOIN con clientes)
# =================================================================
# La presentacion describe esto como:
# "query the sales system to get the sales owner by customer_id
#  and use that to add the sales_region column"

df_enriched = df_combined.join(
    df_customers.select("customer_id", "cust_name", "sales_group"),
    on="customer_id",
    how="left"  # LEFT JOIN porque customer_id 9999999 no existira
).withColumnRenamed("sales_group", "sales_region")

print("Dataset enriquecido con region de ventas:")
df_enriched.select(
    "ticket_id", "customer_id", "cust_name", "subject", 
    "priority", "ticket_type", "sales_region", "source_system"
).orderBy("ticket_id").show(truncate=False)

---
# PASO 5: VALIDATING (Validacion)

## Que dice la presentacion:
- Contar filas esperadas
- Verificar consistencia
- Verificar formatos y tipos de datos
- Verificar duplicados (post-merge)
- Verificar PII
- Verificar outliers

## Que hacemos:
Ejecutamos una bateria de validaciones sobre el dataset enriquecido.

In [None]:
# =================================================================
# 5.1 Validacion de conteo de filas
# =================================================================
# La slide 42 muestra exactamente esta verificacion:
# "Count total rows and check against rows from individual systems"

count_supp1 = df_supp1_clean.count()
count_supp2 = df_supp2_clean.count()
count_total = df_enriched.count()

print("VALIDACION 1: Conteo de filas")
print(f"  supp1: {count_supp1}")
print(f"  supp2: {count_supp2}")
print(f"  Esperado: {count_supp1 + count_supp2}")
print(f"  Real:     {count_total}")

if count_total == count_supp1 + count_supp2:
    print("  RESULTADO: OK - Conteos coinciden")
else:
    print("  RESULTADO: ERROR - Conteos no coinciden")

In [None]:
# =================================================================
# 5.2 Validacion de duplicados post-merge
# =================================================================
# Despues de combinar, podrian existir ticket_ids duplicados
# entre los dos sistemas (el mismo numero en sistemas distintos).

print("VALIDACION 2: Duplicados de ticket_id post-merge")
duplicados = df_enriched.groupBy("ticket_id") \
    .agg(F.count("*").alias("apariciones")) \
    .filter(F.col("apariciones") > 1)

num_dups = duplicados.count()
if num_dups > 0:
    print(f"  ADVERTENCIA: {num_dups} ticket_ids duplicados entre sistemas")
    duplicados.show()
    # Mostrar detalle de los duplicados
    dup_ids = [row.ticket_id for row in duplicados.collect()]
    df_enriched.filter(F.col("ticket_id").isin(dup_ids)) \
        .select("ticket_id", "source_system", "subject", "customer_id") \
        .show(truncate=False)
    print("  NOTA: Son tickets de DIFERENTES sistemas, no son duplicados reales.")
    print("  Accion: No requiere correccion si source_system es distinto.")
else:
    print("  OK - Sin duplicados")

In [None]:
# =================================================================
# 5.3 Validacion de nulos en campos criticos
# =================================================================
print("VALIDACION 3: Nulos en campos criticos")
campos_criticos = ["ticket_id", "customer_id", "subject", "status", 
                   "priority", "create_date"]

for campo in campos_criticos:
    nulos = df_enriched.filter(F.col(campo).isNull()).count()
    estado = "OK" if nulos == 0 else f"ADVERTENCIA: {nulos} nulos"
    print(f"  {campo}: {estado}")

# Verificar registros sin region (customer_id placeholder)
sin_region = df_enriched.filter(F.col("sales_region").isNull()).count()
print(f"  sales_region: {'OK' if sin_region == 0 else f'ADVERTENCIA: {sin_region} sin region'}")

if sin_region > 0:
    print("  Detalle de registros sin region:")
    df_enriched.filter(F.col("sales_region").isNull()) \
        .select("ticket_id", "customer_id", "subject") \
        .show(truncate=False)

In [None]:
# =================================================================
# 5.4 Validacion de consistencia de valores
# =================================================================
print("VALIDACION 4: Consistencia de valores categoricos")
print()
print("Valores de priority:")
df_enriched.groupBy("priority").count().orderBy("priority").show()

print("Valores de status:")
df_enriched.groupBy("status").count().orderBy("status").show()

print("Valores de ticket_type:")
df_enriched.groupBy("ticket_type").count().orderBy("ticket_type").show()

print("Distribucion por source_system:")
df_enriched.groupBy("source_system").count().show()

In [None]:
# =================================================================
# 5.5 Correccion post-validacion
# =================================================================
# Basandonos en las validaciones, asignamos "Unknown" a sales_region
# para registros sin region (customer_id placeholder).

df_validated = df_enriched.withColumn(
    "sales_region",
    F.coalesce(F.col("sales_region"), F.lit("Unassigned"))
).withColumn(
    "cust_name",
    F.coalesce(F.col("cust_name"), F.lit("Unknown Customer"))
)

# Verificar que ya no hay nulos en sales_region
sin_region = df_validated.filter(F.col("sales_region").isNull()).count()
print(f"Registros sin region despues de correccion: {sin_region}")
print()
print("Dataset validado final:")
df_validated.select(
    "ticket_id", "customer_id", "cust_name", "subject",
    "status", "priority", "ticket_type", "sales_region", "source_system"
).orderBy("sales_region", "ticket_id").show(truncate=False)

---
# PASO 6: PUBLISHING (Publicacion)

## Que dice la presentacion:
- Mover datos a almacenamiento permanente
- Aplicar tecnicas de gestion de archivos (formatos, compresion, organizacion)
- Aplicar controles de acceso
- Guardar metadatos
- Configurar frecuencia de actualizacion

## Que hacemos:
Particionamos por sales_region (equivale a crear carpetas por region en S3),
guardamos en formato Parquet con compresion Snappy, y registramos como
tabla SQL para consultar con Spark SQL (equivale a Glue Crawler + Athena).

## Equivalencia con AWS:
| Accion local | Equivalente AWS |
|---|---|
| Escribir Parquet particionado | S3 con prefijos por region |
| Compresion Snappy | Configuracion de formato en S3 |
| Registro como tabla SQL | AWS Glue Crawler + Data Catalog |
| Consultas con Spark SQL | Amazon Athena |

In [None]:
# =================================================================
# 6.1 Publicar datos particionados por region
# =================================================================
# Esto es equivalente a lo que la presentacion describe en la slide 47:
# "group the dataset by sales_region and export each region's rows"
# "upload each file to the bucket or folder associated with the region"

# Seleccionar columnas finales (sin campos internos)
df_final = df_validated.select(
    "ticket_id", "customer_id", "cust_name", "subject",
    "status", "priority", "ticket_type",
    "create_date", "updated_date", "solved_date",
    "sales_region", "source_system"
)

# Ruta de publicacion (equivale a s3://cs-tickets-2020/)
publish_path = os.path.join(BASE_DIR, "published", "cs-tickets-2020")

# Limpiar si existe de una ejecucion anterior
if os.path.exists(publish_path):
    shutil.rmtree(publish_path)

# Escribir particionado por sales_region en formato Parquet
# Esto crea carpetas: sales_region=Europe/, sales_region=USEast/, etc.
df_final.write \
    .mode("overwrite") \
    .partitionBy("sales_region") \
    .parquet(publish_path)

print("Datos publicados en:")
print(f"  {publish_path}")
print()

# Mostrar estructura de carpetas creada
print("Estructura de carpetas (equivale a prefijos de S3):")
for root, dirs, files in os.walk(publish_path):
    level = root.replace(publish_path, '').count(os.sep)
    indent = '  ' * level
    folder_name = os.path.basename(root)
    if files:
        total_size = sum(os.path.getsize(os.path.join(root, f)) for f in files)
        print(f"{indent}{folder_name}/ ({len(files)} archivo(s), {total_size:,} bytes)")
    else:
        print(f"{indent}{folder_name}/")

In [None]:
# =================================================================
# 6.2 Registrar como tabla SQL (equivale a Glue Crawler + Catalog)
# =================================================================
# En AWS, el Glue Crawler escanea los archivos en S3, infiere el
# esquema y lo registra en el Data Catalog. Athena usa ese catalogo
# para ejecutar queries SQL directamente sobre los archivos.
#
# Localmente, hacemos lo equivalente con Spark SQL.

df_published = spark.read.parquet(publish_path)
df_published.createOrReplaceTempView("cs_tickets_2020")

print("Tabla 'cs_tickets_2020' registrada para SQL")
print()
print("Esquema (equivale a lo que Glue Crawler descubriria):")
df_published.printSchema()

In [None]:
# =================================================================
# 6.3 Consultas SQL (equivale a Amazon Athena)
# =================================================================
# Estas son las consultas que el analista de datos ejecutaria
# usando Athena sobre los datos en S3.

# Consulta 1: Resumen por region
print("CONSULTA 1: Tickets por region y prioridad")
print("(Equivalente Athena: SELECT ... FROM cs_tickets_2020 WHERE ...)")
print()
spark.sql("""
    SELECT 
        sales_region,
        priority,
        COUNT(*) AS num_tickets,
        SUM(CASE WHEN status IN ('Closed', 'Solved') THEN 1 ELSE 0 END) AS resueltos,
        SUM(CASE WHEN status = 'Open' THEN 1 ELSE 0 END) AS abiertos
    FROM cs_tickets_2020
    GROUP BY sales_region, priority
    ORDER BY sales_region, priority
""").show()

In [None]:
# Consulta 2: Tickets por cliente (lo que pidio el analista)
print("CONSULTA 2: Tickets por cliente con tipos de issues")
spark.sql("""
    SELECT 
        customer_id,
        cust_name,
        sales_region,
        COUNT(*) AS total_tickets,
        SUM(CASE WHEN ticket_type = 'Problem' THEN 1 ELSE 0 END) AS problems,
        SUM(CASE WHEN ticket_type = 'Question' THEN 1 ELSE 0 END) AS questions,
        SUM(CASE WHEN ticket_type = 'Incident' THEN 1 ELSE 0 END) AS incidents
    FROM cs_tickets_2020
    GROUP BY customer_id, cust_name, sales_region
    ORDER BY total_tickets DESC
""").show(truncate=False)

In [None]:
# Consulta 3: Simular acceso por region
# En AWS, esto se lograria con bucket policies + IAM groups.
# Aqui simulamos el filtro que aplicaria cada equipo regional.

region = "Europe"  # Cambiar a "USEast" o "USWest" para probar

print(f"CONSULTA 3: Vista del equipo de ventas '{region}'")
print(f"(En AWS: solo verian s3://cs-tickets-2020/{region.lower()}/)")
print()
spark.sql(f"""
    SELECT ticket_id, cust_name, subject, status, priority, ticket_type
    FROM cs_tickets_2020
    WHERE sales_region = '{region}'
    ORDER BY create_date DESC
""").show(truncate=False)

---
# EJERCICIOS PRACTICOS

### Ejercicio 1: Pregunta tipo examen (slide 54-56 de la presentacion)

Un ingeniero de datos debe proporcionar datos para un nuevo reporte de ventas.
El reporte combinara datos de ventas de 4 productos distintos, rastreados en
4 sistemas diferentes. Los datos resultantes deben segmentarse por region,
y cada dataset regional solo debe estar disponible para los equipos de ventas
de esa region.

Que deberia hacer PRIMERO el ingeniero de datos?

A) Crear un bucket S3 con carpetas para cada producto  
B) Exportar datos de cada sistema y combinarlos en un solo archivo  
C) Identificar las relaciones entre campos de cada fuente  
D) Reemplazar valores nulos en cada sistema con ceros  

In [None]:
# Escribe tu respuesta y justificacion:
respuesta = "___"  # A, B, C o D
justificacion = "___"


In [None]:
# SOLUCION:
print("Respuesta correcta: C")
print()
print("Justificacion:")
print("C es correcto porque identificar relaciones entre campos")
print("es una tarea de DISCOVERY, que es el PRIMER paso del")
print("proceso de data wrangling.")
print()
print("Por que las otras son incorrectas:")
print("A) Crear el bucket seria STRUCTURING/PUBLISHING (paso posterior).")
print("   Ademas, las carpetas deberian ser por REGION, no por producto.")
print("B) Exportar y combinar seria ENRICHING, pero sin discovery")
print("   no sabes como mapear los campos entre sistemas.")
print("D) Reemplazar nulos seria CLEANING, pero sin discovery")
print("   no sabes si reemplazar con ceros es apropiado.")

### Ejercicio 2: Agregar una nueva fuente

Imagina que llega un tercer sistema de tickets (supp3) con el siguiente esquema:

```
case_number (int), account_id (int), title (string),
case_status (string), urgency (string: "critical"/"normal"/"minor"),
opened_at (string: "YYYY/MM/DD"), resolved_at (string: "YYYY/MM/DD")
```

Escribe el codigo PySpark para:
1. Mapear los campos al esquema unificado
2. Convertir urgency a la escala High/Medium/Low
3. Parsear las fechas
4. Combinarlo con el dataset existente

In [None]:
# Datos de ejemplo de supp3
supp3_data = [
    (500001, 2020411, "API rate limiting issue", "Resolved", "critical", "2020/03/10", "2020/03/12"),
    (500002, 1744899, "Billing question", "Open", "minor", "2020/06/01", None),
    (500003, 2022010, "Integration failing after update", "Resolved", "normal", "2020/09/15", "2020/09/17"),
]

df_supp3_raw = spark.createDataFrame(
    supp3_data,
    ["case_number", "account_id", "title", "case_status", 
     "urgency", "opened_at", "resolved_at"]
)

# TODO: Tu codigo aqui
# 1. Mapear campos al esquema unificado
# 2. Convertir urgency: critical->High, normal->Medium, minor->Low
# 3. Parsear fechas con formato YYYY/MM/DD
# 4. Combinar con df_validated usando unionByName


In [None]:
# SOLUCION Ejercicio 2

# Mapeo de urgency a priority
urgency_map = F.when(F.col("urgency") == "critical", "High") \
    .when(F.col("urgency") == "normal", "Medium") \
    .when(F.col("urgency") == "minor", "Low") \
    .otherwise("Unknown")

# Inferir ticket_type
type_inferred = F.when(
    F.lower(F.col("title")).rlike("question|billing|how"), "Question"
).otherwise("Problem")

# Mapeo de status: supp3 usa "Resolved" en lugar de "Solved" o "Closed"
status_map = F.when(F.col("case_status") == "Resolved", "Closed") \
    .otherwise(F.col("case_status"))

df_supp3_structured = df_supp3_raw.select(
    F.col("case_number").cast("long").alias("ticket_id"),
    F.col("account_id").cast("long").alias("customer_id"),
    F.col("title").alias("subject"),
    status_map.alias("status"),
    urgency_map.alias("priority"),
    type_inferred.alias("ticket_type"),
    F.to_timestamp("opened_at", "yyyy/MM/dd").alias("create_date"),
    F.lit(None).cast("timestamp").alias("updated_date"),
    F.to_timestamp("resolved_at", "yyyy/MM/dd").alias("solved_date")
).withColumn("source_system", F.lit("supp3"))

# Enriquecer con clientes
df_supp3_enriched = df_supp3_structured.join(
    df_customers.select("customer_id", "cust_name", "sales_group"),
    on="customer_id",
    how="left"
).withColumnRenamed("sales_group", "sales_region")

print("supp3 estructurado y enriquecido:")
df_supp3_enriched.select(
    "ticket_id", "customer_id", "cust_name", "subject",
    "priority", "ticket_type", "sales_region"
).show(truncate=False)

# Combinar con el dataset existente
# Nota: necesitamos asegurar que las columnas coincidan
df_all = df_validated.unionByName(df_supp3_enriched)
print(f"\nTotal de tickets combinados (3 fuentes): {df_all.count()}")

### Ejercicio 3: Reflexion ETL vs ELT

Analiza el pipeline que acabamos de construir y responde:

1. Este pipeline sigue un enfoque ETL o ELT? Justifica.
2. Que partes moveriamos a un enfoque ELT si quisieras mayor flexibilidad?
3. Que ventajas tendria cargar primero los datos crudos al data lake?

In [None]:
# Escribe tus respuestas:
respuesta_1 = "___"
respuesta_2 = "___"
respuesta_3 = "___"


In [None]:
# SOLUCION Ejercicio 3

print("1. ENFOQUE DEL PIPELINE")
print("   Este pipeline sigue un enfoque ETL: transformamos ANTES")
print("   de cargar al almacenamiento final. Toda la limpieza,")
print("   estructuracion y enriquecimiento ocurre antes de escribir")
print("   los archivos Parquet particionados.")
print()
print("2. CONVERSION A ELT")
print("   En un enfoque ELT:")
print("   - Cargariamos los JSON crudos directamente a S3 (raw zone)")
print("   - Las transformaciones se harian con Athena o Glue Jobs")
print("     sobre los datos ya almacenados")
print("   - El analista tendria acceso a los datos crudos")
print("     para hacer sus propias exploraciones")
print()
print("3. VENTAJAS DE CARGAR DATOS CRUDOS")
print("   - Si el analista cambia de opinion sobre que campos")
print("     necesita, los datos originales estan disponibles")
print("   - Cambios en las transformaciones se aplican a datos")
print("     historicos (no se pierden los campos truncados)")
print("   - Otros equipos pueden usar los mismos datos crudos")
print("     para otros propositos sin depender del ingeniero")

---
# RESUMEN FINAL

## Pasos de Data Wrangling ejecutados

| Paso | Actividad Local | Equivalente AWS |
|------|----------------|------------------|
| Discovery | Cargar JSON, comparar esquemas, documentar mapeos | Consultar fuentes, analizar con Glue DataBrew |
| Structuring | Renombrar campos, convertir tipos, parsear fechas | Glue Job con DynamicFrame mappings |
| Cleaning | Eliminar duplicados, rellenar nulos, limpiar texto | Glue Job transforms, DataBrew recipes |
| Enriching | UNION de fuentes + LEFT JOIN con clientes | Glue Job con joins, Athena CTAS |
| Validating | Conteos, consistencia, verificacion de nulos | Glue Data Quality, Great Expectations |
| Publishing | Parquet particionado + registro SQL | S3 + Glue Crawler + Data Catalog + Athena |

## Conexion directa con el Modulo 6 de AWS Academy
- El escenario de tickets de soporte es identico al de la presentacion
- Los 6 pasos de wrangling se implementaron en codigo ejecutable
- Las consultas SQL finales simulan lo que Athena haria sobre S3
- El particionamiento por region simula los prefijos S3 con bucket policies

## Siguiente Paso
- Modulo 7 (AWS Academy): Ingesting by Batch or by Stream
- Lab asociado: Querying Data by Using Athena
- En nuestros labs: `03_batch_processing/` y `04_streaming_simulation/`

In [None]:
# Limpieza
if os.path.exists(BASE_DIR):
    shutil.rmtree(BASE_DIR)
    print("Archivos temporales eliminados")

spark.catalog.dropTempView("cs_tickets_2020")
print("Tablas temporales eliminadas")