In [1]:
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Callable, Tuple, Self
import time
import gc
import warnings
import os
import shutil
import pickle
from flaml import AutoML
import os
from glob import glob
import inspect
import logging
logger = logging.getLogger(__name__)        
import cloudpickle
import hashlib
import inspect
from typing import Any, Dict, Optional


SEED = 42
def fallback_latest_notebook():
    notebooks = glob("*.ipynb")
    if not notebooks:
        return None
    notebooks = sorted(notebooks, key=os.path.getmtime, reverse=True)
    return notebooks[0]



warnings.filterwarnings('ignore', category=FutureWarning)


class InDiskCacheWrapper:
    """
    Wrapper class to enable in-disk caching for pipeline steps.
    It uses the InDiskCache class to cache artifacts on disk.
    """
    def __init__(self, step: "PipelineStep", cache_dir: str = ".cache", execute_params: Optional[Dict[str, Any]] = None):
        self.step = step
        self.cache_dir = os.path.join(cache_dir, step.name)
        if not os.path.exists(self.cache_dir):
            os.makedirs(self.cache_dir)
        self._execute_params = execute_params or {}

    def execute(self, *args: Any, **kwargs: Any) -> None:
        """if the step has a cache, it hashes the parameters and checks if the result is already cached.
        note that params could be any object, so it uses cloudpickle to serialize them.
        If the result is cached, it returns the cached result.
        If not, it executes the step and saves the result in the cache.
        """
        # Bind args/kwargs to parameter names using original signature
        bound = inspect.signature(self.step.execute).bind(*args, **kwargs)
        #bound.apply_defaults()

        # also checks que values from __init__ for the hash
        init_params = self.step.__dict__.copy()
        # si los parametros con los que se inicializo cambiaron entonces deberia missear el cache
        bound.apply_defaults()

        # Serialize input arguments with cloudpickle
        try:
            serialized = cloudpickle.dumps(bound.arguments)
            # Include init parameters in the serialization
            serialized += cloudpickle.dumps(init_params)
        except Exception as e:
            raise ValueError(f"Failed to serialize for cache: {e}")

        # Generate a hash key from inputs
        hash_key = hashlib.sha256(serialized).hexdigest()
        cache_file = os.path.join(self.cache_dir, f"{hash_key}.pkl")

        # Load from cache or compute and save
        if os.path.exists(cache_file):
            print(f"Loading cached result for {self.step.name} from {cache_file}")
            with open(cache_file, "rb") as f:
                return pickle.load(f)
        else:
            print(f"Cache miss for {self.step.name}, executing step and saving result to {cache_file}")
            result = self.step.execute(*args, **kwargs)
            with open(cache_file, "wb") as f:
                pickle.dump(result, f)
            return result

    def get_execute_params(self) -> Dict[str, Any]:
        """
        Get the parameters for the execute method of the wrapped step.
        """
        return self._execute_params
    
    @property
    def name(self) -> str:
        """
        Get the name of the step.
        """
        return self.step.name
    

class InMemoryCacheWrapper:
    """
    Wrapper class to enable in-memory caching for pipeline steps.
    It uses the InMemoryCache class to cache artifacts in memory.
    """
    cache = {}
    
    def __init__(self, step: "PipelineStep", execute_params: Optional[Dict[str, Any]] = None):
        self.step = step
        self._execute_params = execute_params or {}

    def execute(self, *args: Any, **kwargs: Any) -> None:
        """Execute the step and cache the result in memory."""
        # Bind args/kwargs to parameter names using original signature
        bound = inspect.signature(self.step.execute).bind(*args, **kwargs)

        init_params = self.step.__dict__.copy()
        # Merge init parameters with execute parameters
        bound.arguments.update(init_params)
        bound.apply_defaults()

        # Serialize input arguments with cloudpickle
        try:
            serialized = cloudpickle.dumps(bound.arguments)
        except Exception as e:
            raise ValueError(f"Failed to serialize for cache: {e}")

        # Generate a hash key from inputs
        hash_key = hashlib.sha256(serialized).hexdigest()

        # Load from cache or compute and save
        if hash_key in self.cache:
            print(f"Loading cached result for {self.step.name} from memory")
            return self.cache[hash_key]
        else:
            print(f"Cache miss for {self.step.name}, executing step and saving result in memory")
            result = self.step.execute(*args, **kwargs)
            self.cache[hash_key] = result
            return result

    def get_execute_params(self) -> Dict[str, Any]:
        """
        Get the parameters for the execute method of the wrapped step.
        """
        return self._execute_params
    
    @property
    def name(self) -> str:
        """
        Get the name of the step.
        """
        return self.step.name
    

class CachedPipelineMixin:
    def in_disk_cache(self, cache_dir: str = ".cache") -> Self:
        """
        It activate the in-disk cache using the InDisKCache class. returns the step itself.
        Args:
            cache_dir (str): Directory where the cache will be stored.
        """
        execute_params = self.get_execute_params()
        return InDiskCacheWrapper(self, cache_dir=cache_dir, execute_params=execute_params)
    
    def in_memory_cache(self) -> Self:
        """
        It activate the in-memory cache using the InMemoryCache class. returns the step itself.
        """
        execute_params = self.get_execute_params()
        return InMemoryCacheWrapper(self, execute_params=execute_params)
    

class PipelineStep(ABC, CachedPipelineMixin):
    """
    Abstract base class for pipeline steps.
    Each step in the pipeline must inherit from this class and implement the execute method.
    """
    def __init__(self, name: Optional[str] = None):
        """
        Initialize a pipeline step.

        Args:
            name (str): Name of the step for identification and logging purposes.
        """
        self._name = name or self.__class__.__name__

    @abstractmethod
    def execute(self, *args: Any, **kwargs: Any) -> None:
        """
        Execute the pipeline step.
    
        Args:
            pipeline (Pipeline): The pipeline instance that contains this step.
        """
        pass

    def save_artifact(self, pipeline: "Pipeline", artifact_name: str, artifact: Any) -> None:
        """
        Save an artifact produced by this step to the pipeline.

        Args:
            pipeline (Pipeline): The pipeline instance.
            artifact_name (str): Name to identify the artifact.
            artifact (Any): The artifact to save.
        """
        pipeline.save_artifact(artifact_name, artifact)

    def get_artifact(self, pipeline: "Pipeline", artifact_name: str, default=None, raise_not_found=True) -> Any:
        """
        Retrieve a stored artifact from the pipeline.

        Args:
            pipeline (Pipeline): The pipeline instance.
            artifact_name (str): Name of the artifact to retrieve.
            default: Default value to return if the artifact is not found.
            raise_not_found (bool): Whether to raise an error if the artifact is not found.

        Returns:
            Any: The requested artifact or default value.
        """
        return pipeline.get_artifact(artifact_name, default=default, raise_not_found=raise_not_found)
    
    def del_artifact(self, pipeline: "Pipeline", artifact_name: str, soft=True) -> None:
        """
        Delete a stored artifact from the pipeline and free memory.

        Args:
            pipeline (Pipeline): The pipeline instance.
            artifact_name (str): Name of the artifact to delete.
            soft (bool): If True, performs a soft delete; if False, forces garbage collection.
        """
        pipeline.del_artifact(artifact_name, soft=soft)

    def get_execute_params(self):
        sig = inspect.signature(self.execute)
        return sig.parameters

        
    @property
    def name(self):
        return self._name

    @name.setter
    def name(self, value):
        self._name = value
    


