In [None]:
import duckdb
import pyarrow as pa
import pyarrow.parquet as pq
import gc
import os # Importado a nivel global ya que se usa en varias funciones para manejo de archivos
from pathlib import Path # Importado a nivel global

# --- Definición de Constantes ---

# Rutas de archivos Parquet
SILVER_VENTAS_PATH = '../data/silver_ventas_establecimiento.parquet'
GOLD_VENTAS_WEEKLY_PATH = '../data/gold_ventas_semanales.parquet'

# Lista de tipos de establecimiento a excluir en el filtrado
TIPOS_A_EXCLUIR = ['Evento','Eventos','Catering, colectividades y Otros','Parque Temático',
                   'Z015','Sociedades','Particular','Z023','Z013','Alimentacion tradicional',
                   'Cash, distribuidor', 'Z020', 'Z016']

# Columnas a eliminar durante el procesamiento
COLUMNS_TO_REMOVE = ['uom', 'material_name', 'tipo', 'region', 'promo_id', 'promo_flag', 'promo_type', 'promo_mechanics']

# Prefijos de materiales a incluir.
MATERIALS_TO_INCLUDE = ('ED', 'FD', 'DL', 'VI', 'VD', 'BD')


# --- Funciones de Transformación de Datos ---

def filter_sales_by_not_type(table: pa.Table, types: list[str]) -> pa.Table:
    """
    Filtra una tabla PyArrow para excluir filas según una lista de 'tipos'.

    Utiliza una conexión DuckDB temporal en memoria para realizar la operación SQL.

    Args:
        table: Tabla PyArrow de entrada.
        types: Lista de strings con los 'tipos' a excluir.

    Returns:
        Una nueva tabla PyArrow con las filas filtradas.
    """
    # Se crea una conexión temporal a DuckDB para esta operación.
    con = duckdb.connect()
    try:
        con.register('input_table', table)
        
        # Construye la parte de la consulta SQL para la cláusula IN.
        # Consideración: Para mayor seguridad contra inyección SQL (aunque 'types' aquí es interno),
        # se podrían explorar consultas parametrizadas si la complejidad no aumenta significativamente.
        types_sql = ", ".join([f"'{t}'" for t in types])
        
        query = f"""
            SELECT *
            FROM input_table
            WHERE tipo NOT IN ({types_sql})
        """
        result_table = con.sql(query).fetch_arrow_table()
    finally:
        # Asegura que la conexión se cierre siempre.
        con.close()
    return result_table


def filter_sales_date(table: pa.Table, date_from: str, date_to: str) -> pa.Table:
    """
    Filtra ventas por rango de fechas utilizando la columna 'week'.

    Crea una conexión DuckDB temporal en memoria.

    Args:
        table: Tabla PyArrow de entrada (debe contener la columna 'week').
        date_from: Fecha de inicio del filtro (formato YYYY-MM-DD).
        date_to: Fecha de fin del filtro (formato YYYY-MM-DD).

    Returns:
        Una nueva tabla PyArrow con las filas filtradas por fecha.
    """
    con = duckdb.connect()
    try:
        con.register('input_table', table)
        
        query = f"""
            SELECT *
            FROM input_table
            WHERE week BETWEEN '{date_from}' AND '{date_to}'
        """
        result_table = con.sql(query).fetch_arrow_table()
    finally:
        con.close()
    return result_table


def promoid_to_boolean(table: pa.Table) -> pa.Table:
    """
    Crea una columna binaria 'has_promo' (1 o 0) basada en la existencia de 'promo_id'.

    Args:
        table: Tabla PyArrow de entrada (debe contener la columna 'promo_id').

    Returns:
        Tabla PyArrow con la nueva columna 'has_promo'.
    """
    con = duckdb.connect()
    try:
        con.register('input_table', table)
        
        query = f"""
            SELECT *,
                CASE
                    WHEN promo_id IS NOT NULL THEN 1
                    ELSE 0
                END AS has_promo
            FROM input_table
        """
        result_table = con.sql(query).fetch_arrow_table()
    finally:
        con.close()
    return result_table

def remove_columns(table: pa.Table, columns_to_remove: list[str]) -> pa.Table:
    """
    Elimina columnas especificadas de una tabla PyArrow.

    Args:
        table: Tabla PyArrow de entrada.
        columns_to_remove: Lista de nombres de columnas a eliminar.

    Returns:
        Tabla PyArrow sin las columnas especificadas.
    Raises:
        ValueError: Si se intentan eliminar todas las columnas.
    """
    con = duckdb.connect()
    try:
        con.register('input_table', table)
        
        all_columns = table.column_names
        columns_to_keep = [col for col in all_columns if col not in columns_to_remove]
        
        if not columns_to_keep:
            # No se puede tener una tabla sin columnas.
            raise ValueError("No se pueden eliminar todas las columnas de la tabla.")
        
        # Asegura que los nombres de las columnas estén entrecomillados por si tienen caracteres especiales.
        select_clause = ', '.join([f'"{col}"' for col in columns_to_keep])
        
        query = f"""
            SELECT {select_clause}
            FROM input_table
        """
        result_table = con.sql(query).fetch_arrow_table()
    finally:
        con.close()
    return result_table

