Skip to content

Commit

Permalink
Switch to eigh from eig to avoid complex value
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengp0 committed Jan 18, 2024
1 parent 75930a8 commit 9b2529b
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 78 deletions.
140 changes: 78 additions & 62 deletions src/regmod/models/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,25 +127,29 @@ class Model:
param_names: Tuple[str] = None
default_param_specs: Dict[str, Dict] = None

def __init__(self,
data: Data,
params: Optional[List[Parameter]] = None,
param_specs: Optional[Dict[str, Dict]] = None):
def __init__(
self,
data: Data,
params: Optional[List[Parameter]] = None,
param_specs: Optional[Dict[str, Dict]] = None,
):
if params is None and param_specs is None:
raise ValueError("Must provide `params` or `param_specs`")

if params is not None:
param_dict = {
param.name: param
for param in params
}
self.params = [param_dict[param_name]
for param_name in self.param_names]
param_dict = {param.name: param for param in params}
self.params = [param_dict[param_name] for param_name in self.param_names]
else:
self.params = [Parameter(param_name,
**{**self.default_param_specs[param_name],
**param_specs[param_name]})
for param_name in self.param_names]
self.params = [
Parameter(
param_name,
**{
**self.default_param_specs[param_name],
**param_specs[param_name],
},
)
for param_name in self.param_names
]

self.data = data
if not self.data.is_empty():
Expand Down Expand Up @@ -199,19 +203,23 @@ def get_vcov(self, coefs: ndarray) -> ndarray:
hessian = self.hessian(coefs)
if isinstance(hessian, Matrix):
hessian = hessian.to_numpy()
eig_vals, eig_vecs = np.linalg.eig(hessian)
eig_vals, eig_vecs = np.linalg.eigh(hessian)
if np.isclose(eig_vals, 0.0).any():
raise ValueError("singular Hessian matrix, please add priors or "
"reduce number of variables")
raise ValueError(
"singular Hessian matrix, please add priors or "
"reduce number of variables"
)
inv_hessian = (eig_vecs / eig_vals).dot(eig_vecs.T)

jacobian2 = self.jacobian2(coefs)
if isinstance(jacobian2, Matrix):
jacobian2 = jacobian2.to_numpy()
eig_vals = np.linalg.eigvals(jacobian2)
eig_vals = np.linalg.eigvalsh(jacobian2)
if np.isclose(eig_vals, 0.0).any():
raise ValueError("singular Jacobian matrix, please add priors or "
"reduce number of variables")
raise ValueError(
"singular Jacobian matrix, please add priors or "
"reduce number of variables"
)