class Pipeline:
    """
    Main pipeline class that manages the execution of steps and storage of artifacts.
    """
    def __init__(self, steps: Optional[List[PipelineStep]] = None, optimize_arftifacts_memory: bool = True):
        """Initialize the pipeline."""
        self.steps: List[PipelineStep] = steps if steps is not None else []
        self.artifacts: Dict[str, Any] = {}
        self.optimize_arftifacts_memory = optimize_arftifacts_memory

    def add_step(self, step: PipelineStep, position: Optional[int] = None) -> None:
        """
        Add a new step to the pipeline.

        Args:
            step (PipelineStep): The step to add.
            position (Optional[int]): Position where to insert the step. If None, appends to the end.
        """
        if position is not None:
            self.steps.insert(position, step)
        else:
            self.steps.append(step)

    def save_artifact(self, artifact_name: str, artifact: Any) -> None:
        """
        Save an artifact from a given step.

        Args:
            artifact_name (str): Name to identify the artifact.
            artifact (Any): The artifact to save.
        """
        if not self.optimize_arftifacts_memory:
            self.artifacts[artifact_name] = artifact
        else:
            # guarda el artifact en /tmp/ para no guardarlo en memoria
            if not os.path.exists("/tmp/"):
                os.makedirs("/tmp/")
            artifact_path = os.path.join("/tmp/", artifact_name)
            with open(artifact_path, 'wb') as f:
                pickle.dump(artifact, f)
            self.artifacts[artifact_name] = artifact_path

    def get_artifact(self, artifact_name: str, default=None, raise_not_found=True) -> Any:
        """
        Retrieve a stored artifact.

        Args:
            artifact_name (str): Name of the artifact to retrieve.

        Returns:
            Any: The requested artifact.
        """
        if not self.optimize_arftifacts_memory:
            return self.artifacts.get(artifact_name)
        else:
            artifact_path = self.artifacts.get(artifact_name)
            if artifact_path and os.path.exists(artifact_path):
                with open(artifact_path, 'rb') as f:
                    return pickle.load(f)
            else:
                if raise_not_found:
                    raise FileNotFoundError(f"Artifact {artifact_name} not found in /tmp/")
                return default
    
    def del_artifact(self, artifact_name: str, soft=True) -> None:
        """
        Delete a stored artifact and free memory.

        Args:
            artifact_name (str): Name of the artifact to delete.
        """
        del self.artifacts[artifact_name]
        if not soft:
            # Force garbage collection if not soft delete
            gc.collect()
    
    
    def run(self, verbose: bool = True, last_step_callback: Callable = None) -> None:
        """
        Execute all steps in sequence and log execution time.
        """        
        
        # Run steps from the last completed step
        for step in self.steps:
            if verbose:
                print(f"Executing step: {step.name}")
            start_time = time.time()
            params = self.__fill_params_from_step(step)
            artifacts_to_save = step.execute(**params)
            if artifacts_to_save is None:
                artifacts_to_save = {}
            self.__save_step_artifacts(artifacts_to_save)
            end_time = time.time()
            if verbose:
                print(f"Step {step.name} completed in {end_time - start_time:.2f} seconds")


    def __fill_params_from_step(self, step: PipelineStep) -> Dict[str, Any]:
        """
        Obtiene los nombres de los parametros de la implementacion de la funcion execute del paso. (excepto el pipeline el cual es obligatorio)
        luego obtengo todos los artefactos del pipeline y los paso como parametros al paso.
        """
        step_params = step.get_execute_params()
        params = {}
        for name, param in step_params.items():
            if name == 'pipeline':
                params[name] = self
            elif param.default is inspect.Parameter.empty:
                params[name] = self.get_artifact(name)
            else:
                params[name] = self.get_artifact(name, default=param.default, raise_not_found=False)
        return params

    

    def __save_step_artifacts(self, artifacts_to_save: Dict[str, Any]) -> None:
        """
        Save artifacts produced by a step to the pipeline.

        Args:
            artifacts_to_save (Dict[str, Any]): Artifacts to save.
        """

        for name, artifact in artifacts_to_save.items():
            self.save_artifact(name, artifact)



    def clear(self, collect_garbage: bool = False) -> None:
        """
        Clean up all artifacts and free memory.
        """
        if collect_garbage:
            del self.artifacts
            gc.collect()
        self.artifacts = {}
        self.last_step = None


    

In [2]:
import pandas as pd
import numpy as np
import lightgbm as lgb


class LoadDataFrameStep(PipelineStep):
    """
    Example step that loads a DataFrame.
    """
    def __init__(self, path: str, name: Optional[str] = None):
        super().__init__(name)
        self.path = path

    def execute(self) -> None:
        df = pd.read_parquet(self.path)
        df = df.drop(columns=["periodo"])
        return {"df": df}


class CastDataTypesStep(PipelineStep):
    def __init__(self, dtypes: Dict[str, str], name: Optional[str] = None):
        super().__init__(name)
        self.dtypes = dtypes

    def execute(self, df) -> None:
        for col, dtype in self.dtypes.items():
            df[col] = df[col].astype(dtype)
        print(df.info())
        return {"df": df}


class ChangeDataTypesStep(PipelineStep):
    def __init__(self, dtypes: Dict[str, str], name: Optional[str] = None):
        super().__init__(name)
        self.dtypes = dtypes

    def execute(self, df) -> None:
        for original_dtype, dtype in self.dtypes.items():
            for col in df.select_dtypes(include=[original_dtype]).columns:
                df[col] = df[col].astype(dtype)
        print(df.info())
        return {"df": df}


class FilterFirstDateStep(PipelineStep):
    def __init__(self, first_date: str, name: Optional[str] = None):
        super().__init__(name)
        self.first_date = first_date

    def execute(self, df) -> None:
        df = df[df["fecha"] >= self.first_date]
        print(f"Filtered DataFrame shape: {df.shape}")
        return {"df": df}


class FeatureEngineeringLagStep(PipelineStep):
    def __init__(self, lags: List[int], columns: List, name: Optional[str] = None):
        super().__init__(name)
        self.lags = lags
        self.columns = columns

    def execute(self, df: pd.DataFrame) -> dict:
        # Ordenar por grupo y fecha para que los lags sean correctos
        df = df.sort_values(by=['product_id', 'customer_id', 'fecha'])
        
        # Crear lags usando groupby y shift (vectorizado)
        grouped = df.groupby(['product_id', 'customer_id'])
        for column in self.columns:
            for lag in self.lags:
                df[f"{column}_lag_{lag}"] = grouped[column].shift(lag)
        
        return {"df": df}



from scipy.stats import linregress


# 1. Media Móvil
class RollingMeanFeatureStep(PipelineStep):
    def __init__(self, window: int, columns: List[str], name: Optional[str] = None):
        super().__init__(name)
        self.window = window
        self.columns = columns

    def execute(self, df: pd.DataFrame) -> Dict:
        df = df.sort_values(by=['product_id', 'customer_id', 'fecha'])
        grouped = df.groupby(['product_id', 'customer_id'])
        for col in self.columns:
            df[f'{col}_rolling_mean_{self.window}'] = grouped[col].transform(
                lambda x: x.rolling(self.window, min_periods=1).mean()
            )
        return {"df": df}

# 2. Máximo Móvil
class RollingMaxFeatureStep(PipelineStep):
    def __init__(self, window: int, columns: List[str], name: Optional[str] = None):
        super().__init__(name)
        self.window = window
        self.columns = columns

    def execute(self, df: pd.DataFrame) -> Dict:
        df = df.sort_values(by=['product_id', 'customer_id', 'fecha'])
        grouped = df.groupby(['product_id', 'customer_id'])
        for col in self.columns:
            df[f'{col}_rolling_max_{self.window}'] = grouped[col].transform(
                lambda x: x.rolling(self.window, min_periods=1).max()
            )
        return {"df": df}

# 3. Mínimo Móvil
class RollingMinFeatureStep(PipelineStep):
    def __init__(self, window: int, columns: List[str], name: Optional[str] = None):
        super().__init__(name)
        self.window = window
        self.columns = columns

    def execute(self, df: pd.DataFrame) -> Dict:
        df = df.sort_values(by=['product_id', 'customer_id', 'fecha'])
        grouped = df.groupby(['product_id', 'customer_id'])
        for col in self.columns:
            df[f'{col}_rolling_min_{self.window}'] = grouped[col].transform(
                lambda x: x.rolling(self.window, min_periods=1).min()
            )
        return {"df": df}