def covid_flag(table: pa.Table) -> pa.Table:
    """
    Crea una columna binaria 'is_covid_period' para indicar si la fecha 'calday' cae dentro del período COVID.

    El período COVID se define entre '2020-03-01' y '2022-04-30'.

    Args:
        table: Tabla PyArrow de entrada (debe contener la columna 'calday').

    Returns:
        Tabla PyArrow con la nueva columna 'is_covid_period'.
    """
    con = duckdb.connect()
    try:
        con.register('input_table', table)
        
        query = f"""
            SELECT *,
                CASE
                    WHEN calday BETWEEN '2020-03-01' AND '2022-04-30' THEN 1
                    ELSE 0
                END AS is_covid_period
            FROM input_table
        """
        result_table = con.sql(query).fetch_arrow_table()
    finally:
        con.close()
    return result_table


def filter_by_string_in_column(table: pa.Table, column_name: str, string_to_filter: str) -> pa.Table:
    """
    Filtra filas basándose en si una columna contiene una subcadena específica.

    Args:
        table: Tabla PyArrow de entrada.
        column_name: Nombre de la columna donde buscar.
        string_to_filter: Subcadena a buscar (usando LIKE '%subcadena%').

    Returns:
        Tabla PyArrow con las filas donde la columna especificada contiene la subcadena.
    """
    con = duckdb.connect()
    try:
        con.register('input_table', table)
        
        # El uso de f-string directamente en la consulta SQL para valores de usuario
        # podría ser un riesgo de seguridad si 'string_to_filter' o 'column_name'
        # provinieran de fuentes no confiables. Aquí se asumen controlados.
        query = f"""
            SELECT *
            FROM input_table
            WHERE "{column_name}" LIKE '%{string_to_filter}%'
        """
        result_table = con.sql(query).fetch_arrow_table()
    finally:
        con.close()
    return result_table


def group_by_week(table: pa.Table) -> pa.Table:
    """
    Agrupa los datos por 'establecimiento', 'material' y semana (derivada de 'calday').

    Calcula sumas y máximos para métricas relevantes.

    Args:
        table: Tabla PyArrow de entrada (con 'calday', 'establecimiento', 'material', 'has_promo', 'volume_ap', 'is_covid_period').

    Returns:
        Tabla PyArrow con datos agregados semanalmente.
    """
    con = duckdb.connect()
    try:
        con.register('input_table', table)
        
        query = """
            SELECT 
                establecimiento,
                material,
                DATE_TRUNC('week', calday) AS week,
                MAX(has_promo) AS has_promo, -- 1 si alguna fila del grupo tuvo promoción
                SUM(volume_ap) AS weekly_volume,
                MAX(is_covid_period) AS is_covid_period -- 1 si alguna fila del grupo está en período COVID
            FROM input_table
            GROUP BY 
                establecimiento,
                material,
                DATE_TRUNC('week', calday)
            ORDER BY 
                establecimiento,
                material,
                week
        """
        result_table = con.sql(query).fetch_arrow_table()
    finally:
        con.close()
    return result_table

def filter_by_min_weeks(table: pa.Table, min_weeks: int) -> pa.Table:
    """
    Filtra combinaciones de 'establecimiento'-'material' que tengan menos de 'min_weeks' semanas de datos.

    Args:
        table: Tabla PyArrow de entrada (con 'establecimiento', 'material', 'week').
        min_weeks: Número mínimo de semanas de datos requeridas.

    Returns:
        Tabla PyArrow filtrada.
    """
    con = duckdb.connect()
    try:
        con.register('input_table', table)
        
        query = f"""
            WITH series_counts AS (
                SELECT 
                    establecimiento, 
                    material,
                    COUNT(DISTINCT week) as week_count
                FROM input_table
                GROUP BY establecimiento, material
            )
            SELECT t.*
            FROM input_table t
            JOIN series_counts s
                ON t.establecimiento = s.establecimiento 
                AND t.material = s.material
            WHERE s.week_count >= {min_weeks}
        """
        result_table = con.sql(query).fetch_arrow_table()
    finally:
        con.close()
    return result_table


