Skip to content

Commit

Permalink
Merge pull request #17 from LRydin/dev
Browse files Browse the repository at this point in the history
Type hinting, PEP adjustments, full=True output.
  • Loading branch information
LRydin committed Aug 25, 2023
2 parents 0b03a38 + 9fb0abf commit 7e9c370
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 44 deletions.
60 changes: 31 additions & 29 deletions kramersmoyal/binning.py
@@ -1,32 +1,13 @@
from scipy.sparse import csr_matrix
import numpy as np


def bincount1(x, weights, minlength=0):
return np.array(
[np.bincount(x, w, minlength=minlength) for w in weights])


def bincount2(x, weights, minlength=0):
# Small speedup if # of weights is large
assert len(x.shape) == 1

ans_size = x.max() + 1

if (ans_size < minlength):
ans_size = minlength

csr = csr_matrix((np.ones(x.shape[0]), (np.arange(x.shape[0]), x)), shape=[
x.shape[0], ans_size])
return weights * csr


_range = range


# An alternative to Numpy's histogramdd, supporting a weights matrix
# Part of the following code is licensed under the BSD-3 License (from Numpy)
def histogramdd(sample, bins=10, range=None, normed=None, weights=None, density=None, bw=0.0):
# An alternative to Numpy's histogramdd, supporting a weights matrix.
# Part of the following code is licensed under the BSD-3 License (from Numpy).
def histogramdd(sample: np.ndarray, bins: int=10, range: str=None,
normed: str=None, weights: np.ndarray=None, density: np.ndarray=None,
bw: float=0.0) -> (np.ndarray, np.ndarray):

def _get_outer_edges(a, range, bw):
"""
Expand All @@ -37,18 +18,20 @@ def _get_outer_edges(a, range, bw):
first_edge, last_edge = range
if first_edge > last_edge:
raise ValueError(
'max must be larger than min in range parameter.')
'Max must be larger than min in range parameter')
if not (np.isfinite(first_edge) and np.isfinite(last_edge)):
raise ValueError(
"supplied range of [{}, {}] is not finite".format(first_edge, last_edge))
'Supplied range of [{}, {}] '
' is not finite'.format(first_edge, last_edge))
elif a.size == 0:
# handle empty arrays. Can't determine range, so use 0-1.
first_edge, last_edge = 0, 1
else:
first_edge, last_edge = a.min() - bw, a.max() + bw
if not (np.isfinite(first_edge) and np.isfinite(last_edge)):
raise ValueError(
"autodetected range of [{}, {}] is not finite".format(first_edge, last_edge))
'Autodetected range of [{}, {}] '
' is not finite'.format(first_edge, last_edge))

# expand empty range to avoid divide by zero
if first_edge == last_edge:
Expand Down Expand Up @@ -76,7 +59,7 @@ def _get_outer_edges(a, range, bw):
if M != D:
raise ValueError(
'The dimension of bins must be equal to the dimension of the '
' sample x.')
' sample x')
except TypeError:
# bins is an integer
bins = D * [bins]
Expand All @@ -85,7 +68,7 @@ def _get_outer_edges(a, range, bw):
if range is None:
range = (None,) * D
elif len(range) != D:
raise ValueError('range argument must have one entry per dimension')
raise ValueError('Range argument must have one entry per dimension')

# Create edge arrays
for i in _range(D):
Expand Down Expand Up @@ -182,3 +165,22 @@ def _get_outer_edges(a, range, bw):
raise RuntimeError(
"Internal Shape Error")
return hist, edges


def bincount1(x, weights, minlength=0):
return np.array(
[np.bincount(x, w, minlength=minlength) for w in weights])


def bincount2(x, weights, minlength=0):
# Small speedup if # of weights is large
assert len(x.shape) == 1

ans_size = x.max() + 1

if (ans_size < minlength):
ans_size = minlength

csr = csr_matrix((np.ones(x.shape[0]), (np.arange(x.shape[0]), x)), shape=[
x.shape[0], ans_size])
return weights * csr
8 changes: 3 additions & 5 deletions kramersmoyal/kernels.py
Expand Up @@ -15,7 +15,7 @@ def kernel(kernel_func):
https://en.wikipedia.org/wiki/Kernel_(statistics)
"""
@wraps(kernel_func) # just for naming
def decorated(x, bw):
def decorated(x: np.ndarray, bw: float):
def volume_unit_ball(dims: int):
# volume of a unit ball in dimension dims
return np.pi ** (dims / 2.0) / gamma(dims / 2.0 + 1.0)
Expand All @@ -28,10 +28,10 @@ def volume_unit_ball(dims: int):
# Euclidean norm
dist = np.sqrt((x * x).sum(axis=-1))

return kernel_func(dist / bw, dims) / (bw ** dims) / volume_unit_ball(dims)
return kernel_func(dist / bw, dims) \
/ (bw ** dims) / volume_unit_ball(dims)
return decorated


@kernel
def epanechnikov(x: np.ndarray, dims: int) -> np.ndarray:
"""
Expand Down Expand Up @@ -79,7 +79,6 @@ def triagular(x: np.ndarray, dims: int) -> np.ndarray:
kernel[mask] = (1.0 - np.abs(x[mask])) / normalisation
return kernel


@kernel
def quartic(x: np.ndarray, dims: int) -> np.ndarray:
"""
Expand All @@ -92,7 +91,6 @@ def quartic(x: np.ndarray, dims: int) -> np.ndarray:
kernel[mask] = ((1.0 - x2[mask]) ** 2) / normalisation
return kernel