# 4. Desviación Estándar Móvil
class RollingStdFeatureStep(PipelineStep):
    def __init__(self, window: int, columns: List[str], name: Optional[str] = None):
        super().__init__(name)
        self.window = window
        self.columns = columns

    def execute(self, df: pd.DataFrame) -> Dict:
        df = df.sort_values(by=['product_id', 'customer_id', 'fecha'])
        grouped = df.groupby(['product_id', 'customer_id'])
        for col in self.columns:
            df[f'{col}_rolling_std_{self.window}'] = grouped[col].transform(
                lambda x: x.rolling(self.window, min_periods=1).std()
            )
        return {"df": df}

# 5. Media Móvil Exponencial
class ExponentialMovingAverageStep(PipelineStep):
    def __init__(self, span: int, columns: List[str], name: Optional[str] = None):
        super().__init__(name)
        self.span = span
        self.columns = columns

    def execute(self, df: pd.DataFrame) -> Dict:
        df = df.sort_values(by=['product_id', 'customer_id', 'fecha'])
        grouped = df.groupby(['product_id', 'customer_id'])
        for col in self.columns:
            df[f'{col}_ema_{self.span}'] = grouped[col].transform(
                lambda x: x.ewm(span=self.span, adjust=False).mean()
            )
        return {"df": df}

# 6. Tendencia (Pendiente de regresión lineal)
import tqdm
class TrendFeatureStep(PipelineStep):
    def __init__(self, window: int, columns: List[str], name: Optional[str] = None):
        super().__init__(name)
        self.window = window
        self.columns = columns

    def execute(self, df: pd.DataFrame) -> Dict:
        df = df.sort_values(by=['product_id', 'customer_id', 'fecha'])
        
        def calculate_trend(series):
            return series.rolling(self.window).apply(
                lambda x: linregress(np.arange(len(x)), x)[0], raw=False
            )
        
        grouped = df.groupby(['product_id', 'customer_id'])
        for col in self.columns:
            df[f'{col}_trend_{self.window}'] = grouped[col].transform(calculate_trend)
        return {"df": df}


# 8. Diferencia con período anterior
class DiffFeatureStep(PipelineStep):
    def __init__(self, periods: int, columns: List[str], name: Optional[str] = None):
        super().__init__(name)
        self.periods = periods
        self.columns = columns

    def execute(self, df: pd.DataFrame) -> Dict:
        df = df.sort_values(by=['product_id', 'customer_id', 'fecha'])
        grouped = df.groupby(['product_id', 'customer_id'])
        for col in self.columns:
            df[f'{col}_diff_{self.periods}'] = grouped[col].diff(self.periods)
        return {"df": df}

# 9. Cambio porcentual
class PctChangeFeatureStep(PipelineStep):
    def __init__(self, periods: int, columns: List[str], name: Optional[str] = None):
        super().__init__(name)
        self.periods = periods
        self.columns = columns

    def execute(self, df: pd.DataFrame) -> Dict:
        df = df.sort_values(by=['product_id', 'customer_id', 'fecha'])
        grouped = df.groupby(['product_id', 'customer_id'])
        for col in self.columns:
            # le sumo un valor minimo al grouped[col] para evitar division por cero
            df[f'{col}_pct_change_{self.periods}'] = grouped[col].pct_change(self.periods)
        return {"df": df}

# 10. Mediana Móvil
class RollingMedianFeatureStep(PipelineStep):
    def __init__(self, window: int, columns: List[str], name: Optional[str] = None):
        super().__init__(name)
        self.window = window
        self.columns = columns

    def execute(self, df: pd.DataFrame) -> Dict:
        df = df.sort_values(by=['product_id', 'customer_id', 'fecha'])
        grouped = df.groupby(['product_id', 'customer_id'])
        for col in self.columns:
            df[f'{col}_rolling_median_{self.window}'] = grouped[col].transform(
                lambda x: x.rolling(self.window, min_periods=1).median()
            )
        return {"df": df}
    


class CreateTotalCategoryStep(PipelineStep):
    def __init__(self, name: Optional[str] = None, cat: str = "cat1", tn: str = "tn"):
        super().__init__(name)
        self.cat = cat
        self.tn = tn
    
    def execute(self, df: pd.DataFrame) -> Dict:
        df = df.sort_values(['fecha', self.cat])
        df[f"{self.tn}_{self.cat}_vendidas"] = (
            df.groupby(['fecha', self.cat])[self.tn]
              .transform('sum')
        )
        return {"df": df}


class CreateWeightByCustomerStep(PipelineStep):
    def __init__(self, name: Optional[str] = None):
        super().__init__(name)

    def execute(self, df: pd.DataFrame) -> Dict:
        """
        Añade en df la columna 'customer_weight' calculada como:
            tn_customer_vendidas / tn_total_vendidas
        sin crear DataFrames intermedios.
        """

        # Aseguramos orden estable (opcional, mejora legibilidad)
        df = df.sort_values(['fecha', 'customer_id'])
        
        # 1) Sumatoria de 'tn' por (fecha, customer_id) directamente en cada fila
        df['tn_customer_vendidas'] = (
            df.groupby(['fecha', 'customer_id'])['tn']
              .transform('sum')
        )
        
        # 2) Sumatoria total de 'tn' por fecha
        df['tn_total_vendidas'] = (
            df.groupby('fecha')['tn']
              .transform('sum')
        )
        
        # 3) Ratio
        df['customer_weight'] = df['tn_customer_vendidas'] / df['tn_total_vendidas']
        
        # 4) (Opcional) Eliminamos columnas temporales si no las necesitamos
        #df.drop(columns=['tn_customer_vendidas', 'tn_total_vendidas'], inplace=True)
        
        return {"df": df}


class CreateWeightByProductStep(PipelineStep):
    def __init__(self, name: Optional[str] = None):
        super().__init__(name)

    def execute(self, df: pd.DataFrame) -> Dict:
        """
        Añade en df la columna 'product_weight' calculada como:
            tn_product_vendidas / tn_total_vendidas
        sin crear DataFrames intermedios.
        """

        # Aseguramos orden estable (opcional, mejora legibilidad)
        df = df.sort_values(['fecha', 'product_id'])
        
        # 1) Sumatoria de 'tn' por (fecha, product_id) directamente en cada fila
        df['tn_product_vendidas'] = (
            df.groupby(['fecha', 'product_id'])['tn']
              .transform('sum')
        )
        
        # 2) Sumatoria total de 'tn' por fecha
        df['tn_total_product_vendidas'] = (
            df.groupby('fecha')['tn']
              .transform('sum')
        )
        
        # 3) Ratio
        df['product_weight'] = df['tn_product_vendidas'] / df['tn_total_product_vendidas']
        
        # 4) (Opcional) Eliminamos columnas temporales si no las necesitamos
        #df.drop(columns=['tn_product_vendidas', 'tn_total_vendidas'], inplace=True)
        
        return {"df": df}
    

class CreateWeightMeanStep(PipelineStep):
    def __init__(self, name: Optional[str] = None):
        super().__init__(name)

    def execute(self, df: pd.DataFrame) -> Dict:
        """
        Añade en df la columna 'mean_weight' calculada como:
            tn_mean_vendidas / tn_total_vendidas
        sin crear DataFrames intermedios.
        """

        # la columna weight es la media entre todas las columnas que digan weight
        weight_columns = [col for col in df.columns if 'weight' in col]
        print(f"Weight columns found: {weight_columns}")
        if not weight_columns:
            raise ValueError("No weight columns found in the DataFrame.")
        df["weight"] = df[weight_columns].mean(axis=1)
        # le sumo 0.5 a weight para que no hayan weights de 0
        df["weight"] = df["weight"] + 0.5
        return {"df": df}

import tqdm