def fill_time_series_gaps(table: pa.Table) -> pa.Table:
    """
    Rellena huecos en series temporales para cada combinación 'establecimiento'-'material'.

    Genera filas para semanas faltantes entre la fecha mínima y máxima de cada serie.
    Procesa los datos en lotes para gestionar el uso de memoria.

    Args:
        table: Tabla PyArrow de entrada (con 'week', 'establecimiento', 'material', y métricas).

    Returns:
        Tabla PyArrow con series temporales continuas, rellenando semanas faltantes.
        Las métricas para semanas nuevas se rellenan con 0 o valores por defecto.
    """
    con = duckdb.connect()
    # Límite de memoria para las operaciones de DuckDB en esta función.
    con.execute("PRAGMA memory_limit='4GB'")
    
    try:
        con.register('input_table', table)
        
        combinations = con.execute("""
            SELECT DISTINCT establecimiento, material 
            FROM input_table
            ORDER BY establecimiento, material
        """).fetchall()
        
        print(f"Procesando {len(combinations)} combinaciones únicas de establecimiento-material en lotes para rellenar huecos.")
        
        batch_size = 500  # Ajustar según memoria disponible y tamaño de datos.
        all_results = []
        
        for i in range(0, len(combinations), batch_size):
            batch = combinations[i:i+batch_size]
            batch_conditions_list = []
            
            for estab, mat in batch:
                # Escapar apóstrofes en los nombres por si acaso.
                estab_escaped = estab.replace("'", "''")
                mat_escaped = mat.replace("'", "''")
                batch_conditions_list.append(f"(establecimiento = '{estab_escaped}' AND material = '{mat_escaped}')")
            
            where_clause_batch = " OR ".join(batch_conditions_list)
            
            batch_query = f"""
                WITH 
                date_ranges AS (
                    SELECT 
                        establecimiento,
                        material,
                        MIN(week) AS min_week,
                        MAX(week) AS max_week
                    FROM input_table
                    WHERE {where_clause_batch}
                    GROUP BY establecimiento, material
                ),
                all_weeks AS (
                    SELECT 
                        d.establecimiento,
                        d.material,
                        calendar_value::DATE AS week
                    FROM date_ranges d,
                    LATERAL UNNEST(
                        GENERATE_SERIES(
                            d.min_week, 
                            d.max_week, 
                            INTERVAL '1 week'
                        )
                    ) AS t(calendar_value)
                )
                SELECT 
                    a.establecimiento,
                    a.material,
                    a.week,
                    COALESCE(o.has_promo, 0) AS has_promo,
                    COALESCE(o.weekly_volume, 0) AS weekly_volume,
                    COALESCE(o.is_covid_period, 
                        CASE 
                            WHEN a.week BETWEEN '2020-03-01' AND '2022-04-30' THEN 1
                            ELSE 0
                        END
                    ) AS is_covid_period
                FROM all_weeks a
                LEFT JOIN input_table o
                    ON a.establecimiento = o.establecimiento
                    AND a.material = o.material
                    AND a.week = o.week
                ORDER BY 
                    a.establecimiento,
                    a.material,
                    a.week
            """
            
            print(f"Rellenando huecos: Lote {i//batch_size + 1}/{(len(combinations) + batch_size -1)//batch_size}, " +
                  f"items {i+1}-{min(i+batch_size, len(combinations))}")
            
            batch_result = con.execute(batch_query).fetch_arrow_table()
            all_results.append(batch_result)
            
            # Limpieza de memoria explícita entre lotes.
            con.unregister('input_table') # Deregistra antes de cerrar para liberar referencias de DuckDB
            con.close()
            gc.collect() # Python GC
            con = duckdb.connect()
            con.execute("PRAGMA memory_limit='4GB'")
            con.register('input_table', table) # Re-registra la tabla original para el siguiente lote
            
        if not all_results:
            return pa.Table.from_arrays([], names=table.schema.names, schema=table.schema) # Devolver tabla vacía con schema

        result_table = pa.concat_tables(all_results) if len(all_results) > 1 else all_results[0]
        
        return result_table
    finally:
        con.close()

