In [0]:
# ACTULZAIR DATOS DE TABLA DELTA EN UN PERIODO DE TIEMPO DISPONIBLE

from pyspark.sql.functions import col
from datetime import datetime
from delta.tables import DeltaTable


# Obtener la fecha actual para procesar (formato: yyyy-MM-dd)
fecha_procesada = datetime.now().strftime("%Y-%m-%d")

# Ruta de destino en Azure Blob Storage
ruta_destino = "abfss://data@storagepredictsell.blob.core.windows.net/cleaned_retail"

# Tabla Delta donde se almacenan los datos
tabla_delta = "default.cleaned_retail"

try:
    # Cargar los nuevos datos 
    df_nuevos_datos = spark.read.format("csv").option("header", True).load("dbfs:/mnt/storagepredictsell/data/cleaned_retail.csv")

    # Validar esquema
    df_nuevos_datos.printSchema()

    # Filtrar solo los datos de la fecha procesada en el nuevo DataFrame
    df_nuevos_datos = df_nuevos_datos.filter(col("Date") == fecha_procesada).dropDuplicates()

    # Validar que existen datos para la fecha procesada
    if df_nuevos_datos.count() > 0:
        # Usar Delta Merge para actualizar eficientemente
        tabla_delta_obj = DeltaTable.forName(spark, tabla_delta)
        tabla_delta_obj.alias("target").merge(
            df_nuevos_datos.alias("source"),
            "target.Date = source.Date"
        ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

        # Guardar la tabla Delta en el contenedor de Azure
        df_nuevos_datos.write.format("delta").mode("overwrite").save(ruta_destino)

        # Confirmar que los datos han sido actualizados correctamente
        print(f"Datos actualizados para la fecha {fecha_procesada}:")
        display(spark.sql(f"SELECT * FROM {tabla_delta} WHERE Date = '{fecha_procesada}'"))
    else:
        print(f"No se encontraron nuevos datos para la fecha {fecha_procesada}.")
except Exception as e:
    print(f"Error durante la actualización: {e}")


root
 |-- Transaction ID: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Product Category: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Price per Unit: string (nullable = true)
 |-- Total Amount: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)

No se encontraron nuevos datos para la fecha 2024-11-25.
