Skip to content

Commit

Permalink
complex Watson with test case
Browse files Browse the repository at this point in the history
  • Loading branch information
LukasDrude committed Aug 15, 2018
1 parent 72c07ed commit 9ed3c96
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 69 deletions.
1 change: 1 addition & 0 deletions dc_integration/distribution/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
ComplexAngularCentralGaussian,
ComplexAngularCentralGaussianTrainer,
)
from .complex_watson import ComplexWatson, ComplexWatsonTrainer
from .vmfmm import VMFMM, VMFMMTrainer
from .cacgmm import CACGMM, CACGMMTrainer
from .gcacgmm import GCACGMM, GCACGMMTrainer
186 changes: 122 additions & 64 deletions dc_integration/distribution/complex_watson.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@

from scipy.interpolate import interp1d
from scipy.special import hyp1f1
from cached_property import cached_property

import numpy as np

from dc_integration.distribution.utils import _Parameter
from dc_integration.distribution.utils import _ProbabilisticModel
from dc_integration.utils import is_broadcast_compatible

from dc_integration.utils import get_power_spectral_density_matrix, get_pca

@dataclass
class ComplexWatsonParameters(_Parameter):
mode: np.array = None # Shape (..., D)
concentration: np.array = None # Shape (...)


class ComplexWatson:
@dataclass
class ComplexWatson(_ProbabilisticModel):
"""
>>> # import dc_integration.distributions.complex_watson as cw
>>> import numpy as np
>>> import matplotlib.pyplot as plt
>>> scales = [
Expand All @@ -39,8 +37,10 @@ class ComplexWatson:
>>> _ = plt.show()
"""

@staticmethod
def pdf(x, loc, scale):
mode: np.array = None # Shape (..., D)
concentration: np.array = None # Shape (...)

def pdf(self, x):
""" Calculates pdf function.
Args:
Expand All @@ -50,10 +50,9 @@ def pdf(x, loc, scale):
Returns:
"""
return np.exp(ComplexWatson.log_pdf(x, loc, scale))
return np.exp(ComplexWatson.log_pdf(x))

@staticmethod
def log_pdf(x, loc, scale):
def log_pdf(self, x):
""" Calculates logarithm of pdf function.
Args:
Expand All @@ -63,14 +62,10 @@ def log_pdf(x, loc, scale):
Returns:
"""
# For now, we assume that the caller does proper expansion
assert x.ndim == loc.ndim
assert x.ndim - 1 == scale.ndim

result = np.einsum('...d,...d', x, loc.conj())
result = np.einsum("...d,...d", x, self.mode[..., None, :].conj())
result = result.real ** 2 + result.imag ** 2
result *= scale
result -= ComplexWatson.log_norm(scale, x.shape[-1])
result *= self.concentration[..., None]
result -= self.log_norm()
return result

@staticmethod
Expand All @@ -87,9 +82,10 @@ def log_norm_low_concentration(scale, dimension):
b_range = np.asarray(b_range)[None, :]

return (
np.log(2) + dimension * np.log(np.pi) -
np.log(math.factorial(dimension - 1)) +
np.log(1 + np.sum(np.cumprod(scale[:, None] / b_range, -1), -1))
np.log(2)
+ dimension * np.log(np.pi)
- np.log(math.factorial(dimension - 1))
+ np.log(1 + np.sum(np.cumprod(scale[:, None] / b_range, -1), -1))
).reshape(shape)

@staticmethod
Expand All @@ -109,13 +105,18 @@ def log_norm_medium_concentration(scale, dimension):
r = np.asarray(r_range)[None, :]

# Mardia1999Watson Equation 3
temp = scale[:, None] ** r * np.exp(-scale[:, None]) / \
np.asarray([math.factorial(_r) for _r in r_range])
temp = (
scale[:, None] ** r
* np.exp(-scale[:, None])
/ np.asarray([math.factorial(_r) for _r in r_range])
)

return (
np.log(2.) + dimension * np.log(np.pi) +
(1. - dimension) * np.log(scale) + scale +
np.log(1. - np.sum(temp, -1))
np.log(2.)
+ dimension * np.log(np.pi)
+ (1. - dimension) * np.log(scale)
+ scale
+ np.log(1. - np.sum(temp, -1))
).reshape(shape)