def sort_series_by_volume(table: pa.Table) -> pa.Table:
    """
    Ordena las series temporales por el volumen total de cada combinación 'establecimiento'-'material'.

    Args:
        table: Tabla PyArrow de entrada (con 'weekly_volume', 'establecimiento', 'material', 'week').

    Returns:
        Tabla PyArrow ordenada. Las series con mayor volumen total aparecen primero.
        Dentro de cada serie, se mantiene el orden por semana.
    """
    con = duckdb.connect()
    con.execute("PRAGMA memory_limit='4GB'")
    
    try:
        con.register('input_table', table)
        
        # Para tablas muy grandes, se procesa en lotes basados en combinaciones ordenadas por volumen.
        if table.num_rows > 1000000: # Umbral para procesamiento en lotes.
            print("Tabla grande detectada para ordenamiento, procesando en lotes...")
            totals_df = con.execute("""
                SELECT 
                    establecimiento,
                    material,
                    SUM(weekly_volume) AS total_volume
                FROM input_table
                GROUP BY establecimiento, material
                ORDER BY total_volume DESC
            """).fetchdf() # Usar fetchdf para facilitar el manejo de lotes de combinaciones.
            
            batch_size = 500  # Número de combinaciones establecimiento-material por lote.
            all_results = []
            
            for i in range(0, len(totals_df), batch_size):
                batch_df = totals_df.iloc[i:i+batch_size]
                batch_conditions_list = []
                for _, row in batch_df.iterrows():
                    estab_escaped = row['establecimiento'].replace("'", "''")
                    mat_escaped = row['material'].replace("'", "''")
                    batch_conditions_list.append(f"(establecimiento = '{estab_escaped}' AND material = '{mat_escaped}')")
                
                if not batch_conditions_list: continue # Si el lote está vacío
                
                where_clause_batch = " OR ".join(batch_conditions_list)
                
                # La cláusula ORDER BY en la subconsulta series_totals_batch no es estrictamente necesaria
                # para la corrección, pero se mantiene por si ayuda al optimizador en algunos casos.
                # El orden final lo da el ORDER BY de la consulta principal.
                batch_query = f"""
                    WITH series_totals_batch AS (
                        SELECT 
                            establecimiento,
                            material,
                            SUM(weekly_volume) AS total_volume
                        FROM input_table
                        WHERE {where_clause_batch}
                        GROUP BY establecimiento, material
                    )
                    SELECT t.*
                    FROM input_table t
                    JOIN series_totals_batch s
                        ON t.establecimiento = s.establecimiento 
                        AND t.material = s.material
                    ORDER BY 
                        s.total_volume DESC,
                        t.establecimiento,
                        t.material,
                        t.week
                """
                
                print(f"Ordenando por volumen: Lote {i//batch_size + 1}/{(len(totals_df) + batch_size - 1)//batch_size}")
                batch_result = con.execute(batch_query).fetch_arrow_table()
                all_results.append(batch_result)
                
                # Limpieza de memoria explícita entre lotes.
                con.unregister('input_table')
                con.close()
                gc.collect()
                con = duckdb.connect()
                con.execute("PRAGMA memory_limit='4GB'")
                con.register('input_table', table)

            if not all_results: # Si no hay resultados (ej. tabla de entrada vacía)
                 return pa.Table.from_arrays([], names=table.schema.names, schema=table.schema)

            result_table = pa.concat_tables(all_results) if len(all_results) > 1 else all_results[0]

        else:
            # Procesamiento estándar para tablas más pequeñas.
            query = """
                WITH series_totals AS (
                    SELECT 
                        establecimiento,
                        material,
                        SUM(weekly_volume) AS total_volume
                    FROM input_table
                    GROUP BY establecimiento, material
                )
                SELECT t.*
                FROM input_table t
                JOIN series_totals s
                    ON t.establecimiento = s.establecimiento 
                    AND t.material = s.material
                ORDER BY 
                    s.total_volume DESC,
                    t.establecimiento,
                    t.material,
                    t.week
            """
            result_table = con.execute(query).fetch_arrow_table()
    finally:
        con.close()
    
    gc.collect()
    return result_table

