In [None]:
def npiv_default(
    Y: Iterable[float],
    X: np.ndarray,
    W: np.ndarray,
    X_eval: Optional[np.ndarray] = None,
    X_grid: Optional[np.ndarray] = None,
    alpha: float = 0.05,
    basis: str = "tensor",                 # options: "tensor", "additive", "glp"
    boot_num: int = 99,
    check_is_fullrank: bool = False,
    deriv_index: int = 1,
    deriv_order: int = 1,
    grid_num: int = 50,
    J_x_degree: int = 3,
    J_x_segments: Optional[int] = None,
    K_w_degree: int = 4,
    K_w_segments: Optional[int] = None,
    K_w_smooth: int = 2,
    knots: str = "uniform",                # or "quantiles"
    progress: bool = True,
    ucb_h: bool = True,
    ucb_deriv: bool = True,
    W_max: Optional[float] = None,
    W_min: Optional[float] = None,
    X_min: Optional[float] = None,
    X_max: Optional[float] = None,
    *,
    npiv_est: Optional[Callable[..., Dict[str, Any]]] = None,
    **kwargs: Any,
) -> Dict[str, Any]:
    """
    Python port of `npiv.default`. This is a thin wrapper around a user-supplied
    estimation routine `npiv_est(...)` that should implement the actual NPIV fit.

    Parameters
    ----------
    Y, X, W : arrays
        Data for outcome Y, regressors X, instruments W.
    X_eval, X_grid : arrays, optional
        Evaluation points and grid (if your estimator supports them).
    alpha, basis, boot_num, ... : hyperparameters
        Passed through to `npiv_est`.
    npiv_est : callable
        Function implementing the estimator. Must accept named arguments used below
        and return a dict-like object with results. Example signature:

            def npiv_est(Y, X, W, X_eval=None, X_grid=None, alpha=0.05, basis="tensor",
                         boot_num=99, check_is_fullrank=False, deriv_index=1, deriv_order=1,
                         grid_num=50, J_x_degree=3, J_x_segments=None, K_w_degree=4,
                         K_w_segments=None, K_w_smooth=2, knots="uniform", progress=True,
                         ucb_h=True, ucb_deriv=True, W_max=None, W_min=None, X_min=None, X_max=None, **kwargs) -> dict

    Returns
    -------
    est : dict
        The estimator's output, augmented with metadata:
        - 'call'  : string describing the call
        - 'ptm'   : elapsed time in seconds (float)
        - repeats of key hyperparameters for convenience (alpha, basis, ...)

    Notes
    -----
    - This function does not implement NPIV itself; provide `npiv_est`.
    - Naming is snake_case in Python; it corresponds to the R names one-to-one.
    """
    if npiv_est is None:
        raise NotImplementedError(
            "Please provide an `npiv_est` callable (the actual estimator). "
            "This wrapper mirrors R's npiv.default and attaches metadata."
        )

    t0 = time.perf_counter()

    # Call the user-provided estimator
    est = npiv_est(
        Y=Y,
        X=X,
        W=W,
        X_eval=X_eval,
        X_grid=X_grid,
        alpha=alpha,
        basis=basis,
        boot_num=boot_num,
        check_is_fullrank=check_is_fullrank,
        deriv_index=deriv_index,
        deriv_order=deriv_order,
        grid_num=grid_num,
        J_x_degree=J_x_degree,
        J_x_segments=J_x_segments,
        K_w_degree=K_w_degree,
        K_w_segments=K_w_segments,
        K_w_smooth=K_w_smooth,
        knots=knots,
        progress=progress,
        ucb_h=ucb_h,
        ucb_deriv=ucb_deriv,
        W_max=W_max,
        W_min=W_min,
        X_min=X_min,
        X_max=X_max,
        **kwargs,
    )

    t1 = time.perf_counter()

    # Augment with metadata (parity with R code)
    call_str = (
        "npiv_default(Y, X, W, "
        f"X_eval={X_eval is not None}, X_grid={X_grid is not None}, "
        f"alpha={alpha}, basis='{basis}', boot_num={boot_num}, "
        f"check_is_fullrank={check_is_fullrank}, deriv_index={deriv_index}, deriv_order={deriv_order}, "
        f"grid_num={grid_num}, J_x_degree={J_x_degree}, J_x_segments={J_x_segments}, "
        f"K_w_degree={K_w_degree}, K_w_segments={K_w_segments}, K_w_smooth={K_w_smooth}, "
        f"knots='{knots}', progress={progress}, ucb_h={ucb_h}, ucb_deriv={ucb_deriv}, "
        f"W_max={W_max}, W_min={W_min}, X_min={X_min}, X_max={X_max})"
    )

    # Ensure dict-like result
    if not isinstance(est, dict):
        # Allow objects with __dict__
        try:
            est = dict(est)
        except Exception:
            est = {"result": est}

    est.update(
        {
            "call": call_str,
            "ptm": (t1 - t0),  # seconds
            "alpha": alpha,
            "basis": basis,
            "boot_num": boot_num,
            "check_is_fullrank": check_is_fullrank,
            "grid_num": grid_num,
            "knots": knots,
            "progress": progress,
            "W_max": W_max,
            "W_min": W_min,
            "X_min": X_min,
            "X_max": X_max,
        }
    )

    # In R they set class(est) <- "npiv"; in Python just annotate:
    est.setdefault("_class", "npiv")
    return est

In [8]:

# --------- main function ---------

