From 73b4a4bd4b55cf4599d2743544ac68d490242296 Mon Sep 17 00:00:00 2001 From: Peng Zheng Date: Wed, 27 Mar 2024 15:21:20 +0800 Subject: [PATCH] update the type hint --- src/regmod/composite_models/base.py | 74 +++---- src/regmod/function.py | 82 +++---- src/regmod/models/gaussian.py | 74 +++---- src/regmod/models/model.py | 297 ++++++++++++++------------ src/regmod/models/negativebinomial.py | 64 +++--- src/regmod/models/pogit.py | 32 ++- src/regmod/models/poisson.py | 70 +++--- src/regmod/models/utils.py | 16 +- src/regmod/models/weibull.py | 48 +++-- src/regmod/optimizer.py | 92 ++++---- src/regmod/parameter.py | 201 +++++++++-------- src/regmod/prior.py | 66 +++--- src/regmod/utils.py | 61 +++--- src/regmod/variable.py | 141 ++++++------ 14 files changed, 672 insertions(+), 646 deletions(-) diff --git a/src/regmod/composite_models/base.py b/src/regmod/composite_models/base.py index 2b1f476..ddcdca3 100644 --- a/src/regmod/composite_models/base.py +++ b/src/regmod/composite_models/base.py @@ -1,6 +1,7 @@ """ Base Model """ + import logging from copy import deepcopy from typing import Dict, List, Optional @@ -19,15 +20,9 @@ logger = logging.getLogger(__name__) link_funs = { - "gaussian": fun_dict[ - GaussianModel.default_param_specs["mu"]["inv_link"] - ].inv_fun, - "poisson": fun_dict[ - PoissonModel.default_param_specs["lam"]["inv_link"] - ].inv_fun, - "binomial": fun_dict[ - BinomialModel.default_param_specs["p"]["inv_link"] - ].inv_fun, + "gaussian": fun_dict[GaussianModel.default_param_specs["mu"]["inv_link"]].inv_fun, + "poisson": fun_dict[PoissonModel.default_param_specs["lam"]["inv_link"]].inv_fun, + "binomial": fun_dict[BinomialModel.default_param_specs["p"]["inv_link"]].inv_fun, } model_constructors = { @@ -86,19 +81,23 @@ class BaseModel(NodeModel): Overwrite the append function in NodeModel. """ - def __init__(self, - name: str, - y: str, - variables: List[Variable], - df: Optional[pd.DataFrame] = None, - weights: str = "weights", - mtype: str = "gaussian", - prior_mask: Optional[Dict] = None, - **param_specs): + def __init__( + self, + name: str, + y: str, + variables: List[Variable], + df: Optional[DataFrame] = None, + weights: str = "weights", + mtype: str = "gaussian", + prior_mask: Optional[Dict] = None, + **param_specs, + ): super().__init__(name) - if any(mtype not in model_config - for model_config in (link_funs, model_constructors)): + if any( + mtype not in model_config + for model_config in (link_funs, model_constructors) + ): raise ValueError(f"Not supported model type {mtype}") data = deepcopy(data) variables = list(deepcopy(variables)) @@ -108,9 +107,7 @@ def __init__(self, self.df = df self.weights = weights self.variables = {v.name: v for v in variables} - self.param_specs = {"variables": variables, - "use_offset": True, - **param_specs} + self.param_specs = {"variables": variables, "use_offset": True, **param_specs} self.model = None self.prior_mask = {} if prior_mask is None else prior_mask @@ -149,10 +146,7 @@ def fit(self, **fit_options): if self.model is None: model_constructor = model_constructors[self.mtype] self.model = model_constructor( - self.y, - df=self.df, - weights=self.weights, - param_specs=self.param_specs + self.y, df=self.df, weights=self.weights, param_specs=self.param_specs ) self.model.fit(**fit_options) message = f"fit_node;finish;{self.level};{self.name};" @@ -179,17 +173,19 @@ def get_draws(self, df: DataFrame = None, size: int = 1000) -> DataFrame: pred_data = self.model.df.copy() pred_data.attach_df(df) - coefs_draws = np.random.multivariate_normal(self.model.opt_coefs, - self.model.opt_vcov, - size=size) - draws = np.vstack([ - self.model.params[0].get_param(coefs_draw, pred_data) - for coefs_draw in coefs_draws - ]) - df_draws = pd.DataFrame( + coefs_draws = np.random.multivariate_normal( + self.model.opt_coefs, self.model.opt_vcov, size=size + ) + draws = np.vstack( + [ + self.model.params[0].get_param(coefs_draw, pred_data) + for coefs_draw in coefs_draws + ] + ) + df_draws = DataFrame( draws.T, columns=[f"{self.col_value}_{i}" for i in range(size)], - index=df.index + index=df.index, ) return pd.concat([df, df_draws], axis=1) @@ -216,8 +212,7 @@ def get_posterior(self) -> Dict: # use minimum standard deviation of the posterior distribution sd = np.maximum(0.1, np.sqrt(np.diag(self.model.opt_vcov))) vnames = [v.name for v in self.param_specs["variables"]] - slices = sizes_to_slices([self.variables[name].size - for name in vnames]) + slices = sizes_to_slices([self.variables[name].size for name in vnames]) return { name: GaussianPrior(mean=mean[slices[i]], sd=sd[slices[i]]) for i, name in enumerate(vnames) @@ -240,8 +235,7 @@ def append(self, node: NodeModel, rank: int = 0): primary children. """ if rank >= 1: - raise ValueError(f"{type(self).__name__} can only have primary " - "link.") + raise ValueError(f"{type(self).__name__} can only have primary " "link.") super().append(node, rank=rank) def __repr__(self) -> str: diff --git a/src/regmod/function.py b/src/regmod/function.py index 0b113f3..f818020 100644 --- a/src/regmod/function.py +++ b/src/regmod/function.py @@ -1,8 +1,9 @@ """ Function module """ + from dataclasses import dataclass, field -from typing import Callable +from regmod._typing import Callable import numpy as np @@ -60,8 +61,8 @@ def exp_d2fun(x): def expit_fun(x): neg_indices = x < 0 - z = np.exp(-np.sqrt(x*x)) - y = 1/(1 + z) + z = np.exp(-np.sqrt(x * x)) + y = 1 / (1 + z) if np.isscalar(x): if neg_indices: y = 1 - y @@ -71,15 +72,15 @@ def expit_fun(x): def expit_dfun(x): - z = np.exp(-np.sqrt(x*x)) - y = z/(1 + z)**2 + z = np.exp(-np.sqrt(x * x)) + y = z / (1 + z) ** 2 return y def expit_d2fun(x): neg_indices = x < 0 - z = np.exp(-np.sqrt(x*x)) - y = z*(z - 1)/(z + 1)**3 + z = np.exp(-np.sqrt(x * x)) + y = z * (z - 1) / (z + 1) ** 3 if np.isscalar(x): if neg_indices: y = -y @@ -93,55 +94,54 @@ def log_fun(x): def log_dfun(x): - return 1/x + return 1 / x def log_d2fun(x): - return -1/x**2 + return -1 / x**2 def logit_fun(x): - return np.log(x/(1 - x)) + return np.log(x / (1 - x)) def logit_dfun(x): - return 1/(x*(1 - x)) + return 1 / (x * (1 - x)) def logit_d2fun(x): - return (2*x - 1)/(x*(1 - x))**2 + return (2 * x - 1) / (x * (1 - x)) ** 2 fun_list = [ - SmoothFunction(name="identity", - fun=identity_fun, - inv_fun=identity_fun, - dfun=identity_dfun, - d2fun=identity_d2fun), - SmoothFunction(name="exp", - fun=exp_fun, - inv_fun=log_fun, - dfun=exp_dfun, - d2fun=exp_d2fun), - SmoothFunction(name="expit", - fun=expit_fun, - inv_fun=logit_fun, - dfun=expit_dfun, - d2fun=expit_d2fun), - SmoothFunction(name="log", - fun=log_fun, - inv_fun=exp_fun, - dfun=log_dfun, - d2fun=log_d2fun), - SmoothFunction(name="logit", - fun=logit_fun, - inv_fun=expit_fun, - dfun=logit_dfun, - d2fun=logit_d2fun), + SmoothFunction( + name="identity", + fun=identity_fun, + inv_fun=identity_fun, + dfun=identity_dfun, + d2fun=identity_d2fun, + ), + SmoothFunction( + name="exp", fun=exp_fun, inv_fun=log_fun, dfun=exp_dfun, d2fun=exp_d2fun + ), + SmoothFunction( + name="expit", + fun=expit_fun, + inv_fun=logit_fun, + dfun=expit_dfun, + d2fun=expit_d2fun, + ), + SmoothFunction( + name="log", fun=log_fun, inv_fun=exp_fun, dfun=log_dfun, d2fun=log_d2fun + ), + SmoothFunction( + name="logit", + fun=logit_fun, + inv_fun=expit_fun, + dfun=logit_dfun, + d2fun=logit_d2fun, + ), ] -fun_dict = { - fun.name: fun - for fun in fun_list -} +fun_dict = {fun.name: fun for fun in fun_list} diff --git a/src/regmod/models/gaussian.py b/src/regmod/models/gaussian.py index ad2bc0b..4947834 100644 --- a/src/regmod/models/gaussian.py +++ b/src/regmod/models/gaussian.py @@ -1,14 +1,12 @@ """ Gaussian Model """ -from typing import Callable, List, Tuple import numpy as np -import pandas as pd -from numpy.typing import NDArray from scipy.stats import norm from regmod.optimizer import msca_optimize +from regmod._typing import Callable, NDArray, DataFrame from .model import Model from .utils import model_post_init @@ -18,7 +16,7 @@ class GaussianModel(Model): param_names = ("mu",) default_param_specs = {"mu": {"inv_link": "identity"}} - def attach_df(self, df: pd.DataFrame): + def attach_df(self, df: DataFrame): super().attach_df(df) self.mat[0], self.cmat, self.cvec = model_post_init( self.mat[0], self.uvec, self.linear_umat, self.linear_uvec @@ -34,17 +32,14 @@ def objective(self, coefs: NDArray) -> float: ------- float Objective value. + """ inv_link = self.params[0].inv_link - lin_param = self.params[0].get_lin_param( - coefs, self.df, mat=self.mat[0] - ) + lin_param = self.params[0].get_lin_param(coefs, self.df, mat=self.mat[0]) param = inv_link.fun(lin_param) - weights = self.weights*self.trim_weights - obj_param = weights * 0.5 * ( - param - self.y - )**2 + weights = self.weights * self.trim_weights + obj_param = weights * 0.5 * (param - self.y) ** 2 return obj_param.sum() + self.objective_from_gprior(coefs) def gradient(self, coefs: NDArray) -> NDArray: @@ -59,19 +54,16 @@ def gradient(self, coefs: NDArray) -> NDArray: ------- NDArray Gradient vector. + """ mat = self.mat[0] inv_link = self.params[0].inv_link - lin_param = self.params[0].get_lin_param( - coefs, self.df, mat=self.mat[0] - ) + lin_param = self.params[0].get_lin_param(coefs, self.df, mat=self.mat[0]) param = inv_link.fun(lin_param) dparam = inv_link.dfun(lin_param) - weights = self.weights*self.trim_weights - grad_param = weights * ( - param - self.y - ) * dparam + weights = self.weights * self.trim_weights + grad_param = weights * (param - self.y) * dparam return mat.T.dot(grad_param) + self.gradient_from_gprior(coefs) @@ -87,20 +79,17 @@ def hessian(self, coefs: NDArray) -> NDArray: ------- NDArray Hessian matrix. + """ mat = self.mat[0] inv_link = self.params[0].inv_link - lin_param = self.params[0].get_lin_param( - coefs, self.df, mat=self.mat[0] - ) + lin_param = self.params[0].get_lin_param(coefs, self.df, mat=self.mat[0]) param = inv_link.fun(lin_param) dparam = inv_link.dfun(lin_param) d2param = inv_link.d2fun(lin_param) - weights = self.weights*self.trim_weights - hess_param = weights * ( - dparam**2 + (param - self.y)*d2param - ) + weights = self.weights * self.trim_weights + hess_param = weights * (dparam**2 + (param - self.y) * d2param) scaled_mat = mat.scale_rows(hess_param) hess_mat = mat.T.dot(scaled_mat) @@ -119,47 +108,44 @@ def jacobian2(self, coefs: NDArray) -> NDArray: ------- NDArray Jacobian matrix. + """ mat = self.mat[0] inv_link = self.params[0].inv_link - lin_param = self.params[0].get_lin_param( - coefs, self.df, mat=self.mat[0] - ) + lin_param = self.params[0].get_lin_param(coefs, self.df, mat=self.mat[0]) param = inv_link.fun(lin_param) dparam = inv_link.dfun(lin_param) - weights = self.weights*self.trim_weights + weights = self.weights * self.trim_weights grad_param = weights * (param - self.y) * dparam jacobian = mat.T.scale_cols(grad_param) hess_mat_gprior = type(jacobian)(self.hessian_from_gprior()) jacobian2 = jacobian.dot(jacobian.T) + hess_mat_gprior return jacobian2 - def fit(self, - optimizer: Callable = msca_optimize, - **optimizer_options): + def fit(self, optimizer: Callable = msca_optimize, **optimizer_options): """Fit function. Parameters ---------- optimizer : Callable, optional Model solver, by default scipy_optimize. + """ - super().fit( - optimizer=optimizer, - **optimizer_options - ) + super().fit(optimizer=optimizer, **optimizer_options) - def nll(self, params: List[NDArray]) -> NDArray: - return 0.5*(params[0] - self.y)**2 + def nll(self, params: list[NDArray]) -> NDArray: + return 0.5 * (params[0] - self.y) ** 2 - def dnll(self, params: List[NDArray]) -> List[NDArray]: + def dnll(self, params: list[NDArray]) -> list[NDArray]: return [params[0] - self.y] - def d2nll(self, params: List[NDArray]) -> List[NDArray]: + def d2nll(self, params: list[NDArray]) -> list[NDArray]: return [[np.ones(self.df.shape[0])]] - def get_ui(self, params: List[NDArray], bounds: Tuple[float, float]) -> NDArray: + def get_ui(self, params: list[NDArray], bounds: tuple[float, float]) -> NDArray: mean = params[0] - sd = 1.0/np.sqrt(self.weights) - return [norm.ppf(bounds[0], loc=mean, scale=sd), - norm.ppf(bounds[1], loc=mean, scale=sd)] + sd = 1.0 / np.sqrt(self.weights) + return [ + norm.ppf(bounds[0], loc=mean, scale=sd), + norm.ppf(bounds[1], loc=mean, scale=sd), + ] diff --git a/src/regmod/models/model.py b/src/regmod/models/model.py index 795edc1..5792f05 100644 --- a/src/regmod/models/model.py +++ b/src/regmod/models/model.py @@ -1,18 +1,15 @@ """ Model module """ -from typing import Callable, Dict, List, Optional, Tuple, Union import numpy as np -import pandas as pd -from msca.linalg.matrix import Matrix -from numpy import ndarray from scipy.linalg import block_diag from scipy.sparse import csc_matrix from regmod.optimizer import scipy_optimize from regmod.parameter import Parameter from regmod.utils import sizes_to_slices +from regmod._typing import Callable, NDArray, DataFrame, Matrix class Model: @@ -26,10 +23,10 @@ class Model: Column name for the weight of each observation data Data frame that contains all information - params : Optional[List[Parameter]], optional + params : list[Parameter], optional A list of parameters. Default to None. - param_specs : Optional[Dict[str, Dict]], optional - Dictionary of all parameter specifications. Default to None. + param_specs : dict[str, dict], optional + dictionary of all parameter specifications. Default to None. Raises ------ @@ -40,30 +37,34 @@ class Model: """ - param_names: Tuple[str] = None - default_param_specs: Dict[str, Dict] = None - - def __init__(self, - y: str, - weights: str = "weights", - df: Optional[pd.DataFrame] = None, - params: Optional[List[Parameter]] = None, - param_specs: Optional[Dict[str, Dict]] = None): + param_names: tuple[str] = None + default_param_specs: dict[str, dict] = None + + def __init__( + self, + y: str, + weights: str = "weights", + df: DataFrame | None = None, + params: list[Parameter] | None = None, + param_specs: dict[str, dict] | None = None, + ): if params is None and param_specs is None: raise ValueError("Must provide `params` or `param_specs`") if params is not None: - param_dict = { - param.name: param - for param in params - } - self.params = [param_dict[param_name] - for param_name in self.param_names] + param_dict = {param.name: param for param in params} + self.params = [param_dict[param_name] for param_name in self.param_names] else: - self.params = [Parameter(param_name, - **{**self.default_param_specs[param_name], - **param_specs[param_name]}) - for param_name in self.param_names] + self.params = [ + Parameter( + param_name, + **{ + **self.default_param_specs[param_name], + **param_specs[param_name], + }, + ) + for param_name in self.param_names + ] self._y = y self._weights = weights self.df = df @@ -83,7 +84,7 @@ def __init__(self, self._opt_coefs = None self._opt_vcov = None - def attach_df(self, df: pd.DataFrame): + def attach_df(self, df: DataFrame): self.df = df for param in self.params: param.check_data(df) @@ -104,34 +105,36 @@ def attach_df(self, df: pd.DataFrame): self.weights = self.df[self._weights].to_numpy() @property - def opt_coefs(self) -> Union[None, ndarray]: + def opt_coefs(self) -> NDArray | None: return self._opt_coefs @opt_coefs.setter - def opt_coefs(self, coefs: ndarray): + def opt_coefs(self, coefs: NDArray): coefs = np.asarray(coefs) if coefs.size != self.size: raise ValueError("Coefficients size not match.") self._opt_coefs = coefs @property - def opt_vcov(self) -> Union[None, ndarray]: + def opt_vcov(self) -> NDArray | None: return self._opt_vcov @opt_vcov.setter - def opt_vcov(self, vcov: ndarray): + def opt_vcov(self, vcov: NDArray): vcov = np.asarray(vcov) self._opt_vcov = vcov - def get_vcov(self, coefs: ndarray) -> ndarray: + def get_vcov(self, coefs: NDArray) -> NDArray: hessian = self.hessian(coefs) if isinstance(hessian, Matrix): hessian = hessian.to_numpy() - #We probably don't want to be eigendecomposing + # We probably don't want to be eigendecomposing eig_vals, eig_vecs = np.linalg.eigh(hessian) if np.isclose(eig_vals, 0.0).any(): - raise ValueError("singular Hessian matrix, please add priors or " - "reduce number of variables") + raise ValueError( + "singular Hessian matrix, please add priors or " + "reduce number of variables" + ) inv_hessian = (eig_vecs / eig_vals).dot(eig_vecs.T) jacobian2 = self.jacobian2(coefs) @@ -139,230 +142,234 @@ def get_vcov(self, coefs: ndarray) -> ndarray: jacobian2 = jacobian2.to_numpy() eig_vals = np.linalg.eigvalsh(jacobian2) if np.isclose(eig_vals, 0.0).any(): - raise ValueError("singular Jacobian matrix, please add priors or " - "reduce number of variables") + raise ValueError( + "singular Jacobian matrix, please add priors or " + "reduce number of variables" + ) vcov = inv_hessian.dot(jacobian2) vcov = inv_hessian.dot(vcov.T) return vcov - def get_mat(self) -> List[ndarray]: + def get_mat(self) -> list[NDArray]: """Get the design matrices. Returns ------- - List[ndarray] + list[NDArray] The design matrices. """ return [param.get_mat(self.df) for param in self.params] - def get_uvec(self) -> ndarray: + def get_uvec(self) -> NDArray: """Get the direct Uniform prior array. Returns ------- - ndarray + NDArray The direct Uniform prior array. """ return np.hstack([param.get_uvec() for param in self.params]) - def get_gvec(self) -> ndarray: + def get_gvec(self) -> NDArray: """Get the direct Gaussian prior array. Returns ------- - ndarray + NDArray The direct Gaussian prior array. """ return np.hstack([param.get_gvec() for param in self.params]) - def get_linear_uvec(self) -> ndarray: + def get_linear_uvec(self) -> NDArray: """Get the linear Uniform prior array. Returns ------- - ndarray + NDArray The linear Uniform prior array. """ return np.hstack([param.get_linear_uvec() for param in self.params]) - def get_linear_gvec(self) -> ndarray: + def get_linear_gvec(self) -> NDArray: """Get the linear Gaussian prior array. Returns ------- - ndarray + NDArray The linear Gaussian prior array. """ return np.hstack([param.get_linear_gvec() for param in self.params]) - def get_linear_umat(self) -> ndarray: + def get_linear_umat(self) -> NDArray: """Get the linear Uniform prior design matrix. Returns ------- - ndarray + NDArray The linear Uniform prior design matrix. """ return block_diag(*[param.get_linear_umat() for param in self.params]) - def get_linear_gmat(self) -> ndarray: + def get_linear_gmat(self) -> NDArray: """Get the linear Gaussian prior design matrix. Returns ------- - ndarray + NDArray The linear Gaussian prior design matrix. """ return block_diag(*[param.get_linear_gmat() for param in self.params]) - def split_coefs(self, coefs: ndarray) -> List[ndarray]: + def split_coefs(self, coefs: NDArray) -> list[NDArray]: """Split coefficients into pieces for each parameter. Parameters ---------- - coefs : ndarray + coefs : NDArray The coefficients array. Returns ------- - List[ndarray] + list[NDArray] A list of splitted coefficients for each parameter. """ assert len(coefs) == self.size return [coefs[index] for index in self.indices] - def get_params(self, coefs: ndarray) -> List[ndarray]: + def get_params(self, coefs: NDArray) -> list[NDArray]: """Get the parameters. Parameters ---------- - coefs : ndarray + coefs : NDArray The coefficients array. Returns ------- - List[ndarray] + list[NDArray] The parameters. """ coefs = self.split_coefs(coefs) - return [param.get_param(coefs[i], self.df, mat=self.mat[i]) - for i, param in enumerate(self.params)] + return [ + param.get_param(coefs[i], self.df, mat=self.mat[i]) + for i, param in enumerate(self.params) + ] - def get_dparams(self, coefs: ndarray) -> List[ndarray]: + def get_dparams(self, coefs: NDArray) -> list[NDArray]: """Get the derivative of the parameters. Parameters ---------- - coefs : ndarray + coefs : NDArray The coefficients array. Returns ------- - List[ndarray] + list[NDArray] The derivative of the parameters. """ coefs = self.split_coefs(coefs) - return [param.get_dparam(coefs[i], self.df, mat=self.mat[i]) - for i, param in enumerate(self.params)] + return [ + param.get_dparam(coefs[i], self.df, mat=self.mat[i]) + for i, param in enumerate(self.params) + ] - def get_d2params(self, coefs: ndarray) -> List[ndarray]: + def get_d2params(self, coefs: NDArray) -> list[NDArray]: """Get the second order derivative of the parameters. Parameters ---------- - coefs : ndarray + coefs : NDArray The coefficients array. Returns ------- - List[ndarray] + list[NDArray] The second order derivative of the parameters. """ coefs = self.split_coefs(coefs) - return [param.get_d2param(coefs[i], self.df, mat=self.mat[i]) - for i, param in enumerate(self.params)] + return [ + param.get_d2param(coefs[i], self.df, mat=self.mat[i]) + for i, param in enumerate(self.params) + ] - def nll(self, params: List[ndarray]) -> ndarray: + def nll(self, params: list[NDArray]) -> NDArray: """Negative log likelihood. Parameters ---------- - params : List[ndarray] + params : list[NDArray] A list of parameters. Returns ------- - ndarray + NDArray An array of negative log likelihood for each observation. """ raise NotImplementedError() - def dnll(self, params: List[ndarray]) -> List[ndarray]: + def dnll(self, params: list[NDArray]) -> list[NDArray]: """Derivative of negative the log likelihood. Parameters ---------- - params : List[ndarray] + params : list[NDArray] A list of parameters. Returns ------- - List[ndarray] + list[NDArray] A list of derivatives for each parameter and each observation. """ raise NotImplementedError() - def d2nll(self, params: List[ndarray]) -> List[List[ndarray]]: + def d2nll(self, params: list[NDArray]) -> list[list[NDArray]]: """Second order derivative of the negative log likelihood. Parameters ---------- - params : List[ndarray] + params : list[NDArray] A list of parameters. Returns ------- - List[List[ndarray]] + list[list[NDArray]] A list of list of second order derivatives for each parameter and each observation. """ raise NotImplementedError() - def get_ui(self, - params: List[ndarray], - bounds: Tuple[float, float]) -> ndarray: + def get_ui(self, params: list[NDArray], bounds: tuple[float, float]) -> NDArray: """Get uncertainty interval, used for the trimming algorithm. Parameters ---------- - params : List[ndarray] + params : list[NDArray] A list of parameters - bounds : Tuple[float, float] + bounds : tuple[float, float] Quantile bounds for the uncertainty interval. Returns ------- - ndarray + NDArray An array with uncertainty interval for each observation. """ raise NotImplementedError() - def detect_outliers(self, - coefs: ndarray, - bounds: Tuple[float, float]) -> ndarray: + def detect_outliers(self, coefs: NDArray, bounds: tuple[float, float]) -> NDArray: """Detect outliers. Parameters ---------- - coefs : ndarray + coefs : NDArray Given coefficients. - bounds : Tuple[float, float] + bounds : tuple[float, float] Quantile bounds for the inliers. Returns ------- - ndarray + NDArray A boolean array that indicate if observations are outliers. """ params = self.get_params(coefs) @@ -370,12 +377,12 @@ def detect_outliers(self, obs = self.df[self.y].to_numpy() return (obs < ui[0]) | (obs > ui[1]) - def objective_from_gprior(self, coefs: ndarray) -> float: + def objective_from_gprior(self, coefs: NDArray) -> float: """Objective function from the Gaussian priors. Parameters ---------- - coefs : ndarray + coefs : NDArray Given coefficients. Returns @@ -383,54 +390,61 @@ def objective_from_gprior(self, coefs: ndarray) -> float: float Objective function value. """ - val = 0.5*np.sum((coefs - self.gvec[0])**2/self.gvec[1]**2) + val = 0.5 * np.sum((coefs - self.gvec[0]) ** 2 / self.gvec[1] ** 2) if self.linear_gvec.size > 0: - val += 0.5*np.sum((self.linear_gmat.dot(coefs) - self.linear_gvec[0])**2/self.linear_gvec[1]**2) + val += 0.5 * np.sum( + (self.linear_gmat.dot(coefs) - self.linear_gvec[0]) ** 2 + / self.linear_gvec[1] ** 2 + ) return val - def gradient_from_gprior(self, coefs: ndarray) -> ndarray: + def gradient_from_gprior(self, coefs: NDArray) -> NDArray: """Graident function from the Gaussian priors. Parameters ---------- - coefs : ndarray + coefs : NDArray Given coefficients. Returns ------- - ndarray + NDArray Graident vector. """ - grad = (coefs - self.gvec[0])/self.gvec[1]**2 + grad = (coefs - self.gvec[0]) / self.gvec[1] ** 2 if self.linear_gvec.size > 0: - grad += (self.linear_gmat.T/self.linear_gvec[1]**2).dot(self.linear_gmat.dot(coefs) - self.linear_gvec[0]) + grad += (self.linear_gmat.T / self.linear_gvec[1] ** 2).dot( + self.linear_gmat.dot(coefs) - self.linear_gvec[0] + ) return grad - def hessian_from_gprior(self) -> ndarray: + def hessian_from_gprior(self) -> NDArray: """Hessian matrix from the Gaussian prior. Returns ------- - ndarray + NDArray Hessian matrix. """ - hess = np.diag(1.0/self.gvec[1]**2) + hess = np.diag(1.0 / self.gvec[1] ** 2) if self.linear_gvec.size > 0: - hess += (self.linear_gmat.T/self.linear_gvec[1]**2).dot(self.linear_gmat) + hess += (self.linear_gmat.T / self.linear_gvec[1] ** 2).dot( + self.linear_gmat + ) return hess - def get_nll_terms(self, coefs: ndarray) -> ndarray: + def get_nll_terms(self, coefs: NDArray) -> NDArray: params = self.get_params(coefs) nll_terms = self.nll(params) - nll_terms = self.weights*nll_terms + nll_terms = self.weights * nll_terms return nll_terms - def objective(self, coefs: ndarray) -> float: + def objective(self, coefs: NDArray) -> float: """Objective function. Parameters ---------- - coefs : ndarray + coefs : NDArray Given coefficients. Returns @@ -439,42 +453,40 @@ def objective(self, coefs: ndarray) -> float: Objective value. """ nll_terms = self.get_nll_terms(coefs) - return self.trim_weights.dot(nll_terms) + \ - self.objective_from_gprior(coefs) + return self.trim_weights.dot(nll_terms) + self.objective_from_gprior(coefs) - def gradient(self, coefs: ndarray) -> ndarray: + def gradient(self, coefs: NDArray) -> NDArray: """Gradient function. Parameters ---------- - coefs : ndarray + coefs : NDArray Given coefficients. Returns ------- - ndarray + NDArray Gradient vector. """ params = self.get_params(coefs) dparams = self.get_dparams(coefs) grad_params = self.dnll(params) - weights = self.weights*self.trim_weights - return np.hstack([ - dparams[i].T.dot(weights*grad_params[i]) - for i in range(self.num_params) - ]) + self.gradient_from_gprior(coefs) + weights = self.weights * self.trim_weights + return np.hstack( + [dparams[i].T.dot(weights * grad_params[i]) for i in range(self.num_params)] + ) + self.gradient_from_gprior(coefs) - def hessian(self, coefs: ndarray) -> ndarray: + def hessian(self, coefs: NDArray) -> NDArray: """Hessian function. Parameters ---------- - coefs : ndarray + coefs : NDArray Given coefficients. Returns ------- - ndarray + NDArray Hessian matrix. """ params = self.get_params(coefs) @@ -482,43 +494,42 @@ def hessian(self, coefs: ndarray) -> ndarray: d2params = self.get_d2params(coefs) grad_params = self.dnll(params) hess_params = self.d2nll(params) - weights = self.weights*self.trim_weights + weights = self.weights * self.trim_weights hess = [ - [(dparams[i].T*(weights*hess_params[i][j])).dot(dparams[j]) - for j in range(self.num_params)] + [ + (dparams[i].T * (weights * hess_params[i][j])).dot(dparams[j]) + for j in range(self.num_params) + ] for i in range(self.num_params) ] for i in range(self.num_params): - hess[i][i] += np.tensordot(weights*grad_params[i], d2params[i], axes=1) + hess[i][i] += np.tensordot(weights * grad_params[i], d2params[i], axes=1) return np.block(hess) + self.hessian_from_gprior() - def jacobian2(self, coefs: ndarray) -> ndarray: + def jacobian2(self, coefs: NDArray) -> NDArray: """Jacobian function. Parameters ---------- - coefs : ndarray + coefs : NDArray Given coefficients. Returns ------- - ndarray + NDArray Jacobian matrix. """ params = self.get_params(coefs) dparams = self.get_dparams(coefs) grad_params = self.dnll(params) - weights = self.weights*self.trim_weights - jacobian = np.vstack([ - dparams[i].T*(weights*grad_params[i]) - for i in range(self.num_params) - ]) + weights = self.weights * self.trim_weights + jacobian = np.vstack( + [dparams[i].T * (weights * grad_params[i]) for i in range(self.num_params)] + ) jacobian2 = jacobian.dot(jacobian.T) + self.hessian_from_gprior() return jacobian2 - def fit(self, - optimizer: Callable = scipy_optimize, - **optimizer_options): + def fit(self, optimizer: Callable = scipy_optimize, **optimizer_options): """Fit function. Parameters @@ -533,18 +544,18 @@ def fit(self, return optimizer(self, **optimizer_options) - def predict(self, df: Optional[pd.DataFrame] = None) -> pd.DataFrame: + def predict(self, df: DataFrame | None = None) -> DataFrame: """Predict the parameters. Parameters ---------- - df : pd.DataFrame, optional + df : DataFrame, optional Data Frame with prediction data. If it is None, using the training data. Returns ------- - pd.DataFrame + DataFrame Data frame with predicted parameters. """ if df is None: @@ -558,7 +569,9 @@ def predict(self, df: Optional[pd.DataFrame] = None) -> pd.DataFrame: return df def __repr__(self) -> str: - return (f"{type(self).__name__}(" - f"bnum_y={self.df.shape[0]}, " - f"num_params={self.num_params}, " - f"size={self.size})") + return ( + f"{type(self).__name__}(" + f"bnum_y={self.df.shape[0]}, " + f"num_params={self.num_params}, " + f"size={self.size})" + ) diff --git a/src/regmod/models/negativebinomial.py b/src/regmod/models/negativebinomial.py index 49b802e..d8d59e0 100644 --- a/src/regmod/models/negativebinomial.py +++ b/src/regmod/models/negativebinomial.py @@ -1,44 +1,54 @@ """ Negative Binomial Model """ -# pylint: disable=no-name-in-module -from typing import List, Tuple +# pylint: disable=no-name-in-module import numpy as np -import pandas as pd -from numpy import ndarray from scipy.special import digamma, loggamma, polygamma from scipy.stats import nbinom +from regmod._typing import NDArray, DataFrame from .model import Model class NegativeBinomialModel(Model): - param_names: List[str] = ("n", "p") - default_param_specs = {"n": {"inv_link": "exp"}, - "p": {"inv_link": "expit"}} + param_names: list[str] = ("n", "p") + default_param_specs = {"n": {"inv_link": "exp"}, "p": {"inv_link": "expit"}} - def attach_df(self, df: pd.DataFrame): + def attach_df(self, df: DataFrame): super().attach_df(df) if not np.all(self.y >= 0): - raise ValueError("Negative-Binomial model requires observations to be non-negative.") - - def nll(self, params: List[ndarray]) -> ndarray: - return -(loggamma(params[0] + self.y) - - loggamma(params[0]) + - self.y*np.log(1 - params[1]) + - params[0]*np.log(params[1])) - - def dnll(self, params: List[ndarray]) -> List[ndarray]: - return [-(digamma(params[0] + self.y) - digamma(params[0]) + np.log(params[1])), - self.y/(1 - params[1]) - params[0]/params[1]] - - def d2nll(self, params: List[ndarray]) -> List[List[ndarray]]: - return [[polygamma(1, params[0]) - polygamma(1, params[0] + self.y), -1/params[1]], - [-1/params[1], params[0]/params[1]**2 + self.y/(1 - params[1])**2]] - - def get_ui(self, params: List[ndarray], bounds: Tuple[float, float]) -> np.ndarray: + raise ValueError( + "Negative-Binomial model requires observations to be non-negative." + ) + + def nll(self, params: list[NDArray]) -> NDArray: + return -( + loggamma(params[0] + self.y) + - loggamma(params[0]) + + self.y * np.log(1 - params[1]) + + params[0] * np.log(params[1]) + ) + + def dnll(self, params: list[NDArray]) -> list[NDArray]: + return [ + -(digamma(params[0] + self.y) - digamma(params[0]) + np.log(params[1])), + self.y / (1 - params[1]) - params[0] / params[1], + ] + + def d2nll(self, params: list[NDArray]) -> list[list[NDArray]]: + return [ + [ + polygamma(1, params[0]) - polygamma(1, params[0] + self.y), + -1 / params[1], + ], + [ + -1 / params[1], + params[0] / params[1] ** 2 + self.y / (1 - params[1]) ** 2, + ], + ] + + def get_ui(self, params: list[NDArray], bounds: tuple[float, float]) -> NDArray: n = params[0] p = params[1] - return [nbinom.ppf(bounds[0], n=n, p=p), - nbinom.ppf(bounds[1], n=n, p=p)] + return [nbinom.ppf(bounds[0], n=n, p=p), nbinom.ppf(bounds[1], n=n, p=p)] diff --git a/src/regmod/models/pogit.py b/src/regmod/models/pogit.py index e47e326..4965b22 100644 --- a/src/regmod/models/pogit.py +++ b/src/regmod/models/pogit.py @@ -1,40 +1,34 @@ """ Pogit Model """ -from typing import List, Tuple import numpy as np -import pandas as pd -from numpy import ndarray from scipy.stats import poisson +from regmod._typing import NDArray, DataFrame from .model import Model class PogitModel(Model): param_names = ("p", "lam") - default_param_specs = {"p": {"inv_link": "expit"}, - "lam": {"inv_link": "exp"}} + default_param_specs = {"p": {"inv_link": "expit"}, "lam": {"inv_link": "exp"}} - def attach_df(self, df: pd.DataFrame): + def attach_df(self, df: DataFrame): super().attach_df(df) if not all(self.y >= 0): raise ValueError("Pogit model requires observations to be non-negagive.") - def nll(self, params: List[ndarray]) -> ndarray: - mean = params[0]*params[1] - return mean - self.y*np.log(mean) + def nll(self, params: list[NDArray]) -> NDArray: + mean = params[0] * params[1] + return mean - self.y * np.log(mean) - def dnll(self, params: List[ndarray]) -> List[ndarray]: - return [params[1] - self.y/params[0], - params[0] - self.y/params[1]] + def dnll(self, params: list[NDArray]) -> list[NDArray]: + return [params[1] - self.y / params[0], params[0] - self.y / params[1]] - def d2nll(self, params: List[ndarray]) -> List[List[ndarray]]: + def d2nll(self, params: list[NDArray]) -> list[list[NDArray]]: ones = np.ones(self.df.shape[0]) - return [[self.y/params[0]**2, ones], - [ones, self.y/params[1]**2]] + return [[self.y / params[0] ** 2, ones], [ones, self.y / params[1] ** 2]] - def get_ui(self, params: List[ndarray], bounds: Tuple[float, float]) -> ndarray: - mean = params[0]*params[1] - return [poisson.ppf(bounds[0], mu=mean), - poisson.ppf(bounds[1], mu=mean)] + def get_ui(self, params: list[NDArray], bounds: tuple[float, float]) -> NDArray: + mean = params[0] * params[1] + return [poisson.ppf(bounds[0], mu=mean), poisson.ppf(bounds[1], mu=mean)] diff --git a/src/regmod/models/poisson.py b/src/regmod/models/poisson.py index c2dc929..6828512 100644 --- a/src/regmod/models/poisson.py +++ b/src/regmod/models/poisson.py @@ -1,15 +1,12 @@ """ Poisson Model """ -from typing import Callable, List, Tuple import numpy as np -import pandas as pd -from numpy.typing import NDArray from scipy.stats import poisson from regmod.optimizer import msca_optimize - +from regmod._typing import Callable, NDArray, DataFrame from .model import Model from .utils import model_post_init @@ -18,7 +15,7 @@ class PoissonModel(Model): param_names = ("lam",) default_param_specs = {"lam": {"inv_link": "exp"}} - def attach_df(self, df: pd.DataFrame): + def attach_df(self, df: DataFrame): super().attach_df(df) if not all(self.y >= 0): raise ValueError("Poisson model requires observations to be non-negagive.") @@ -38,15 +35,11 @@ def objective(self, coefs: NDArray) -> float: Objective value. """ inv_link = self.params[0].inv_link - lin_param = self.params[0].get_lin_param( - coefs, self.df, mat=self.mat[0] - ) + lin_param = self.params[0].get_lin_param(coefs, self.df, mat=self.mat[0]) param = inv_link.fun(lin_param) - weights = self.weights*self.trim_weights - obj_param = weights * ( - param - self.y * np.log(param) - ) + weights = self.weights * self.trim_weights + obj_param = weights * (param - self.y * np.log(param)) return obj_param.sum() + self.objective_from_gprior(coefs) def gradient(self, coefs: NDArray) -> NDArray: @@ -64,16 +57,12 @@ def gradient(self, coefs: NDArray) -> NDArray: """ mat = self.mat[0] inv_link = self.params[0].inv_link - lin_param = self.params[0].get_lin_param( - coefs, self.df, mat=self.mat[0] - ) + lin_param = self.params[0].get_lin_param(coefs, self.df, mat=self.mat[0]) param = inv_link.fun(lin_param) dparam = inv_link.dfun(lin_param) - weights = self.weights*self.trim_weights - grad_param = weights * ( - 1 - self.y / param - ) * dparam + weights = self.weights * self.trim_weights + grad_param = weights * (1 - self.y / param) * dparam return mat.T.dot(grad_param) + self.gradient_from_gprior(coefs) @@ -92,17 +81,14 @@ def hessian(self, coefs: NDArray) -> NDArray: """ mat = self.mat[0] inv_link = self.params[0].inv_link - lin_param = self.params[0].get_lin_param( - coefs, self.df, mat=self.mat[0] - ) + lin_param = self.params[0].get_lin_param(coefs, self.df, mat=self.mat[0]) param = inv_link.fun(lin_param) dparam = inv_link.dfun(lin_param) d2param = inv_link.d2fun(lin_param) - weights = self.weights*self.trim_weights + weights = self.weights * self.trim_weights hess_param = weights * ( - self.y / param**2 * dparam**2 + - (1 - self.y / param) * d2param + self.y / param**2 * dparam**2 + (1 - self.y / param) * d2param ) scaled_mat = mat.scale_rows(hess_param) @@ -125,21 +111,17 @@ def jacobian2(self, coefs: NDArray) -> NDArray: """ mat = self.mat[0] inv_link = self.params[0].inv_link - lin_param = self.params[0].get_lin_param( - coefs, self.df, mat=self.mat[0] - ) + lin_param = self.params[0].get_lin_param(coefs, self.df, mat=self.mat[0]) param = inv_link.fun(lin_param) dparam = inv_link.dfun(lin_param) - weights = self.weights*self.trim_weights - grad_param = weights * (1.0 - self.y/param) * dparam + weights = self.weights * self.trim_weights + grad_param = weights * (1.0 - self.y / param) * dparam jacobian = mat.T.scale_cols(grad_param) hess_mat_gprior = type(jacobian)(self.hessian_from_gprior()) jacobian2 = jacobian.dot(jacobian.T) + hess_mat_gprior return jacobian2 - def fit(self, - optimizer: Callable = msca_optimize, - **optimizer_options): + def fit(self, optimizer: Callable = msca_optimize, **optimizer_options): """Fit function. Parameters @@ -147,21 +129,17 @@ def fit(self, optimizer : Callable, optional Model solver, by default scipy_optimize. """ - super().fit( - optimizer=optimizer, - **optimizer_options - ) + super().fit(optimizer=optimizer, **optimizer_options) - def nll(self, params: List[NDArray]) -> NDArray: - return params[0] - self.y*np.log(params[0]) + def nll(self, params: list[NDArray]) -> NDArray: + return params[0] - self.y * np.log(params[0]) - def dnll(self, params: List[NDArray]) -> List[NDArray]: - return [1.0 - self.y/params[0]] + def dnll(self, params: list[NDArray]) -> list[NDArray]: + return [1.0 - self.y / params[0]] - def d2nll(self, params: List[NDArray]) -> List[List[NDArray]]: - return [[self.y/params[0]**2]] + def d2nll(self, params: list[NDArray]) -> list[list[NDArray]]: + return [[self.y / params[0] ** 2]] - def get_ui(self, params: List[NDArray], bounds: Tuple[float, float]) -> NDArray: + def get_ui(self, params: list[NDArray], bounds: tuple[float, float]) -> NDArray: mean = params[0] - return [poisson.ppf(bounds[0], mu=mean), - poisson.ppf(bounds[1], mu=mean)] + return [poisson.ppf(bounds[0], mu=mean), poisson.ppf(bounds[1], mu=mean)] diff --git a/src/regmod/models/utils.py b/src/regmod/models/utils.py index 0cdd39a..f6fcb96 100644 --- a/src/regmod/models/utils.py +++ b/src/regmod/models/utils.py @@ -1,9 +1,7 @@ -from typing import Tuple - import numpy as np -from msca.linalg.matrix import Matrix, asmatrix -from numpy.typing import NDArray +from msca.linalg.matrix import asmatrix from scipy.sparse import csc_matrix +from regmod._typing import NDArray, Matrix def model_post_init( @@ -11,7 +9,7 @@ def model_post_init( uvec: NDArray, linear_umat: NDArray, linear_uvec: NDArray, -) -> Tuple[Matrix, Matrix, NDArray]: +) -> tuple[Matrix, Matrix, NDArray]: # design matrix issparse = mat.size == 0 or ((mat == 0).sum() / mat.size) > 0.95 if issparse: @@ -31,12 +29,8 @@ def model_post_init( cmat = cmat / scale[:, np.newaxis] cvec = cvec / scale - cmat = np.vstack([ - -cmat[~np.isneginf(cvec[0])], cmat[~np.isposinf(cvec[1])] - ]) - cvec = np.hstack([ - -cvec[0][~np.isneginf(cvec[0])], cvec[1][~np.isposinf(cvec[1])] - ]) + cmat = np.vstack([-cmat[~np.isneginf(cvec[0])], cmat[~np.isposinf(cvec[1])]]) + cvec = np.hstack([-cvec[0][~np.isneginf(cvec[0])], cvec[1][~np.isposinf(cvec[1])]]) if issparse: cmat = csc_matrix(cmat).astype(np.float64) cmat = asmatrix(cmat) diff --git a/src/regmod/models/weibull.py b/src/regmod/models/weibull.py index a17ec46..e9a5885 100644 --- a/src/regmod/models/weibull.py +++ b/src/regmod/models/weibull.py @@ -1,13 +1,11 @@ """ Weibull Model """ -from typing import List, Tuple import numpy as np -import pandas as pd -from numpy import ndarray from scipy.stats import weibull_min +from regmod._typing import NDArray, DataFrame from .model import Model @@ -15,29 +13,43 @@ class WeibullModel(Model): param_names = ("b", "k") default_param_specs = {"b": {"inv_link": "exp"}, "k": {"inv_link": "exp"}} - def attach_df(self, df: pd.DataFrame): + def attach_df(self, df: DataFrame): super().attach_df(df) if not all(self.y > 0): raise ValueError("Weibull model requires observations to be positive.") - def nll(self, params: List[ndarray]) -> ndarray: + def nll(self, params: list[NDArray]) -> NDArray: t = self.y ln_t = np.log(t) - return params[0]*(t**params[1]) - (params[1] - 1)*ln_t - np.log(params[0]) - np.log(params[1]) - - def dnll(self, params: List[ndarray]) -> List[ndarray]: + return ( + params[0] * (t ** params[1]) + - (params[1] - 1) * ln_t + - np.log(params[0]) + - np.log(params[1]) + ) + + def dnll(self, params: list[NDArray]) -> list[NDArray]: t = self.y ln_t = np.log(t) - return [t**params[1] - 1/params[0], - ln_t*params[0]*(t**params[1]) - ln_t - 1/params[1]] + return [ + t ** params[1] - 1 / params[0], + ln_t * params[0] * (t ** params[1]) - ln_t - 1 / params[1], + ] - def d2nll(self, params: List[ndarray]) -> List[List[ndarray]]: + def d2nll(self, params: list[NDArray]) -> list[list[NDArray]]: t = self.y ln_t = np.log(t) - return [[1/params[0]**2, ln_t*(t**params[1])], - [ln_t*(t**params[1]), 1/params[1]**2 + params[0]*(ln_t**2)*(t**params[1])]] - - def get_ui(self, params: List[ndarray], bounds: Tuple[float, float]) -> ndarray: - scale = 1 / params[0]**(1 / params[1]) - return [weibull_min.ppf(bounds[0], c=params[1], scale=scale), - weibull_min.ppf(bounds[1], c=params[1], scale=scale)] + return [ + [1 / params[0] ** 2, ln_t * (t ** params[1])], + [ + ln_t * (t ** params[1]), + 1 / params[1] ** 2 + params[0] * (ln_t**2) * (t ** params[1]), + ], + ] + + def get_ui(self, params: list[NDArray], bounds: tuple[float, float]) -> NDArray: + scale = 1 / params[0] ** (1 / params[1]) + return [ + weibull_min.ppf(bounds[0], c=params[1], scale=scale), + weibull_min.ppf(bounds[1], c=params[1], scale=scale), + ] diff --git a/src/regmod/optimizer.py b/src/regmod/optimizer.py index 382e3e3..aef8f1c 100644 --- a/src/regmod/optimizer.py +++ b/src/regmod/optimizer.py @@ -1,18 +1,17 @@ """ Optimizer module """ -from typing import Callable, Dict, Optional import numpy as np from msca.optim.prox import proj_capped_simplex from msca.optim.solver import IPSolver, NTSolver -from numpy.typing import NDArray from scipy.optimize import LinearConstraint, minimize +from regmod._typing import Callable, NDArray -def scipy_optimize(model: "Model", - x0: Optional[NDArray] = None, - options: Optional[Dict] = None) -> NDArray: +def scipy_optimize( + model: "Model", x0: NDArray | None = None, options: dict | None = None +) -> NDArray: """Scipy trust-region optimizer. Parameters @@ -22,7 +21,7 @@ def scipy_optimize(model: "Model", x0 : NDArray, optional Initial guess for the variable, by default None. If `None` use zero vector as the initial guess. - options : Dict, optional + options : dict, optional Scipy solver options, by default None. Returns @@ -32,19 +31,26 @@ def scipy_optimize(model: "Model", """ x0 = np.zeros(model.size) if x0 is None else x0 bounds = model.uvec.T - constraints = [LinearConstraint( - model.linear_umat, - model.linear_uvec[0], - model.linear_uvec[1] - )] if model.linear_uvec.size > 0 else [] - - result = minimize(model.objective, x0, - method="trust-constr", - jac=model.gradient, - hess=model.hessian, - constraints=constraints, - bounds=bounds, - options=options) + constraints = ( + [ + LinearConstraint( + model.linear_umat, model.linear_uvec[0], model.linear_uvec[1] + ) + ] + if model.linear_uvec.size > 0 + else [] + ) + + result = minimize( + model.objective, + x0, + method="trust-constr", + jac=model.gradient, + hess=model.hessian, + constraints=constraints, + bounds=bounds, + options=options, + ) model.opt_result = result model.opt_coefs = result.x.copy() @@ -52,25 +58,17 @@ def scipy_optimize(model: "Model", return result.x -def msca_optimize(model: "Model", - x0: Optional[NDArray] = None, - options: Optional[Dict] = None) -> NDArray: +def msca_optimize( + model: "Model", x0: NDArray | None = None, options: dict | None = None +) -> NDArray: x0 = np.zeros(model.size) if x0 is None else x0 options = options or {} if model.cmat.size == 0: - solver = NTSolver( - model.objective, - model.gradient, - model.hessian - ) + solver = NTSolver(model.objective, model.gradient, model.hessian) else: solver = IPSolver( - model.objective, - model.gradient, - model.hessian, - model.cmat, - model.cvec + model.objective, model.gradient, model.hessian, model.cmat, model.cvec ) result = solver.minimize(x0=x0, **options) model.opt_result = result @@ -109,18 +107,21 @@ def trimming(optimize: Callable) -> Callable: Callable Trimming optimization solver. """ - def optimize_with_trimming(model: "Model", - x0: NDArray = None, - options: Dict = None, - trim_steps: int = 3, - inlier_pct: float = 0.95) -> NDArray: + + def optimize_with_trimming( + model: "Model", + x0: NDArray = None, + options: dict = None, + trim_steps: int = 3, + inlier_pct: float = 0.95, + ) -> NDArray: if trim_steps < 2: raise ValueError("At least two trimming steps.") if inlier_pct < 0.0 or inlier_pct > 1.0: raise ValueError("inlier_pct has to be between 0 and 1.") coefs = optimize(model, x0, options) if inlier_pct < 1.0: - bounds = (0.5 - 0.5*inlier_pct, 0.5 + 0.5*inlier_pct) + bounds = (0.5 - 0.5 * inlier_pct, 0.5 + 0.5 * inlier_pct) index = model.detect_outliers(coefs, bounds) if index.sum() > 0: masks = np.append(np.linspace(1.0, 0.0, trim_steps)[1:], 0.0) @@ -129,6 +130,7 @@ def optimize_with_trimming(model: "Model", coefs = optimize(model, coefs, options) index = model.detect_outliers(coefs, bounds) return coefs + return optimize_with_trimming @@ -136,10 +138,10 @@ def original_trimming(optimize: Callable) -> Callable: def optimize_with_trimming( model: "Model", x0: NDArray = None, - options: Optional[Dict] = None, + options: dict | None = None, trim_steps: int = 10, step_size: float = 10.0, - inlier_pct: float = 0.95 + inlier_pct: float = 0.95, ) -> NDArray: if trim_steps < 2: raise ValueError("At least two trimming steps.") @@ -147,24 +149,24 @@ def optimize_with_trimming( raise ValueError("inlier_pct has to be between 0 and 1.") coefs = optimize(model, x0, options) if inlier_pct < 1.0: - num_inliers = int(inlier_pct*model.y.size) + num_inliers = int(inlier_pct * model.y.size) counter = 0 success = False while (counter < trim_steps) and (not success): counter += 1 nll_terms = model.get_nll_terms(coefs) model.trim_weights = proj_capped_simplex( - model.trim_weights - step_size*nll_terms, - num_inliers + model.trim_weights - step_size * nll_terms, num_inliers ) coefs = optimize(model, x0, options) success = all( - np.isclose(model.trim_weights, 0.0) | - np.isclose(model.trim_weights, 1.0) + np.isclose(model.trim_weights, 0.0) + | np.isclose(model.trim_weights, 1.0) ) if not success: sort_indices = np.argsort(model.trim_weights) model.trim_weights[sort_indices[-num_inliers:]] = 1.0 model.trim_weights[sort_indices[:-num_inliers]] = 0.0 return coefs + return optimize_with_trimming diff --git a/src/regmod/parameter.py b/src/regmod/parameter.py index 29b68e2..d87db67 100644 --- a/src/regmod/parameter.py +++ b/src/regmod/parameter.py @@ -1,16 +1,16 @@ """ Parameter module """ + from dataclasses import dataclass, field -from typing import Optional, Union import numpy as np -import pandas as pd from scipy.linalg import block_diag from regmod.function import SmoothFunction, fun_dict from regmod.prior import LinearGaussianPrior, LinearUniformPrior from regmod.variable import SplineVariable, Variable +from regmod._typing import NDArray, DataFrame @dataclass @@ -40,36 +40,44 @@ class Parameter: name: str variables: list[Variable] = field(default_factory=list, repr=False) - inv_link: Union[str, SmoothFunction] = field(default="identity", repr=False) - offset: Optional[str] = field(default=None, repr=False) + inv_link: str | SmoothFunction = field(default="identity", repr=False) + offset: str | None = field(default=None, repr=False) linear_gpriors: list[LinearGaussianPrior] = field(default_factory=list, repr=False) linear_upriors: list[LinearUniformPrior] = field(default_factory=list, repr=False) def __post_init__(self): if isinstance(self.inv_link, str): self.inv_link = fun_dict[self.inv_link] - assert isinstance(self.inv_link, SmoothFunction), \ - "inv_link has to be an instance of SmoothFunction." - assert all([isinstance(prior, LinearGaussianPrior) for prior in self.linear_gpriors]), \ - "linear_gpriors has to be a list of LinearGaussianPrior." - assert all([isinstance(prior, LinearUniformPrior) for prior in self.linear_upriors]), \ - "linear_upriors has to be a list of LinearUniformPrior." - assert all([prior.mat.shape[1] == self.size for prior in self.linear_gpriors]), \ - "linear_gpriors size not match." - assert all([prior.mat.shape[1] == self.size for prior in self.linear_upriors]), \ - "linear_upriors size not match." - assert all([isinstance(var, Variable) for var in self.variables]), \ - "variables has to be a list of Variable." + assert isinstance( + self.inv_link, SmoothFunction + ), "inv_link has to be an instance of SmoothFunction." + assert all( + [isinstance(prior, LinearGaussianPrior) for prior in self.linear_gpriors] + ), "linear_gpriors has to be a list of LinearGaussianPrior." + assert all( + [isinstance(prior, LinearUniformPrior) for prior in self.linear_upriors] + ), "linear_upriors has to be a list of LinearUniformPrior." + assert all( + [prior.mat.shape[1] == self.size for prior in self.linear_gpriors] + ), "linear_gpriors size not match." + assert all( + [prior.mat.shape[1] == self.size for prior in self.linear_upriors] + ), "linear_upriors size not match." + assert all( + [isinstance(var, Variable) for var in self.variables] + ), "variables has to be a list of Variable." if len(self.variables) == 0 and self.offset is None: - raise ValueError("Please provide at least one variable when " - "parameter does not have an offset") + raise ValueError( + "Please provide at least one variable when " + "parameter does not have an offset" + ) @property def size(self) -> int: """Size of the parameter.""" return sum([var.size for var in self.variables]) - def check_data(self, df: pd.DataFrame): + def check_data(self, df: DataFrame): """Attach data to all variables. Parameters @@ -81,7 +89,7 @@ def check_data(self, df: pd.DataFrame): for var in self.variables: var.check_data(df) - def get_mat(self, df: pd.DataFrame) -> np.ndarray: + def get_mat(self, df: DataFrame) -> NDArray: """Get the design matrix. Parameters @@ -91,7 +99,7 @@ def get_mat(self, df: pd.DataFrame) -> np.ndarray: Returns ------- - np.ndarray + NDArray The design matrix. """ @@ -99,125 +107,155 @@ def get_mat(self, df: pd.DataFrame) -> np.ndarray: return np.empty(shape=(df.shape[0], 0)) return np.hstack([var.get_mat(df) for var in self.variables]) - def get_uvec(self) -> np.ndarray: + def get_uvec(self) -> NDArray: """Get the direct Uniform prior. Returns ------- - np.ndarray + NDArray Uniform prior information array. """ if len(self.variables) == 0: return np.empty(shape=(2, 0)) return np.hstack([var.get_uvec() for var in self.variables]) - def get_gvec(self) -> np.ndarray: + def get_gvec(self) -> NDArray: """Get the direct Gaussian prior. Returns ------- - np.ndarray + NDArray Gaussian prior information array. """ if len(self.variables) == 0: return np.empty(shape=(2, 0)) return np.hstack([var.get_gvec() for var in self.variables]) - def get_linear_uvec(self) -> np.ndarray: + def get_linear_uvec(self) -> NDArray: """Get the linear Uniform prior vector. Returns ------- - np.ndarray + NDArray Uniform prior information array. """ if len(self.variables) == 0: return np.empty(shape=(2, 0)) - uvec = np.hstack([ - var.get_linear_uvec() if isinstance(var, SplineVariable) else np.empty((2, 0)) - for var in self.variables - ]) + uvec = np.hstack( + [ + ( + var.get_linear_uvec() + if isinstance(var, SplineVariable) + else np.empty((2, 0)) + ) + for var in self.variables + ] + ) if len(self.linear_upriors) > 0: - uvec = np.hstack([uvec] + [np.vstack([prior.lb, prior.ub]) - for prior in self.linear_upriors]) + uvec = np.hstack( + [uvec] + + [np.vstack([prior.lb, prior.ub]) for prior in self.linear_upriors] + ) return uvec - def get_linear_gvec(self) -> np.ndarray: + def get_linear_gvec(self) -> NDArray: """Get the linear Gaussian prior vector. Returns ------- - np.ndarray + NDArray Gaussian prior information array. """ if len(self.variables) == 0: return np.empty(shape=(2, 0)) - gvec = np.hstack([ - var.get_linear_gvec() if isinstance(var, SplineVariable) else np.empty((2, 0)) - for var in self.variables - ]) + gvec = np.hstack( + [ + ( + var.get_linear_gvec() + if isinstance(var, SplineVariable) + else np.empty((2, 0)) + ) + for var in self.variables + ] + ) if len(self.linear_gpriors) > 0: - gvec = np.hstack([gvec] + [np.vstack([prior.mean, prior.sd]) - for prior in self.linear_gpriors]) + gvec = np.hstack( + [gvec] + + [np.vstack([prior.mean, prior.sd]) for prior in self.linear_gpriors] + ) return gvec - def get_linear_umat(self) -> np.ndarray: + def get_linear_umat(self) -> NDArray: """Get the linear Uniform prior matrix. Returns ------- - np.ndarray + NDArray Uniform prior design matrix. """ if len(self.variables) == 0: return np.empty(shape=(0, 0)) - umat = block_diag(*[ - var.get_linear_umat() if isinstance(var, SplineVariable) else np.empty((0, 1)) - for var in self.variables - ]) + umat = block_diag( + *[ + ( + var.get_linear_umat() + if isinstance(var, SplineVariable) + else np.empty((0, 1)) + ) + for var in self.variables + ] + ) if len(self.linear_upriors) > 0: umat = np.vstack([umat] + [prior.mat for prior in self.linear_upriors]) return umat - def get_linear_gmat(self) -> np.ndarray: + def get_linear_gmat(self) -> NDArray: """Get the linear Gaussian prior matrix. Returns ------- - np.ndarray + NDArray Gaussian prior design matrix. """ if len(self.variables) == 0: return np.empty(shape=(0, 0)) - gmat = block_diag(*[ - var.get_linear_gmat() if isinstance(var, SplineVariable) else np.empty((0, 1)) - for var in self.variables - ]) + gmat = block_diag( + *[ + ( + var.get_linear_gmat() + if isinstance(var, SplineVariable) + else np.empty((0, 1)) + ) + for var in self.variables + ] + ) if len(self.linear_gpriors) > 0: gmat = np.vstack([gmat] + [prior.mat for prior in self.linear_gpriors]) return gmat - def get_lin_param(self, - coefs: np.ndarray, - df: pd.DataFrame, - mat: np.ndarray = None, - return_mat: bool = False) -> Union[np.ndarray, tuple[np.ndarray, np.ndarray]]: + def get_lin_param( + self, + coefs: NDArray, + df: DataFrame, + mat: NDArray = None, + return_mat: bool = False, + ) -> NDArray | tuple[NDArray, NDArray]: """Get the parameter before apply the link function. Parameters ---------- - coefs : np.ndarray + coefs : NDArray Coefficients for the design matrix. df Data frame that contains all the covariates. - mat : np.ndarray, optional + mat : NDArray, optional Alternative design matrix, by default None. return_mat : bool, optional If `True`, return the created design matrix, by default False. Returns ------- - Union[np.ndarray, tuple[np.ndarray, np.ndarray]] + Union[NDArray, tuple[NDArray, NDArray]] Linear parameter vector, or when `return_mat=True` also returns the design matrix. @@ -233,75 +271,70 @@ def get_lin_param(self, return lin_param, mat return lin_param - def get_param(self, - coefs: np.ndarray, - df: pd.DataFrame, - mat: np.ndarray = None) -> np.ndarray: + def get_param(self, coefs: NDArray, df: DataFrame, mat: NDArray = None) -> NDArray: """Get the parameter. Parameters ---------- - coefs : np.ndarray + coefs : NDArray Coefficients for the design matrix. df Data frame that contains all the covariates. - mat : np.ndarray, optional + mat : NDArray, optional Alternative design matrix, by default None. Returns ------- - np.ndarray + NDArray Returns the parameter. """ lin_param = self.get_lin_param(coefs, df, mat) return self.inv_link.fun(lin_param) - def get_dparam(self, - coefs: np.ndarray, - df: pd.DataFrame, - mat: np.ndarray = None) -> np.ndarray: + def get_dparam(self, coefs: NDArray, df: DataFrame, mat: NDArray = None) -> NDArray: """Get the derivative of the parameter. Parameters ---------- - coefs : np.ndarray + coefs : NDArray Coefficients for the design matrix. df Data frame that contains all the covariates. - mat : np.ndarray, optional + mat : NDArray, optional Alternative design matrix, by default None. Returns ------- - np.ndarray + NDArray Returns the derivative of the parameter. """ if len(self.variables) == 0: return np.empty((df.shape[0], 0)) lin_param, mat = self.get_lin_param(coefs, df, mat, return_mat=True) - return self.inv_link.dfun(lin_param)[:, None]*mat + return self.inv_link.dfun(lin_param)[:, None] * mat - def get_d2param(self, - coefs: np.ndarray, - df: pd.DataFrame, - mat: np.ndarray = None) -> np.ndarray: + def get_d2param( + self, coefs: NDArray, df: DataFrame, mat: NDArray = None + ) -> NDArray: """Get the second order derivative of the parameter. Parameters ---------- - coefs : np.ndarray + coefs : NDArray Coefficients for the design matrix. df Data frame that contains all the covariates. - mat : np.ndarray, optional + mat : NDArray, optional Alternative design matrix, by default None. Returns ------- - np.ndarray + NDArray Returns the second order derivative of the parameter. """ if len(self.variables) == 0: return np.empty((df.shape[0], 0, 0)) lin_param, mat = self.get_lin_param(coefs, df, mat, return_mat=True) - return self.inv_link.d2fun(lin_param)[:, None, None]*(mat[..., None]*mat[:, None, :]) + return self.inv_link.d2fun(lin_param)[:, None, None] * ( + mat[..., None] * mat[:, None, :] + ) diff --git a/src/regmod/prior.py b/src/regmod/prior.py index ea88fb1..5485f50 100644 --- a/src/regmod/prior.py +++ b/src/regmod/prior.py @@ -1,14 +1,13 @@ """ Prior module """ -from collections.abc import Iterable + from dataclasses import dataclass, field -from typing import Any, List import numpy as np from xspline import XSpline -from regmod.utils import default_vec_factory +from regmod._typing import Any, Iterable, NDArray @dataclass @@ -40,12 +39,12 @@ class Prior: size: int = None - def process_size(self, vecs: List[Any]): + def process_size(self, vecs: list[Any]): """Infer and validate size from given vector information. Parameters ---------- - vecs : List[Any] + vecs : list[Any] Vector information of the prior. For Gaussian prior it will be mean and standard deviation. For Uniform prior it will be lower and upper bounds. @@ -73,10 +72,10 @@ class GaussianPrior(Prior): size : Optional[int], optional Size of variable. Default is `None`. When it is `None`, size is inferred from the vector information of the prior. - mean : Union[float, np.ndarray], default=0 + mean : Union[float, NDArray], default=0 Mean of the Gaussian prior. Default is 0. If it is a scalar, it will be extended to an array with `self.size`. - sd : Union[float, np.ndarray], default=np.inf + sd : Union[float, NDArray], default=np.inf Standard deviation of the Gaussian prior. Default is `np.inf`. If it is a scalar, it will be extended to an array with `self.size`. @@ -84,9 +83,9 @@ class GaussianPrior(Prior): ---------- size : int Size of the variable. - mean : ndarray + mean : NDArray Mean of the Gaussian prior. - sd : ndarray + sd : NDArray Standard deviation of the Gaussian prior. Raises @@ -99,8 +98,8 @@ class GaussianPrior(Prior): Raised when any value in standard deviation vector is non-positive. """ - mean: np.ndarray = field(default=0.0, repr=False) - sd: np.ndarray = field(default=np.inf, repr=False) + mean: NDArray = field(default=0.0, repr=False) + sd: NDArray = field(default=np.inf, repr=False) def __post_init__(self): self.process_size([self.mean, self.sd]) @@ -127,10 +126,10 @@ class UniformPrior(Prior): size : Optional[int], optional Size of variable. Default is `None`. When it is `None`, size is inferred from the vector information of the prior. - lb : Union[float, np.ndarray], default=-np.inf + lb : Union[float, NDArray], default=-np.inf Lower bound of Uniform prior. Default is `-np.inf`. If it is a scalar, it will be extended to an array with `self.size`. - ub : Union[float, np.ndarray], default=np.inf + ub : Union[float, NDArray], default=np.inf Upper bound of the Uniform prior. Default is `np.inf`. If it is a scalar,it will be extended to an array with `self.size`. @@ -138,9 +137,9 @@ class UniformPrior(Prior): ---------- size : int Size of the variable. - lb : ndarray + lb : NDArray Lower bound of Uniform prior. - ub : ndarray + ub : NDArray Upper bound of Uniform prior. Raises @@ -153,8 +152,8 @@ class UniformPrior(Prior): Raised if lower bound is greater than upper bound. """ - lb: np.ndarray = field(default=-np.inf, repr=False) - ub: np.ndarray = field(default=np.inf, repr=False) + lb: NDArray = field(default=-np.inf, repr=False) + ub: NDArray = field(default=np.inf, repr=False) def __post_init__(self): self.process_size([self.lb, self.ub]) @@ -169,9 +168,7 @@ def __post_init__(self): if self.ub.size != self.size: raise ValueError("Upper bound vector size does not match.") if any(self.lb > self.ub): - ValueError( - "Lower bounds must be less than or equal to upper bounds." - ) + ValueError("Lower bounds must be less than or equal to upper bounds.") @dataclass @@ -180,7 +177,7 @@ class LinearPrior: Parameters ---------- - mat : np.ndarray, optional + mat : NDArray, optional Linear mapping for the prior. Default is an empty matrix. size : Optional[int], optional Size of the prior. Default is `None`. If it is `None`, the size will @@ -188,7 +185,7 @@ class LinearPrior: Attributes ---------- - mat : np.ndarray + mat : NDArray Linear mapping for the prior. size : int Size of the prior. @@ -199,8 +196,7 @@ class LinearPrior: Indicate if the prior is empty. """ - mat: np.ndarray = field(default_factory=lambda: np.empty(shape=(0, 1)), - repr=False) + mat: NDArray = field(default_factory=lambda: np.empty(shape=(0, 1)), repr=False) size: int = None def __post_init__(self): @@ -270,13 +266,14 @@ class SplinePrior(LinearPrior): domain_type: str = field(default="rel", repr=False) def __post_init__(self): - assert self.domain_lb <= self.domain_ub, \ - "Domain lower bound must be less or equal than upper bound." - assert self.domain_type in ["rel", "abs"], \ - "Domain type must be 'rel' or 'abs'." + assert ( + self.domain_lb <= self.domain_ub + ), "Domain lower bound must be less or equal than upper bound." + assert self.domain_type in ["rel", "abs"], "Domain type must be 'rel' or 'abs'." if self.domain_type == "rel": - assert self.domain_lb >= 0.0 and self.domain_ub <= 1.0, \ - "Using relative domain, bounds must be numbers between 0 and 1." + assert ( + self.domain_lb >= 0.0 and self.domain_ub <= 1.0 + ), "Using relative domain, bounds must be numbers between 0 and 1." def attach_spline(self, spline: XSpline): """Attach the spline to process the domain. @@ -289,14 +286,15 @@ def attach_spline(self, spline: XSpline): knots_lb = spline.knots[0] knots_ub = spline.knots[-1] if self.domain_type == "rel": - points_lb = knots_lb + (knots_ub - knots_lb)*self.domain_lb - points_ub = knots_lb + (knots_ub - knots_lb)*self.domain_ub + points_lb = knots_lb + (knots_ub - knots_lb) * self.domain_lb + points_ub = knots_lb + (knots_ub - knots_lb) * self.domain_ub else: points_lb = self.domain_lb points_ub = self.domain_ub points = np.linspace(points_lb, points_ub, self.size) - self.mat = spline.design_dmat(points, order=self.order, - l_extra=True, r_extra=True) + self.mat = spline.design_dmat( + points, order=self.order, l_extra=True, r_extra=True + ) super().__post_init__() diff --git a/src/regmod/utils.py b/src/regmod/utils.py index 1a9875b..9d514c9 100644 --- a/src/regmod/utils.py +++ b/src/regmod/utils.py @@ -1,17 +1,18 @@ """ Utility classes and functions """ + from dataclasses import dataclass -from typing import Any, List, Optional import numpy as np from xspline import XSpline +from regmod._typing import Any, NDArray + -def default_vec_factory(vec: Any, - size: int, - default_value: float, - vec_name: str = 'vec') -> np.ndarray: +def default_vec_factory( + vec: Any, size: int, default_value: float, vec_name: str = "vec" +) -> NDArray: """Validate or create the vector. Parameters @@ -30,7 +31,7 @@ def default_vec_factory(vec: Any, Returns ------- - np.ndarray + NDArray Created or validated vector. Notes @@ -46,12 +47,12 @@ def default_vec_factory(vec: Any, return vec -def check_size(vec: np.ndarray, size: int, vec_name: str = 'vec') -> None: +def check_size(vec: NDArray, size: int, vec_name: str = "vec") -> None: """Check the size of the vector. Parameters ---------- - vec : np.ndarray + vec : NDArray Vector to be validated. size : int The assumption size of the vector. @@ -76,7 +77,7 @@ class SplineSpecs: Attributes ---------- - knots : np.ndarray + knots : NDArray Knots placement of the spline. Depends on `knots_type` this will be used differently. degree : int, default=3 @@ -106,7 +107,7 @@ class SplineSpecs: Create the spline from given vector as the data. """ - knots: np.ndarray + knots: NDArray degree: int = 3 l_linear: bool = False r_linear: bool = False @@ -114,22 +115,26 @@ class SplineSpecs: knots_type: str = "abs" def __post_init__(self): - assert self.knots_type in ["abs", "rel_domain", "rel_freq"], \ - "Knots type must be one of 'abs', 'rel_domain' or 'rel_freq'." + assert self.knots_type in [ + "abs", + "rel_domain", + "rel_freq", + ], "Knots type must be one of 'abs', 'rel_domain' or 'rel_freq'." @property def num_spline_bases(self) -> int: """Number of the spline bases.""" - inner_knots = self.knots[int(self.l_linear): - len(self.knots) - int(self.r_linear)] + inner_knots = self.knots[ + int(self.l_linear) : len(self.knots) - int(self.r_linear) + ] return len(inner_knots) - 2 + self.degree + int(self.include_first_basis) - def create_spline(self, vec: Optional[np.ndarray] = None) -> XSpline: + def create_spline(self, vec: NDArray | None = None) -> XSpline: """Create spline from the given vector. Parameters ---------- - vec : Optional[np.ndarray], optional + vec : Optional[NDArray], optional Given vector as the data. Default to `None`. When it is `None` it requires `knots_type` to be `abs`. @@ -146,27 +151,31 @@ def create_spline(self, vec: Optional[np.ndarray] = None) -> XSpline: if self.knots_type == "abs": knots = self.knots else: - assert vec is not None, \ - "Using relative knots, must provide a vector to finalize knots." + assert ( + vec is not None + ), "Using relative knots, must provide a vector to finalize knots." if self.knots_type == "rel_domain": lb = np.min(vec) ub = np.max(vec) - knots = lb + self.knots*(ub - lb) + knots = lb + self.knots * (ub - lb) else: knots = np.quantile(vec, self.knots) - return XSpline(knots, self.degree, - l_linear=self.l_linear, - r_linear=self.r_linear, - include_first_basis=self.include_first_basis) + return XSpline( + knots, + self.degree, + l_linear=self.l_linear, + r_linear=self.r_linear, + include_first_basis=self.include_first_basis, + ) -def sizes_to_slices(sizes: List[int]) -> List[slice]: +def sizes_to_slices(sizes: list[int]) -> list[slice]: """Convert a list of sizes to a list of slices. Parameters ---------- - sizes : List[int] + sizes : list[int] A list of positive integers representing sizes of the groups. Raises @@ -176,7 +185,7 @@ def sizes_to_slices(sizes: List[int]) -> List[slice]: Returns ------- - List[slice] + list[slice] A list of slices converted from sizes. Examples diff --git a/src/regmod/variable.py b/src/regmod/variable.py index 5cbf2d6..af6142e 100644 --- a/src/regmod/variable.py +++ b/src/regmod/variable.py @@ -1,19 +1,26 @@ """ Variable module """ -from collections.abc import Iterable + from copy import deepcopy from dataclasses import dataclass, field -from typing import List, Optional, Union import numpy as np -import pandas as pd from xspline import XSpline -from regmod.prior import (GaussianPrior, LinearGaussianPrior, LinearPrior, - LinearUniformPrior, Prior, SplineGaussianPrior, - SplinePrior, SplineUniformPrior, UniformPrior) +from regmod.prior import ( + GaussianPrior, + LinearGaussianPrior, + LinearPrior, + LinearUniformPrior, + Prior, + SplineGaussianPrior, + SplinePrior, + SplineUniformPrior, + UniformPrior, +) from regmod.utils import SplineSpecs +from regmod._typing import Iterable, NDArray, DataFrame @dataclass @@ -26,7 +33,7 @@ class Variable: ---------- name : str Name of the variable corresponding to the column name in the data frame. - priors : List[Prior], optional + priors : list[Prior], optional A list of priors for the variable. Default is an empty list. Attributes @@ -34,7 +41,7 @@ class Variable: size name : str Name of the variable corresponding to the column name in the data frame. - priors : List[Prior] + priors : list[Prior] A list of priors for the variable. gprior : GaussianPrior Direct Gaussian prior in `priors`. @@ -68,7 +75,7 @@ class Variable: """ name: str - priors: List[Prior] = field(default_factory=list, repr=False) + priors: list[Prior] = field(default_factory=list, repr=False) gprior: GaussianPrior = field(default=None, init=False, repr=False) uprior: UniformPrior = field(default=None, init=False, repr=False) @@ -94,18 +101,16 @@ def process_priors(self): if self.gprior is not None: self.priors.remove(self.gprior) self.gprior = prior - assert self.gprior.size == self.size, \ - "Gaussian prior size not match." + assert self.gprior.size == self.size, "Gaussian prior size not match." elif isinstance(prior, UniformPrior): if self.uprior is not None: self.priors.remove(self.uprior) self.uprior = prior - assert self.uprior.size == self.size, \ - "Uniform prior size not match." + assert self.uprior.size == self.size, "Uniform prior size not match." else: raise ValueError("Unknown prior type.") - def check_data(self, df: pd.DataFrame): + def check_data(self, df: DataFrame): """Check if the data contains the column name `name`. Parameters @@ -132,12 +137,12 @@ def reset_priors(self) -> None: self.gprior = None self.uprior = None - def add_priors(self, priors: Union[Prior, List[Prior]]) -> None: + def add_priors(self, priors: Prior | list[Prior]) -> None: """Add priors. Parameters ---------- - priors : Union[Prior, List[Prior]] + priors : Union[Prior, list[Prior]] Priors to be added. """ if not isinstance(priors, list): @@ -145,12 +150,12 @@ def add_priors(self, priors: Union[Prior, List[Prior]]) -> None: self.priors.extend(priors) self.process_priors() - def rm_priors(self, indices: Union[int, List[int], List[bool]]) -> None: + def rm_priors(self, indices: int | list[int] | list[bool]) -> None: """Remove priors. Parameters ---------- - indices : Union[int, List[int], List[bool]] + indices : Union[int, list[int], list[bool]] Indicies of the priors that need to be removed. Indicies come in the forms of integer, list of integers or list of booleans. When it is integer or list of integers, it requires the integer is within the @@ -170,21 +175,27 @@ def rm_priors(self, indices: Union[int, List[int], List[bool]]) -> None: if isinstance(indices, int): indices = [indices] else: - assert isinstance(indices, Iterable), \ - "Indies must be int, List[int], or List[bool]." - if all([not isinstance(index, bool) and isinstance(index, int) - for index in indices]): + assert isinstance( + indices, Iterable + ), "Indies must be int, list[int], or list[bool]." + if all( + [ + not isinstance(index, bool) and isinstance(index, int) + for index in indices + ] + ): indices = [i in indices for i in range(len(self.priors))] - assert all([isinstance(index, bool) for index in indices]), \ - "Index type not consistent." - assert len(indices) == len(self.priors), \ - "Index size not match with number of priors." - self.priors = [self.priors[i] for i, index in enumerate(indices) - if not index] + assert all( + [isinstance(index, bool) for index in indices] + ), "Index type not consistent." + assert len(indices) == len( + self.priors + ), "Index size not match with number of priors." + self.priors = [self.priors[i] for i, index in enumerate(indices) if not index] self.reset_priors() self.process_priors() - def get_mat(self, df: pd.DataFrame) -> np.ndarray: + def get_mat(self, df: DataFrame) -> NDArray: """Get design matrix. Parameters @@ -194,7 +205,7 @@ def get_mat(self, df: pd.DataFrame) -> np.ndarray: Returns ------- - np.ndarray + NDArray Design matrix. """ @@ -203,12 +214,12 @@ def get_mat(self, df: pd.DataFrame) -> np.ndarray: return np.ones((df.shape[0], 1)) return df[[self.name]].to_numpy() - def get_gvec(self) -> np.ndarray: + def get_gvec(self) -> NDArray: """Get direct Gaussian prior vector. Returns ------- - np.ndarray + NDArray Direct Gaussian prior vector. """ if self.gprior is None: @@ -217,12 +228,12 @@ def get_gvec(self) -> np.ndarray: gvec = np.vstack([self.gprior.mean, self.gprior.sd]) return gvec - def get_uvec(self) -> np.ndarray: + def get_uvec(self) -> NDArray: """Get direct Uniform prior vector. Returns ------- - np.ndarray + NDArray Direct Uniform prior vector. """ if self.uprior is None: @@ -254,10 +265,10 @@ class SplineVariable(Variable): spline_specs : SplineSpecs, optional Spline settings used to create spline object. Recommend to use only when use `knots_type={'rel_domain', 'rel_freq'}. Default to be `None`. - linear_gpriors : List[LinearPrior], optional + linear_gpriors : list[LinearPrior], optional A list of linear Gaussian priors usually for shape priors of the spline. Default to be an empty list. - linear_upriors : List[LinearPrior], optional + linear_upriors : list[LinearPrior], optional A list of linear Uniform priors usually for shape priors of the spline. spline. Default to be an empty list. @@ -267,9 +278,9 @@ class SplineVariable(Variable): Spline object that in charge of creating design matrix. spline_specs : SplineSpecs Spline settings used to create spline object. - linear_gpriors : List[LinearPrior] + linear_gpriors : list[LinearPrior] A list of linear Gaussian priors usually for shape priors of the spline. - linear_upriors : List[LinearPrior] + linear_upriors : list[LinearPrior] A list of linear Uniform priors usually for shape priors of the spline. Methods @@ -296,15 +307,15 @@ class SplineVariable(Variable): spline: XSpline = field(default=None, repr=False) spline_specs: SplineSpecs = field(default=None, repr=False) - linear_gpriors: List[LinearPrior] = field(default_factory=list, repr=False) - linear_upriors: List[LinearPrior] = field(default_factory=list, repr=False) + linear_gpriors: list[LinearPrior] = field(default_factory=list, repr=False) + linear_upriors: list[LinearPrior] = field(default_factory=list, repr=False) def __post_init__(self): if (self.spline is None) and (self.spline_specs is None): raise ValueError("At least one of spline and spline_specs is not None.") self.process_priors() - def check_data(self, df: pd.DataFrame): + def check_data(self, df: DataFrame): """Check if the data contains the column name `name`. And create the spline object, if only `spline_specs` is provided. @@ -344,14 +355,12 @@ def process_priors(self): if self.gprior is not None: self.priors.remove(self.gprior) self.gprior = prior - assert self.gprior.size == self.size, \ - "Gaussian prior size not match." + assert self.gprior.size == self.size, "Gaussian prior size not match." elif isinstance(prior, UniformPrior): if self.uprior is not None: self.priors.remove(self.uprior) self.uprior = prior - assert self.uprior.size == self.size, \ - "Uniform prior size not match." + assert self.uprior.size == self.size, "Uniform prior size not match." else: raise ValueError("Unknown prior type.") @@ -371,7 +380,7 @@ def reset_priors(self): self.linear_gpriors = list() self.linear_upriors = list() - def get_mat(self, df: pd.DataFrame) -> np.ndarray: + def get_mat(self, df: DataFrame) -> NDArray: """Get design matrix. Parameters @@ -381,7 +390,7 @@ def get_mat(self, df: pd.DataFrame) -> np.ndarray: Returns ------- - np.ndarray + NDArray Design matrix. """ @@ -389,41 +398,39 @@ def get_mat(self, df: pd.DataFrame) -> np.ndarray: cov = df[self.name].to_numpy() return self.spline.design_mat(cov, l_extra=True, r_extra=True) - def get_linear_uvec(self) -> np.ndarray: + def get_linear_uvec(self) -> NDArray: """Get linear Uniform prior vector. Returns ------- - np.ndarray + NDArray Linear uniform prior vector. """ if not self.linear_upriors: uvec = np.empty((2, 0)) else: - uvec = np.hstack([ - np.vstack([prior.lb, prior.ub]) - for prior in self.linear_upriors - ]) + uvec = np.hstack( + [np.vstack([prior.lb, prior.ub]) for prior in self.linear_upriors] + ) return uvec - def get_linear_gvec(self) -> np.ndarray: + def get_linear_gvec(self) -> NDArray: """Get linear Gaussian prior vector. Returns ------- - np.ndarray + NDArray Linear Gaussian prior vector. """ if not self.linear_gpriors: gvec = np.empty((2, 0)) else: - gvec = np.hstack([ - np.vstack([prior.mean, prior.sd]) - for prior in self.linear_gpriors - ]) + gvec = np.hstack( + [np.vstack([prior.mean, prior.sd]) for prior in self.linear_gpriors] + ) return gvec - def get_linear_umat(self, df: Optional[pd.DataFrame] = None) -> np.ndarray: + def get_linear_umat(self, data: DataFrame | None = None) -> NDArray: """Get linear Uniform prior design matrix. Parameters @@ -438,7 +445,7 @@ def get_linear_umat(self, df: Optional[pd.DataFrame] = None) -> np.ndarray: Returns ------- - np.ndarray: + NDArray: Linear Uniform prior design matrix. """ if not self.linear_upriors: @@ -447,12 +454,10 @@ def get_linear_umat(self, df: Optional[pd.DataFrame] = None) -> np.ndarray: if self.spline is None: assert data is not None, "Must check data to create spline first." self.check_data(data) - umat = np.vstack([ - prior.mat for prior in self.linear_upriors - ]) + umat = np.vstack([prior.mat for prior in self.linear_upriors]) return umat - def get_linear_gmat(self, df: Optional[pd.DataFrame] = None) -> np.ndarray: + def get_linear_gmat(self, data: DataFrame | None = None) -> NDArray: """Get linear Gaussian prior design matrix. Parameters @@ -467,7 +472,7 @@ def get_linear_gmat(self, df: Optional[pd.DataFrame] = None) -> np.ndarray: Returns ------- - np.ndarray: + NDArray: Linear Gaussian prior design matrix. """ if not self.linear_gpriors: @@ -476,7 +481,5 @@ def get_linear_gmat(self, df: Optional[pd.DataFrame] = None) -> np.ndarray: if self.spline is None: assert data is not None, "Must check data to create spline first." self.check_data(data) - gmat = np.vstack([ - prior.mat for prior in self.linear_gpriors - ]) + gmat = np.vstack([prior.mat for prior in self.linear_gpriors]) return gmat