def create_nested_series_format(table: pa.Table, output_path: str = None) -> pa.Table:
    """
    Transforma datos de series temporales a un formato anidado.

    Cada fila representa una combinación 'establecimiento'-'material', con sus datos
    temporales (semana, volumen, has_promo, is_covid_period) en una lista de diccionarios
    dentro de una columna 'series'.

    Args:
        table: Tabla PyArrow de entrada (datos semanales, ej: salida de 'group_by_week').
        output_path: Ruta opcional para guardar el resultado en Parquet.

    Returns:
        Tabla PyArrow en formato anidado.
    """
    con = duckdb.connect()
    con.execute("PRAGMA memory_limit='4GB'")
    
    try:
        con.register('input_table', table)
        
        # Procesamiento en lotes para tablas grandes para evitar problemas de memoria al crear estructuras anidadas.
        if table.num_rows > 1000000: # Umbral para procesamiento en lotes
            print("Tabla grande detectada para formato anidado, procesando en lotes...")
            
            combinations = con.execute("""
                SELECT 
                    establecimiento, 
                    material,
                    SUM(weekly_volume) AS total_volume -- Usado para ordenar lotes
                FROM input_table
                GROUP BY establecimiento, material
                ORDER BY total_volume DESC, establecimiento, material
            """).fetchall()
            
            batch_size = 500 # Número de combinaciones por lote
            all_results = []
            
            for i in range(0, len(combinations), batch_size):
                batch = combinations[i:i+batch_size]
                batch_conditions_list = []
                for estab, mat, _ in batch: # Ignorar total_volume aquí, solo para ordenación de lotes
                    estab_escaped = estab.replace("'", "''")
                    mat_escaped = mat.replace("'", "''")
                    batch_conditions_list.append(f"(establecimiento = '{estab_escaped}' AND material = '{mat_escaped}')")

                if not batch_conditions_list: continue

                where_clause_batch = " OR ".join(batch_conditions_list)
                
                batch_query = f"""
                    SELECT 
                        establecimiento,
                        material,
                        LIST(STRUCT_PACK(
                            ds := week::VARCHAR, 
                            y := weekly_volume,
                            has_promo := has_promo,
                            is_covid_period := is_covid_period
                        ) ORDER BY week) AS series, -- Asegura el orden dentro de la lista anidada
                        COUNT(*) AS num_points,
                        SUM(weekly_volume) AS total_volume,
                        AVG(weekly_volume) AS avg_weekly_volume
                    FROM input_table
                    WHERE {where_clause_batch}
                    GROUP BY establecimiento, material
                    ORDER BY SUM(weekly_volume) DESC, establecimiento, material
                """
                
                print(f"Creando series anidadas: Lote {i//batch_size + 1}/{(len(combinations) + batch_size - 1)//batch_size}")
                batch_result = con.execute(batch_query).fetch_arrow_table()
                all_results.append(batch_result)
                
                # Limpieza de memoria explícita
                con.unregister('input_table')
                con.close()
                gc.collect()
                con = duckdb.connect()
                con.execute("PRAGMA memory_limit='4GB'")
                con.register('input_table', table)

            if not all_results:
                 return pa.Table.from_schema( # Esquema esperado para la tabla anidada
                    pa.schema([
                        ('establecimiento', pa.string()), ('material', pa.string()),
                        ('series', pa.list_(pa.struct([
                            ('ds', pa.string()), ('y', pa.float64()), # Ajustar tipos si es necesario
                            ('has_promo', pa.int64()), ('is_covid_period', pa.int64())
                        ]))),
                        ('num_points', pa.int64()), ('total_volume', pa.float64()), ('avg_weekly_volume', pa.float64())
                    ])
                 )
            result_table = pa.concat_tables(all_results) if len(all_results) > 1 else all_results[0]

        else:
            # Procesamiento estándar para tablas más pequeñas.
            query = """
                SELECT 
                    establecimiento,
                    material,
                    LIST(STRUCT_PACK(
                        ds := week::VARCHAR, 
                        y := weekly_volume,
                        has_promo := has_promo,
                        is_covid_period := is_covid_period
                    ) ORDER BY week) AS series, -- Asegura orden dentro de la lista
                    COUNT(*) AS num_points,
                    SUM(weekly_volume) AS total_volume,
                    AVG(weekly_volume) AS avg_weekly_volume
                FROM input_table
                GROUP BY establecimiento, material
                ORDER BY SUM(weekly_volume) DESC, establecimiento, material
            """
            result_table = con.execute(query).fetch_arrow_table()
        
        if output_path:
            print(f"\nGuardando formato de series anidadas en: {output_path}")
            # Escritura en fragmentos (chunks) para tablas de resultados grandes
            if len(result_table) > 100000: # Umbral para escritura en fragmentos
                print("Tabla de resultado anidada grande, escribiendo en fragmentos...")
                temp_dir = Path(output_path).parent / f"temp_nested_{Path(output_path).stem}"
                os.makedirs(temp_dir, exist_ok=True)
                
                chunk_size = 50000 # Número de series (filas) por fragmento
                num_chunks = (len(result_table) + chunk_size - 1) // chunk_size
                
                for i_chunk in range(num_chunks):
                    start_idx = i_chunk * chunk_size
                    end_idx = min((i_chunk + 1) * chunk_size, len(result_table))
                    chunk = result_table.slice(start_idx, end_idx - start_idx)
                    chunk_path = temp_dir / f"chunk_{i_chunk}.parquet"
                    pq.write_table(chunk, chunk_path, compression='snappy')
                    print(f"  - Guardado fragmento {i_chunk+1}/{num_chunks} en {chunk_path}")
                    del chunk
                    gc.collect()
                
                print(f"Fusionando {num_chunks} fragmentos en el archivo final...")
                chunk_files = sorted(temp_dir.glob("chunk_*.parquet"))
                # Leer todos los fragmentos y concatenarlos antes de escribir el archivo final.
                # Esto puede ser intensivo en memoria si el total es muy grande.
                # Alternativa: Usar DuckDB para leer múltiples Parquet y escribir uno nuevo si es más eficiente.
                tables_to_merge = [pq.read_table(cf) for cf in chunk_files]
                merged_table = pa.concat_tables(tables_to_merge)
                pq.write_table(merged_table, output_path, compression='snappy')
                
                # Limpieza de archivos temporales
                for cf in chunk_files:
                    os.remove(cf)
                os.rmdir(temp_dir)
                print(f"Fragmentos fusionados y archivos temporales eliminados.")
            else:
                pq.write_table(result_table, output_path, compression='snappy') # Compresión por defecto, considerar 'ZSTD' para mejor ratio.
            
            print(f"Guardadas {len(result_table):,} series en {output_path}")
            
            # Muestra de la estructura anidada
            # Necesario registrar la tabla resultante para consultarla con SQL
            con.register('result_table_nested', result_table)
            sample = con.execute("""
                SELECT 
                    establecimiento, material, num_points, series[1:3] AS sample_points
                FROM result_table_nested 
                LIMIT 1
            """).fetchall()
            
            if sample:
                print("\nMuestra de la estructura anidada:")
                print(f"Serie para {sample[0][0]}-{sample[0][1]} tiene {sample[0][2]} puntos.")
                print(f"Primeros puntos de muestra: {sample[0][3]}")
    finally:
        con.close()
    
    gc.collect()
    return result_table