def npiv_est(
    Y: Iterable[float],
    X: np.ndarray,
    W: np.ndarray,
    X_eval: Optional[np.ndarray] = None,
    X_grid: Optional[np.ndarray] = None,     # passed to chooser if needed
    alpha: float = 0.05,
    basis: str = "tensor",                   # {"tensor","additive","glp"}
    boot_num: int = 99,
    check_is_fullrank: bool = False,
    deriv_index: int = 1,
    deriv_order: int = 1,
    grid_num: int = 50,
    J_x_degree: int = 3,
    J_x_segments: Optional[int] = None,
    K_w_degree: int = 4,
    K_w_segments: Optional[int] = None,
    K_w_smooth: int = 2,
    knots: str = "uniform",                  # {"uniform","quantiles"}
    progress: bool = True,
    ucb_h: bool = True,
    ucb_deriv: bool = True,
    W_max: Optional[float] = None,
    W_min: Optional[float] = None,
    X_max: Optional[float] = None,
    X_min: Optional[float] = None,
    *,
    prodspline: Callable[..., np.ndarray],
    npiv_choose_J: Optional[Callable[..., Dict[str, Any]]] = None,
) -> Dict[str, Any]:
    """
    Python port of npivEst. Requires:
      - prodspline(...) -> ndarray   (basis builder)
      - npiv_choose_J(...) -> dict   (only if data-driven J/K selection is needed)
    """
    basis = _match_arg(basis, ["tensor", "additive", "glp"], "basis")
    knots = _match_arg(knots, ["uniform", "quantiles"], "knots")

    if Y is None: raise ValueError("must provide Y")
    if X is None: raise ValueError("must provide X")
    if W is None: raise ValueError("must provide W")
    if K_w_degree < 0: raise ValueError("K.w.degree must be a non-negative integer")
    if J_x_degree < 0: raise ValueError("J.x.degree must be a non-negative integer")
    if K_w_segments is not None and K_w_segments <= 0: raise ValueError("K.w.segments must be positive")
    if J_x_segments is not None and J_x_segments <= 0: raise ValueError("J.x.segments must be positive")

    Y = _as_colvec(Y)
    X = np.asarray(X, dtype=float)
    W = np.asarray(W, dtype=float)

    if check_is_fullrank:
        if not _is_fullrank(Y): raise ValueError("Y is not of full column rank")
        if not _is_fullrank(X): raise ValueError("X is not of full column rank")
        if not _is_fullrank(W): raise ValueError("W is not of full column rank")

    # -------- regression vs IV; decide data-driven selection ----------
    data_driven = False
    if _identical(X, W):
        # regression: if J unspecified -> data-driven
        if J_x_segments is None:
            data_driven = True
        K_w_degree = J_x_degree
        # if J is specified, match degrees/segments; otherwise chooser will set
        if J_x_segments is not None:
            K_w_segments = J_x_segments
        K_w_smooth = 0
    else:
        # IV: if either J or K missing -> data-driven
        if K_w_segments is None or J_x_segments is None:
            data_driven = True

    if data_driven:
        if npiv_choose_J is None:
            raise ValueError("Data-driven selection requested but `npiv_choose_J` was not provided.")
        test1 = npiv_choose_J(
            Y=Y, X=X, W=W, X_grid=X_grid,
            J_x_degree=J_x_degree, K_w_degree=K_w_degree, K_w_smooth=K_w_smooth,
            knots=knots, basis=basis,
            X_min=X_min, X_max=X_max, W_min=W_min, W_max=W_max,
            grid_num=grid_num, boot_num=boot_num,
            check_is_fullrank=check_is_fullrank, progress=progress,
            prodspline=prodspline,  # forwarded to internals
            dimbs=None,             # the chooser itself will pass its own if needed
        )
        K_w_segments = int(test1["K.w.seg"])
        J_x_segments = int(test1["J.x.seg"])
        J_x_segments_set = test1["J.x.segments.set"]
        K_w_segments_set = test1["K.w.segments.set"]
    else:
        J_x_segments_set = K_w_segments_set = None

    if K_w_degree + (K_w_segments or 0) < J_x_degree + (J_x_segments or 0):
        raise ValueError("K.w.degree+K.w.segments must be >= J.x.degree+J.x.segments")

    # -------- Build W basis B.w --------
    if K_w_degree == 0:
        B_w = np.ones((W.shape[0], 1))
    else:
        K_W = np.column_stack([
            np.repeat(K_w_degree, W.shape[1]),
            np.repeat(K_w_segments, W.shape[1]),
        ])
        B_w = prodspline(x=W, K=K_W, knots=knots, basis=basis, x_min=W_min, x_max=W_max)
        if basis != "tensor":
            B_w = np.column_stack([np.ones((W.shape[0], 1)), B_w])

    # -------- Build X basis Psi.x (+ derivative, and eval versions) --------
    if J_x_degree == 0:
        Psi_x = np.ones((X.shape[0], 1))
        Psi_x_eval = Psi_x if X_eval is None else np.ones((X_eval.shape[0], 1))
        Psi_x_deriv = np.zeros_like(Psi_x)
        Psi_x_deriv_eval = Psi_x_deriv if X_eval is None else np.zeros_like(Psi_x_eval)
    else:
        K_X = np.column_stack([
            np.repeat(J_x_degree, X.shape[1]),
            np.repeat(J_x_segments, X.shape[1]),
        ])
        Psi_x = prodspline(x=X, K=K_X, knots=knots, basis=basis, x_min=X_min, x_max=X_max)
        Psi_x_deriv = prodspline(
            x=X, K=K_X, knots=knots, basis=basis,
            deriv_index=deriv_index, deriv=deriv_order,
            x_min=X_min, x_max=X_max
        )
        if X_eval is None:
            Psi_x_eval = Psi_x
            Psi_x_deriv_eval = Psi_x_deriv
        else:
            Psi_x_eval = prodspline(x=X, xeval=X_eval, K=K_X, knots=knots, basis=basis, x_min=X_min, x_max=X_max)
            Psi_x_deriv_eval = prodspline(
                x=X, xeval=X_eval, K=K_X, knots=knots, basis=basis,
                deriv_index=deriv_index, deriv=deriv_order,
                x_min=X_min, x_max=X_max
            )
        if basis != "tensor":
            Psi_x = np.column_stack([np.ones((Psi_x.shape[0], 1)), Psi_x])
            Psi_x_eval = np.column_stack([np.ones((Psi_x_eval.shape[0], 1)), Psi_x_eval])
            Psi_x_deriv = np.column_stack([np.zeros((Psi_x_deriv.shape[0], 1)), Psi_x_deriv])
            Psi_x_deriv_eval = np.column_stack([np.zeros((Psi_x_deriv_eval.shape[0], 1)), Psi_x_deriv_eval])

    # -------- 2SLS algebra (robust to large bases via pinv) --------
    # tmp = (Psi' B (B'B)^-1 B')^+ Psi' B (B'B)^-1 B'
    BtB_inv = _ginv(B_w.T @ B_w)
    PsiTPB = Psi_x.T @ B_w @ BtB_inv @ B_w.T
    tmp = _ginv(PsiTPB @ Psi_x) @ PsiTPB
    beta = tmp @ Y  # (J x 1)

    # -------- fitted function and derivative at eval data --------
    h = Psi_x_eval @ beta
    h_deriv = Psi_x_deriv_eval @ beta

    # -------- asymptotic SEs --------
    U_hat = Y - (Psi_x @ beta)                # n x 1
    Wgt = (tmp.T * U_hat.ravel()[:, None])    # n x J
    D = Wgt.T @ Wgt                           # J x J

    asy_se = np.sqrt(np.abs(np.sum((Psi_x_eval @ D) * Psi_x_eval, axis=1)))          # (neval,)
    asy_se_deriv = np.sqrt(np.abs(np.sum((Psi_x_deriv_eval @ D) * Psi_x_deriv_eval, axis=1)))

    # -------- UCBs (two regimes) --------
    h_lower = h_upper = cv = None
    h_lower_deriv = h_upper_deriv = cv_deriv = None

    if ucb_h or ucb_deriv:
        rng = np.random.default_rng(12345)  # fixed to mimic with_preserve_seed behavior
        n = X.shape[0]
        boot_Z_h = None
        boot_Z_d = None

        if data_driven:
            # Build per-J bootstrap using the chooser’s grid
            J_set = J_x_segments_set
            K_set = K_w_segments_set
            if J_set is None or K_set is None:
                raise ValueError("Data-driven UCB requires J.x.segments.set and K.w.segments.set.")

            # Optional truncation like in R
            if len(J_set) > 2:
                cutoff = max(J_x_segments, max(J_set[:-2]))
                J_boot = J_set[J_set <= cutoff]
                K_boot = K_set[: len(J_boot)]
            else:
                J_boot, K_boot = J_set, K_set

            if ucb_h: boot_Z_h = np.full((boot_num, len(J_boot)), np.nan)
            if ucb_deriv: boot_Z_d = np.full((boot_num, len(J_boot)), np.nan)

            # Pre-generate the same bootstrap draws used for every J
            Zmat = rng.standard_normal(size=(boot_num, n))

            for ii in range(len(J_boot)):
                J_seg = int(J_boot[ii])
                K_seg = int(K_boot[ii])

                # W basis at this J
                if K_w_degree == 0:
                    B_w_J = np.ones((W.shape[0], 1))
                else:
                    K_WJ = np.column_stack([
                        np.repeat(K_w_degree, W.shape[1]),
                        np.repeat(K_seg, W.shape[1]),
                    ])
                    B_w_J = prodspline(x=W, K=K_WJ, knots=knots, basis=basis, x_min=W_min, x_max=W_max)
                    if basis != "tensor":
                        B_w_J = np.column_stack([np.ones((W.shape[0], 1)), B_w_J])

                # X bases at this J
                if J_x_degree == 0:
                    Psi_x_J = np.ones((X.shape[0], 1))
                    Psi_x_J_eval = Psi_x_J if X_eval is None else np.ones((X_eval.shape[0], 1))
                    Psi_x_J_deriv_eval = np.zeros_like(Psi_x_J_eval)
                else:
                    K_XJ = np.column_stack([
                        np.repeat(J_x_degree, X.shape[1]),
                        np.repeat(J_seg, X.shape[1]),
                    ])
                    Psi_x_J = prodspline(x=X, K=K_XJ, knots=knots, basis=basis, x_min=X_min, x_max=X_max)
                    Psi_x_J_eval = Psi_x_J if X_eval is None else prodspline(
                        x=X, xeval=X_eval, K=K_XJ, knots=knots, basis=basis, x_min=X_min, x_max=X_max
                    )
                    if ucb_deriv:
                        Psi_x_J_deriv_eval = prodspline(
                            x=X, xeval=X_eval if X_eval is not None else None,
                            K=K_XJ, knots=knots, basis=basis,
                            deriv_index=deriv_index, deriv=deriv_order,
                            x_min=X_min, x_max=X_max
                        )
                    if basis != "tensor":
                        Psi_x_J = np.column_stack([np.ones((Psi_x_J.shape[0], 1)), Psi_x_J])
                        Psi_x_J_eval = np.column_stack([np.ones((Psi_x_J_eval.shape[0], 1)), Psi_x_J_eval])
                        if ucb_deriv:
                            Psi_x_J_deriv_eval = np.column_stack([np.zeros((Psi_x_J_deriv_eval.shape[0], 1)), Psi_x_J_deriv_eval])

                # Recompute tmp_J & residuals at this J
                BtB_inv_J = _ginv(B_w_J.T @ B_w_J)
                P_B_J = B_w_J @ BtB_inv_J @ B_w_J.T
                PsiTP_J = Psi_x_J.T @ P_B_J
                tmp_J = _ginv(PsiTP_J @ Psi_x_J) @ PsiTP_J
                beta_J = tmp_J @ Y
                U_J = Y - (Psi_x_J @ beta_J)

                # Asy SEs for norming
                Wgt_J = (tmp_J.T * U_J.ravel()[:, None])
                D_J = Wgt_J.T @ Wgt_J
                if ucb_h:
                    asy_se_J = np.sqrt(np.abs(np.sum((Psi_x_J_eval @ D_J) * Psi_x_J_eval, axis=1)))
                if ucb_deriv:
                    asy_se_J_deriv = np.sqrt(np.abs(np.sum((Psi_x_J_deriv_eval @ D_J) * Psi_x_J_deriv_eval, axis=1)))

                # Bootstrap sup t at this J (reusing Zmat rows)
                for b in range(boot_num):
                    z = Zmat[b]
                    if ucb_h:
                        num = Psi_x_J_eval @ (tmp_J @ (U_J.ravel() * z).reshape(-1, 1))
                        boot_Z_h[b, ii] = np.max(np.abs(num.ravel() / _nzd(asy_se_J)))
                    if ucb_deriv:
                        num_d = Psi_x_J_deriv_eval @ (tmp_J @ (U_J.ravel() * z).reshape(-1, 1))
                        boot_Z_d[b, ii] = np.max(np.abs(num_d.ravel() / _nzd(asy_se_J_deriv)))

            # Critical values (max over J grid, then quantile, then inflate)
            if ucb_h:
                Z_boot = np.nanmax(boot_Z_h, axis=1)
                z_star = float(np.quantile(Z_boot, 1 - alpha, method="linear"))
                cv = z_star + max(0.0, np.log(np.log(float(test1["J.tilde"])))) * float(test1["theta.star"])
            if ucb_deriv:
                Z_boot_d = np.nanmax(boot_Z_d, axis=1)
                z_star_d = float(np.quantile(Z_boot_d, 1 - alpha, method="linear"))
                cv_deriv = z_star_d + max(0.0, np.log(np.log(float(test1["J.tilde"])))) * float(test1["theta.star"])

            # restore selected J/K
            J_x_segments = int(test1["J.x.seg"])
            K_w_segments = int(test1["K.w.seg"])

        else:
            # Fixed J/K: Chen & Christensen (2018)
            Z_h = np.empty(boot_num) if ucb_h else None
            Z_d = np.empty(boot_num) if ucb_deriv else None
            for b in range(boot_num):
                z = rng.standard_normal(X.shape[0])
                if ucb_h:
                    num = Psi_x_eval @ (tmp @ (U_hat.ravel() * z).reshape(-1, 1))
                    Z_h[b] = np.max(np.abs(num.ravel() / _nzd(asy_se)))
                if ucb_deriv:
                    num_d = Psi_x_deriv_eval @ (tmp @ (U_hat.ravel() * z).reshape(-1, 1))
                    Z_d[b] = np.max(np.abs(num_d.ravel() / _nzd(asy_se_deriv)))
            if ucb_h: cv = float(np.quantile(Z_h, 1 - alpha, method="linear"))
            if ucb_deriv: cv_deriv = float(np.quantile(Z_d, 1 - alpha, method="linear"))

        # Apply UCBs
        if ucb_h:
            asy_se_col = asy_se.reshape(-1, 1)
            h_lower = h - cv * asy_se_col
            h_upper = h + cv * asy_se_col
        if ucb_deriv:
            asy_se_d_col = asy_se_deriv.reshape(-1, 1)
            h_lower_deriv = h_deriv - cv_deriv * asy_se_d_col
            h_upper_deriv = h_deriv + cv_deriv * asy_se_d_col
    # end UCB block

    # -------- pack & return (R-style names) --------
    return {
        "J.x.degree": J_x_degree,
        "J.x.segments": J_x_segments,
        "K.w.degree": K_w_degree,
        "K.w.segments": K_w_segments,
        "asy.se": asy_se,
        "beta": beta,
        "cv.deriv": (None if not ucb_deriv else cv_deriv),
        "cv": (None if not ucb_h else cv),
        "deriv.asy.se": asy_se_deriv,
        "deriv.index": deriv_index,
        "deriv.order": deriv_order,
        "deriv": h_deriv,
        "h.lower.deriv": h_lower_deriv,
        "h.lower": h_lower,
        "h.upper.deriv": h_upper_deriv,
        "h.upper": h_upper,
        "h": h,
        "nevalobs": (None if X_eval is None else X_eval.shape[0]),
        "nobs": X.shape[0],
        "ndim": X.shape[1],
        "residuals": (Y - (Psi_x @ beta)).ravel(),
        "trainiseval": (X_eval is None),
        "Y": Y.ravel(),
        "X": X,
        "X.eval": (X if X_eval is None else X_eval),
        "W": W,
    }