class FeatureEngineeringProductInteractionStep(PipelineStep):

    def execute(self, df) -> None:
        """
        El dataframe tiene una columna product_id y customer_id y fecha.
        Quiero obtener los 100 productos con mas tn del ultimo mes y crear 100 nuevas columnas que es la suma de tn de esos productos (para todos los customer)
        se deben agregan entonces respetando la temporalidad la columna product_{product_id}_total_tn
        """
        last_date = df["fecha"].max()
        last_month_df = df[df["fecha"] == last_date]
        top_products = last_month_df.groupby("product_id").aggregate({"tn": "sum"}).nlargest(10, "tn").index.tolist()
        # TODO: mejor agruparlo por categoria y hacer una columna por cada categoria tanto de agrup por product como por customer
        for product_id in tqdm.tqdm(top_products):
            # creo un subset que es el total de product_id vendidos para todos los customer en cada t y lo mergeo a df
            product_df = df[df["product_id"] == product_id].groupby("fecha").aggregate({"tn": "sum"}).reset_index()
            product_df = product_df.rename(columns={"tn": f"product_{product_id}_total_tn"})
            product_df = product_df[["fecha", f"product_{product_id}_total_tn"]]
            df = df.merge(product_df, on="fecha", how="left")
        return {"df": df}



class FeatureEngineeringProductCatInteractionStep(PipelineStep):

    def __init__(self, cat="cat1", name: Optional[str] = None, tn="tn"):
        super().__init__(name)
        self.cat = cat
        self.tn = tn


    def execute(self, df) -> None:
        # agrupo el dataframe por cat1 (sumando), obteniendo fecha, cat1 y
        # luego paso el dataframe a wide format, donde cada columna es una categoria  y la fila es la suma de tn para cada cat1
        # luego mergeo al dataframe original por fecha y product_id
        df_cat = df.groupby(["fecha", self.cat]).agg({self.tn: "sum"}).reset_index()
        df_cat = df_cat.pivot(index="fecha", columns=self.cat, values=self.tn).reset_index()
        df = df.merge(df_cat, on="fecha", how="left")
        return {"df": df}
        

class FeatureDivInteractionStep(PipelineStep):
    def __init__(self, columns: List[Tuple[str, str]], name: Optional[str] = None):
        super().__init__(name)
        self.columns = columns

    def execute(self, df) -> None:
        for col1, col2 in self.columns:
            df[f"{col1}_div_{col2}"] = df[col1] / (df[col2] + 1e-6)  # Evitar división por cero
        return {"df": df}


class FeatureProdInteractionStep(PipelineStep):
    def __init__(self, columns: List[Tuple[str, str]], name: Optional[str] = None):
        super().__init__(name)
        self.columns = columns

    def execute(self, df) -> None:
        for col1, col2 in self.columns:
            df[f"{col1}_prod_{col2}"] = df[col1] * df[col2]
        return {"df": df}

class DateRelatedFeaturesStep(PipelineStep):
    def __init__(self, name: Optional[str] = None):
        super().__init__(name)

    def execute(self, df) -> None:
        df["year"] = df["fecha"].dt.year
        df["mes"] = df["fecha"].dt.month
        return {"df": df}

        

class SplitDataFrameStep(PipelineStep):
    def __init__(self, name: Optional[str] = None):
        super().__init__(name)

    def execute(self, df) -> None:
        sorted_dated = sorted(df["fecha"].unique())
        last_date = sorted_dated[-1] # es 12-2019
        last_test_date = sorted_dated[-3] # needs a gap because forecast moth+2
        last_train_date = sorted_dated[-4] #

        kaggle_pred = df[df["fecha"] == last_date]
        test = df[df["fecha"] == last_test_date]
        eval_data = df[df["fecha"] == last_train_date]
        train = df[(df["fecha"] < last_train_date)]
        return {
            "train": train,
            "eval_data": eval_data,
            "test": test,
            "kaggle_pred": kaggle_pred
        }



class CustomMetric:
    def __init__(self, df_eval, product_id_col='product_id', scaler=None):
        self.scaler = scaler
        self.df_eval = df_eval
        self.product_id_col = product_id_col
    
    def __call__(self, preds, train_data):
        labels = train_data.get_label()
        df_temp = self.df_eval.copy()
        df_temp['preds'] = preds
        df_temp['labels'] = labels

        if self.scaler:
            df_temp['preds'] = self.scaler.inverse_transform(df_temp[['preds']])
            df_temp['labels'] = self.scaler.inverse_transform(df_temp[['labels']])
        
        # Agrupar por product_id y calcular el error
        por_producto = df_temp.groupby(self.product_id_col).agg({'labels': 'sum', 'preds': 'sum'})
        
        # Calcular el error personalizado
        error = np.sum(np.abs(por_producto['labels'] - por_producto['preds'])) / np.sum(np.abs(por_producto['labels']))
        
        # LightGBM espera que el segundo valor sea mayor cuando el modelo es mejor
        return 'custom_error', error, False


    
class CustomMetricAutoML:
    def __init__(self, df_eval, product_id_col='product_id', scaler=None):
        self.df_eval = df_eval
        self.product_id_col = product_id_col
        self.scaler = scaler

    def __call__(self, X_val, y_val, estimator, *args, **kwargs):
        df_temp = X_val.copy()
        df_temp['preds'] = estimator.predict(X_val)
        df_temp['labels'] = y_val

        if self.scaler:
            df_temp['preds'] = self.scaler.inverse_transform(df_temp[['preds']])
            df_temp['labels'] = self.scaler.inverse_transform(df_temp[['labels']])
        
        # Agrupar por product_id y calcular el error
        por_producto = df_temp.groupby(self.product_id_col).agg({'labels': 'sum', 'preds': 'sum'})
        
        # Calcular el error personalizado
        error = np.sum(np.abs(por_producto['labels'] - por_producto['preds'])) / np.sum(por_producto['labels'])
        
        return error, {"total_error": error}

from sklearn.preprocessing import RobustScaler, MinMaxScaler


class TrainScalerDataStep(PipelineStep):
    def __init__(self, scaler = RobustScaler, name: Optional[str] = None):
        super().__init__(name)
        self.scaler = scaler

    def execute(self, train) -> None:
        scaler = self.scaler()

        # escalo las columnas que son int32 o float32
        columns_to_scale = train.select_dtypes(include=['int32', 'float32']).columns.tolist()
        # saco la columna target
        columns_to_scale = [col for col in columns_to_scale if col not in
                ["periodo", 'fecha', 'target']]

        scaler = scaler.fit(train[columns_to_scale])
        scaler_target = self.scaler()
        scaler_target = scaler_target.fit(train[["target"]])
        return {
            "scaler": scaler,
            "scaler_target": scaler_target,
            "columns_to_scale": columns_to_scale
        }

 
   
class PrepareXYStep(PipelineStep):
    def __init__(self, name: Optional[str] = None):
        super().__init__(name)

    def execute(self, train, eval_data, test, kaggle_pred) -> None:
        features = [col for col in train.columns if col not in
                        ['fecha', 'target']]
        target = 'target'

        X_train = pd.concat([train[features], eval_data[features]]) # [train + eval] + [eval] -> [test] 
        y_train = pd.concat([train[target], eval_data[target]])

        X_train_alone = train[features]
        y_train_alone = train[target]

        X_eval = eval_data[features]
        y_eval = eval_data[target]

        X_test = test[features]
        y_test = test[target]

        X_train_final = pd.concat([train[features], eval_data[features], test[features]])
        y_train_final = pd.concat([train[target], eval_data[target], test[target]])

        X_kaggle = kaggle_pred[features]
        return {
            "X_train": X_train,
            "y_train": y_train,
            "X_train_alone": X_train_alone,
            "y_train_alone": y_train_alone,
            "X_eval": X_eval,
            "y_eval": y_eval,
            "X_test": X_test,
            "y_test": y_test,
            "X_train_final": X_train_final,
            "y_train_final": y_train_final,
            "X_kaggle": X_kaggle
        }
        

from xgboost import XGBRegressor

