In [1]:
# Celda 1: imports
from __future__ import annotations
import warnings

warnings.filterwarnings("ignore")

from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional, Union
import numpy as np
import pandas as pd
import statsmodels.api as sm

# Modelos tabulares
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier

# Prophet opcional (quedará apagado por defecto)
try:
    from prophet import Prophet

    _HAS_PROPHET = True
except Exception:
    _HAS_PROPHET = False

In [2]:
import pandas as pd

df = pd.read_csv("C:/Users/Usuario/Downloads/olist_consolidated_dataset.csv")

In [3]:
# Celda 2: utilidades
def coerce_datetime(df: pd.DataFrame, cols: List[str]) -> pd.DataFrame:
    d = df.copy()
    for c in cols:
        if c in d.columns:
            d[c] = pd.to_datetime(d[c], errors="coerce")
    return d


def safe_float(df: pd.DataFrame, cols: List[str]) -> pd.DataFrame:
    d = df.copy()
    for c in cols:
        if c in d.columns:
            d[c] = pd.to_numeric(d[c], errors="coerce")
    return d

In [5]:
# Celda 3: clase principal
@dataclass
class ModeloMultifuncional:
    # columnas
    ts_col: str = "order_purchase_timestamp"
    categoria_col: str = "product_category_name_english"
    item_col: str = "order_item_id"

    # config series
    freq: str = "D"
    order: Tuple[int, int, int] = (1, 0, 1)
    seasonal_order: Tuple[int, int, int, int] = (0, 0, 0, 0)
    usar_prophet: bool = False  # OFF por defecto

    # config tabular
    random_state: int = 13

    # estado
    modo_: Optional[str] = None  # "series_cat_15d" | "regression" | "binary"
    modelos_ts_: Dict[str, object] = None
    historicos_: Dict[str, pd.DataFrame] = None
    modelo_tab_: Optional[Union[RandomForestRegressor, RandomForestClassifier]] = None
    features_: List[str] = None
    target_col_: Optional[str] = None

    # -------- utilidades internas --------
    def _coerce_dates(self, df: pd.DataFrame) -> pd.DataFrame:
        d = df.copy()
        d[self.ts_col] = pd.to_datetime(d[self.ts_col], errors="coerce")
        return d.dropna(subset=[self.ts_col])

    def _build_tabular(self, df: pd.DataFrame) -> pd.DataFrame:
        d = self._coerce_dates(df)
        num_cols = [
            "price",
            "freight_value",
            "product_weight_g",
            "product_length_cm",
            "product_height_cm",
            "product_width_cm",
            "product_name_lenght",
            "product_description_lenght",
            "product_photos_qty",
            "payment_value",
            "payment_installments",
            "review_score",
        ]
        d = safe_float(d, [c for c in num_cols if c in d.columns])

        if {"order_estimated_delivery_date", "order_delivered_customer_date"}.issubset(
            d.columns
        ):
            d["delivered_on_time"] = (
                pd.to_datetime(d["order_delivered_customer_date"], errors="coerce")
                <= pd.to_datetime(d["order_estimated_delivery_date"], errors="coerce")
            ).astype("int8")

        agg = {c: "sum" for c in num_cols if c in d.columns}
        agg.update(
            {
                "order_item_id": "count",
                "product_id": "nunique",
                "seller_id": "nunique",
                "delivered_on_time": (
                    "mean" if "delivered_on_time" in d.columns else "max"
                ),
            }
        )
        g = (
            d.groupby("order_id", dropna=False)
            .agg(agg)
            .rename(
                columns={
                    "order_item_id": "n_items",
                    "product_id": "n_products",
                    "seller_id": "n_sellers",
                    "delivered_on_time": "on_time_rate",
                }
            )
        )
        first = d.groupby("order_id")[
            [
                "payment_type",
                "product_category_name_english",
                "customer_state",
                "seller_state",
                "order_status",
            ]
        ].first()
        X = g.join(first, how="left").reset_index()

        if {"order_approved_at", "order_purchase_timestamp"}.issubset(d.columns):
            t = d.groupby("order_id")[
                [
                    "order_purchase_timestamp",
                    "order_approved_at",
                    "order_delivered_carrier_date",
                    "order_delivered_customer_date",
                    "order_estimated_delivery_date",
                ]
            ].first()

            def add(a, b, name):
                if a in t and b in t:
                    X[name] = (t[a] - t[b]).dt.total_seconds() / 86400.0

            add("order_approved_at", "order_purchase_timestamp", "t_approve_delay")
            add("order_delivered_carrier_date", "order_approved_at", "t_carrier_delay")
            add(
                "order_delivered_customer_date",
                "order_delivered_carrier_date",
                "t_transit",
            )
            add(
                "order_estimated_delivery_date",
                "order_purchase_timestamp",
                "t_est_window",
            )
        return X

    def _serie_cat(self, df: pd.DataFrame) -> pd.DataFrame:
        d = self._coerce_dates(df)
        if self.item_col not in d.columns:
            d[self.item_col] = 1.0
        g = (
            d.groupby(
                [self.categoria_col, pd.Grouper(key=self.ts_col, freq=self.freq)],
                dropna=False,
            )[self.item_col]
            .count()
            .rename("y")
            .reset_index()
            .rename(columns={self.ts_col: "ds"})
            .sort_values([self.categoria_col, "ds"])
        )
        return g