In [None]:
data("Engel95", package = "npiv")
Engel95 <- Engel95[order(Engel95$logexp),] 
attach(Engel95)
logexp.eval <- seq(4.5,6.5,length=100)


food_engel <- npiv(food, logexp, logwages, X.eval = logexp.eval)

In [1]:
# (C) Python port of Jeffrey S. Racine's glp.model.matrix (R, 2011)
# Author: ChatGPT — direct translation with NumPy
#
# Notes:
# - X must be a list of 2D NumPy arrays (n x d_j), one per variable.
# - Each X[j] must have its first column equal to 1 (intercept / degree 0).
# - This reproduces the R logic, including the construction of the 'z' set
#   of multi-indices and the generalized tensor product.
# - The original R code calls a compiled C routine; here we compute the
#   same result in pure NumPy.
#
# Dependencies: numpy

import numpy as np


def _index_swap(index_input: np.ndarray) -> np.ndarray:
    """
    Invert a permutation (R's 'index.swap').
    index_input: permutation indices (0-based) describing sort order.
    Returns an inverse permutation such that out[index_input[i]] = i.
    """
    index_input = np.asarray(index_input, dtype=int)
    inv = np.empty_like(index_input)
    inv[index_input] = np.arange(index_input.size, dtype=int)
    return inv


def _two_dimen(d1, d2, d2p, nd1, pd12, d1sets):
    """
    Port of the R helper two.dimen (uses 1-based logic for nd1-like arrays).
    Conventions:
      - d1, d2 are scalars (ncol of the first and second blocks).
      - nd1 is a 1-based-like vector of length d1 (index 1..d1 meaningful).
      - d1sets is an integer matrix whose rows are index 'sets' built so far.
      - Some entries of the sets can be 0 (intercept for that dimension).
      - Returns dict with keys: d12, nd1 (updated), d2p, sets.
    """
    # Ensure nd1 is a Python list/array with 1-based semantics for readability.
    nd1 = np.asarray(nd1, dtype=int)

    if d2 == 1:
        ret = {
            "d12": pd12,
            "nd1": nd1.copy(),
            "d2p": d2,
            "sets": np.hstack([d1sets, np.zeros((d1sets.shape[0], 1), dtype=int)]),
        }
        return ret

    # Start building the combined sets
    d12 = d2
    rows0 = min(d1sets.shape[0], d2)
    # Initial block: zeros for previous dims, and 1..d2 for the new dim (R used 1:d2 here)
    right_col = np.arange(1, d2 + 1, dtype=int).reshape(-1, 1)[:rows0]
    d2sets = np.hstack([np.zeros((rows0, d1sets.shape[1]), dtype=int), right_col])

    # If d1 > d2, add blocks
    if d1 - d2 > 0:
        for i in range(1, d1 - d2 + 1):  # i = 1 .. (d1 - d2)
            d12 += d2 * nd1[i - 1]  # nd1 is 1-based in original R; shift by -1
            oneSet = d1sets
            if d1sets.shape[1] > 1 and d2p > 0:
                oneSet = oneSet[oneSet[:, -1] != d2p, :]

            sel = oneSet[np.sum(oneSet, axis=1) == i, :]
            if sel.size == 0 or nd1[i - 1] == 0:
                continue

            # Replicate each selected row 'd2' times
            rep_rows = np.repeat(sel, repeats=d2, axis=0)
            # Append 0:(d2-1) with each value repeated nd1[i]
            last_col = np.repeat(np.arange(0, d2, dtype=int), nd1[i - 1]).reshape(-1, 1)
            d2sets = np.vstack([d2sets, np.hstack([rep_rows, last_col])])

    # Final blocks for i = 1..d2
    for i in range(1, d2 + 1):
        d12 += i * nd1[d1 - i]  # nd1[d1 - i] (since nd1 is 1-based in R)
        if nd1[d1 - i] > 0:
            oneSet = d1sets
            if d1sets.shape[1] > 1 and d2p > 0:
                oneSet = oneSet[oneSet[:, -1] != d2p, :]

            sel = oneSet[np.sum(oneSet, axis=1) == (d1 - i + 1), :]
            if sel.size == 0:
                continue

            # Replicate each selected row 'i' times
            rep_rows = np.repeat(sel, repeats=i, axis=0)
            # Append 0:(i-1), each repeated nd1[d1-i]
            last_col = np.repeat(np.arange(0, i, dtype=int), nd1[d1 - i]).reshape(-1, 1)
            d2sets = np.vstack([d2sets, np.hstack([rep_rows, last_col])])

    # Update nd: compute nd2 from nd1
    nd2 = nd1.copy()
    if d1 > 1:
        for j in range(1, d1):  # j = 1..(d1-1) in R
            acc = 0
            i_lo = max(0, j - d2 + 1)  # R uses max(0, j - d2 + 1)
            # sum nd1[i] for i = j..(max(0,...))
            # In R this loop is ascending i=j:max(0, j-d2+1)
            # Implement ascending with special handling of i==0 term (=1)
            for i in range(j, i_lo - 1, -1):  # descending gives same sum; we just add all
                if i > 0:
                    acc += nd1[i]
                else:
                    acc += 1  # nd1[0] is treated as 1 in the R code
            nd2[j] = acc

    if d2 > 1:
        # nd2[d1] (last) starts at nd1[d1] and add nd1[(d1-d2+1):(d1-1)]
        acc = nd1[d1]
        for i in range(d1 - d2 + 1, d1):
            acc += nd1[i]
        nd2[d1] = acc
    else:
        nd2[d1] = nd1[d1]

    return {"d12": d12, "nd1": nd2, "d2p": d2, "sets": d2sets}