class TrainModelXGBStep(PipelineStep):
    def __init__(self, params: Dict = {}, train_eval_sets = {}, name: Optional[str] = None):
        super().__init__(name)
        if not params:
            params = {
                'learning_rate': 0.05,
                'max_depth': 6,
                'n_estimators': 800,
                'subsample': 0.8,
                'colsample_bytree': 0.8,
                'reg_alpha': 1,
                'reg_lambda': 5,
                'random_state': 42,
                "enable_categorical": True,
            }
        if not train_eval_sets:
            train_eval_sets = {
                "X_train": "X_train",
                "y_train": "y_train",
                "X_eval": "X_eval",
                "y_eval": "y_eval",
            }
        self.params = params
        self.train_eval_sets = train_eval_sets
    def execute(self, pipeline: Pipeline, scaler=None, scaler_target=None):
        X_train = pipeline.get_artifact(self.train_eval_sets["X_train"])
        y_train = pipeline.get_artifact(self.train_eval_sets["y_train"])
        X_eval = pipeline.get_artifact(self.train_eval_sets["X_eval"])
        y_eval = pipeline.get_artifact(self.train_eval_sets["y_eval"])

        if scaler:
            X_train[scaler.feature_names_in_] = scaler.transform(X_train[scaler.feature_names_in_])
            X_eval[scaler.feature_names_in_] = scaler.transform(X_eval[scaler.feature_names_in_])
            y_train = pd.Series(
                scaler_target.transform(y_train.values.reshape(-1, 1)).flatten(),
                index=y_train.index,
            )
            y_eval = pd.Series(
                scaler_target.transform(y_eval.values.reshape(-1, 1)).flatten(),
                index=y_eval.index,
            )

        model = XGBRegressor(**self.params)
        model.fit(X_train, y_train, eval_set=[(X_eval, y_eval)])
        return {"model": model}


class TrainModelLGBStep(PipelineStep):
    def __init__(self, params: Dict = {}, train_eval_sets = {}, name: Optional[str] = None):
        super().__init__(name)
        if not params:
            params = {
                "objective": "regression",
                "boosting_type": "gbdt",
                "num_leaves": 31,
                "learning_rate": 0.05,
                "feature_fraction": 0.9,
                "bagging_fraction": 0.8,
                "bagging_freq": 5,
                "n_estimators": 1000,
                "verbose": -1
            }
        if not train_eval_sets:
            train_eval_sets = {
                "X_train": "X_train",
                "y_train": "y_train",
                "X_eval": "X_eval",
                "y_eval": "y_eval",
                "eval_data": "eval_data",
            }
        self.params = params
        self.train_eval_sets = train_eval_sets

    def execute(self, pipeline: Pipeline, scaler=None, scaler_target=None) -> None:
        X_train = pipeline.get_artifact(self.train_eval_sets["X_train"])
        y_train = pipeline.get_artifact(self.train_eval_sets["y_train"])
        X_eval = pipeline.get_artifact(self.train_eval_sets["X_eval"])
        y_eval = pipeline.get_artifact(self.train_eval_sets["y_eval"])
        df_eval = pipeline.get_artifact(self.train_eval_sets["eval_data"])

        cat_features = [col for col in X_train.columns if X_train[col].dtype.name == 'category']

        if scaler:
            X_train[scaler.feature_names_in_] = scaler.transform(X_train[scaler.feature_names_in_])
            X_eval[scaler.feature_names_in_] = scaler.transform(X_eval[scaler.feature_names_in_])
            y_train = pd.Series(
                scaler_target.transform(y_train.values.reshape(-1, 1)).flatten(),
                index=y_train.index,
            )
            y_eval = pd.Series(
                scaler_target.transform(y_eval.values.reshape(-1, 1)).flatten(),
                index=y_eval.index,
            )

        
        weight = X_train['weight'] if 'weight' in X_train.columns else None
        weight_eval = X_eval['weight'] if 'weight' in X_eval.columns else None
        train_data = lgb.Dataset(X_train, label=y_train, categorical_feature=cat_features, weight=weight)
        eval_data = lgb.Dataset(X_eval, label=y_eval, reference=train_data, categorical_feature=cat_features, weight=weight_eval)
        custom_metric = CustomMetric(df_eval, product_id_col='product_id', scaler=scaler_target)
        callbacks = [
            #lgb.early_stopping(50),
            lgb.log_evaluation(100),
        ]
        model = lgb.train(
            self.params,
            train_data,
            #num_boost_round=1200,
            #num_boost_round=50, # test
            valid_sets=[eval_data],
            feval=custom_metric,
            callbacks=callbacks,
        )
        return {"model": model}




# Creo una clase CustomAutoMLSplitter que implementa la interfaz de KFold asi se usa en automl
# lo que hace es simplemente splittear dejando el ultimo test para eval y el resto para test
# por ahora queda harcodeado 1 split

from sklearn.model_selection import KFold
class CustomAutoMLSplitter:
    def __init__(self, *args, **kwargs):
        self.n_splits = 1 # siempre 1

    def split(self, X, y=None, groups=None):        
        # Solo un split, el resto es para test
        last_month = X['fecha'].max()
        last_month_df = X[X['fecha'] == last_month]
        train_df = X[X['fecha'] < last_month]

        # Devolvemos un solo split, el resto es para test
        yield train_df.index, last_month_df.index
    
    def get_n_splits(self, X=None, y=None, groups=None):
        return self.n_splits

class TrainModelAutoMLStep(PipelineStep):
    def __init__(self, train_eval_sets = {}, time_budget: int = 100, products_proportion: float = 1.0, name: Optional[str] = None):
        super().__init__(name)
        self.time_budget = time_budget
        if not train_eval_sets:
            train_eval_sets = {
                "X_train": "X_train",
                "y_train": "y_train",
                "X_eval": "X_eval",
                "y_eval": "y_eval",
                "eval_data": "eval_data",
            }
        self.train_eval_sets = train_eval_sets
        self.products_proportion = products_proportion

    def execute(self, pipeline: Pipeline, scaler=None, scaler_target=None) -> None:
        X_train = pipeline.get_artifact(self.train_eval_sets["X_train"])
        y_train = pipeline.get_artifact(self.train_eval_sets["y_train"])
        X_eval = pipeline.get_artifact(self.train_eval_sets["X_eval"])
        y_eval = pipeline.get_artifact(self.train_eval_sets["y_eval"])
        df_eval = pipeline.get_artifact(self.train_eval_sets["eval_data"])

        # para que sea mas rapido si self.products_proportion < 1.0, tomo una muestra de los productos
        if self.products_proportion < 1.0:
            unique_products = X_train['product_id'].unique()
            sample_size = int(len(unique_products) * self.products_proportion)
            sampled_products = np.random.choice(unique_products, size=sample_size, replace=False)
            X_train = X_train[X_train['product_id'].isin(sampled_products)]
            y_train = y_train[X_train.index]
            X_eval = X_eval[X_eval['product_id'].isin(sampled_products)]
            y_eval = y_eval[X_eval.index]
            df_eval = df_eval[df_eval['product_id'].isin(sampled_products)]
        
        if scaler:
            X_train[scaler.feature_names_in_] = scaler.transform(X_train[scaler.feature_names_in_])
            X_eval[scaler.feature_names_in_] = scaler.transform(X_eval[scaler.feature_names_in_])
            y_train = pd.Series(
                scaler_target.transform(y_train.values.reshape(-1, 1)).flatten(),
                index=y_train.index,
            )
            y_eval = pd.Series(
                scaler_target.transform(y_eval.values.reshape(-1, 1)).flatten(),
                index=y_eval.index,
            )
        automl = AutoML()
        metric = CustomMetricAutoML(df_eval, product_id_col='product_id', scaler=scaler_target)
        automl_params = {
            "time_budget": self.time_budget,
            "task": "regression",
            "metric": metric,
            "eval_method": "holdout",
            #"estimator_list": ["lgbm", "xgboost", "catboost"],
            "estimator_list": ["lgbm"],            
        }
            
        automl.fit(
            X_train=X_train,
            y_train=y_train,
            X_val=X_eval,
            y_val=y_eval,
            **automl_params
        )
        # Save the model
        automl_ml_best_model = automl.model.estimator
        return {
            "automl_best_model": automl_ml_best_model,
            "automl": automl
        }        