vcov = inv_hessian.dot(jacobian2)
vcov = inv_hessian.dot(vcov.T)
Expand Down Expand Up @@ -317,8 +325,10 @@ def get_params(self, coefs: ndarray) -> List[ndarray]:
The parameters.
"""
coefs = self.split_coefs(coefs)
return [param.get_param(coefs[i], self.data, mat=self.mat[i])
for i, param in enumerate(self.params)]
return [
param.get_param(coefs[i], self.data, mat=self.mat[i])
for i, param in enumerate(self.params)
]

def get_dparams(self, coefs: ndarray) -> List[ndarray]:
"""Get the derivative of the parameters.
Expand All @@ -334,8 +344,10 @@ def get_dparams(self, coefs: ndarray) -> List[ndarray]:
The derivative of the parameters.
"""
coefs = self.split_coefs(coefs)
return [param.get_dparam(coefs[i], self.data, mat=self.mat[i])
for i, param in enumerate(self.params)]
return [
param.get_dparam(coefs[i], self.data, mat=self.mat[i])
for i, param in enumerate(self.params)
]

def get_d2params(self, coefs: ndarray) -> List[ndarray]:
"""Get the second order derivative of the parameters.
Expand All @@ -351,8 +363,10 @@ def get_d2params(self, coefs: ndarray) -> List[ndarray]:
The second order derivative of the parameters.
"""
coefs = self.split_coefs(coefs)
return [param.get_d2param(coefs[i], self.data, mat=self.mat[i])
for i, param in enumerate(self.params)]
return [
param.get_d2param(coefs[i], self.data, mat=self.mat[i])
for i, param in enumerate(self.params)
]

def nll(self, params: List[ndarray]) -> ndarray:
"""Negative log likelihood.
Expand Down Expand Up @@ -400,9 +414,7 @@ def d2nll(self, params: List[ndarray]) -> List[List[ndarray]]:
"""
raise NotImplementedError()

def get_ui(self,
params: List[ndarray],
bounds: Tuple[float, float]) -> ndarray:
def get_ui(self, params: List[ndarray], bounds: Tuple[float, float]) -> ndarray:
"""Get uncertainty interval, used for the trimming algorithm.
Parameters
Expand All @@ -419,9 +431,7 @@ def get_ui(self,
"""
raise NotImplementedError()

def detect_outliers(self,
coefs: ndarray,
bounds: Tuple[float, float]) -> ndarray:
def detect_outliers(self, coefs: ndarray, bounds: Tuple[float, float]) -> ndarray:
"""Detect outliers.
Parameters
Expand Down Expand Up @@ -453,9 +463,12 @@ def objective_from_gprior(self, coefs: ndarray) -> float:
float
Objective function value.
"""
val = 0.5*np.sum((coefs - self.gvec[0])**2/self.gvec[1]**2)
val = 0.5 * np.sum((coefs - self.gvec[0]) ** 2 / self.gvec[1] ** 2)
if self.linear_gvec.size > 0:
val += 0.5*np.sum((self.linear_gmat.dot(coefs) - self.linear_gvec[0])**2/self.linear_gvec[1]**2)
val += 0.5 * np.sum(
(self.linear_gmat.dot(coefs) - self.linear_gvec[0]) ** 2
/ self.linear_gvec[1] ** 2
)
return val

def gradient_from_gprior(self, coefs: ndarray) -> ndarray:
Expand All @@ -471,9 +484,11 @@ def gradient_from_gprior(self, coefs: ndarray) -> ndarray:
ndarray
Graident vector.
"""
grad = (coefs - self.gvec[0])/self.gvec[1]**2
grad = (coefs - self.gvec[0]) / self.gvec[1] ** 2
if self.linear_gvec.size > 0:
grad += (self.linear_gmat.T/self.linear_gvec[1]**2).dot(self.linear_gmat.dot(coefs) - self.linear_gvec[0])
grad += (self.linear_gmat.T / self.linear_gvec[1] ** 2).dot(
self.linear_gmat.dot(coefs) - self.linear_gvec[0]
)
return grad

def hessian_from_gprior(self) -> ndarray:
Expand All @@ -484,15 +499,17 @@ def hessian_from_gprior(self) -> ndarray:
ndarray
Hessian matrix.
"""
hess = np.diag(1.0/self.gvec[1]**2)
hess = np.diag(1.0 / self.gvec[1] ** 2)
if self.linear_gvec.size > 0:
hess += (self.linear_gmat.T/self.linear_gvec[1]**2).dot(self.linear_gmat)
hess += (self.linear_gmat.T / self.linear_gvec[1] ** 2).dot(
self.linear_gmat
)
return hess

def get_nll_terms(self, coefs: ndarray) -> ndarray:
params = self.get_params(coefs)
nll_terms = self.nll(params)
nll_terms = self.data.weights*nll_terms
nll_terms = self.data.weights * nll_terms
return nll_terms

def objective(self, coefs: ndarray) -> float:
Expand All @@ -509,8 +526,7 @@ def objective(self, coefs: ndarray) -> float:
Objective value.
"""
nll_terms = self.get_nll_terms(coefs)
return self.data.trim_weights.dot(nll_terms) + \
self.objective_from_gprior(coefs)
return self.data.trim_weights.dot(nll_terms) + self.objective_from_gprior(coefs)

def gradient(self, coefs: ndarray) -> ndarray:
"""Gradient function.
Expand All @@ -528,11 +544,10 @@ def gradient(self, coefs: ndarray) -> ndarray:
params = self.get_params(coefs)
dparams = self.get_dparams(coefs)
grad_params = self.dnll(params)
weights = self.data.weights*self.data.trim_weights
return np.hstack([
dparams[i].T.dot(weights*grad_params[i])
for i in range(self.num_params)
]) + self.gradient_from_gprior(coefs)
weights = self.data.weights * self.data.trim_weights
return np.hstack(
[dparams[i].T.dot(weights * grad_params[i]) for i in range(self.num_params)]
) + self.gradient_from_gprior(coefs)

def hessian(self, coefs: ndarray) -> ndarray:
"""Hessian function.
Expand All @@ -552,14 +567,16 @@ def hessian(self, coefs: ndarray) -> ndarray:
d2params = self.get_d2params(coefs)
grad_params = self.dnll(params)
hess_params = self.d2nll(params)
weights = self.data.weights*self.data.trim_weights
weights = self.data.weights * self.data.trim_weights
hess = [
[(dparams[i].T*(weights*hess_params[i][j])).dot(dparams[j])
for j in range(self.num_params)]
[
(dparams[i].T * (weights * hess_params[i][j])).dot(dparams[j])
for j in range(self.num_params)
]
for i in range(self.num_params)
]
for i in range(self.num_params):
hess[i][i] += np.tensordot(weights*grad_params[i], d2params[i], axes=1)
hess[i][i] += np.tensordot(weights * grad_params[i], d2params[i], axes=1)
return np.block(hess) + self.hessian_from_gprior()

def jacobian2(self, coefs: ndarray) -> ndarray:
Expand All @@ -578,17 +595,14 @@ def jacobian2(self, coefs: ndarray) -> ndarray:
params = self.get_params(coefs)
dparams = self.get_dparams(coefs)
grad_params = self.dnll(params)
weights = self.data.weights*self.data.trim_weights
jacobian = np.vstack([
dparams[i].T*(weights*grad_params[i])
for i in range(self.num_params)
])
weights = self.data.weights * self.data.trim_weights
jacobian = np.vstack(
[dparams[i].T * (weights * grad_params[i]) for i in range(self.num_params)]
)
jacobian2 = jacobian.dot(jacobian.T) + self.hessian_from_gprior()
return jacobian2

def fit(self,
optimizer: Callable = scipy_optimize,
**optimizer_options):
def fit(self, optimizer: Callable = scipy_optimize, **optimizer_options):
"""Fit function.
Parameters
Expand Down Expand Up @@ -631,7 +645,9 @@ def predict(self, df: Optional[pd.DataFrame] = None) -> pd.DataFrame:
return df

def __repr__(self) -> str:
return (f"{type(self).__name__}("
f"bnum_obs={self.data.num_obs}, "
f"num_params={self.num_params}, "
f"size={self.size})")
return (
f"{type(self).__name__}("
f"bnum_obs={self.data.num_obs}, "
f"num_params={self.num_params}, "
f"size={self.size})"
)
34 changes: 18 additions & 16 deletions tests/test_tobitmodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def data(df):
def param_specs():
specs = {
"mu": {"variables": [Variable("x"), Variable("intercept")]},
"sigma": {"variables": [Variable("intercept")]}
"sigma": {"variables": [Variable("intercept")]},
}
return specs

Expand All @@ -50,8 +50,7 @@ def test_neg_obs(df, param_specs):
"""ValueError if data contains negative observations."""
with pytest.raises(ValueError, match="requires non-negative observations"):
TobitModel(
data=Data(col_obs="y", col_covs=["x"], df=df),
param_specs=param_specs
data=Data(col_obs="y", col_covs=["x"], df=df), param_specs=param_specs
)


Expand All @@ -62,8 +61,8 @@ def test_vcov_output(model):

# Old version
H = model.hessian(coefs)
eig_vals, eig_vecs = np.linalg.eig(H)
inv_H = (eig_vecs/eig_vals).dot(eig_vecs.T)
eig_vals, eig_vecs = np.linalg.eigh(H)
inv_H = (eig_vecs / eig_vals).dot(eig_vecs.T)
J = model.jacobian2(coefs)
vcov_old = inv_H.dot(J)
vcov_old = inv_H.dot(vcov_old.T)
Expand All @@ -84,18 +83,19 @@ def test_pred_values(model):

def test_model_no_variables():
num_obs = 5
df = pd.DataFrame({
"obs": np.random.rand(num_obs)*10,
"offset": np.ones(num_obs),
})
df = pd.DataFrame(
{
"obs": np.random.rand(num_obs) * 10,
"offset": np.ones(num_obs),
}
)
data = Data(
col_obs="obs",
col_offset="offset",
df=df,
)
model = TobitModel(
data,
param_specs={"mu": {"offset": "offset"}, "sigma": {"offset": "offset"}}
data, param_specs={"mu": {"offset": "offset"}, "sigma": {"offset": "offset"}}
)
coefs = np.array([])
grad = model.gradient(coefs)
Expand All @@ -109,10 +109,12 @@ def test_model_no_variables():

def test_model_one_variable():
num_obs = 5
df = pd.DataFrame({
"obs": np.random.rand(num_obs)*10,
"offset": np.ones(num_obs),
})
df = pd.DataFrame(
{
"obs": np.random.rand(num_obs) * 10,
"offset": np.ones(num_obs),
}
)
data = Data(
col_obs="obs",
col_offset="offset",
Expand All @@ -123,7 +125,7 @@ def test_model_one_variable():
param_specs={
"sigma": {"offset": "offset"},
"mu": {"variables": [Variable("intercept")]},
}
},
)
model.fit()
assert model.opt_coefs.size == 1

0 comments on commit 9b2529b

Please sign in to comment.