From 9b2529b7dcdeb7b94628599ea5ca873eb2e95965 Mon Sep 17 00:00:00 2001 From: zhengp0 Date: Thu, 18 Jan 2024 10:24:45 -0800 Subject: [PATCH] Switch to eigh from eig to avoid complex value --- src/regmod/models/model.py | 140 +++++++++++++++++++++---------------- tests/test_tobitmodel.py | 34 ++++----- 2 files changed, 96 insertions(+), 78 deletions(-) diff --git a/src/regmod/models/model.py b/src/regmod/models/model.py index b0683de..ebe8b6c 100644 --- a/src/regmod/models/model.py +++ b/src/regmod/models/model.py @@ -127,25 +127,29 @@ class Model: param_names: Tuple[str] = None default_param_specs: Dict[str, Dict] = None - def __init__(self, - data: Data, - params: Optional[List[Parameter]] = None, - param_specs: Optional[Dict[str, Dict]] = None): + def __init__( + self, + data: Data, + params: Optional[List[Parameter]] = None, + param_specs: Optional[Dict[str, Dict]] = None, + ): if params is None and param_specs is None: raise ValueError("Must provide `params` or `param_specs`") if params is not None: - param_dict = { - param.name: param - for param in params - } - self.params = [param_dict[param_name] - for param_name in self.param_names] + param_dict = {param.name: param for param in params} + self.params = [param_dict[param_name] for param_name in self.param_names] else: - self.params = [Parameter(param_name, - **{**self.default_param_specs[param_name], - **param_specs[param_name]}) - for param_name in self.param_names] + self.params = [ + Parameter( + param_name, + **{ + **self.default_param_specs[param_name], + **param_specs[param_name], + }, + ) + for param_name in self.param_names + ] self.data = data if not self.data.is_empty(): @@ -199,19 +203,23 @@ def get_vcov(self, coefs: ndarray) -> ndarray: hessian = self.hessian(coefs) if isinstance(hessian, Matrix): hessian = hessian.to_numpy() - eig_vals, eig_vecs = np.linalg.eig(hessian) + eig_vals, eig_vecs = np.linalg.eigh(hessian) if np.isclose(eig_vals, 0.0).any(): - raise ValueError("singular Hessian matrix, please add priors or " - "reduce number of variables") + raise ValueError( + "singular Hessian matrix, please add priors or " + "reduce number of variables" + ) inv_hessian = (eig_vecs / eig_vals).dot(eig_vecs.T) jacobian2 = self.jacobian2(coefs) if isinstance(jacobian2, Matrix): jacobian2 = jacobian2.to_numpy() - eig_vals = np.linalg.eigvals(jacobian2) + eig_vals = np.linalg.eigvalsh(jacobian2) if np.isclose(eig_vals, 0.0).any(): - raise ValueError("singular Jacobian matrix, please add priors or " - "reduce number of variables") + raise ValueError( + "singular Jacobian matrix, please add priors or " + "reduce number of variables" + ) vcov = inv_hessian.dot(jacobian2) vcov = inv_hessian.dot(vcov.T) @@ -317,8 +325,10 @@ def get_params(self, coefs: ndarray) -> List[ndarray]: The parameters. """ coefs = self.split_coefs(coefs) - return [param.get_param(coefs[i], self.data, mat=self.mat[i]) - for i, param in enumerate(self.params)] + return [ + param.get_param(coefs[i], self.data, mat=self.mat[i]) + for i, param in enumerate(self.params) + ] def get_dparams(self, coefs: ndarray) -> List[ndarray]: """Get the derivative of the parameters. @@ -334,8 +344,10 @@ def get_dparams(self, coefs: ndarray) -> List[ndarray]: The derivative of the parameters. """ coefs = self.split_coefs(coefs) - return [param.get_dparam(coefs[i], self.data, mat=self.mat[i]) - for i, param in enumerate(self.params)] + return [ + param.get_dparam(coefs[i], self.data, mat=self.mat[i]) + for i, param in enumerate(self.params) + ] def get_d2params(self, coefs: ndarray) -> List[ndarray]: """Get the second order derivative of the parameters. @@ -351,8 +363,10 @@ def get_d2params(self, coefs: ndarray) -> List[ndarray]: The second order derivative of the parameters. """ coefs = self.split_coefs(coefs) - return [param.get_d2param(coefs[i], self.data, mat=self.mat[i]) - for i, param in enumerate(self.params)] + return [ + param.get_d2param(coefs[i], self.data, mat=self.mat[i]) + for i, param in enumerate(self.params) + ] def nll(self, params: List[ndarray]) -> ndarray: """Negative log likelihood. @@ -400,9 +414,7 @@ def d2nll(self, params: List[ndarray]) -> List[List[ndarray]]: """ raise NotImplementedError() - def get_ui(self, - params: List[ndarray], - bounds: Tuple[float, float]) -> ndarray: + def get_ui(self, params: List[ndarray], bounds: Tuple[float, float]) -> ndarray: """Get uncertainty interval, used for the trimming algorithm. Parameters @@ -419,9 +431,7 @@ def get_ui(self, """ raise NotImplementedError() - def detect_outliers(self, - coefs: ndarray, - bounds: Tuple[float, float]) -> ndarray: + def detect_outliers(self, coefs: ndarray, bounds: Tuple[float, float]) -> ndarray: """Detect outliers. Parameters @@ -453,9 +463,12 @@ def objective_from_gprior(self, coefs: ndarray) -> float: float Objective function value. """ - val = 0.5*np.sum((coefs - self.gvec[0])**2/self.gvec[1]**2) + val = 0.5 * np.sum((coefs - self.gvec[0]) ** 2 / self.gvec[1] ** 2) if self.linear_gvec.size > 0: - val += 0.5*np.sum((self.linear_gmat.dot(coefs) - self.linear_gvec[0])**2/self.linear_gvec[1]**2) + val += 0.5 * np.sum( + (self.linear_gmat.dot(coefs) - self.linear_gvec[0]) ** 2 + / self.linear_gvec[1] ** 2 + ) return val def gradient_from_gprior(self, coefs: ndarray) -> ndarray: @@ -471,9 +484,11 @@ def gradient_from_gprior(self, coefs: ndarray) -> ndarray: ndarray Graident vector. """ - grad = (coefs - self.gvec[0])/self.gvec[1]**2 + grad = (coefs - self.gvec[0]) / self.gvec[1] ** 2 if self.linear_gvec.size > 0: - grad += (self.linear_gmat.T/self.linear_gvec[1]**2).dot(self.linear_gmat.dot(coefs) - self.linear_gvec[0]) + grad += (self.linear_gmat.T / self.linear_gvec[1] ** 2).dot( + self.linear_gmat.dot(coefs) - self.linear_gvec[0] + ) return grad def hessian_from_gprior(self) -> ndarray: @@ -484,15 +499,17 @@ def hessian_from_gprior(self) -> ndarray: ndarray Hessian matrix. """ - hess = np.diag(1.0/self.gvec[1]**2) + hess = np.diag(1.0 / self.gvec[1] ** 2) if self.linear_gvec.size > 0: - hess += (self.linear_gmat.T/self.linear_gvec[1]**2).dot(self.linear_gmat) + hess += (self.linear_gmat.T / self.linear_gvec[1] ** 2).dot( + self.linear_gmat + ) return hess def get_nll_terms(self, coefs: ndarray) -> ndarray: params = self.get_params(coefs) nll_terms = self.nll(params) - nll_terms = self.data.weights*nll_terms + nll_terms = self.data.weights * nll_terms return nll_terms def objective(self, coefs: ndarray) -> float: @@ -509,8 +526,7 @@ def objective(self, coefs: ndarray) -> float: Objective value. """ nll_terms = self.get_nll_terms(coefs) - return self.data.trim_weights.dot(nll_terms) + \ - self.objective_from_gprior(coefs) + return self.data.trim_weights.dot(nll_terms) + self.objective_from_gprior(coefs) def gradient(self, coefs: ndarray) -> ndarray: """Gradient function. @@ -528,11 +544,10 @@ def gradient(self, coefs: ndarray) -> ndarray: params = self.get_params(coefs) dparams = self.get_dparams(coefs) grad_params = self.dnll(params) - weights = self.data.weights*self.data.trim_weights - return np.hstack([ - dparams[i].T.dot(weights*grad_params[i]) - for i in range(self.num_params) - ]) + self.gradient_from_gprior(coefs) + weights = self.data.weights * self.data.trim_weights + return np.hstack( + [dparams[i].T.dot(weights * grad_params[i]) for i in range(self.num_params)] + ) + self.gradient_from_gprior(coefs) def hessian(self, coefs: ndarray) -> ndarray: """Hessian function. @@ -552,14 +567,16 @@ def hessian(self, coefs: ndarray) -> ndarray: d2params = self.get_d2params(coefs) grad_params = self.dnll(params) hess_params = self.d2nll(params) - weights = self.data.weights*self.data.trim_weights + weights = self.data.weights * self.data.trim_weights hess = [ - [(dparams[i].T*(weights*hess_params[i][j])).dot(dparams[j]) - for j in range(self.num_params)] + [ + (dparams[i].T * (weights * hess_params[i][j])).dot(dparams[j]) + for j in range(self.num_params) + ] for i in range(self.num_params) ] for i in range(self.num_params): - hess[i][i] += np.tensordot(weights*grad_params[i], d2params[i], axes=1) + hess[i][i] += np.tensordot(weights * grad_params[i], d2params[i], axes=1) return np.block(hess) + self.hessian_from_gprior() def jacobian2(self, coefs: ndarray) -> ndarray: @@ -578,17 +595,14 @@ def jacobian2(self, coefs: ndarray) -> ndarray: params = self.get_params(coefs) dparams = self.get_dparams(coefs) grad_params = self.dnll(params) - weights = self.data.weights*self.data.trim_weights - jacobian = np.vstack([ - dparams[i].T*(weights*grad_params[i]) - for i in range(self.num_params) - ]) + weights = self.data.weights * self.data.trim_weights + jacobian = np.vstack( + [dparams[i].T * (weights * grad_params[i]) for i in range(self.num_params)] + ) jacobian2 = jacobian.dot(jacobian.T) + self.hessian_from_gprior() return jacobian2 - def fit(self, - optimizer: Callable = scipy_optimize, - **optimizer_options): + def fit(self, optimizer: Callable = scipy_optimize, **optimizer_options): """Fit function. Parameters @@ -631,7 +645,9 @@ def predict(self, df: Optional[pd.DataFrame] = None) -> pd.DataFrame: return df def __repr__(self) -> str: - return (f"{type(self).__name__}(" - f"bnum_obs={self.data.num_obs}, " - f"num_params={self.num_params}, " - f"size={self.size})") + return ( + f"{type(self).__name__}(" + f"bnum_obs={self.data.num_obs}, " + f"num_params={self.num_params}, " + f"size={self.size})" + ) diff --git a/tests/test_tobitmodel.py b/tests/test_tobitmodel.py index 2e0ddef..b1fa7dc 100644 --- a/tests/test_tobitmodel.py +++ b/tests/test_tobitmodel.py @@ -27,7 +27,7 @@ def data(df): def param_specs(): specs = { "mu": {"variables": [Variable("x"), Variable("intercept")]}, - "sigma": {"variables": [Variable("intercept")]} + "sigma": {"variables": [Variable("intercept")]}, } return specs @@ -50,8 +50,7 @@ def test_neg_obs(df, param_specs): """ValueError if data contains negative observations.""" with pytest.raises(ValueError, match="requires non-negative observations"): TobitModel( - data=Data(col_obs="y", col_covs=["x"], df=df), - param_specs=param_specs + data=Data(col_obs="y", col_covs=["x"], df=df), param_specs=param_specs ) @@ -62,8 +61,8 @@ def test_vcov_output(model): # Old version H = model.hessian(coefs) - eig_vals, eig_vecs = np.linalg.eig(H) - inv_H = (eig_vecs/eig_vals).dot(eig_vecs.T) + eig_vals, eig_vecs = np.linalg.eigh(H) + inv_H = (eig_vecs / eig_vals).dot(eig_vecs.T) J = model.jacobian2(coefs) vcov_old = inv_H.dot(J) vcov_old = inv_H.dot(vcov_old.T) @@ -84,18 +83,19 @@ def test_pred_values(model): def test_model_no_variables(): num_obs = 5 - df = pd.DataFrame({ - "obs": np.random.rand(num_obs)*10, - "offset": np.ones(num_obs), - }) + df = pd.DataFrame( + { + "obs": np.random.rand(num_obs) * 10, + "offset": np.ones(num_obs), + } + ) data = Data( col_obs="obs", col_offset="offset", df=df, ) model = TobitModel( - data, - param_specs={"mu": {"offset": "offset"}, "sigma": {"offset": "offset"}} + data, param_specs={"mu": {"offset": "offset"}, "sigma": {"offset": "offset"}} ) coefs = np.array([]) grad = model.gradient(coefs) @@ -109,10 +109,12 @@ def test_model_no_variables(): def test_model_one_variable(): num_obs = 5 - df = pd.DataFrame({ - "obs": np.random.rand(num_obs)*10, - "offset": np.ones(num_obs), - }) + df = pd.DataFrame( + { + "obs": np.random.rand(num_obs) * 10, + "offset": np.ones(num_obs), + } + ) data = Data( col_obs="obs", col_offset="offset", @@ -123,7 +125,7 @@ def test_model_one_variable(): param_specs={ "sigma": {"offset": "offset"}, "mu": {"variables": [Variable("intercept")]}, - } + }, ) model.fit() assert model.opt_coefs.size == 1