# utils.ipynb — port de `utils.R` a Python

Este notebook contiene equivalentes Python (pandas/numpy) de las utilidades definidas en `utils.R`:

- `Mean`, `Sd`, `wSum`, `Sum` (con manejo `na.rm=TRUE` y pesos)
- `by_check`
- `adj_fact` y `find_last` (para limpiar interacciones con factores constantes)

> Nota: `Sd()` replica `sqrt(Hmisc::wtd.var(..., normwt=FALSE, method='unbiased'))`, o sea varianza ponderada con denominador `(sum(w)-1)` cuando `normwt=False`.


In [None]:

import numpy as np
import pandas as pd


def _as_numpy(a) -> np.ndarray:
    '''Convert to 1D numpy array (float) preserving NaN.'''
    if isinstance(a, (pd.Series, pd.Index)):
        return a.to_numpy()
    return np.asarray(a)


def wtd_mean(x, weights=None, na_rm: bool = True) -> float:
    '''
    Replica Hmisc::wtd.mean / stats::weighted.mean con na.rm=TRUE:
    - filtra observaciones donde (x + w) no es NA si na_rm=True.
    - retorna sum(w * x) / sum(w).
    '''
    x = _as_numpy(x).astype(float)
    if weights is None:
        return float(np.nanmean(x) if na_rm else np.mean(x))

    w = _as_numpy(weights).astype(float)
    if na_rm:
        s = ~np.isnan(x + w)   # mismo criterio que R: !is.na(x + weights)
        x = x[s]
        w = w[s]

    sw = np.sum(w)
    return float(np.sum(w * x) / sw)  # puede dar nan si sw==0, como en R


def wtd_var(x, weights=None, normwt: bool = False, na_rm: bool = True, method: str = "unbiased") -> float:
    '''
    Replica Hmisc::wtd.var (caso principal usado por el paquete):
    - default: normwt=False, method='unbiased'
    - si normwt=True o method=='ML': usa lógica tipo stats::cov.wt
      (pesos normalizados a suma 1; unbiased divide por (1 - sum(w^2))).
    - si normwt=False y method=='unbiased': usa caso especial "frequency weights"
      var = sum(w*(x-xbar)^2) / (sum(w)-1)
    '''
    method = method.lower()
    if method not in {"unbiased", "ml"}:
        raise ValueError("method must be 'unbiased' or 'ML'")

    x = _as_numpy(x).astype(float)
    if weights is None or len(_as_numpy(weights)) == 0:
        if na_rm:
            x = x[~np.isnan(x)]
        # np.var con ddof=1 replica var() de R (unbiased)
        return float(np.var(x, ddof=1))

    w = _as_numpy(weights).astype(float)

    if na_rm:
        s = ~np.isnan(x + w)  # mismo criterio que R: !is.na(x + weights)
        x = x[s]
        w = w[s]

    if normwt:
        # igual que R: weights <- weights * length(x) / sum(weights)
        sw0 = np.sum(w)
        if sw0 != 0:
            w = w * (len(x) / sw0)

    # rama cov.wt (normwt==TRUE o method=='ML')
    if normwt or method == "ml":
        sw = np.sum(w)
        if sw == 0:
            return float("nan")
        w_norm = w / sw
        xbar = np.sum(w_norm * x)
        v_ml = np.sum(w_norm * (x - xbar) ** 2)  # ML covariance for 1D
        if method == "ml":
            return float(v_ml)
        # unbiased covariance for sampling weights
        denom = 1.0 - np.sum(w_norm ** 2)
        return float(v_ml / denom) if denom != 0 else float("inf")

    # caso especial: unbiased frequency weights (default en tu R)
    sw = np.sum(w)
    xbar = np.sum(w * x) / sw if sw != 0 else float("nan")
    return float(np.sum(w * (x - xbar) ** 2) / (sw - 1))  # puede dar inf si sw==1