def list_materials_from_parquet(file_path: str) -> list[str]:
    """
    Obtiene una lista de materiales únicos desde un archivo Parquet.

    Args:
        file_path: Ruta al archivo Parquet.

    Returns:
        Lista de strings con los nombres de materiales únicos.
    """
    con = duckdb.connect()
    try:
        # Asegurar que la ruta se pasa como string a DuckDB
        result = con.execute(f"SELECT DISTINCT material FROM read_parquet('{str(file_path)}')").fetchdf()['material'].tolist()
    finally:
        con.close()
    return result


def filter_by_materials(table: pa.Table) -> pa.Table:
    """
    Filtra la tabla para incluir solo filas cuyos materiales comiencen con los prefijos definidos en MATERIALS_TO_INCLUDE.
    """
    con = duckdb.connect()
    try:
        # Obtiene todos los materiales distintos del archivo Parquet silver de referencia.
        # Esta operación puede ser costosa si el archivo es grande y se llama repetidamente.
        # Considerar obtener esta lista una vez si el pipeline lo permite.
        distinct_materials_in_silver = list_materials_from_parquet(SILVER_VENTAS_PATH)
        
        # Filtra los materiales basándose en los prefijos definidos globalmente.
        materials_to_actually_include = [
            material for material in distinct_materials_in_silver 
            if material.startswith(MATERIALS_TO_INCLUDE) # MATERIALS_TO_INCLUDE es una tupla de prefijos
        ]

        if not materials_to_actually_include:
            print("Advertencia: No se encontraron materiales que coincidan con los prefijos. Se devolverá una tabla vacía.")
            # Devolver una tabla vacía con el mismo esquema que la tabla de entrada.
            return table.slice(0, 0)

        con.register('input_table', table)
        # Construcción de la cláusula IN. Ver comentario en filter_sales_by_not_type sobre parametrización.
        materials_sql = ", ".join([f"'{m.replace(\"'\", \"''\")}'" for m in materials_to_actually_include]) # Escapar apóstrofes
        
        query = f"""
            SELECT *
            FROM input_table
            WHERE material IN ({materials_sql})
        """
        result_table = con.sql(query).fetch_arrow_table()
        
        return result_table
    finally:
        con.close()


# --- Función Principal de Procesamiento ---

