diff --git a/src/lamatrix/__init__.py b/src/lamatrix/__init__.py index 21eb1a2..0ae5880 100644 --- a/src/lamatrix/__init__.py +++ b/src/lamatrix/__init__.py @@ -36,6 +36,7 @@ def _META_DATA(): import numpy as np +from .bound import * from .combine import * # noqa: E402, F401 from .models.astrophysical import * # noqa: E402, F401 from .models.gaussian import * # noqa: E402, F401 diff --git a/src/lamatrix/bound.py b/src/lamatrix/bound.py new file mode 100644 index 0000000..1dfefdb --- /dev/null +++ b/src/lamatrix/bound.py @@ -0,0 +1,163 @@ +import numpy as np + +from .combine import StackedIndependentGenerator + +__all__ = ["BoundedGenerator"] + + +class BoundedGenerator(StackedIndependentGenerator): + def __init__(self, generator, bounds: list): + self.generator = generator + self.x_name = self.generator.x_name + if isinstance(bounds, slice): + self._latex_bounds = self._slice_bounds_to_latex( + bounds.start, bounds.stop, bounds.step + ) + self.nbounds = (bounds.stop - bounds.start) // bounds.step + + else: + self._latex_bounds = self._bounds_to_latex(bounds) + self.nbounds = len(bounds) + + self.bounds = bounds + self.fit_mu = None + self.fit_sigma = None + + def _bounds_to_latex(self, bounds): + bound_latex = [ + f"""\\[b_{{{idx}}}(\\mathbf{{{self.x_name}}}) = + \\begin{{cases}} + \\mathbf{{{self.x_name}}}, & \\text{{if }} {bound[0]} < \\mathbf{{{self.x_name}}} \\leq {bound[1]} \\\\ + 0, & \\text{{otherwise}} + \\end{{cases}}\\]""" + for idx, bound in enumerate(bounds) + ] + return "\n".join(bound_latex) + + def _slice_bounds_to_latex(self, start, stop, step): + step_str = f"{step}{{\cdot}}" if step != 1 else "" + nbounds = (stop - start) // step + if nbounds < 4: + def_str1 = "0" + else: + def_str1 = ", ".join([f"{s}" for s in np.arange(0, 3)]) + def_str = f"For $i = {def_str1}, \\ldots, {nbounds} $ define:" + bound_latex = [ + f"""b_{{i}}(\\mathbf{{{self.x_name}}}) = + \\begin{{cases}} + \\mathbf{{{self.x_name}}}, & \\text{{if }} ({start} + {step_str}i) < \\mathbf{{{self.x_name}}} \\leq ({start} + {step_str}(i + 1)) \\\\ + 0, & \\text{{otherwise}} + \\end{{cases}}""" + ] + return def_str + "\[" + "\n".join(bound_latex) + "\]" + + def __repr__(self): + str1 = f"{type(self).__name__}({type(self.generator).__name__}({', '.join(list(self.arg_names))}))[n, {self.width}]" + + return str1 + + @property + def mu(self): + return self.prior_mu if self.fit_mu is None else self.fit_mu + + @property + def sigma(self): + return self.prior_sigma if self.fit_sigma is None else self.fit_sigma + + @property + def prior_mu(self): + return np.hstack([self.generator.prior_mu] * self.nbounds) + + @property + def prior_sigma(self): + return np.hstack([self.generator.prior_sigma] * self.nbounds) + + @property + def arg_names(self): + return self.generator.arg_names + + @property + def width(self): + return self.generator.width * self.nbounds + + @property + def _equation(self): + eqn = ["b_{i}(" + eqn + ")" for eqn in self.generator._equation] + return eqn + + @property + def equation(self): + func_signature = ", ".join([f"\\mathbf{{{a}}}" for a in self.arg_names]) + n = self.nbounds + return ( + f"\\[f({func_signature}) = " + + " + ".join( + [ + f"\\sum_{{i=0}}^{{{n}}} w_{{{{i}}, {coeff} }} {e}" + for coeff, e in enumerate(self._equation) + ] + ) + + "\\]" + ) + + def to_latex(self): + return "\n".join([self._latex_bounds, self.equation, self._to_latex_table()]) + + def design_matrix(self, *args, **kwargs): + if not self.arg_names.issubset(set(kwargs.keys())): + raise ValueError(f"Expected {self.arg_names} to be passed.") + x = kwargs.get(self.x_name) + if isinstance(self.bounds, slice): + bounds_list = [ + (a, a + self.bounds.step) + for a in np.arange( + self.bounds.start, self.bounds.stop, self.bounds.step + ) + ] + else: + bounds_list = self.bounds + return np.hstack( + [ + self.generator.design_matrix(x=x) * ((x >= b[0]) & (x < b[1]))[:, None] + for b in bounds_list + ] + ) + + def fit(self, *args, **kwargs): + self.fit_mu, self.fit_sigma = self._fit(*args, **kwargs) + + @property + def table_properties(self): + return [ + ( + "w_{{{i}, {idx}}}", + (self.mu[idx], self.sigma[idx]), + (self.prior_mu[idx], self.prior_sigma[idx]), + ) + for idx in range(self.width) + ] + + def _to_latex_table(self): + latex_table = "\\begin{table}[h!]\n\\centering\n" + latex_table += "\\begin{tabular}{|c|c|c|}\n\\hline\n" + latex_table += "Coefficient & Best Fit & Prior \\\\\\hline\n" + idx = 0 + for tm in self._get_table_matter(): + latex_table += tm.format( + idx=idx % self.nbounds, i=(idx - (idx % self.nbounds)) // self.nbounds + ) + idx += 1 + latex_table += "\\end{tabular}\n\\end{table}" + return latex_table + + def __getitem__(self, key): + g = self.generator.copy() + attrs = ["fit_mu", "fit_sigma"] + for attr in attrs: + setattr( + g, attr, getattr(self, attr).reshape((self.nbounds, len(self)))[key] + ) + return g + + def __len__(self): + return self.width // self.nbounds diff --git a/src/lamatrix/combine.py b/src/lamatrix/combine.py index ed95299..9dbe5c9 100644 --- a/src/lamatrix/combine.py +++ b/src/lamatrix/combine.py @@ -188,6 +188,7 @@ def save(self, filename: str): data_to_store["metadata"] = _META_DATA() json.dump(data_to_store, json_file, indent=4) + class StackedDependentGenerator(StackedIndependentGenerator): @property @@ -238,6 +239,9 @@ def __getitem__(self, key): raise AttributeError( "Can not extract individual generators from a dependent stacked generator." ) - @property + + @property def gradient(self): - raise AttributeError("Can not create a gradient for a dependent stacked generator.") + raise AttributeError( + "Can not create a gradient for a dependent stacked generator." + ) diff --git a/src/lamatrix/generator.py b/src/lamatrix/generator.py index 76e0794..e3b398d 100644 --- a/src/lamatrix/generator.py +++ b/src/lamatrix/generator.py @@ -101,10 +101,8 @@ def copy(self): return deepcopy(self) def __repr__(self): - fit = "" if self.fit_mu is not None else "" - return ( - f"{type(self).__name__}({', '.join(list(self.arg_names))})[n, {self.width}] {fit}" - ) + fit = "fit" if self.fit_mu is not None else "" + return f"{type(self).__name__}({', '.join(list(self.arg_names))})[n, {self.width}] {fit}" # def __add__(self, other): # if isinstance(other, Generator): @@ -128,7 +126,7 @@ def format_significant_figures(mean, error): if error == 0: sig_figures = 0 else: - sig_figures = -int(math.floor(math.log10(abs(error)))) + sig_figures = np.max([0, -int(math.floor(math.log10(abs(error))))]) # Format mean and error to have the same number of decimal places formatted_mean = f"{mean:.{sig_figures}f}" diff --git a/src/lamatrix/models/gaussian.py b/src/lamatrix/models/gaussian.py index 5e69555..a6ee902 100644 --- a/src/lamatrix/models/gaussian.py +++ b/src/lamatrix/models/gaussian.py @@ -169,7 +169,7 @@ def table_properties(self): @property def _equation(self): return [ - f"", + f"\\mathbf{{{self.x_name}}}^0", f"\mathbf{{{self.x_name}}}^{2}", f"\mathbf{{{self.y_name}}}^{2}", f"\mathbf{{{self.x_name}}}\mathbf{{{self.y_name}}}", @@ -189,7 +189,7 @@ def to_latex(self): ) @property - def derivative(self): + def gradient(self): return dlnGaussian2DGenerator( stddev_x=self.stddev_x[0], stddev_y=self.stddev_y[0], @@ -306,4 +306,4 @@ def table_properties(self): def _equation(self): dfdx = f"\\left(-\\frac{{1}}{{1-\\rho^2}}\\left(\\frac{{\\mathbf{{{self.x_name}}}}}{{\\sigma_x^2}} - \\rho\\frac{{\\mathbf{{{self.y_name}}}}}{{\\sigma_x\\sigma_y}}\\right)\\right)" dfdy = f"\\left(-\\frac{{1}}{{1-\\rho^2}}\\left(\\frac{{\\mathbf{{{self.y_name}}}}}{{\\sigma_x^2}} - \\rho\\frac{{\\mathbf{{{self.x_name}}}}}{{\\sigma_x\\sigma_y}}\\right)\\right)" - return ["", dfdx, dfdy] + return [f"\\mathbf{{{self.x_name}}}^0", dfdx, dfdy] diff --git a/src/lamatrix/models/simple.py b/src/lamatrix/models/simple.py index b9a331c..715d75c 100644 --- a/src/lamatrix/models/simple.py +++ b/src/lamatrix/models/simple.py @@ -83,13 +83,18 @@ def _equation(self): eqn = [ f"\mathbf{{{self.x_name}}}^{{{idx}}}" for idx in range(self.polyorder + 1) ] - eqn[0] = "" + # eqn[0] = "" return eqn @property def gradient(self): - return dPolynomial1DGenerator(weights=self.mu, x_name=self.x_name, polyorder=self.polyorder, data_shape=self.data_shape, offset_prior=(self.mu[1], self.sigma[1]) -) + return dPolynomial1DGenerator( + weights=self.mu, + x_name=self.x_name, + polyorder=self.polyorder, + data_shape=self.data_shape, + offset_prior=(self.mu[1], self.sigma[1]), + ) class dPolynomial1DGenerator(MathMixins, Generator): @@ -152,7 +157,12 @@ def design_matrix(self, *args, **kwargs): if not self.arg_names.issubset(set(kwargs.keys())): raise ValueError(f"Expected {self.arg_names} to be passed.") x = kwargs.get(self.x_name).ravel() - return np.vstack([(idx + 1) * w * x**idx for idx, w in zip(range(self.polyorder), self.weights[1:])]).T + return np.vstack( + [ + (idx + 1) * w * x**idx + for idx, w in zip(range(self.polyorder), self.weights[1:]) + ] + ).T def fit(self, *args, **kwargs): self.fit_mu, self.fit_sigma = self._fit(*args, **kwargs) @@ -164,13 +174,13 @@ def offset(self): @property def _equation(self): eqn = [ - f"({idx + 1}v_{{\\textit{{poly}}, {idx + 1}}}\mathbf{{{self.x_name}}}^{{{idx}}})" for idx in range(self.polyorder) + f"({idx + 1}v_{{\\textit{{poly}}, {idx + 1}}}\mathbf{{{self.x_name}}}^{{{idx}}})" + for idx in range(self.polyorder) ] - eqn[0] = "" + # eqn[0] = "" return eqn - class SinusoidGenerator(MathMixins, Generator): def __init__( self, @@ -248,7 +258,7 @@ def frq(term): return np.hstack( [ - "", + f"\\mathbf{{{self.x_name}}}^0", *[ [ f"\sin({frq(idx)}\\mathbf{{{self.x_name}}})", @@ -333,8 +343,12 @@ def design_matrix(self, *args, **kwargs): [ x**0, *[ - [w1*np.cos(x * (idx + 1)), -w2*np.sin(x * (idx + 1))] - for idx, w1, w2 in zip(range(self.nterms), self.weights[1:][::2], self.weights[1:][1::2]) + [w1 * np.cos(x * (idx + 1)), -w2 * np.sin(x * (idx + 1))] + for idx, w1, w2 in zip( + range(self.nterms), + self.weights[1:][::2], + self.weights[1:][1::2], + ) ], ] ).T @@ -349,7 +363,7 @@ def frq(term): return np.hstack( [ - "", + f"\\mathbf{{{self.x_name}}}^0", *[ [ f"v_{{\\textit{{sin}}, {idx * 2}}}\cos({frq(idx)}\\mathbf{{{self.x_name}}})", diff --git a/src/lamatrix/models/spline.py b/src/lamatrix/models/spline.py index 008aa29..24f043b 100644 --- a/src/lamatrix/models/spline.py +++ b/src/lamatrix/models/spline.py @@ -150,7 +150,7 @@ def offset(self): @property def _equation(self): return [ - "", + f"\\mathbf{{{self.x_name}}}^0", *[ f"N_{{{idx},k}}(\\mathbf{{{self.x_name}}})" for idx in np.arange(1, self.width) @@ -277,6 +277,6 @@ def table_properties(self): @property def _equation(self): return [ - "", + f"\\mathbf{{{self.x_name}}}^0", f"\\frac{{\\partial \\left( \\sum_{{i=1}}^{{{len(self.knots) - self.splineorder}}} v_{{\\textit{{spline}}, i}} N_{{i,{self.splineorder}}}(\\mathbf{{{self.x_name}}})\\right)}}{{\\partial \mathbf{{{self.x_name}}}}}", ] diff --git a/tests/test_generator.py b/tests/test_generator.py index a0e4af1..074204f 100644 --- a/tests/test_generator.py +++ b/tests/test_generator.py @@ -1,14 +1,9 @@ import numpy as np -from lamatrix import ( - Polynomial1DGenerator, - SinusoidGenerator, - Spline1DGenerator, - StackedIndependentGenerator, - dlnGaussian2DGenerator, - lnGaussian2DGenerator, - load, -) +from lamatrix import (BoundedGenerator, Polynomial1DGenerator, + SinusoidGenerator, Spline1DGenerator, + StackedIndependentGenerator, dlnGaussian2DGenerator, + lnGaussian2DGenerator, load) def test_lngauss(): @@ -71,7 +66,7 @@ def test_poly1d(): x = (np.arange(200) - 200) / 200 x2 = (np.arange(-300, 300, 10) - 200) / 200 - data = (4.658*x**3 + 1.23*x**2 + 3 * x + 0.3) + data = 4.658 * x**3 + 1.23 * x**2 + 3 * x + 0.3 true_g = np.gradient(data, x) data += np.random.normal(0, 0.01, size=200) # s = np.random.choice(np.arange(200), size=5) @@ -83,29 +78,31 @@ def test_poly1d(): outlier_mask = np.abs(data - g.evaluate(x=x)) / errors < 3 g.fit(x=x, data=data, errors=errors, mask=outlier_mask) - #assert np.allclose([0.3, 3, 1.23, 4.658], g.mu, atol=g.sigma * 2) + # assert np.allclose([0.3, 3, 1.23, 4.658], g.mu, atol=g.sigma * 2) g.evaluate(x=x2) dg = g.gradient model_g = g.gradient.design_matrix(x=x).dot([1, 1, 1]) - analytical_grad = (3*4.658*x**2 + 2*1.23*x**1 + 3) + analytical_grad = 3 * 4.658 * x**2 + 2 * 1.23 * x**1 + 3 np.allclose(true_g[1:-1], analytical_grad[1:-1], model_g[1:-1], atol=0.001) + def test_sinusoid(): true_w = np.random.normal(size=3) - x = np.arange(-2*np.pi, 2*np.pi, 0.01) + x = np.arange(-2 * np.pi, 2 * np.pi, 0.01) p = SinusoidGenerator() data = p.design_matrix(x=x).dot(true_w) data += np.random.normal(0, 0.05, size=len(x)) errors = np.zeros(len(x)) + 0.05 p.fit(x=x, data=data, errors=errors) - np.allclose(true_w, p.mu, atol=p.sigma*3) + np.allclose(true_w, p.mu, atol=p.sigma * 3) true_g = np.gradient(p.design_matrix(x=x).dot(true_w), x) g = p.gradient.design_matrix(x=x)[:, 1:].dot([1, 1]) assert np.allclose(true_g[1:-1], g[1:-1], atol=0.005) + def test_polycombine(): c, r = np.meshgrid(np.arange(-10, 10, 0.2), np.arange(-10, 10, 0.2), indexing="ij") @@ -125,7 +122,7 @@ def test_polycombine(): def test_spline(): x = np.linspace(0, 10, 100) # Independent variable - true_g = np.gradient(np.sin(2*x), x) + true_g = np.gradient(np.sin(2 * x), x) y = np.random.normal(0, 0.1, 100) + np.sin( 2 * (x) ) # Dependent variable, replace with your time-series data @@ -142,7 +139,9 @@ def test_spline(): dmodel.fit(x=x, data=y2 - model.evaluate(x=x), errors=ye) assert np.abs((dmodel.shift_x[0] - 0.2)) / dmodel.shift_x[1] < 10 - assert np.allclose(true_g[10:-10], dmodel.design_matrix(x=x)[:, 1:].dot([1])[10:-10], atol=1) + assert np.allclose( + true_g[10:-10], dmodel.design_matrix(x=x)[:, 1:].dot([1])[10:-10], atol=1 + ) s1 = Spline1DGenerator(knots=np.arange(-10, 10, 3), x_name="x") s2 = Spline1DGenerator(knots=np.arange(-10, 10, 3), x_name="y") @@ -167,3 +166,17 @@ def test_save(): assert p[0].x_name == "c" assert p[1].x_name == "r" assert p[0].polyorder == 2 + + +def test_bounded(): + p = Polynomial1DGenerator("x", polyorder=2, prior_sigma=[100, 100, 100]) + bg = BoundedGenerator(p, slice(-10, 10, 5)) + x = np.arange(-10, 10, 0.1) + true_w = np.random.normal(0, 1, size=3) + data = p.design_matrix(x=x).dot(true_w) + bg.fit(x=x, data=data) + assert np.allclose( + bg.fit_mu.reshape((bg.nbounds, len(bg))), + true_w, + atol=bg.fit_sigma.reshape((bg.nbounds, len(bg))) * 2, + )