def silvermans_rule(timeseries: np.ndarray) -> float:
n, dims = timeseries.shape
sigma = np.std(timeseries, axis=0)
Expand Down
29 changes: 20 additions & 9 deletions kramersmoyal/kmc.py
Expand Up @@ -8,7 +8,8 @@

def km(timeseries: np.ndarray, bins: str='default', powers: int=4,
kernel: callable=epanechnikov, bw: float=None, tol: float=1e-10,
conv_method: str='auto', center_edges: bool=True) -> np.ndarray:
conv_method: str='auto', center_edges: bool=True,
full: bool=False) -> (np.ndarray, np.ndarray):
"""
Estimates the Kramers─Moyal coefficients from a timeseries using a kernel
estimator method. `km` can calculate the Kramers─Moyal coefficients for a
Expand Down Expand Up @@ -57,8 +58,8 @@ def km(timeseries: np.ndarray, bins: str='default', powers: int=4,
powers = np.array([[0, 0, 1, 1, 0, 1, 2, 2, 2],
[0, 1, 0, 1, 2, 2, 0, 1, 2]]).T
Set `verbose=True` to print out `powers`. The order that they appear
dictactes the order in the output `kmc`.
Set `full=True` to output `powers`. The order that they appear dictactes
the order in the output `kmc`.
kernel: callable (default `epanechnikov`)
Kernel used to convolute with the Kramers-Moyal coefficients. To select
Expand All @@ -68,7 +69,8 @@ def km(timeseries: np.ndarray, bins: str='default', powers: int=4,
bw: float (default `None`)
Desired bandwidth of the kernel. A value of 1 occupies the full space of
the bin space. Recommended are values `0.005 < bw < 0.5`.
the bin space. Recommended are values `0.005 < bw < 0.5`. Set
`full=True` to output `bw`.
tol: float (default `1e-10`)
Round to zero absolute values smaller than `tol`, after the
Expand All @@ -82,6 +84,9 @@ def km(timeseries: np.ndarray, bins: str='default', powers: int=4,
Whether to return the bin centers or the bin edges (since for `n` bins
there are `n + 1` edges).
full: bool (default `False`)
Outputs bandwidth `bw` and powers `powers`.
Returns
-------
kmc: np.ndarray
Expand All @@ -94,6 +99,12 @@ def km(timeseries: np.ndarray, bins: str='default', powers: int=4,
The bin edges with shape `(D, bins.shape)` of the estimated
Kramers─Moyal coefficients.
(..., bw, powers): tuple
This is only returned if `full=True`:
* The bandwidth `bw`,
* An array of the `powers`.
References
----------
.. [Lamouroux2009] D. Lamouroux and K. Lehnertz, "Kernel-based regression of
Expand All @@ -117,7 +128,7 @@ def km(timeseries: np.ndarray, bins: str='default', powers: int=4,

# Tranforming powers into right shape
if isinstance(powers, int):
# complicated way of obtaing power in all dimensions
# complicated way of obtaing powers in all dimensions
powers = np.array(sorted(product(*(range(powers + 1),) * dims),
key=lambda x: (max(x), x)))

Expand All @@ -127,9 +138,6 @@ def km(timeseries: np.ndarray, bins: str='default', powers: int=4,

if not (powers[0] == [0] * dims).all():
powers = np.array([[0] * dims, *powers])
trim_output = True
else:
trim_output = False

assert (powers[0] == [0] * dims).all(), "First power must be zero"
assert dims == powers.shape[1], "Powers not matching timeseries' dimension"
Expand Down Expand Up @@ -164,7 +172,10 @@ def km(timeseries: np.ndarray, bins: str='default', powers: int=4,
if center_edges:
edges = [edge[:-1] + 0.5 * (edge[1] - edge[0]) for edge in edges]

return (kmc, edges) if not trim_output else (kmc[1:], edges)
if not full:
return (kmc, edges)
else:
return (kmc, edges, bw, powers)


def _km(timeseries: np.ndarray, bins: np.ndarray, powers: np.ndarray,
Expand Down
3 changes: 2 additions & 1 deletion test/kernels_test.py
Expand Up @@ -13,6 +13,7 @@ def test_kernels():
kernel_ = kernel(mesh, bw=bw).reshape(
*(edge.size for edge in edges))
passed = np.allclose(kernel_.sum() * dx, 1, atol=1e-2)
print("Kernel {0:10s}\t with {1:.2f} bandwidth at {2}D passed: {3}".format(
print("Kernel {0:10s}\t with {1:.2f} bandwidth at "
"{2}D passed: {3}".format(
kernel.__name__, bw, dim, passed))
print()
8 changes: 8 additions & 0 deletions test/kmc_test.py
Expand Up @@ -94,3 +94,11 @@ def test_kmc():
kmc, edges = km(timeseries, powers=4, bins=[25, 35])
assert isinstance(kmc, np.ndarray)
assert len(edges) == 2

# test output
timeseries = np.random.normal(loc=0, scale=1, size=(N, 1))
kmc, edges, bw, powers = km(timeseries, full=True)
assert isinstance(kmc, np.ndarray)
assert len(edges) == 1
assert isinstance(bw, float)
assert isinstance(powers, np.ndarray)

0 comments on commit 7e9c370

Please sign in to comment.