@staticmethod
Expand All @@ -128,12 +129,14 @@ def log_norm_high_concentration(scale, dimension):
scale = scale.ravel()

return (
np.log(2.) + dimension * np.log(np.pi) +
(1. - dimension) * np.log(scale) + scale
np.log(2.)
+ dimension * np.log(np.pi)
+ (1. - dimension) * np.log(scale)
+ scale
).reshape(shape)

@staticmethod
def log_norm_1F1(scale, dimension):
def log_norm_1f1(scale, dimension):
# This is already good.
# I am unsure if the others are better.
# In https://github.com/scipy/scipy/issues/2957
Expand All @@ -142,7 +145,7 @@ def log_norm_1F1(scale, dimension):
# With scale > 800 this function makes problems.
# Normally scale is thresholded by 100
norm = hyp1f1(1, dimension, scale) * (
2*np.pi**dimension / math.factorial(dimension-1)
2 * np.pi ** dimension / math.factorial(dimension - 1)
)
return np.log(norm)

Expand All @@ -153,77 +156,132 @@ def log_norm_tran_vu(scale, dimension):
scale = scale.ravel()

log_c_high = (
np.log(2.)
+ dimension * np.log(np.pi)
+ (1. - dimension) * np.log(scale)
+ scale
np.log(2.)
+ dimension * np.log(np.pi)
+ (1. - dimension) * np.log(scale)
+ scale
)

r_range = np.arange(dimension - 2 + 1)
r = r_range[None, :]
# Mardia1999Watson Equation 3
temp = scale[:, None] ** r * np.exp(-scale[:, None]) / \
np.array([math.factorial(_r) for _r in r_range])