class ReTrainAutoMLBestModelStep(PipelineStep):
    def __init__(self, train_eval_sets = {}, name: Optional[str] = None):
        super().__init__(name)
        if not train_eval_sets:
            train_eval_sets = {
                "X_train": "X_train",
                "y_train": "y_train",
                "X_eval": "X_eval",
                "y_eval": "y_eval",
                "eval_data": "eval_data",
            }
        self.train_eval_sets = train_eval_sets

    def execute(self, pipeline: Pipeline, automl_best_model, scaler=None, scaler_target=None) -> None:
        X_train = pipeline.get_artifact(self.train_eval_sets["X_train"])
        y_train = pipeline.get_artifact(self.train_eval_sets["y_train"])
        X_eval = pipeline.get_artifact(self.train_eval_sets["X_eval"])
        y_eval = pipeline.get_artifact(self.train_eval_sets["y_eval"])
        df_eval = pipeline.get_artifact(self.train_eval_sets["eval_data"])

        if scaler:
            X_train[scaler.feature_names_in_] = scaler.transform(X_train[scaler.feature_names_in_])
            X_eval[scaler.feature_names_in_] = scaler.transform(X_eval[scaler.feature_names_in_])
            y_train = pd.Series(
                scaler_target.transform(y_train.values.reshape(-1, 1)).flatten(),
                index=y_train.index,
            )
            y_eval = pd.Series(
                scaler_target.transform(y_eval.values.reshape(-1, 1)).flatten(),
                index=y_eval.index,
            )
        if isinstance(automl_best_model, lgb.LGBMRegressor):
            categorical_features = [col for col in X_train.columns if X_train[col].dtype.name == 'category']
            train_data = lgb.Dataset(X_train, label=y_train, categorical_feature=categorical_features)
            eval_data = lgb.Dataset(X_eval, label=y_eval, reference=train_data, categorical_feature=categorical_features)
            custom_metric = CustomMetric(df_eval, product_id_col='product_id', scaler=scaler_target)
            callbacks = [
                #lgb.early_stopping(50),
                lgb.log_evaluation(100),
            ]
            model = lgb.train(
                automl_best_model.get_params(),
                train_data,
                num_boost_round=1000,
                valid_sets=[eval_data],
                feval=custom_metric,
                callbacks=callbacks
            )
        else:
            raise ValueError("The model is not a valid LightGBM model.")
        # Save the model
        return {"model": model}



class PredictStep(PipelineStep):
    def __init__(self, predict_set: str, name: Optional[str] = None):
        super().__init__(name)
        self.predict_set = predict_set

    def execute(self, pipeline: Pipeline, model, scaler=None, scaler_target=None) -> None:
        X_predict = pipeline.get_artifact(self.predict_set)
        if scaler:
            X_predict[scaler.feature_names_in_] = scaler.transform(X_predict[scaler.feature_names_in_])
        predictions = model.predict(X_predict)
        # los valores de predictions que dan menores a 0 los seteo en 0
        if scaler_target:
            predictions = scaler_target.inverse_transform(predictions.reshape(-1, 1)).flatten()
        # la columna de predictions seria "predictions" y le agrego columna de product_id
        predictions = pd.DataFrame(predictions, columns=["predictions"], index=X_predict.index)
        predictions["product_id"] = X_predict["product_id"]
        predictions["customer_id"] = X_predict["customer_id"]
        return {"predictions": predictions}



class EvaluatePredictionsSteps(PipelineStep):
    def __init__(self, y_actual_df: str, name: Optional[str] = None):
        super().__init__(name)
        self.y_actual_df = y_actual_df

    def execute(self, pipeline: Pipeline, predictions) -> None:
        y_actual = pipeline.get_artifact(self.y_actual_df)

        product_actual = y_actual.groupby("product_id")["target"].sum()
        product_pred = predictions.groupby("product_id")["predictions"].sum()

        eval_df = pd.DataFrame({
            "product_id": product_actual.index,
            "tn_real": product_actual.values,
            "tn_pred": product_pred.values
        })

        total_error = np.sum(np.abs(eval_df['tn_real'] - eval_df['tn_pred'])) / np.sum(eval_df['tn_real'])
        print(f"Error en test: {total_error:.4f}")
        print("\nTop 5 productos con mayor error absoluto:")
        eval_df['error_absoluto'] = np.abs(eval_df['tn_real'] - eval_df['tn_pred'])
        print(eval_df.sort_values('error_absoluto', ascending=False).head())
        return {
            "eval_df": eval_df,
            "total_error": total_error
        }



class PlotFeatureImportanceStep(PipelineStep):

    def execute(self, model) -> None:
        lgb.plot_importance(model)



class KaggleSubmissionStep(PipelineStep):
    def execute(self, predictions) -> None:
        submission = predictions.groupby("product_id")["predictions"].sum().reset_index()
        submission.columns = ["product_id", "tn"]
        return {"submission": submission}



class SaveExperimentStep(PipelineStep):
    def __init__(self, exp_name: str, save_dataframes=False, name: Optional[str] = None):
        super().__init__(name)
        self.exp_name = exp_name
        self.save_dataframes = save_dataframes

    def execute(self, pipeline: Pipeline) -> None:

        # Create the experiment directory
        exp_dir = f"experiments/{self.exp_name}"
        os.makedirs(exp_dir, exist_ok=True)

        # obtengo el model
        model = pipeline.get_artifact("model")
        # Save the model as a pickle file
        with open(os.path.join(exp_dir, "model.pkl"), "wb") as f:
            pickle.dump(model, f)
        # guardo el error total de test
        total_error = pipeline.get_artifact("total_error")
        with open(os.path.join(exp_dir, "total_error.txt"), "w") as f:
            f.write(str(total_error))

        # Save the submission file
        submission = pipeline.get_artifact("submission")
        submission.to_csv(os.path.join(exp_dir, f"submission_{self.exp_name}_{total_error:.4f}.csv"), index=False)

        # borro submission model y error de los artifacts
        pipeline.del_artifact("submission")
        
        # Guardo los artifacts restantes que son dataframes como csvs
        if self.save_dataframes:
            for artifact_name, artifact in pipeline.artifacts.items():
                if isinstance(artifact, pd.DataFrame):
                    artifact.to_csv(os.path.join(exp_dir, f"{artifact_name}.csv"), index=False)


        # Save a copy of the notebook
        notebook_path = fallback_latest_notebook()
        shutil.copy(notebook_path, os.path.join(exp_dir, f"notebook_{self.exp_name}.ipynb"))



class FilterProductsIDStep(PipelineStep):
    def __init__(self, product_file = "product_id_apredecir201912.txt", dfs=["df"], name: Optional[str] = None):
        super().__init__(name)
        self.file = product_file
        self.dfs = dfs

    def execute(self, pipeline: Pipeline) -> None:
        """ el txt es un csv que tiene columna product_id separado por tabulaciones """
        converted_dfs = {}
        for df_key in self.dfs:
            df = pipeline.get_artifact(df_key)
            product_ids = pd.read_csv(self.file, sep="\t")["product_id"].tolist()
            df = df[df["product_id"].isin(product_ids)]
            converted_dfs[df_key] = df
            print(f"Filtered DataFrame {df_key} shape: {df.shape}")
        return converted_dfs
    