def _construct_tensor_prod(dimen_input):
    """
    Port of R's construct.tensor.prod.
    Returns the 'sets' matrix (called 'z' later) with columns aligned
    to the original dimension order.
    """
    dimen_input = np.asarray(dimen_input, dtype=int)
    order_desc = np.argsort(dimen_input)[::-1]  # indices sorting by decreasing dim
    dimen = dimen_input[order_desc]
    k = dimen.size

    # nd1: length dimen[0] (R is 1-based; we keep an extra slot at front for clarity)
    # We'll store nd1 as length (d1 + 1), where index 1..d1 are active.
    d1 = int(dimen[0])
    nd1 = np.ones(d1 + 1, dtype=int)
    nd1[d1] = 0  # R: nd1[d1] <- 0

    ncol_bs = d1
    sets = np.arange(1, d1 + 1, dtype=int).reshape(-1, 1)  # R: 1:d1 as a column
    d2p = 0

    if k > 1:
        for i in range(1, k):
            out = _two_dimen(d1, int(dimen[i]), d2p, nd1[1:], ncol_bs, sets)
            nd1 = np.concatenate([[0], out["nd1"]])  # keep 1-based convention
            ncol_bs = out["d12"]
            sets = out["sets"]
            d2p = out["d2p"]

        # R: ncol.bs <- dim.rt$d12 + k - 1; then append k-1 rows (pure terms)
        ncol_bs = out["d12"] + k - 1
        for i in range(1, k):
            one_row = np.zeros((1, sets.shape[1]), dtype=int)
            one_row[0, i - 1] = dimen[i - 1]
            sets = np.vstack([sets, one_row])

    # Reorder columns back to the original variable order
    inv = _index_swap(order_desc)
    sets = sets[:, inv]
    return sets


def glp_model_matrix(X):
    """
    Generalized Local Polynomial model matrix (Python/NumPy port).

    Parameters
    ----------
    X : list of np.ndarray
        List of model matrices [X1, X2, ..., Xk], each of shape (n, d_j).
        The first column of each Xj must be ones (intercept).

    Returns
    -------
    B : np.ndarray
        The generalized polynomial model matrix of shape (n, q),
        where q is determined by the algorithm (typically much
        smaller than the full tensor-product dimension).
    """
    if not isinstance(X, (list, tuple)) or len(X) == 0:
        raise ValueError("X must be a non-empty list of 2D numpy arrays.")

    # Validate shapes and get dimensions
    n = X[0].shape[0]
    for i, Xi in enumerate(X):
        if Xi.ndim != 2:
            raise ValueError(f"X[{i}] must be 2D.")
        if Xi.shape[0] != n:
            raise ValueError("All X[j] must have the same number of rows.")

    d = np.array([Xi.shape[1] for Xi in X], dtype=int)
    k = len(X)

    # Build 'z' (called 'sets' in the helpers).
    z = _construct_tensor_prod(d)

    # Match R's reordering to expand.grid: order rows by last->first columns
    # (R: z <- z[do.call('order', as.list(data.frame(z[, NCOL(z):1]))), ])
    if z.shape[1] > 1:
        # np.lexsort uses last key as primary; give it columns from first to last,
        # so we reverse to get last-to-first primary ordering.
        sort_keys = tuple(z[:, j] for j in range(z.shape[1] - 1, -1, -1))
        ord_rows = np.lexsort(sort_keys)
        z = z[ord_rows, :]

    # Build the result: for each row r of z, multiply the selected columns across variables.
    # Map set value v -> column index:
    #   v == 0  -> use column 0  (intercept)
    #   v >= 1  -> use column (v - 1)
    n_terms = z.shape[0]
    B = np.ones((n, n_terms), dtype=float)

    # Precompute per-variable column blocks selected by z
    for j in range(k):
        zj = z[:, j]
        cols_j = np.where(zj <= 0, 0, zj - 1)  # vector of column indices for X[j]
        # Gather columns for all terms at once: shape (n, n_terms)
        block = X[j][:, cols_j]
        B *= block

    return B


In [2]:
import warnings
from dataclasses import dataclass
from typing import Iterable, Optional, Union

import numpy as np
from scipy.interpolate import BSpline


# ---------- internal helpers ----------

def _make_breaks(nbreak: int, x_min: float, x_max: float, knots: Optional[Iterable[float]]):
    """
    Returns the 'breakpoints' vector of length nbreak (including the two boundaries).
    If `knots` is provided, it must be length nbreak and sorted (R code enforces len == nbreak).
    """
    if knots is not None:
        b = np.asarray(knots, dtype=float)
        if b.size != nbreak:
            raise ValueError("`knots` length must equal `nbreak`.")
        if np.any(np.diff(b) < 0):
            b = np.sort(b)
        return b
    # equally spaced breakpoints, including endpoints
    return np.linspace(x_min, x_max, nbreak)


def _augmented_knots(breaks: np.ndarray, degree: int):
    """
    Construct the open knot vector with endpoint multiplicity p+1 and interior breaks with multiplicity 1.
    """
    p = int(degree)
    if breaks.ndim != 1 or breaks.size < 2:
        raise ValueError("breaks must be a 1D array with at least two values.")
    left = np.repeat(breaks[0], p + 1)
    right = np.repeat(breaks[-1], p + 1)
    interior = breaks[1:-1]
    t = np.concatenate([left, interior, right]).astype(float)
    # n_basis = len(t) - p - 1  == nbreak + degree - 1
    return t


def _bspline_design_matrix(x: np.ndarray, t: np.ndarray, degree: int, deriv_order: int) -> np.ndarray:
    """
    Evaluate all B-spline basis functions (or their `deriv_order` derivatives) at x.
    Returns matrix shape (len(x), n_basis).
    """
    p = int(degree)
    n_basis = len(t) - p - 1
    if n_basis <= 0:
        raise ValueError("Invalid knot configuration -> non-positive number of basis functions.")

    # Build basis splines with identity coefficients
    # (one BSpline per column; keep extrapolate=False here; caller decides how to handle outside)
    eye = np.eye(n_basis)
    col_spl = []
    for i in range(n_basis):
        spl = BSpline(t, eye[i], p, extrapolate=False)
        if deriv_order > 0:
            spl = spl.derivative(deriv_order)
        col_spl.append(spl)

    # Evaluate
    X = np.empty((x.size, n_basis), dtype=float)
    for j, spl in enumerate(col_spl):
        X[:, j] = spl(x)
    return X


def _taylor_extrapolate(
    x_out: np.ndarray,
    pivot: float,
    t: np.ndarray,
    degree: int,
    deriv: int
) -> np.ndarray:
    """
    For points outside support, reproduce the R code's Taylor expansion around the boundary pivot.
    Returns a matrix with shape (len(x_out), n_basis) corresponding to the `deriv`-th derivative values.
    """
    p = int(degree)
    n_basis = len(t) - p - 1

    if deriv > p + 1:  # R: deriv <= degree + 1 checked earlier; this case would be zero anyway
        return np.zeros((x_out.size, n_basis), dtype=float)

    # Number of terms in the expansion: q = 0..(p-deriv)
    L = p - deriv + 1
    if L <= 0:
        # deriv == degree+1 => everything is zero (outside and inside)
        return np.zeros((x_out.size, n_basis), dtype=float)

    # Build T: rows correspond to orders `deriv, deriv+1, ..., degree` evaluated at pivot
    orders = list(range(deriv, p + 1))
    T = np.vstack([_bspline_design_matrix(np.array([pivot], float), t, degree, s)[0, :] for s in orders])

    # Divide row r by (r-1)! (gamma(r)) so rows correspond to f^{(deriv+q)}(pivot) / q!
    # where q = 0..(L-1), and gamma(q+1) = q!
    gamma = np.array([np.math.factorial(q) for q in range(L)], dtype=float)
    T_scaled = T / gamma[:, None]

    # Build polynomial features: [1, (x-pivot), (x-pivot)^2, ..., (x-pivot)^(L-1)]
    dx = (x_out - pivot).reshape(-1, 1)
    # (broadcasted) powers per column q
    poly = np.hstack([np.ones_like(dx)] + [dx ** q for q in range(1, L)])

    return poly @ T_scaled  # shape (#x_out, n_basis)