def process_data(initial_table: pa.Table, processing_functions: list, 
               show_intermediate: bool = False,
               save_result: bool = False,
               output_path: str = None, # Cambiado a str, pero Path es más robusto
               output_compression: str = 'snappy',
               memory_limit_duckdb: str = '8GB') -> pa.Table: # Renombrado para claridad
    """
    Aplica una secuencia de funciones de procesamiento a una tabla PyArrow.

    Args:
        initial_table: Tabla PyArrow inicial.
        processing_functions: Lista de funciones a aplicar. Cada elemento puede ser:
                              - Una referencia a función (que toma la tabla como primer arg).
                              - Una tupla (función, lista_args_adicionales).
                              - Una tupla (función, lista_args_adicionales, dict_kwargs_adicionales).
        show_intermediate: Si es True, imprime información y muestra de tablas intermedias.
        save_result: Si es True, guarda la tabla final en formato Parquet.
        output_path: Ruta para guardar el archivo Parquet (requerido si save_result=True).
                     Se recomienda usar pathlib.Path.
        output_compression: Algoritmo de compresión para Parquet (ej: 'snappy', 'gzip', 'zstd').
        memory_limit_duckdb: Límite de memoria para DuckDB (ej: '8GB').

    Returns:
        La tabla PyArrow final después de todas las transformaciones.
    """
    current_table = initial_table
    
    # Conexión DuckDB para mostrar resultados intermedios si se solicita.
    # Se gestiona cuidadosamente para liberar memoria.
    con_display = None
    if show_intermediate:
        con_display = duckdb.connect()
        con_display.execute(f"PRAGMA memory_limit='{memory_limit_duckdb}'")
    
    try:
        for i, func_spec in enumerate(processing_functions):
            function_to_apply = None
            args = []
            kwargs = {}

            if callable(func_spec):
                function_to_apply = func_spec
            elif isinstance(func_spec, tuple) and len(func_spec) >= 1 and callable(func_spec[0]):
                function_to_apply = func_spec[0]
                if len(func_spec) > 1:
                    args = func_spec[1] # Asumimos que es una lista o tupla de argumentos
                if len(func_spec) > 2:
                    kwargs = func_spec[2] # Asumimos que es un diccionario de kwargs
            else:
                raise ValueError(f"Especificación de función inválida en la posición {i} del pipeline: {func_spec}")
            
            function_name = function_to_apply.__name__
            if show_intermediate:
                print(f"\n--- Paso {i+1}: Aplicando '{function_name}' ---")
                print(f"Filas antes: {len(current_table):,}")
            
            current_table = function_to_apply(current_table, *args, **kwargs)
            
            # Forzar recolección de basura después de cada paso para liberar memoria.
            # Esto puede tener un impacto en el rendimiento, pero es útil en entornos con memoria limitada.
            gc.collect()
            
            if show_intermediate and con_display:
                print(f"Filas después de '{function_name}': {len(current_table):,}")
                try:
                    con_display.register('current_intermediate_table', current_table)
                    print(f"Muestra después de '{function_name}' (primeras 5 filas):")
                    con_display.sql("SELECT * FROM current_intermediate_table LIMIT 5").show()
                    # Des-registrar la tabla para liberar la referencia en DuckDB
                    con_display.unregister('current_intermediate_table')
                except Exception as e:
                    print(f"Error al mostrar tabla intermedia para '{function_name}': {e}")
                    # Intentar resetear la conexión si hay problemas
                    if con_display: con_display.close()
                    con_display = duckdb.connect()
                    con_display.execute(f"PRAGMA memory_limit='{memory_limit_duckdb}'")

        if save_result:
            if output_path is None:
                raise ValueError("Se debe especificar 'output_path' cuando 'save_result' es True.")
            
            output_path_obj = Path(output_path) # Convertir a Path para manejo robusto
            print(f"\nGuardando resultado final en: {output_path_obj}")

            # Escritura en fragmentos (chunks) para tablas muy grandes para evitar OOM.
            # Este umbral es arbitrario y puede ajustarse.
            if len(current_table) > 1000000: 
                print(f"Tabla final grande ({len(current_table):,} filas), usando escritura en fragmentos...")
                
                temp_dir_final = output_path_obj.parent / f"temp_final_{output_path_obj.stem}"
                os.makedirs(temp_dir_final, exist_ok=True)
                
                chunk_size = 250000  # Número de filas por fragmento.
                num_chunks = (len(current_table) + chunk_size - 1) // chunk_size
                
                for i_chunk in range(num_chunks):
                    start_idx = i_chunk * chunk_size
                    end_idx = min((i_chunk + 1) * chunk_size, len(current_table))
                    chunk = current_table.slice(start_idx, end_idx - start_idx)
                    chunk_path = temp_dir_final / f"final_chunk_{i_chunk}.parquet"
                    pq.write_table(chunk, chunk_path, compression=output_compression)
                    print(f"  - Guardado fragmento {i_chunk+1}/{num_chunks} en {chunk_path}")
                    del chunk # Liberar memoria del fragmento explícitamente
                    gc.collect()
                
                print(f"Fusionando {num_chunks} fragmentos en el archivo final: {output_path_obj}...")
                # Este proceso de fusión puede ser intensivo en memoria.
                # Una alternativa podría ser usar DuckDB para leer los Parquets y escribir uno nuevo:
                # con_merge = duckdb.connect()
                # con_merge.execute(f"COPY (SELECT * FROM read_parquet('{str(temp_dir_final)}/*.parquet')) TO '{str(output_path_obj)}' (FORMAT PARQUET, COMPRESSION '{output_compression}')")
                # con_merge.close()
                # La implementación actual es puramente PyArrow:
                chunk_files = sorted(temp_dir_final.glob("final_chunk_*.parquet"))
                tables_to_merge = [pq.read_table(cf) for cf in chunk_files]
                merged_table = pa.concat_tables(tables_to_merge)
                pq.write_table(merged_table, output_path_obj, compression=output_compression)
                del tables_to_merge, merged_table # Liberar memoria
                gc.collect()

                # Limpieza de archivos y directorio temporal
                for cf in chunk_files:
                    os.remove(cf)
                os.rmdir(temp_dir_final)
                print(f"Fragmentos fusionados y directorio temporal '{temp_dir_final}' eliminado.")
            else:
                # Escritura estándar para tablas más pequeñas.
                pq.write_table(current_table, output_path_obj, compression=output_compression)
            
            print(f"Guardadas {len(current_table):,} filas en {output_path_obj}")
        
        return current_table
    
    finally:
        if con_display:
            con_display.close()
        gc.collect() # Recolección de basura final.

In [None]:
con = duckdb.connect()