class FilterProductForTestingStep(PipelineStep):
    def __init__(self, total_products_ids: int = 100, name: Optional[str] = None, random=True):
        super().__init__(name)
        self.total_products_ids = total_products_ids
        self.random = random
        
    def execute(self, df: pd.DataFrame) -> pd.DataFrame:
        """ Filtra el DataFrame para que contenga solo los primeros total_products_ids productos """
        unique_products = df['product_id'].unique()
        if len(unique_products) > self.total_products_ids:
            if self.random:
                products = np.random.choice(unique_products, size=self.total_products_ids, replace=False)
            else:
                products = unique_products[:self.total_products_ids]
            df = df[df['product_id'].isin(products)]
        print(f"Filtered DataFrame shape: {df.shape}")
        return {"df": df}
    

from typing import List, Dict, Union, Optional, Type
from sklearn.preprocessing import RobustScaler, MinMaxScaler, StandardScaler

from sklearn.base import BaseEstimator, TransformerMixin
import tqdm


class CustomStandarScaler:
    def __init__(self, column: str):
        self.column = column
        self.scaler_data = None

    def fit(self, df: pd.DataFrame):
        agg = df.groupby(['product_id', 'customer_id'])[self.column].agg(['mean', 'std']).rename(
            columns={'mean': f'{self.column}_mean_scaler', 'std': f'{self.column}_std_scaler'})
        self.scaler_data = agg
        return self
    
    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        if self.scaler_data is None:
            raise ValueError("Scaler has not been fitted yet.")
        
        # agrego columnas temporales
        df = df.merge(self.scaler_data, on=['product_id', 'customer_id'], how='left')
        df[f'{self.column}_scaled'] = (df[self.column] - df[f'{self.column}_mean_scaler']) / (df[f'{self.column}_std_scaler'])
        df[f'{self.column}_scaled'] = df[f'{self.column}_scaled'].fillna(0)
        # elimino las columnas temporales
        df.drop(columns=[f'{self.column}_mean_scaler', f'{self.column}_std_scaler'], inplace=True, errors='ignore')
        return df
    
    def fit_transform(self, df: pd.DataFrame) -> pd.DataFrame:
        return self.fit(df).transform(df)
    
    def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
        if self.scaler_data is None:
            raise ValueError("Scaler has not been fitted yet.")
        
        # agrego columnas temporales
        df = df.merge(self.scaler_data, on=['product_id', 'customer_id'], how='left')
        df[f"{self.column}_scaled"] = (df[f'{self.column}_scaled'] * (df[f'{self.column}_std_scaler'])) + df[f'{self.column}_mean_scaler']
        # elimino las columnas temporales
        df.drop(columns=[f'{self.column}_mean_scaler', f'{self.column}_std_scaler'], inplace=True, errors='ignore')
        return df
    
    
class ScaleFeatureStep(PipelineStep):
    def __init__(self, column: str, scaler=CustomStandarScaler, name: Optional[str] = None):
        super().__init__(name)
        self.column = column
        self.scaler_cls = scaler

    def execute(self, df: pd.DataFrame) -> Dict:
        scaler = self.scaler_cls(
            column=self.column,
        )
        df_scaled = scaler.fit_transform(df)
        return {
            "df": df_scaled,
            "custom_scaler": scaler
        }
    
class InverseScalePredictionsStep(PipelineStep):
    def execute(self, custom_scaler, predictions, y_test, test) -> Dict:
        """
        Inverse scale the predictions using the provided grouped scaler.
        """
        # cambio el nombre de la columna a tn_scaled para que coincida con el scaler
        predictions = predictions.rename(columns={"predictions": "tn_scaled"})
        predictions = custom_scaler.inverse_transform(predictions).rename(columns={"tn_scaled": "predictions"})

        # renombro target por tn_scaled en y_pred
        # paso y_test de serie a df para que sea compatible

        y_test_scaled = test[["target", "product_id", "customer_id"]]
        y_test_scaled = y_test_scaled.rename(columns={"target": "tn_scaled"})
        y_test_scaled = custom_scaler.inverse_transform(y_test_scaled)
        y_test_scaled = y_test_scaled.rename(columns={"tn_scaled": "target"})
        y_test_scaled.index = test.index
        return {
            "predictions": predictions,
            "y_test_unscale": y_test_scaled
        }
        
class CreateTargetColumStep(PipelineStep):
    #df['target'] = df.groupby(['product_id', 'customer_id'])['tn'].shift(-2)
    def __init__(self, name: Optional[str] = None, target_col: str = 'tn'):
        super().__init__(name)
        self.target_col = target_col

    def execute(self, df: pd.DataFrame) -> Dict:
        """
        Crea la columna 'target' como la suma de 'tn' de los próximos 2 meses
        para cada combinación de 'product_id' y 'customer_id'.
        """
        df.drop(columns=["target"], inplace=True, errors='ignore')  # Elimina la columna si ya existe
        df = df.sort_values(['product_id', 'customer_id', 'fecha'])

        df['target'] = df.groupby(['product_id', 'customer_id'])[self.target_col].shift(-2)
        
        return {"df": df}

class SaveDataFrameStep(PipelineStep):
    def __init__(self, df_name: str, file_name: str, ext = "pickle", name: Optional[str] = None):
        super().__init__(name)
        self.df_name = df_name
        self.file_name = file_name
        self.ext = ext

    def execute(self, pipeline: Pipeline) -> None:
        df = pipeline.get_artifact(self.df_name)
        if self.ext == "pickle":
            df.to_pickle(self.file_name)
        elif self.ext == "parquet":
            df.to_parquet(f"{self.file_name}.parquet", index=False)
        elif self.ext == "csv":
            df.to_csv(f"{self.file_name}.csv", index=False)
        else:
            raise ValueError(f"Unsupported file extension: {self.ext}")
        
class RollingSkewFeatureStep(PipelineStep):
    def __init__(self, window: int, columns: List[str], name: Optional[str] = None):
        super().__init__(name)
        self.window = window
        self.columns = columns

    def execute(self, df: pd.DataFrame) -> Dict:
        df = df.sort_values(by=['product_id', 'customer_id', 'fecha'])
        grouped = df.groupby(['product_id', 'customer_id'])
        for col in self.columns:
            df[f'{col}_rolling_skew_{self.window}'] = grouped[col].transform(
                lambda x: x.rolling(self.window, min_periods=1).skew()
            )
        return {"df": df}
    
class RollingZscoreFeatureStep(PipelineStep):
    def __init__(self, window: int, columns: List[str], name: Optional[str] = None):
        super().__init__(name)
        self.window = window
        self.columns = columns

    def execute(self, df: pd.DataFrame) -> Dict:
        df = df.sort_values(by=['product_id', 'customer_id', 'fecha'])
        grouped = df.groupby(['product_id', 'customer_id'])
        for col in self.columns:
            rolling_mean = grouped[col].transform(
                lambda x: x.rolling(self.window, min_periods=1).mean()
            )
            rolling_std = grouped[col].transform(
                lambda x: x.rolling(self.window, min_periods=1).std()
            )
            df[f'{col}_rolling_zscore_{self.window}'] = (df[col] - rolling_mean) / (rolling_std + 1e-6)
        return {"df": df}
    

class ReduceMemoryUsageStep(PipelineStep):
    def execute(self, df):
        initial_mem_usage = df.memory_usage().sum() / 1024**2
        for col in df.columns:
            if pd.api.types.is_numeric_dtype(df[col]):
                c_min = df[col].min()
                c_max = df[col].max()
                if pd.api.types.is_float_dtype(df[col]):
                    if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                        df[col] = df[col].astype(np.float16)
                    elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                        df[col] = df[col].astype(np.float32)
                elif pd.api.types.is_integer_dtype(df[col]):
                    if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                        df[col] = df[col].astype(np.int8)
                    elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                        df[col] = df[col].astype(np.int16)
                    elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                        df[col] = df[col].astype(np.int32)
        
        final_mem_usage = df.memory_usage().sum() / 1024**2
        print('--- Memory usage before: {:.2f} MB'.format(initial_mem_usage))
        print('--- Memory usage after: {:.2f} MB'.format(final_mem_usage))
        print('--- Decreased memory usage by {:.1f}%\n'.format(100 * (initial_mem_usage - final_mem_usage) / initial_mem_usage))
        return {"df": df}      