# ---------- public API (Python port) ----------

def bs_des(
    x: Union[np.ndarray, Iterable[float]],
    degree: int = 3,
    nbreak: int = 2,
    deriv: Union[int, Iterable[int]] = 0,
    x_min: Optional[float] = None,
    x_max: Optional[float] = None,
    knots: Optional[Iterable[float]] = None,
):
    """
    Python port of R's bs.des (basis/derivative evaluator).
    - x: points to evaluate (1D array)
    - degree: spline degree p >= 1
    - nbreak: number of breakpoints (including the two boundaries)
    - deriv: either a scalar derivative order or a vector of per-x derivative orders
    - x_min, x_max: boundaries (used when `knots` is None)
    - knots: optional breakpoints (length must equal nbreak)

    Returns: (n, nbreak + degree - 1) matrix
    """
    x = np.asarray(x, dtype=float).ravel()
    n = x.size

    if degree <= 0:
        raise ValueError("degree must be a positive integer")
    if nbreak <= 1:
        raise ValueError("nbreak must be at least 2")

    # bounds
    if x_min is None:
        x_min = float(np.min(x))
    if x_max is None:
        x_max = float(np.max(x))
    if x_min >= x_max:
        raise ValueError("x_min must be less than x_max")

    # breaks and augmented knot vector
    breaks = _make_breaks(nbreak, x_min, x_max, knots)
    t = _augmented_knots(breaks, degree)

    # deriv vector per x
    if np.isscalar(deriv):
        deriv_vec = np.full(n, int(deriv), dtype=int)
    else:
        deriv_vec = np.asarray(deriv, dtype=int).ravel()
        if deriv_vec.size != n:
            raise ValueError("`deriv` must be a scalar or a vector with the same length as `x`.")

    if np.any(deriv_vec < 0):
        raise ValueError("deriv must be non-negative")
    if np.any(deriv_vec > degree + 1):
        raise ValueError("deriv must be <= degree + 1")

    # Evaluate by grouping unique derivative orders for efficiency
    B = np.empty((n, len(t) - degree - 1), dtype=float)
    for d_ord in np.unique(deriv_vec):
        idx = np.where(deriv_vec == d_ord)[0]
        if idx.size == 0:
            continue
        B[idx, :] = _bspline_design_matrix(x[idx], t, degree, int(d_ord))

    return B


def gsl_bs(
    x: Union[np.ndarray, Iterable[float]],
    degree: int = 3,
    nbreak: int = 2,
    deriv: int = 0,
    x_min: Optional[float] = None,
    x_max: Optional[float] = None,
    intercept: bool = False,
    knots: Optional[Iterable[float]] = None,
):
    """
    Python port of R's gsl.bs.default.
    Returns the generalized spline basis (or derivative basis) matrix.

    Notes:
    - If `x` lies outside the boundary knots, we reproduce the R code's
      Taylor expansion using derivatives at the boundary to avoid ill-conditioning.
    - If `intercept=False` (R's default), the first column is dropped.
    - Attributes from the R object are returned alongside the matrix.
    """
    x = np.asarray(x, dtype=float).ravel()

    if degree <= 0:
        raise ValueError("degree must be a positive integer")
    if deriv < 0:
        raise ValueError("deriv must be non-negative")
    if deriv > degree + 1:
        raise ValueError("deriv must be <= degree + 1")
    if nbreak <= 1:
        raise ValueError("nbreak must be at least 2")

    # reconcile nbreak with provided knots (R resets nbreak and warns)
    if knots is not None:
        knots = np.asarray(knots, dtype=float).ravel()
        if knots.size != nbreak:
            nbreak = int(knots.size)
            warnings.warn(f"nbreak and knots vector do not agree: resetting nbreak to {nbreak}", RuntimeWarning)

    # bounds: when knots are provided, outside check uses min/max(knots)
    # otherwise, x_min/x_max come from data if not provided
    if x_min is None:
        x_min = float(np.min(x))
        ol = np.zeros_like(x, dtype=bool)
    else:
        ol = x < x_min

    if x_max is None:
        x_max = float(np.max(x))
        or_ = np.zeros_like(x, dtype=bool)
    else:
        or_ = x > x_max

    if knots is not None:
        kmin, kmax = float(np.min(knots)), float(np.max(knots))
        ol = ol | (x < kmin)
        or_ = or_ | (x > kmax)

    outside = ol | or_
    inside = ~outside

    # Build breaks & augmented knots once
    breaks = _make_breaks(nbreak, x_min, x_max, knots)
    t = _augmented_knots(breaks, degree)
    ncol = len(t) - degree - 1

    # Evaluate inside points directly
    if np.any(outside):
        warnings.warn("some 'x' values beyond boundary knots may cause ill-conditioned bases", RuntimeWarning)

    # Initialize result
    B = np.zeros((x.size, ncol), dtype=float)

    # Inside points: evaluate derivative `deriv` normally
    if np.any(inside):
        B[inside, :] = bs_des(
            x[inside],
            degree=degree,
            nbreak=nbreak,
            deriv=int(deriv),
            x_min=x_min,
            x_max=x_max,
            knots=knots,
        )

    # Outside-left: Taylor around pivot = x_min (or min(knots) if provided)
    if np.any(ol) and (degree + 1 > deriv):
        pivot = breaks[0]  # left boundary
        B[ol, :] = _taylor_extrapolate(x[ol], pivot, t, degree, deriv)

    # Outside-right: Taylor around pivot = x_max (or max(knots) if provided)
    if np.any(or_) and (degree + 1 > deriv):
        pivot = breaks[-1]  # right boundary
        B[or_, :] = _taylor_extrapolate(x[or_], pivot, t, degree, deriv)

    # Drop the first column if intercept == False (R default)
    if not intercept:
        B = B[:, 1:].copy()

    # Return matrix + attributes for parity with R object
    attrs = {
        "degree": degree,
        "nbreak": nbreak,
        "deriv": deriv,
        "x.min": x_min,
        "x.max": x_max,
        "intercept": intercept,
        "knots": None if knots is None else np.asarray(knots, dtype=float),
    }
    return B, attrs


@dataclass
class GslBS:
    """
    Small convenience wrapper mimicking R's object + predict method.
    """
    degree: int = 3
    nbreak: int = 2
    deriv: int = 0
    intercept: bool = False
    knots: Optional[np.ndarray] = None
    x_min: Optional[float] = None
    x_max: Optional[float] = None

    def fit_transform(self, x: Union[np.ndarray, Iterable[float]]) -> np.ndarray:
        B, attrs = gsl_bs(
            x,
            degree=self.degree,
            nbreak=self.nbreak,
            deriv=self.deriv,
            x_min=self.x_min,
            x_max=self.x_max,
            intercept=self.intercept,
            knots=self.knots,
        )
        # persist attributes as in R
        self.x_min = attrs["x.min"]
        self.x_max = attrs["x.max"]
        self.knots = attrs["knots"]
        return B

    def transform(self, newx: Union[np.ndarray, Iterable[float]]) -> np.ndarray:
        # Predict using stored attributes (like predict.gsl.bs)
        B, _ = gsl_bs(
            newx,
            degree=self.degree,
            nbreak=self.nbreak if self.knots is None else len(self.knots),
            deriv=self.deriv,
            x_min=self.x_min,
            x_max=self.x_max,
            intercept=self.intercept,
            knots=self.knots,
        )
        return B


# ---------- quick example ----------

if __name__ == "__main__":
    rng = np.random.default_rng(42)
    n = 1000
    degree = 3
    nbreak = 5

    x = rng.random(n)  # U[0,1]
    # Fit + transform
    B, attrs = gsl_bs(x, degree=degree, nbreak=nbreak)  # default deriv=0, intercept=False
    print("B shape:", B.shape)  # (n, nbreak + degree - 1 - 1) because intercept=False

    # Predict on new data using the class
    model = GslBS(degree=degree, nbreak=nbreak, intercept=False)
    B_train = model.fit_transform(x)
    x_new = np.linspace(-0.2, 1.2, 200)  # includes outside points
    B_new = model.transform(x_new)
    print("B_train:", B_train.shape, "B_new:", B_new.shape)


B shape: (1000, 6)




AttributeError: module 'numpy' has no attribute 'math'

In [None]:
import time
import numpy as np
from typing import List, Optional, Iterable, Dict, Any, Callable