# 1. Cargar los datos iniciales desde un archivo Parquet a una tabla PyArrow
print(f"Cargando datos iniciales desde: {SILVER_VENTAS_PATH}")
# Lee el archivo Parquet y carga su contenido en una tabla de PyArrow
initial_table = con.sql(f"SELECT * FROM read_parquet('{str(SILVER_VENTAS_PATH)}')").fetch_arrow_table()
print(f"Filas iniciales cargadas: {len(initial_table):,}")

# Registrar la tabla PyArrow en DuckDB para poder consultarla mediante SQL
con.register('initial_table_arrow', initial_table) # Se cambió el nombre para evitar posible colisión con tablas SQL

# Mostrar las primeras 5 filas de la tabla cargada para verificación
print("\nTabla Silver Inicial (primeras 5 filas, desde PyArrow registrada):")
con.sql("SELECT * FROM initial_table_arrow LIMIT 5").show()

# 2. Definición del pipeline de procesamiento de datos
processing_pipeline = [
    (filter_sales_by_not_type, [TIPOS_A_EXCLUIR]), # Filtrar ventas por tipo (excluyendo ciertos tipos)
    promoid_to_boolean,                           # Convertir promo_id a un valor booleano
    (remove_columns, [COLUMNS_TO_REMOVE]),        # Eliminar columnas no necesarias
    covid_flag,                                   # Añadir una bandera/indicador de COVID
    (filter_by_string_in_column, ['establecimiento', '81']), # Filtrar por cadena en la columna 'establecimiento'
    filter_by_materials,                          # Filtrar por materiales específicos
    group_by_week,                                # Agrupar los datos por semana
    fill_time_series_gaps,                        # Rellenar huecos en la serie temporal
    (filter_by_min_weeks, [12]),                  # Filtrar series con un mínimo de semanas de datos
    sort_series_by_volume                         # Ordenar las series por volumen
]

# 3. Ejecutar el pipeline de procesamiento
print("\nIniciando el procesamiento de datos a través del pipeline...")
final_table = process_data(
    input_table=initial_table, # La tabla PyArrow de entrada
    pipeline_steps=processing_pipeline,
    show_intermediate=True,    # Mostrar resultados intermedios (útil para depuración)
    save_result=True,          # Guardar la tabla resultante
    output_path=GOLD_VENTAS_WEEKLY_PATH # Ruta para guardar la tabla procesada (capa Gold)
)

print(f"Procesamiento completado. Tabla final generada con {len(final_table):,} filas.")
if GOLD_VENTAS_WEEKLY_PATH and save_result: # save_result también debería ser True
    print(f"Tabla final guardada en: {GOLD_VENTAS_WEEKLY_PATH}")

con.close()


In [None]:


con = duckdb.connect()

# 1. Load the initial data into a PyArrow Table
print(f"Loading initial data from: {SILVER_VENTAS_PATH}")
initial_table = con.sql(f"SELECT * FROM read_parquet('{SILVER_VENTAS_PATH}')").fetch_arrow_table()
print(f"Initial rows: {len(initial_table):,}")

# Register the table for querying
con.register('initial_table', initial_table)
# Show first 5 rows
print("\nInitial Silver Table (first 5 rows):")
con.sql("SELECT * FROM initial_table LIMIT 5").show()

processing_pipeline = [
        (filter_sales_by_not_type, [TIPOS_A_EXCLUIR]),
        promoid_to_boolean,
        (remove_columns, [COLUMNS_TO_REMOVE]),
        covid_flag,
        (filter_by_string_in_column, ['establecimiento', '81']),
        filter_by_materials,
        group_by_week,
        fill_time_series_gaps,
        (filter_by_min_weeks, [12]),
        sort_series_by_volume
    ]

# Process the data through all steps
final_table = process_data(
    initial_table, 
    processing_pipeline, 
    show_intermediate=True, 
    save_result=True, 
    output_path=GOLD_VENTAS_WEEKLY_PATH)



Loading initial data from: ../data/silver_ventas_establecimiento.parquet
Initial rows: 67,918,456

Initial Silver Table (first 5 rows):
┌─────────────────┬──────────┬─────────────────────┬──────────────────────┬───────────┬──────────────┬─────────────────┐
│ establecimiento │ material │       calday        │       promo_id       │ volume_ap │ cantidad_umb │      tipo       │
│     varchar     │ varchar  │      timestamp      │       varchar        │  double   │    double    │     varchar     │
├─────────────────┼──────────┼─────────────────────┼──────────────────────┼───────────┼──────────────┼─────────────────┤
│ 8100240876      │ TB8      │ 2024-11-26 00:00:00 │ NULL                 │       8.0 │          1.0 │ Bar Cervecería  │
│ 8100032055      │ PI13     │ 2024-11-26 00:00:00 │ NULL                 │      7.92 │          1.0 │ Restaurante     │
│ 8100258434      │ FL13SPN  │ 2024-11-26 00:00:00 │ NULL                 │      23.1 │          2.0 │ Bar Cervecería  │
│ 8100036860     