#Taller Evaluado 01
#Ingesta y Validación de Datos en Azure Databricks
**Objetivos:**
Practicar y reforzar lo aprendido en las sesiones 2 y 3, aplicando técnicas de ingesta de datos, validación de esquemas, manejo de errores y registro de auditoría, utilizando un dataset de origen real o simulado.
Al finalizar, seré capaz de: 
1. Seleccionar e implementar un mecanismo de ingesta adecuado (Auto Loader) para un dataset dado. 
2. Aplicar un esquema explícito (StructType) para controlar la estructura de los datos. 
3. Detectar y separar registros válidos e inválidos usando _rescued_data o badRecordsPath. 
4. Registrar los registros inconsistentes en una tabla Delta de auditoría usando saveAsTable(). 
5. Documentar el proceso y los hallazgos en un notebook bien estructurado.

### 1. Escoger un dataset de origen (puede ser un archivo CSV, JSON o Parquet disponible en un volumen de Unity Catalog o cargado manualmente).
Busqué DataSets en la página [KAGGLE](https://www.kaggle.com/) y obtuve el dataset de las transacciones de un E-Commerce correspondientes a los años 2010 y 2011. Debido a que el dataset original tenía 541,910 registros, lo ajusté aleatoriamente a solo 1,500 con fines prácticos para agilizar el procesamiento.
El enlace del DataSet en mención es: [E-Commerce Data](https://www.kaggle.com/datasets/carrie1/ecommerce-data).

### 2. Diseñar y ejecutar un flujo de ingesta hacia Databricks utilizando Auto Loader según el formato y el volumen del archivo. 
### 3. Definir un esquema explícito con StructType y aplicarlo durante la carga.

Procedo a crear el catálogo, volúmenes y directorio de carpetas.

![creación de directorio.png](./creación de directorio.png "creación de directorio.png")

In [0]:
#Defino el esquema con StrucType

from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType, TimestampType

compras_schema = (
    StructType()
    .add("InvoiceNo", IntegerType(), True)
    .add("StockCode", IntegerType(), True)
    .add("Description", StringType(), True)
    .add("Quantity", IntegerType(), True)
    .add("InvoiceDate", StringType(), True)
    .add("UnitPrice", DoubleType(), True)
    .add("CustomerID", StringType(), True)
    .add("Country", StringType(), True)
)

In [0]:
#Leo el archivo CSV

from pyspark.sql.functions import col, to_timestamp, when, coalesce

df_compras = (
    spark.read
    .format("csv")
    .option("header", "true")
    .schema(compras_schema)
    .load("/Volumes/dmc_taller01/default/source/input/csv/")
    .withColumn("archivo_origen", col("_metadata.file_path"))
)

# Debido a que hay muchos formatos de fecha en el DataSet, defino una lista de formatos de fecha comunes:
date_formats = [
    "M/d/yyyy H:mm",    # Ej: 7/20/2011 11:28
    "M/d/yyyy H:mm:ss", # Si hubiera segundos
    "M/dd/yyyy H:mm",   # Ej: 10/30/2011 11:37
    "d/M/yyyy H:mm",
    "dd/M/yyyy H:mm"
]

# Realizo la conversión de la columna InvoiceDate usando coalesce y to_timestamp
df_converted = df_compras.withColumn(
    "InvoiceDate",
    coalesce(*[to_timestamp(col("InvoiceDate"), fmt) for fmt in date_formats])
)

# Verifico el resultado
df_converted.printSchema()
display(df_converted)


In [0]:
#Leo el archivo CSV
display(df_compras)

### 4. Configurar un badRecordsPath y capturar información de registros inválidos.

In [0]:
from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType, TimestampType

compras_schema = (
    StructType()
    .add("InvoiceNo", IntegerType(), True)
    .add("StockCode", IntegerType(), True)
    .add("Description", StringType(), True)
    .add("Quantity", IntegerType(), True)
    .add("InvoiceDate", StringType(), True)
    .add("UnitPrice", DoubleType(), True)
    .add("CustomerID", StringType(), True)
    .add("Country", StringType(), True)
)

In [0]:
from pyspark.sql.functions import col

df_compras = (
    spark.read
    .format("csv")
    .schema(compras_schema)
    .option("columnNameOfCorruptRecord", "_rescued_data")
    .option("badRecordsPath", "/Volumes/dmc_taller01/default/source/taller_evaluado_01/bad_records/") # _rescued_data se almacena en un directorio
    .load("/Volumes/dmc_taller01/default/source/taller_evaluado_01/")
    .withColumn("archivo_origen", col("_metadata.file_path"))
)

from pyspark.sql.functions import col

df_compras = (
    spark.read
    .format("csv")
    .schema(compras_schema)
    .option("columnNameOfCorruptRecord", "_rescued_data")
    .option("badRecordsPath", "/Volumes/dmc_taller01/default/source/taller_evaluado_01/bad_records/") 
    .load("/Volumes/dmc_taller01/default/source/input/csv/")
    .withColumn("archivo_origen", col("_metadata.file_path"))
)

display(df_compras)

**Evidencia de creación de carpeta BAD_RECORDS:**

![creación archivo Bad_Records.png](./creación archivo Bad_Records.png "creación archivo Bad_Records.png")

### 5. Separar datos válidos e inválidos en tablas Delta diferentes.

In [0]:
#Lo realizo con TRY/EXCEPT. Considerando que la columa más relevante del DataSet es CustomerID, procedo con esa validación:

from pyspark.sql.functions import col

try:
    df_validos = df_compras.filter(col("CustomerID").isNotNull())
    df_invalidos = df_compras.filter(col("CustomerID").isNull())

    df_validos.write.format("delta").mode("overwrite").saveAsTable("dmc_taller01.default.compras_validadas")
    df_invalidos.write.format("delta").mode("overwrite").saveAsTable("dmc_taller01.default.compras_con_errores")

    print("Escritura realizada con éxito.")

except Exception as e:
    print(f"Error durante la escritura: {str(e)}")

In [0]:
%sql
-- Muestro la data resultante de la tabla compras_con_errores

select * from dmc_taller01.default.compras_con_errores

In [0]:
%sql
-- Muestro la data resultante de la tabla compras_validadas

select * from dmc_taller01.default.compras_validadas

**Evidencia de creación de Tablas Delta:**

![creación tablas Delta.png](./creación tablas Delta.png "creación tablas Delta.png")

### 6. Agregar una columna de auditoría con la ruta de archivo de origen (_metadata.file_path).

In [0]:
from pyspark.sql.functions import col

df_compras = (
    spark.read
    .format("csv")
    .schema(compras_schema)
    .option("columnNameOfCorruptRecord", "_rescued_data")
    .load("/Volumes/dmc_taller01/default/source/input/csv/")
    .withColumn("Auditoria", col("_metadata.file_path"))
)

In [0]:
display(df_compras)

### 7. Guardar los registros inválidos en una tabla Delta de auditoría usando saveAsTable().

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS dmc_taller01")
from pyspark.sql.functions import col

df_invalidos = df_compras.filter(col("CustomerID").isNull())
df_invalidos.write.format("delta").option("mergeSchema", "true").mode("overwrite").saveAsTable("dmc_taller01.default.compras_con_errores")

In [0]:
%sql
select * from dmc_taller01.default.compras_con_errores

### 8. Documentar las decisiones tomadas, las dificultades encontradas y los resultados obtenidos.

**Decisiones tomadas**
a. Se definió como columna clave de validez del registro al CustomerID en caso esté vacío o no.
b. Se determinó como Auto Loader la técnica más apta para la ingesta de la data.
c. Se redujo la cantidad de registros a la cantidad original, con fines didácticos.

**Dificultades encontradas**
a. El reto propio de indagar respecto a la herramienta Databricks, la cual he empezado a estudiar a profundidad

**Resultados Obtenidos**
a. Principalmente, aplicar los conocimientos adquiridos en las sesiones dictadas hasta el momento.