def tensor_prod_model_matrix(X: List[np.ndarray]) -> np.ndarray:
    """
    Python port of mgcv::tensor.prod.model.matrix.

    Given a list of design matrices X = [X1, X2, ... , Xm] with the same number
    of rows n and columns d_j, returns an (n x prod(d_j)) matrix whose i-th row is
    the Kronecker product X1[i, :] ⊗ X2[i, :] ⊗ ... ⊗ Xm[i, :].

    Parameters
    ----------
    X : list of 2D numpy arrays
        Each Xi has shape (n, d_i). All must have the same n.

    Returns
    -------
    T : 2D numpy array
        Shape (n, prod(d_i)). Column order matches the iterative
        Khatri–Rao construction (outermost loop over later Xi).
    """
    if not isinstance(X, (list, tuple)) or len(X) == 0:
        raise ValueError("X must be a non-empty list of 2D NumPy arrays.")

    n = X[0].shape[0]
    for i, Xi in enumerate(X):
        if Xi.ndim != 2:
            raise ValueError(f"X[{i}] must be 2D.")
        if Xi.shape[0] != n:
            raise ValueError("All X[j] must have the same number of rows.")

    # Build via iterative Khatri–Rao (column-wise row Kronecker).
    T = np.ones((n, 1), dtype=float)
    for Xi in X:
        # For each column c of Xi, multiply T element-wise by Xi[:, c] and stack
        T = np.hstack([T * Xi[:, [c]] for c in range(Xi.shape[1])])

    return T




In [6]:
import numpy as np
import pandas as pd
from typing import Any, Dict, Optional, Union, Mapping
from patsy import dmatrix


def _split_npiv_formula(formula: str) -> Dict[str, str]:
    """
    Split 'Y ~ X | W' into lhs, rhs_x, rhs_w (strings).
    """
    if "~" not in formula:
        raise ValueError("Formula must contain '~', e.g. 'y ~ x1 + x2 | z1 + z2'.")
    lhs, rhs = formula.split("~", 1)
    lhs = lhs.strip()
    if "|" not in rhs:
        raise ValueError("Formula must contain '|' to separate X and W: 'y ~ X | W'.")
    rhs_x, rhs_w = rhs.split("|", 1)
    rhs_x, rhs_w = rhs_x.strip(), rhs_w.strip()
    if not lhs or not rhs_x or not rhs_w:
        raise ValueError("Parsed formula parts are empty; check your formula string.")
    return {"lhs": lhs, "rhs_x": rhs_x, "rhs_w": rhs_w}


def _apply_subset(df: pd.DataFrame, subset: Optional[Union[str, np.ndarray, pd.Series]]) -> pd.DataFrame:
    """
    Apply a subset (string query or boolean index/array). Returns a view.
    """
    if subset is None:
        return df
    if isinstance(subset, str):
        return df.query(subset)
    # boolean mask / indexer
    return df.loc[subset]


def npiv_formula(
    formula: str,
    data: Optional[Union[pd.DataFrame, Mapping[str, Any]]] = None,
    newdata: Optional[Union[pd.DataFrame, Mapping[str, Any]]] = None,
    subset: Optional[Union[str, np.ndarray, pd.Series]] = None,
    na_action: str = "na.omit",  # supports "na.omit" (drop) or "na.raise"
    *,
    npiv_default_fn=None,  # inject your previously ported npiv_default
    **kwargs: Any,
) -> Dict[str, Any]:
    """
    Python port of R's `npiv.formula`.

    Parameters
    ----------
    formula : str
        'Y ~ x-terms | w-terms' (use standard patsy syntax for terms).
    data : DataFrame or mapping
        Training data.
    newdata : DataFrame or mapping, optional
        If provided, builds X_eval using the X part of the formula.
    subset : str or boolean index, optional
        Row filter (pandas .query string or boolean index).
    na_action : {"na.omit","na.raise"}
        Drop rows with missing values in any of the used variables, or raise.
    npiv_default_fn : callable
        The Python port of `npiv.default` you already have (named here to avoid shadowing).

    Returns
    -------
    est : dict
        Result of `npiv_default_fn(...)` augmented with 'call' and 'formula'.

    Notes
    -----
    - Intercepts on RHS are removed (like `[,-1]` in R).
    - We align Y, X, and W on the intersection of valid (non-NA) rows.
    """
    if npiv_default_fn is None:
        raise ValueError("Please pass your `npiv_default` function as `npiv_default_fn=...`.")

    parts = _split_npiv_formula(formula)
    lhs, rhs_x, rhs_w = parts["lhs"], parts["rhs_x"], parts["rhs_w"]

    if data is None:
        data = {}
    if not isinstance(data, pd.DataFrame):
        data = pd.DataFrame(data)

    # subset first
    df = _apply_subset(data, subset)

    # NA handling: use patsy with NA_action
    if na_action in ("na.omit", "omit", "drop"):
        patsy_na = "drop"
    elif na_action in ("na.raise", "raise"):
        patsy_na = "raise"
    else:
        raise ValueError("na_action must be 'na.omit' (drop) or 'na.raise'.")

    # Build Y, X, W as DataFrames (keep indices to align)
    # Y: evaluate expression on the RHS side of dmatrix ('0 + expr' -> no intercept)
    y_df = dmatrix(f"0 + {lhs}", df, return_type="dataframe", NA_action=patsy_na)
    # X, W: drop intercept (R used [,-1]); '0 +' ensures no implicit intercept
    X_df = dmatrix(f"0 + {rhs_x}", df, return_type="dataframe", NA_action=patsy_na)
    W_df = dmatrix(f"0 + {rhs_w}", df, return_type="dataframe", NA_action=patsy_na)

    # Align on common index (intersection of rows that survived NA drop per part)
    idx = y_df.index.intersection(X_df.index).intersection(W_df.index)
    if len(idx) == 0:
        raise ValueError("No rows left after NA handling / subset alignment.")

    y = np.asarray(y_df.loc[idx]).ravel()
    X = np.asarray(X_df.loc[idx])
    W = np.asarray(W_df.loc[idx])

    # Optional evaluation data for X
    X_eval = None
    if newdata is not None:
        nd = newdata if isinstance(newdata, pd.DataFrame) else pd.DataFrame(newdata)
        X_eval_df = dmatrix(f"0 + {rhs_x}", nd, return_type="dataframe", NA_action=patsy_na)
        X_eval = np.asarray(X_eval_df)

    # Call your npiv_default
    est = npiv_default_fn(
        Y=y,
        X=X,
        W=W,
        X_eval=X_eval,
        **kwargs,
    )

    # Add call & formula (parity with R)
    est["call"] = f"npiv_formula(formula='{formula}', data=..., newdata={'provided' if newdata is not None else 'None'})"
    est["formula"] = formula
    est.setdefault("_class", "npiv")
    return est


def fitted_npiv(obj: Dict[str, Any]) -> np.ndarray:
    """
    Python analogue of `fitted.npiv`: returns object$h.
    """
    if "h" not in obj:
        raise KeyError("Expected key 'h' in npiv result (fitted values).")
    return np.asarray(obj["h"])


def residuals_npiv(obj: Dict[str, Any]) -> np.ndarray:
    """
    Python analogue of `residuals.npiv`: returns object$residuals.
    """
    if "residuals" not in obj:
        raise KeyError("Expected key 'residuals' in npiv result.")
    return np.asarray(obj["residuals"])


In [None]:
import numpy as np
from typing import Optional, Iterable, Dict, Any, Callable, Sequence

try:
    from tqdm import trange
except Exception:  # tqdm is optional
    def trange(n, **kwargs):
        return range(n)


# -------- small helpers --------

def _match_arg(val: str, choices: Iterable[str], name: str) -> str:
    choices = list(choices)
    if val not in choices:
        raise ValueError(f"{name} must be one of {choices}, got {val!r}.")
    return val

def _ginv(A: np.ndarray) -> np.ndarray:
    return np.linalg.pinv(A, rcond=1e-12)

def _is_fullrank(M: np.ndarray) -> bool:
    if M.ndim == 1:
        M = M[:, None]
    return np.linalg.matrix_rank(M) == M.shape[1]

def _as_colvec(y) -> np.ndarray:
    y = np.asarray(y, dtype=float).ravel()
    return y.reshape(-1, 1)

def _nzd(x: np.ndarray, eps: float = 1e-12) -> np.ndarray:
    return np.where(np.abs(x) < eps, eps, x)


# -------- main function --------