log_c_medium = log_c_high + (
+ np.log(1. - np.sum(temp, -1))
temp = (
scale[:, None] ** r
* np.exp(-scale[:, None])
/ np.array([math.factorial(_r) for _r in r_range])
)

log_c_medium = log_c_high + (+np.log(1. - np.sum(temp, -1)))

# Mardia1999Watson Equation 4, Taylor series
b_range = np.arange(dimension, dimension + 20 - 1 + 1)
b_range = b_range[None, :]

log_c_low = (
np.log(2)
+ dimension * np.log(np.pi)
- np.log(math.factorial(dimension - 1))
+ np.log(
1
+ np.sum(np.cumprod(scale[:, None] / b_range, -1), -1)
)
np.log(2)
+ dimension * np.log(np.pi)
- np.log(math.factorial(dimension - 1))
+ np.log(1 + np.sum(np.cumprod(scale[:, None] / b_range, -1), -1))
)

# A good boundary between low and medium is kappa = 1/D. This can be
# motivated by plotting each term for very small values and for
# values from [0.1, 1].
# A good boundary between medium and high is when
# kappa * exp(kappa) < epsilon. Choosing kappa = 100 is sufficient.
log_c_low[scale >= 1/dimension] = log_c_medium[scale >= 1/dimension]
log_c_low[scale >= 1 / dimension] = log_c_medium[
scale >= 1 / dimension
]
log_c_low[scale >= 100] = log_c_medium[scale >= 100]
return log_c_low.reshape(shape)

log_norm = log_norm_1F1
def log_norm(self):
return self.log_norm_1f1(self.concentration, self.mode.shape[-1])


def __init__(self, dimension, max_concentration=100, spline_markers=100):
class ComplexWatsonTrainer:
def __init__(
self, dimension=None, max_concentration=100, spline_markers=100
):
"""
Args:
dimension: Feature dimension. If you do not provide this when
initializing the trainer, it will be inferred when the fit
function is called.
max_concentration:
spline_markers:
"""
self.dimension = dimension
self.max_concentration = max_concentration
self.spline_markers = spline_markers
self.spline = self._get_spline()

def _get_spline(self):
"""Defines a qubic spline to fit concentration parameter."""
@cached_property
def spline(self):
"""Defines a cubic spline to fit concentration parameter."""
assert self.dimension is not None, (
"You need to specify dimension. This can be done at object "
"instantiation or it can be inferred when using the fit function."
)
x = np.logspace(
-3, np.log10(self.max_concentration), self.spline_markers
)
y = (
hyp1f1(2, self.dimension + 1, x)
/ (self.dimension * hyp1f1(1, self.dimension, x))
y = hyp1f1(2, self.dimension + 1, x) / (
self.dimension * hyp1f1(1, self.dimension, x)
)
return interp1d(
y, x,
kind='quadratic', assume_sorted=True, bounds_error=False,
fill_value=(0, self.max_concentration)
y,
x,
kind="quadratic",
assume_sorted=True,
bounds_error=False,
fill_value=(0, self.max_concentration),
)

def hypergeometric_ratio_inverse(self, eigenvalues):
"""
This is twice as slow as interpolation with Tran Vu's C-code.
>>> cw = ComplexWatson(5)
>>> cw.hypergeometric_ratio_inverse([0, 1/5, 1/5 + 1e-4, 0.9599999, 1])
>>> t = ComplexWatsonTrainer(5)
>>> t.hypergeometric_ratio_inverse([0, 1/5, 1/5 + 1e-4, 0.9599999, 1])
"""

return self.spline(eigenvalues)

def fit(self) -> ComplexWatsonParameters:
pass
def fit(self, x, saliency=None) -> ComplexWatson:
assert np.iscomplexobj(x), x.dtype
x = x / np.maximum(
np.linalg.norm(x, axis=-1, keepdims=True), np.finfo(x.dtype).tiny
)
if saliency is not None:
assert is_broadcast_compatible(x.shape[:-1], saliency.shape), (
x.shape,
saliency.shape,
)
return self._fit(x, saliency=saliency)

def _fit(self, x, saliency) -> ComplexWatson:
if self.dimension is None:
self.dimension = x.shape[-1]
else:
assert self.dimension == x.shape[-1], (
"You initialized the trainer with a different dimension than "
"you are using to fit a model. Use a new trainer, when you "
"change the dimension."
)

if saliency is None:
covariance = np.einsum(
"...nd,...nD->...dD", x, x.conj()
)
denominator = np.array(x.shape[-2])
else:
covariance = np.einsum(
"...n,...nd,...nD->...dD", saliency, x, x.conj()
)
denominator = np.einsum("...n->...", saliency)[..., None, None]

covariance /= denominator
mode, eigenvalues = get_pca(covariance)
concentration = self.hypergeometric_ratio_inverse(eigenvalues)
return ComplexWatson(mode=mode, concentration=concentration)
7 changes: 2 additions & 5 deletions dc_integration/distribution/cwmm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@
get_power_spectral_density_matrix,
get_pca,
)
from dc_integration.distribution.utils import (
_unit_norm,
_Parameter,
)
from dc_integration.distribution.utils import _ProbabilisticModel


@dataclass
class ComplexWatsonMixtureModelParameters(_Parameter):
class ComplexWatsonMixtureModelParameters(_ProbabilisticModel):
complex_watson: ComplexWatsonParameters \
= field(default_factory=ComplexWatsonParameters)
mixture_weights: np.array = None
Expand Down
20 changes: 20 additions & 0 deletions tests/test_distribution/test_complex_watson.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import numpy as np
from numpy.testing import assert_equal
import unittest
from dc_integration.distribution import (
ComplexAngularCentralGaussian,
ComplexWatsonTrainer,
)


class TestComplexWatson(unittest.TestCase):
def test_complex_watson_shapes(self):
covariance = np.array(
[[10, 1 + 1j, 1 + 1j], [1 - 1j, 5, 1], [1 - 1j, 1, 2]]
)
covariance /= np.trace(covariance)
model = ComplexAngularCentralGaussian(covariance=covariance)
x = model.sample(size=(10000,))
model = ComplexWatsonTrainer().fit(x)
assert_equal(model.mode.shape, (3,))
assert_equal(model.concentration.shape, ())

0 comments on commit 9ed3c96

Please sign in to comment.