# Extracción y Procesamiento Diario
Procesa los datos históricos de ventas y genera métricas diarias por proveedor

In [ ]:
%run "../utils/config"

In [ ]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime, timedelta

# Configuración
batch_config = get_batch_config()
processing_date = datetime.now().date() - timedelta(days=1)

In [ ]:
# Leer datos del día anterior
df_sales = spark.read.table("raw_sales")\
    .where(date_trunc('day', col('timestamp')) == processing_date)

In [ ]:
# Calcular métricas diarias por proveedor
df_daily_metrics = df_sales.groupBy(
    date_trunc('day', col('timestamp')).alias('date'),
    'provider_id',
    'client_id'
).agg(
    count('*').alias('transaction_count'),
    sum('amount').alias('total_amount'),
    avg('amount').alias('avg_amount')
)

In [ ]:
# Escribir resultados en Delta Lake
df_daily_metrics.write\
    .format('delta')\
    .mode(batch_config['write_mode'])\
    .partitionBy(*batch_config['partition_columns'])\
    .saveAsTable('daily_metrics')

In [ ]:
# Calcular top clientes por proveedor
df_top_clients = df_sales.groupBy(
    'provider_id',
    'client_id'
).agg(
    sum('amount').alias('total_amount')
).orderBy(
    'provider_id',
    col('total_amount').desc()
)

In [ ]:
# Escribir top clientes
df_top_clients.write\
    .format('delta')\
    .mode('overwrite')\
    .saveAsTable('top_clients')

# Optimizar tablas
for table in ['daily_metrics', 'top_clients']:
    spark.sql(f"OPTIMIZE {table}")
    spark.sql(f"VACUUM {table} RETAIN {batch_config['vacuum_retention']}")