def npiv_J(
    Y,
    X: np.ndarray,
    W: np.ndarray,
    X_grid: Optional[np.ndarray] = None,
    J_x_degree: int = 3,
    K_w_degree: int = 4,
    J_x_segments_set: Optional[Sequence[int]] = None,   # default: [1,3,7,15,31,63]
    K_w_segments_set: Optional[Sequence[int]] = None,   # default: [1,3,7,15,31,63]
    knots: str = "uniform",                             # {"uniform","quantiles"}
    basis: str = "tensor",                              # {"tensor","additive","glp"}
    X_min: Optional[float] = None,
    X_max: Optional[float] = None,
    W_min: Optional[float] = None,
    W_max: Optional[float] = None,
    grid_num: int = 50,
    boot_num: int = 99,
    alpha: float = 0.5,
    check_is_fullrank: bool = False,
    progress: bool = True,
    *,
    prodspline: Callable[..., np.ndarray],
    dimbs: Callable[..., int],
) -> Dict[str, Any]:
    """
    Python port of R's npivJ (Lepski selection via sup-t comparisons).

    You must inject:
      - prodspline(x, K, knots, basis, x_min, x_max, xeval=None) -> ndarray
        (return basis WITHOUT leading intercept; this function adds it when needed)
      - dimbs(basis, degree, segments) -> int  (dimension of X basis)

    Returns dict with keys: J.tilde, J.hat, J.hat.n, J.x.seg, K.w.seg, theta.star
    """
    basis = _match_arg(basis, ["tensor", "additive", "glp"], "basis")
    knots = _match_arg(knots, ["uniform", "quantiles"], "knots")

    if Y is None: raise ValueError("must provide Y")
    if X is None: raise ValueError("must provide X")
    if W is None: raise ValueError("must provide W")
    if K_w_degree < 0: raise ValueError("K_w_degree must be non-negative")
    if J_x_degree < 0: raise ValueError("J_x_degree must be non-negative")
    if not (0 < alpha < 1): raise ValueError("alpha must lie in (0,1)")

    Y = _as_colvec(Y)
    X = np.asarray(X, dtype=float)
    W = np.asarray(W, dtype=float)

    if check_is_fullrank:
        if not _is_fullrank(Y): raise ValueError("Y is not of full column rank")
        if not _is_fullrank(X): raise ValueError("X is not of full column rank")
        if not _is_fullrank(W): raise ValueError("W is not of full column rank")

    n, p = X.shape

    # If no grid given, make a simple column-wise grid (not a Cartesian product).
    if X_grid is None:
        X_grid = np.empty((grid_num, p))
        for j in range(p):
            col = X[:, j]
            X_grid[:, j] = np.linspace(np.min(col), np.max(col), grid_num)

    # Default segment sets: c(2,4,8,16,32,64) - 1  ->  [1,3,7,15,31,63]
    if J_x_segments_set is None:
        J_x_segments_set = (2 ** np.arange(1, 7) - 1).tolist()
    if K_w_segments_set is None:
        K_w_segments_set = (2 ** np.arange(1, 7) - 1).tolist()

    J_x_segments_set = np.asarray(J_x_segments_set, dtype=int)
    K_w_segments_set = np.asarray(K_w_segments_set, dtype=int)
    if J_x_segments_set.size != K_w_segments_set.size:
        raise ValueError("J_x_segments_set and K_w_segments_set must have the same length.")

    # Build all ordered pairs (J1, J2) with J2 > J1
    J_pairs = []
    for a in J_x_segments_set:
        for b in J_x_segments_set:
            if b > a:
                J_pairs.append((int(a), int(b)))
    J_pairs = np.array(J_pairs, dtype=int)
    num_pairs = J_pairs.shape[0]

    # Map each J in X-set to its corresponding K in W-set by index matching on value
    # (assumes both sets are aligned by value; duplicates would be ambiguous)
    J_to_K = {int(J_x_segments_set[i]): int(K_w_segments_set[i]) for i in range(J_x_segments_set.size)}
    K_pairs = np.array([(J_to_K[a], J_to_K[b]) for (a, b) in J_pairs], dtype=int)

    # Storage
    Z_sup = np.empty(num_pairs)
    Z_sup_boot = np.full((boot_num, num_pairs), np.nan)

    it_pairs = trange(num_pairs, disable=not progress, desc="complexity determination")

    for ii in it_pairs:
        J1_seg, J2_seg = int(J_pairs[ii, 0]), int(J_pairs[ii, 1])
        K1_seg, K2_seg = int(K_pairs[ii, 0]), int(K_pairs[ii, 1])

        if K_w_degree < J_x_degree:
            raise ValueError("K_w_degree must be >= J_x_degree")

        # --- W bases (J1/J2 paths)
        if K_w_degree == 0:
            B_w_J1 = B_w_J2 = np.ones((W.shape[0], 1))
        else:
            K_mat_W1 = np.column_stack([np.repeat(K_w_degree, W.shape[1]), np.repeat(K1_seg, W.shape[1])])
            K_mat_W2 = np.column_stack([np.repeat(K_w_degree, W.shape[1]), np.repeat(K2_seg, W.shape[1])])
            B_w_J1 = prodspline(x=W, K=K_mat_W1, knots=knots, basis=basis, x_min=W_min, x_max=W_max)
            B_w_J2 = prodspline(x=W, K=K_mat_W2, knots=knots, basis=basis, x_min=W_min, x_max=W_max)
            if basis != "tensor":
                B_w_J1 = np.column_stack([np.ones((W.shape[0], 1)), B_w_J1])
                B_w_J2 = np.column_stack([np.ones((W.shape[0], 1)), B_w_J2])

        # --- X bases (J1/J2 paths) at sample + grid eval
        if J_x_degree == 0:
            Psi_x_J1 = Psi_x_J2 = np.ones((X.shape[0], 1))
            Psi_x_J1_eval = Psi_x_J2_eval = np.ones((X_grid.shape[0], 1))
        else:
            K_mat_X1 = np.column_stack([np.repeat(J_x_degree, X.shape[1]), np.repeat(J1_seg, X.shape[1])])
            K_mat_X2 = np.column_stack([np.repeat(J_x_degree, X.shape[1]), np.repeat(J2_seg, X.shape[1])])
            Psi_x_J1 = prodspline(x=X, K=K_mat_X1, knots=knots, basis=basis, x_min=X_min, x_max=X_max)
            Psi_x_J2 = prodspline(x=X, K=K_mat_X2, knots=knots, basis=basis, x_min=X_min, x_max=X_max)
            Psi_x_J1_eval = prodspline(x=X, xeval=X_grid, K=K_mat_X1, knots=knots, basis=basis, x_min=X_min, x_max=X_max)
            Psi_x_J2_eval = prodspline(x=X, xeval=X_grid, K=K_mat_X2, knots=knots, basis=basis, x_min=X_min, x_max=X_max)
            if basis != "tensor":
                Psi_x_J1 = np.column_stack([np.ones((X.shape[0], 1)), Psi_x_J1])
                Psi_x_J2 = np.column_stack([np.ones((X.shape[0], 1)), Psi_x_J2])
                Psi_x_J1_eval = np.column_stack([np.ones((X_grid.shape[0], 1)), Psi_x_J1_eval])
                Psi_x_J2_eval = np.column_stack([np.ones((X_grid.shape[0], 1)), Psi_x_J2_eval])

        # --- 2SLS algebra per J ---
        BtB_inv_1 = _ginv(B_w_J1.T @ B_w_J1)
        P_B1 = B_w_J1 @ BtB_inv_1 @ B_w_J1.T
        PsiTP1 = Psi_x_J1.T @ P_B1
        tmp1 = _ginv(PsiTP1 @ Psi_x_J1) @ PsiTP1
        beta1 = tmp1 @ Y
        U1 = Y - (Psi_x_J1 @ beta1)
        hhat_J1 = Psi_x_J1_eval @ beta1

        BtB_inv_2 = _ginv(B_w_J2.T @ B_w_J2)
        P_B2 = B_w_J2 @ BtB_inv_2 @ B_w_J2.T
        PsiTP2 = Psi_x_J2.T @ P_B2
        tmp2 = _ginv(PsiTP2 @ Psi_x_J2) @ PsiTP2
        beta2 = tmp2 @ Y
        U2 = Y - (Psi_x_J2 @ beta2)
        hhat_J2 = Psi_x_J2_eval @ beta2

        # --- Asymptotic variances ---
        Wgt1 = (tmp1.T * U1.ravel()[:, None])     # n x J1
        D1 = Wgt1.T @ Wgt1                         # J1 x J1
        asy_var_J1 = np.sum((Psi_x_J1_eval @ D1) * Psi_x_J1_eval, axis=1)  # (neval,)

        Wgt2 = (tmp2.T * U2.ravel()[:, None])     # n x J2
        D2 = Wgt2.T @ Wgt2                         # J2 x J2
        asy_var_J2 = np.sum((Psi_x_J2_eval @ D2) * Psi_x_J2_eval, axis=1)

        # --- Covariance term ---
        # A1: J1 x n, B2: n x J2
        A1 = (tmp1.T * U1.ravel()[:, None]).T      # J1 x n
        B2 = (tmp2.T * U2.ravel()[:, None])        # n x J2
        M12 = A1 @ B2                               # J1 x J2
        S = (Psi_x_J1_eval @ M12)                   # neval x J2
        asy_cov_J1J2 = np.sum(S * Psi_x_J2_eval, axis=1)  # (neval,)

        # denom (vector) and sup t-stat
        asy_se = np.sqrt(np.maximum(0.0, asy_var_J1 + asy_var_J2 - 2.0 * asy_cov_J1J2))
        Z_sup[ii] = float(np.max(np.abs((hhat_J1.ravel() - hhat_J2.ravel()) / _nzd(asy_se))))

        # --- Bootstrap sup t (same draws across columns not needed here) ---
        it = trange(boot_num, disable=not progress, desc=f"boot sup t ({ii+1}/{num_pairs})")
        for b in it:
            z = np.random.standard_normal(n)
            num = (Psi_x_J1_eval @ (tmp1 @ (U1.ravel() * z).reshape(-1, 1))) \
                - (Psi_x_J2_eval @ (tmp2 @ (U2.ravel() * z).reshape(-1, 1)))
            Z_sup_boot[b, ii] = np.max(np.abs(num.ravel() / _nzd(asy_se)))

    # Max over pairs for each bootstrap draw -> critical value
    Z_boot = np.nanmax(Z_sup_boot, axis=1)
    # R: quantile(..., type=5). We use NumPy's default linear interpolation.
    theta_star = float(np.quantile(Z_boot, 1 - alpha, method="linear"))

    # ----- Lepski choice -----
    num_J = J_x_segments_set.size
    test_mat = np.full((num_J, num_J), np.nan)
    # Fill only upper triangle (j2 > j1) by mapping pairs back to indices
    J_index = {int(v): i for i, v in enumerate(J_x_segments_set)}
    for ii, (j1, j2) in enumerate(J_pairs):
        r = J_index[int(j1)]
        c = J_index[int(j2)]
        test_mat[r, c] = 1.0 if Z_sup[ii] <= 1.1 * theta_star else 0.0

    # Per-row product over strictly larger columns; use NA rules like R:
    test_vec = np.full(num_J, np.nan)
    for i in range(num_J - 1):
        row = test_mat[i, i + 1 :]
        if row.size == 0:
            test_vec[i] = np.nan
        else:
            valid = row[~np.isnan(row)]
            if valid.size == 0:
                test_vec[i] = np.nan
            else:
                test_vec[i] = 1.0 if np.all(valid == 1.0) else 0.0

    # Choose segment J.seg
    if np.all(np.isnan(test_vec) | (test_vec == 0.0)):
        J_seg = int(np.max(J_x_segments_set))
    elif np.any(test_vec == 1.0):
        # smallest J for which row passes
        first_ok = int(np.where(test_vec == 1.0)[0][0])
        J_seg = int(J_x_segments_set[first_ok])
    else:
        J_seg = int(np.min(J_x_segments_set))

    # Dimension for chosen J
    J_hat = int(dimbs(basis=basis,
                      degree=[J_x_degree] * p,
                      segments=[J_seg] * p))

    # Truncated (second-largest) segments and its "dimension" proxy
    if num_J >= 2:
        J_seg_n = int(np.sort(J_x_segments_set)[-2])
    else:
        J_seg_n = int(J_seg)
    J_hat_n = int((J_seg_n + J_x_degree) ** p)

    # Final J.x.seg, J.tilde, K.w.seg (paired by matching J)
    J_x_seg = int(min(J_seg, J_seg_n))
    J_tilde = int(min(J_hat, J_hat_n))
    K_w_seg = int(J_to_K[J_x_seg])

    return dict(
        J.tilde=J_tilde,
        J.hat=J_hat,
        J.hat.n=J_hat_n,
        J.x.seg=J_x_seg,
        K.w.seg=K_w_seg,
        theta.star=theta_star,
    )


