Skip to content

Commit

Permalink
Add time to sample conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
EtienneCmb committed Jul 23, 2021
1 parent 1c4d1dd commit 7c44478
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 3 deletions.
5 changes: 4 additions & 1 deletion frites/utils/__init__.py
@@ -1,4 +1,7 @@
"""Global utility functions."""
from .parallel import parallel_func # noqa
from .preproc import savgol_filter, kernel_smoothing, nonsorted_unique # noqa
from .preproc import ( # noqa
savgol_filter, kernel_smoothing, nonsorted_unique, time_to_sample,
get_closest_sample
)
from .wrapper import jit # noqa
76 changes: 76 additions & 0 deletions frites/utils/preproc.py
Expand Up @@ -134,3 +134,79 @@ def nonsorted_unique(data, assert_unique=False):
"Brain regions are not unique for inferring connectivity pairs")
_, u_idx = np.unique(data, return_index=True)
return data[np.sort(u_idx)]


def time_to_sample(values, sf=None, times=None, round='closer', verbose=None):
"""Convert values from time to sample space.
Parameters
----------
values : array_like
Array of values to convert.
sf : float | None
Sampling frequency. If None, the time vector can also be supplied to
infer it.
times : array_like | None
Time vector used to infer the sampling frequency.
round : {'lower', 'closer', 'upper'}
Use either lower, closer or upper rounding. Default is closer
Returns
-------
values : array_like
Values converted to sample space.
"""
set_log_level(verbose)
# get sampling frequency
if isinstance(times, (list, tuple, np.ndarray)):
sf = 1. / (times[1] - times[0])
logger.info(f"Inferring sampling rate : {sf}")
assert isinstance(sf, (int, float)), "Sampling rate is missing"

# array conversion
values = np.asarray(values)
fcn = {'lower': np.floor, 'closer': np.round, 'upper': np.ceil}[round]
values_i = fcn(values * sf).astype(int)

return values_i


def get_closest_sample(ref, values, precision=None, return_precision=False):
"""Get the sample of closest value in a reference time vector.
Parameters
----------
ref : array_like
Reference vector
values : array_like
Values to seek in the reference vector
precision : float | None
Minimum precision to achieve.
return_precision : {True, False}
If true, the precision of length (n_values,) is also returned
Returns
-------
sample : array_like
Array of length (n_values,) containing the sample of closest values in
the reference vector
precisions : array_like
If return_precision, the vector of precisions of length (n_values,) is
also returned
"""
# array conversion
ref, values = np.asarray(ref), np.asarray(values)

# find closest sample
diff = np.abs(ref.reshape(-1, 1) - values.reshape(1, -1)).argmin(0)
precisions = np.abs(values - ref[diff])

if isinstance(precision, (int, float)):

assert precision > 0, "Precision should be strictly positive"
assert np.all(precisions < precision), "Precision not sufficient"

if return_precision:
return (diff, precisions)
else:
return diff
38 changes: 36 additions & 2 deletions frites/utils/tests/test_preproc.py
Expand Up @@ -2,7 +2,8 @@
import numpy as np
import xarray as xr

from frites.utils import savgol_filter, kernel_smoothing, nonsorted_unique
from frites.utils import (savgol_filter, kernel_smoothing, nonsorted_unique,
time_to_sample, get_closest_sample)


class TestPreproc(object):
Expand Down Expand Up @@ -41,6 +42,39 @@ def test_nonsorted_unique(self):
a = ['r2', 'r0', 'r0', 'r1', 'r2']
np.testing.assert_array_equal(nonsorted_unique(a), ['r2', 'r0', 'r1'])

def test_time_to_sample(self):
"""Test the time to sample conversion function."""
sf = 1024.
n_times = 1000
times = np.arange(n_times) / sf

# test straight conversion
values = [1., .5, .25]
val_i_1 = time_to_sample(values, times=times)
val_i_2 = time_to_sample(values, sf=sf)
np.testing.assert_equal(val_i_1, val_i_2)
np.testing.assert_equal(val_i_1, [1024, 512, 256])

# test rounding
np.testing.assert_equal(time_to_sample(.7, sf=sf, round='lower'), 716)
np.testing.assert_equal(time_to_sample(.7, sf=sf, round='upper'), 717)
np.testing.assert_equal(time_to_sample(.7, sf=sf, round='closer'), 717)

def test_get_closest_sample(self):
"""Test function to get the closest sample in a reference vector."""
sf = 1024.
n_times = 1000
times = np.arange(n_times) / sf

# straight testing
closest, precisions = get_closest_sample(
times, [times[4], times[122]], return_precision=True)
np.testing.assert_equal(closest, [4, 122])
np.testing.assert_equal(precisions, [0, 0])

# test precision
closest = get_closest_sample(times, [0.1, 0.2, .4], precision=.1)


if __name__ == '__main__':
TestPreproc().test_kernel_smoothing()
TestPreproc().test_get_closest_sample()

0 comments on commit 7c44478

Please sign in to comment.