def Mean(obj: str, df: pd.DataFrame, w: str = "weight_XX") -> float:
    '''Equivalente a Mean() en utils.R.'''
    return wtd_mean(df[obj], df[w], na_rm=True)


def Sd(obj: str, df: pd.DataFrame, w: str = "weight_XX") -> float:
    '''Equivalente a Sd() en utils.R: sqrt(Hmisc::wtd.var(...)).'''
    return float(np.sqrt(wtd_var(df[obj], df[w], normwt=False, na_rm=True, method="unbiased")))


def wSum(df: pd.DataFrame, w: str = "weight_XX") -> float:
    '''Equivalente a wSum() en utils.R: sum(w, na.rm=TRUE).'''
    return float(np.nansum(_as_numpy(df[w]).astype(float)))


def Sum(obj: str, df: pd.DataFrame, w: str = "weight_XX") -> float:
    '''
    Equivalente a Sum() en utils.R: t(x) %*% w tras remover NA en x o w.
    (En el paquete, típicamente `weight_XX` ya viene sin NA porque se setean a 0.)
    '''
    x = _as_numpy(df[obj]).astype(float)
    ww = _as_numpy(df[w]).astype(float)
    s = ~np.isnan(x + ww)  # mismo criterio que R: !is.na(x + weights)
    if not np.any(s):
        return 0.0
    return float(np.sum(x[s] * ww[s]))


def by_check(df: pd.DataFrame, ID: str, by: str) -> bool:
    '''
    Replica by_check() de utils.R:
    - crea codificación numérica del factor(by)
    - calcula sd dentro de cada ID
    - retorna mean(sd_ID) == 0 (sin ignorar NA en el mean)
    '''
    codes, _ = pd.factorize(df[by], sort=False)  # as.numeric(factor())
    temp_G = pd.Series(codes.astype(float), index=df.index)

    # sd por ID con ddof=1, igual que sd() de R
    sd_by_id = temp_G.groupby(df[ID]).apply(lambda s: float(np.nanstd(_as_numpy(s), ddof=1)))

    # mapear de vuelta a filas y promediar como en R (mean sin na.rm)
    sd_ID = df[ID].map(sd_by_id)
    m = sd_ID.mean(skipna=False)
    return bool(m == 0)


def find_last(s: str, char: str) -> int:
    '''
    Replica find_last() de utils.R:
    - retorna posición 1-indexed de la última ocurrencia de `char`
    - retorna 0 si no existe
    '''
    idx = s.rfind(char)
    return 0 if idx < 0 else idx + 1


def adj_fact(formula_str: str, df: pd.DataFrame, check: bool) -> str:
    '''
    Replica adj_fact() de utils.R:
    - Si check=True:
      * separa base hasta (último '+') incluido + un espacio si lo hay
      * separa el bloque final de interacción 'v1 * v2 * ...'
      * elimina variables con 1 solo nivel en df (excluyendo NA)
      * reconstituye la string.
    '''
    if not check:
        return formula_str

    last_plus_0 = formula_str.rfind("+")
    if last_plus_0 < 0:
        return formula_str

    # R asume "+ " (más un espacio) al cortar
    base_end = last_plus_0 + 1
    rest_start = last_plus_0 + 1

    if last_plus_0 + 1 < len(formula_str) and formula_str[last_plus_0 + 1] == " ":
        base_end = last_plus_0 + 2  # incluye "+ "
        rest_start = last_plus_0 + 2

    base = formula_str[:base_end]
    rest = formula_str[rest_start:]

    vars_ = rest.split(" * ")  # split literal como strsplit(..., fixed=TRUE)

    kept = []
    for v in vars_:
        v = v.strip()
        if v == "":
            continue
        if v not in df.columns:
            raise KeyError(f"adj_fact: variable '{v}' no está en df.columns")
        n_levels = pd.Series(df[v]).dropna().nunique()
        if n_levels > 1:
            kept.append(v)

    if len(kept) == 0:
        return base.rstrip()

    return base + " * ".join(kept)