In [None]:
import numpy as np
from typing import Optional, Iterable, Dict, Any, Callable


# --------- small helpers (R analogues) ---------

def _match_arg(val: str, choices, name: str) -> str:
    choices = list(choices)
    if val not in choices:
        raise ValueError(f"{name} must be one of {choices}, got {val!r}.")
    return val

def _ginv(A: np.ndarray) -> np.ndarray:
    return np.linalg.pinv(A, rcond=1e-12)

def _is_fullrank(M: np.ndarray) -> bool:
    if M.ndim == 1:
        M = M[:, None]
    return np.linalg.matrix_rank(M) == M.shape[1]

def _as_colvec(y: Iterable[float]) -> np.ndarray:
    y = np.asarray(y, dtype=float).ravel()
    return y.reshape(-1, 1)

def _nzd(x: np.ndarray, eps: float = 1e-12) -> np.ndarray:
    # Non-zero denominator safeguard (R's NZD)
    return np.where(np.abs(x) < eps, eps, x)

def _identical(A: np.ndarray, B: np.ndarray) -> bool:
    return np.array_equal(np.asarray(A), np.asarray(B))



In [None]:
import numpy as np
from typing import Optional, Dict, Any, Callable


def npiv_choose_J(
    Y,
    X: np.ndarray,
    W: np.ndarray,
    X_grid: Optional[np.ndarray] = None,
    J_x_degree: int = 3,
    K_w_degree: int = 4,
    K_w_smooth: int = 2,
    knots: str = "uniform",                 # {"uniform","quantiles"}
    basis: str = "tensor",                  # {"tensor","additive","glp"}
    X_min: Optional[float] = None,
    X_max: Optional[float] = None,
    W_min: Optional[float] = None,
    W_max: Optional[float] = None,
    grid_num: int = 50,
    boot_num: int = 99,
    check_is_fullrank: bool = False,
    progress: bool = True,
    *,
    prodspline: Callable[..., np.ndarray],
    dimbs: Callable[..., int],
) -> Dict[str, Any]:
    """
    Python port of R's `npiv_choose_J`.

    Parameters
    ----------
    (Same meaning as in R.)
    You must inject:
      - prodspline(x, K, knots, basis, x_min, x_max, xeval=None, deriv_index=None, deriv=None) -> ndarray
      - dimbs(basis, degree, segments) -> int

    Returns
    -------
    dict with:
      J.hat.max, J.hat.n, J.hat, J.tilde, J.x.seg, K.w.seg,
      J.x.segments.set, K.w.segments.set, theta.star
    """
    # Step 1: compute \hat{J}_max and the data-driven grids of segments for X and W
    tmp1 = npiv_Jhat_max(
        X=X,
        W=W,
        J_x_degree=J_x_degree,
        K_w_degree=K_w_degree,
        K_w_smooth=K_w_smooth,
        knots=knots,
        basis=basis,
        X_min=X_min,
        X_max=X_max,
        W_min=W_min,
        W_max=W_max,
        check_is_fullrank=check_is_fullrank,
        progress=progress,
        prodspline=prodspline,
        dimbs=dimbs,
    )

    # Step 2: run Lepski/bootstrapped choice over that grid
    tmp2 = npiv_J(
        Y=Y,
        X=X,
        W=W,
        X_grid=X_grid,
        J_x_degree=J_x_degree,
        K_w_degree=K_w_degree,
        J_x_segments_set=tmp1["J.x.segments.set"],
        K_w_segments_set=tmp1["K.w.segments.set"],
        knots=knots,
        basis=basis,
        X_min=X_min,
        X_max=X_max,
        W_min=W_min,
        W_max=W_max,
        grid_num=grid_num,
        boot_num=boot_num,
        alpha=tmp1["alpha.hat"],
        check_is_fullrank=check_is_fullrank,
        progress=progress,
        prodspline=prodspline,
        dimbs=dimbs,
    )

    # Step 3: return combined results
    return {
        "J.hat.max": tmp1["J.hat.max"],
        "J.hat.n": tmp2["J.hat.n"],
        "J.hat": tmp2["J.hat"],
        "J.tilde": tmp2["J.tilde"],
        "J.x.seg": tmp2["J.x.seg"],
        "K.w.seg": tmp2["K.w.seg"],
        "J.x.segments.set": tmp1["J.x.segments.set"],
        "K.w.segments.set": tmp1["K.w.segments.set"],
        "theta.star": tmp2["theta.star"],
    }


In [5]:
# Example for tensor product:
n = 4
X1 = np.column_stack([np.ones(n), np.linspace(0,1,n)])
X2 = np.column_stack([np.ones(n), np.linspace(1,2,n), np.linspace(1,2,n)**2])
T = tensor_prod_model_matrix([X1, X2])
print(T.shape)  # (n, 2*3) = (4, 6)

# Example wrapper call:
def dummy_npiv_est(**kw):
    # pretend we fit a model; return something minimal
    return {"coef": np.array([1.0]), "se": np.array([0.1])}

res = npiv_default(
    Y=np.random.randn(n),
    X=X1,
    W=X2,
    npiv_est=dummy_npiv_est,
)
print(res["ptm"], res["_class"], res["coef"])


(4, 6)
1.3399985618889332e-05 npiv [1.]