In [None]:
import datetime
pipeline = Pipeline(
    steps=[
        LoadDataFrameStep(path="df_intermedio.parquet"),
        #FilterProductForTestingStep(total_products_ids=2, random=True), # para hacer pruebas mas rapidas
        FilterProductsIDStep(dfs=["df"]), # hago este aca para que el dataset sea mas chico y pueda correrlo en local
        ScaleFeatureStep(
            column="tn",
            scaler=CustomStandarScaler,
        ),
        CreateTargetColumStep(target_col="tn_scaled"),
        DateRelatedFeaturesStep(),
        CastDataTypesStep(dtypes=
            {
                "product_id": "uint32", 
                "customer_id": "uint32",
                "mes": "uint16",
                "year": "uint16",
                "brand": "category",
                "cat1": "category",
                "cat2": "category",
                "cat3": "category",
            }
        ),

        ReduceMemoryUsageStep(),
        FeatureEngineeringLagStep(lags=[1,2,3,5,11, 23], columns=["tn_scaled", "cust_request_qty", "stock_final"]),
        RollingMeanFeatureStep(window=3, columns=["tn_scaled", "cust_request_qty", "stock_final"]),
        RollingMaxFeatureStep(window=3, columns=["tn_scaled", "cust_request_qty", "stock_final"]),
        RollingMinFeatureStep(window=3, columns=["tn_scaled", "cust_request_qty", "stock_final"]),
        RollingMeanFeatureStep(window=9, columns=["tn_scaled", "cust_request_qty", "stock_final"]),
        RollingMaxFeatureStep(window=9, columns=["tn_scaled", "cust_request_qty", "stock_final"]),
        RollingMinFeatureStep(window=9, columns=["tn_scaled", "cust_request_qty", "stock_final"]),

        ReduceMemoryUsageStep(),

        RollingStdFeatureStep(window=3, columns=["tn", "cust_request_qty"]),
        RollingStdFeatureStep(window=6, columns=["tn", "cust_request_qty"]),
        RollingStdFeatureStep(window=12, columns=["tn", "cust_request_qty"]), 

        RollingSkewFeatureStep(window=3, columns=["tn", "cust_request_qty"]),
        RollingSkewFeatureStep(window=6, columns=["tn", "cust_request_qty"]),
        RollingSkewFeatureStep(window=12, columns=["tn", "cust_request_qty"]),
        ReduceMemoryUsageStep(),

        RollingZscoreFeatureStep(window=3, columns=["tn", "cust_request_qty"]),
        RollingZscoreFeatureStep(window=6, columns=["tn", "cust_request_qty"]),
        RollingZscoreFeatureStep(window=12, columns=["tn", "cust_request_qty"]),

        ReduceMemoryUsageStep(),

        DiffFeatureStep(periods=1, columns=["tn_scaled", "cust_request_qty", "stock_final"]),
        DiffFeatureStep(periods=2, columns=["tn_scaled", "cust_request_qty", "stock_final"]),
        DiffFeatureStep(periods=3, columns=["tn_scaled", "cust_request_qty", "stock_final"]),
        DiffFeatureStep(periods=4, columns=["tn_scaled", "cust_request_qty", "stock_final"]),
        DiffFeatureStep(periods=5, columns=["tn_scaled", "cust_request_qty", "stock_final"]),
        DiffFeatureStep(periods=11, columns=["tn_scaled", "cust_request_qty", "stock_final"]),
        FeatureEngineeringProductCatInteractionStep(cat="cat1", tn="tn_scaled"),
        FeatureEngineeringProductCatInteractionStep(cat="cat2", tn="tn_scaled"),
        FeatureEngineeringProductCatInteractionStep(cat="cat3", tn="tn_scaled"),

        ReduceMemoryUsageStep(),

        CreateTotalCategoryStep(cat="cat1"),
        CreateTotalCategoryStep(cat="cat2"),
        CreateTotalCategoryStep(cat="cat3"),
        CreateTotalCategoryStep(cat="brand"),
        CreateTotalCategoryStep(cat="customer_id"),
        CreateTotalCategoryStep(cat="product_id"),

        ReduceMemoryUsageStep(),

        CreateTotalCategoryStep(cat="cat1", tn="tn_scaled"),
        CreateTotalCategoryStep(cat="cat2", tn="tn_scaled"),
        CreateTotalCategoryStep(cat="cat3", tn="tn_scaled"),
        CreateTotalCategoryStep(cat="brand", tn="tn_scaled"),
        CreateTotalCategoryStep(cat="customer_id", tn="tn_scaled"),
        CreateTotalCategoryStep(cat="product_id", tn="tn_scaled"),
                
        CreateTotalCategoryStep(cat="cat1", tn="stock_final"),
        CreateTotalCategoryStep(cat="cat2", tn="stock_final"),
        CreateTotalCategoryStep(cat="cat3", tn="stock_final"),

        ReduceMemoryUsageStep(),


        FeatureDivInteractionStep(columns=[
            ("tn", "tn_cat1_vendidas"), 
            ("tn", "tn_cat2_vendidas"), 
            ("tn", "tn_cat3_vendidas"), 
            ("tn", "tn_brand_vendidas")]
        ),
        ReduceMemoryUsageStep(),

        FeatureProdInteractionStep(columns=[("tn_scaled", "cust_request_qty")]),
        CreateWeightByCustomerStep(),
        CreateWeightByProductStep(),
        ReduceMemoryUsageStep(),


        SaveDataFrameStep(df_name="df", file_name="df_fe_big.pickle"),

        SplitDataFrameStep(),
        FilterProductsIDStep(dfs=["test", "eval_data", "kaggle_pred"]),
        PrepareXYStep(),
        TrainModelLGBStep(
            train_eval_sets={
                "X_train": "X_train",
                "y_train": "y_train",
                "X_eval": "X_eval",
                "y_eval": "y_eval",
                "eval_data": "eval_data"
            },
        ),
        PredictStep(predict_set="X_test"),
        InverseScalePredictionsStep(),
        EvaluatePredictionsSteps(y_actual_df="y_test_unscale"),
        PlotFeatureImportanceStep(),
        TrainModelLGBStep(
            train_eval_sets={
               "X_train": "X_train_final",
               "y_train": "y_train_final",
                "X_eval": "X_test",
                "y_eval": "y_test",
                "eval_data": "test",
            },
        ),
        PredictStep(predict_set="X_kaggle"),
        InverseScalePredictionsStep(),
        KaggleSubmissionStep(), # falta hacer la inversa de la prediccion
        SaveExperimentStep(exp_name=f"{datetime.datetime.now().strftime('%Y%m%d_%H%M')}_exp_lgb_scaled_target_no_feature_df", save_dataframes=False),

    ],
        optimize_arftifacts_memory=True
)
pipeline.run(verbose=True)
    

Executing step: LoadDataFrameStep
Step LoadDataFrameStep completed in 2.28 seconds
Executing step: FilterProductsIDStep
Filtered DataFrame df shape: (11101445, 13)
Step FilterProductsIDStep completed in 2.73 seconds
Executing step: ScaleFeatureStep
Step ScaleFeatureStep completed in 2.58 seconds
Executing step: CreateTargetColumStep
Step CreateTargetColumStep completed in 2.69 seconds
Executing step: DateRelatedFeaturesStep
Step DateRelatedFeaturesStep completed in 2.80 seconds
Executing step: CastDataTypesStep
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 11101445 entries, 0 to 11101444
Data columns (total 17 columns):
 #   Column                 Dtype    
---  ------                 -----    
 0   product_id             uint32   
 1   fecha                  period[M]
 2   customer_id            uint32   
 3   plan_precios_cuidados  float32  
 4   cust_request_qty       int32    
 5   cust_request_tn        float32  
 6   tn                     float32  
 7   stock_final          