Skip to content

Commit

Permalink
Added inhomogeneous_poisson function and relative tests (NeuralEnsemb…
Browse files Browse the repository at this point in the history
…le#132)

* This PR adds an inhomogeneous poisson function and accompanying tests
  • Loading branch information
pietroquaglio authored and alperyeg committed Feb 22, 2018
1 parent 0f04e41 commit d125056
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 3 deletions.
93 changes: 93 additions & 0 deletions elephant/spike_train_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,99 @@ def homogeneous_poisson_process(rate, t_start=0.0 * ms, t_stop=1000.0 * ms,
np.random.exponential, (mean_interval,), rate, t_start, t_stop,
as_array)

def inhomogeneous_poisson_process(rate, as_array=False):
"""
Returns a spike train whose spikes are a realization of an inhomogeneous
Poisson process with the given rate profile.
Parameters
----------
rate : neo.AnalogSignal
A `neo.AnalogSignal` representing the rate profile evolving over time.
Its values have all to be `>=0`. The output spiketrain will have
`t_start = rate.t_start` and `t_stop = rate.t_stop`
as_array : bool
If True, a NumPy array of sorted spikes is returned,
rather than a SpikeTrain object.
Raises
------
ValueError : If `rate` contains any negative value.
"""
# Check rate contains only positive values
if any(rate < 0) or not rate.size:
raise ValueError(
'rate must be a positive non empty signal, representing the'
'rate at time t')
else:
#Generate n hidden Poisson SpikeTrains with rate equal to the peak rate
max_rate = max(rate)
homogeneous_poiss = homogeneous_poisson_process(
rate=max_rate, t_stop=rate.t_stop, t_start=rate.t_start)
# Compute the rate profile at each spike time by interpolation
rate_interpolated = _analog_signal_linear_interp(
signal=rate, times=homogeneous_poiss.magnitude *
homogeneous_poiss.units)
# Accept each spike at time t with probability rate(t)/max_rate
u = np.random.uniform(size=len(homogeneous_poiss)) * max_rate
spikes = homogeneous_poiss[u < rate_interpolated.flatten()]
if as_array:
spikes = spikes.magnitude
return spikes


def _analog_signal_linear_interp(signal, times):
'''
Compute the linear interpolation of a signal at desired times.
Given the `signal` (neo.AnalogSignal) taking value `s0` and `s1` at two
consecutive time points `t0` and `t1` `(t0 < t1)`, for every time `t` in
`times`, such that `t0<t<=t1` is returned the value of the linear
interpolation, given by:
`s = ((s1 - s0) / (t1 - t0)) * t + s0`.
Parameters
----------
times : Quantity vector(time)
The time points for which the interpolation is computed
signal : neo.core.AnalogSignal
The analog signal containing the discretization of the function to
interpolate
Returns
------
out: Quantity array representing the values of the interpolated signal at the
times given by times
Notes
-----
If `signal` has sampling period `dt=signal.sampling_period`, its values
are defined at `t=signal.times`, such that `t[i] = signal.t_start + i * dt`
The last of such times is lower than
signal.t_stop`:t[-1] = signal.t_stop - dt`.
For the interpolation at times t such that `t[-1] <= t <= signal.t_stop`,
the value of `signal` at `signal.t_stop` is taken to be that
at time `t[-1]`.
'''
dt = signal.sampling_period
t_start = signal.t_start.rescale(signal.times.units)
t_stop = signal.t_stop.rescale(signal.times.units)

# Extend the signal (as a dimensionless array) copying the last value
# one time, and extend its times to t_stop
signal_extended = np.vstack(
[signal.magnitude, signal[-1].magnitude]).flatten()
times_extended = np.hstack([signal.times, t_stop]) * signal.times.units
time_ids = np.floor(((times - t_start) / dt).rescale(
dimensionless).magnitude).astype('i')

# Compute the slope m of the signal at each time in times
y1 = signal_extended[time_ids]
y2 = signal_extended[time_ids + 1]
m = (y2 - y1) / dt

# Interpolate the signal at each time in times by linear interpolation
out = (y1 + m * (times - times_extended[time_ids])) * signal.units
return out.rescale(signal.units)

def homogeneous_gamma_process(a, b, t_start=0.0 * ms, t_stop=1000.0 * ms,
as_array=False):
Expand Down
62 changes: 59 additions & 3 deletions elephant/test/test_spike_train_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
import neo
import numpy as np
from numpy.testing.utils import assert_array_almost_equal
from scipy.stats import kstest, expon
from quantities import ms, second, Hz, kHz, mV, dimensionless
from scipy.stats import kstest, expon, poisson
from quantities import s, ms, second, Hz, kHz, mV, dimensionless
import elephant.spike_train_generation as stgen
from elephant.statistics import isi

from scipy.stats import expon

def pdiff(a, b):
"""Difference between a and b as a fraction of a
Expand Down Expand Up @@ -181,6 +181,62 @@ def test_buffer_overrun(self):
self.assertLess(expected_last_spike - spiketrain[-1], 4*expected_mean_isi)


class InhomogeneousPoissonProcessTestCase(unittest.TestCase):
def setUp(self):
rate_list = [[20] for i in range(1000)] + [[200] for i in range(1000)]
self.rate_profile = neo.AnalogSignal(
rate_list * Hz, sampling_period=0.001*s)
rate_0 = [[0] for i in range(1000)]
self.rate_profile_0 = neo.AnalogSignal(
rate_0 * Hz, sampling_period=0.001*s)
rate_negative = [[-1] for i in range(1000)]
self.rate_profile_negative = neo.AnalogSignal(
rate_negative * Hz, sampling_period=0.001 * s)
pass

def test_statistics(self):
# This is a statistical test that has a non-zero chance of failure
# during normal operation. Thus, we set the random seed to a value that
# creates a realization passing the test.
np.random.seed(seed=12345)

for rate in [self.rate_profile, self.rate_profile.rescale(kHz)]:
spiketrain = stgen.inhomogeneous_poisson_process(rate)
intervals = isi(spiketrain)

# Computing expected statistics and percentiles
expected_spike_count = (np.sum(
rate) * rate.sampling_period).simplified
percentile_count = poisson.ppf(.999, expected_spike_count)
expected_min_isi = (1 / np.min(rate))
expected_max_isi = (1 / np.max(rate))
percentile_min_isi = expon.ppf(.999, expected_min_isi)
percentile_max_isi = expon.ppf(.999, expected_max_isi)

# Testing (each should fail 1 every 1000 times)
self.assertLess(spiketrain.size, percentile_count)
self.assertLess(np.min(intervals), percentile_min_isi)
self.assertLess(np.max(intervals), percentile_max_isi)

# Testing t_start t_stop
self.assertEqual(rate.t_stop, spiketrain.t_stop)
self.assertEqual(rate.t_start, spiketrain.t_start)

# Testing type
spiketrain_as_array = stgen.inhomogeneous_poisson_process(
rate, as_array=True)
self.assertTrue(isinstance(spiketrain_as_array, np.ndarray))
self.assertTrue(isinstance(spiketrain, neo.SpikeTrain))

def test_low_rates(self):
spiketrain = stgen.inhomogeneous_poisson_process(self.rate_profile_0)
self.assertEqual(spiketrain.size, 0)

def test_negative_rates(self):
self.assertRaises(
ValueError, stgen.inhomogeneous_poisson_process,
self.rate_profile_negative)

class HomogeneousGammaProcessTestCase(unittest.TestCase):

def setUp(self):
Expand Down

0 comments on commit d125056

Please sign in to comment.