Skip to content

Commit

Permalink
Merge pull request #467 from appukuttan-shailesh/master
Browse files Browse the repository at this point in the history
Fixes issues #437, #442, #445, #451, #465
  • Loading branch information
apdavison committed Mar 17, 2017
2 parents 45f348d + 8a8498a commit e3ed9eb
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 11 deletions.
39 changes: 32 additions & 7 deletions pyNN/brian/standardmodels/electrodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import logging
import numpy
import brian
from brian import ms, nA, Hz, network_operation, amp as ampere
from pyNN.standardmodels import electrodes, build_translations, StandardCurrentSource
from pyNN.parameters import ParameterSpace, Sequence
Expand Down Expand Up @@ -56,8 +57,10 @@ def set_native_parameters(self, parameters):
self._reset()

def _reset(self):
self.i = 0
self.running = True
# self.i reset to 0 only at the start of a new run; not for continuation of existing runs
if not hasattr(self, 'running') or self.running == False:
self.i = 0
self.running = True
if self._is_computed:
self._generate()

Expand All @@ -71,7 +74,8 @@ def inject_into(self, cell_list):
self.indices.extend([cell.parent.id_to_index(cell)])

def _update_current(self):
if self.running and simulator.state.t >= self.times[self.i] * 1e3:
# check if current timestamp is within dt/2 of target time; Brian uses seconds as unit of time
if self.running and abs(simulator.state.t - self.times[self.i] * 1e3) < (simulator.state.dt/2.0):
for cell, idx in zip(self.cell_list, self.indices):
if not self._is_playable:
cell.parent.brian_group.i_inj[idx] = self.amplitudes[self.i] * ampere
Expand All @@ -80,7 +84,17 @@ def _update_current(self):
self.i += 1
if self.i >= len(self.times):
self.running = False
if self._is_playable:
# ensure that currents are set to 0 after t_stop
for cell, idx in zip(self.cell_list, self.indices):
cell.parent.brian_group.i_inj[idx] = 0

def _record(self):
self.i_state_monitor = brian.StateMonitor(self.cell_list[0].parent.brian_group[self.indices[0]], 'i_inj', record=0)
simulator.state.network.add(self.i_state_monitor)

def _get_data(self):
return numpy.array((self.i_state_monitor.times / ms, self.i_state_monitor[0] / nA))

class StepCurrentSource(BrianCurrentSource, electrodes.StepCurrentSource):
__doc__ = electrodes.StepCurrentSource.__doc__
Expand Down Expand Up @@ -112,10 +126,13 @@ def __init__(self, **parameters):
self._generate()

def _generate(self):
self.times = numpy.arange(self.start, self.stop, simulator.state.dt)
# Note: Brian uses seconds as unit of time
temp_num_t = len(numpy.arange(self.start, self.stop + simulator.state.dt * 1e-3, simulator.state.dt * 1e-3))
self.times = numpy.array([self.start+(i*simulator.state.dt*1e-3) for i in range(temp_num_t)])

def _compute(self, time):
return self.amplitude * numpy.sin(time * 2 * numpy.pi * self.frequency / 1000. + 2 * numpy.pi * self.phase / 360)
# Note: Brian uses seconds as unit of time; frequency is specified in Hz; thus no conversion required
return self.offset + self.amplitude * numpy.sin((time-self.start) * 2 * numpy.pi * self.frequency + 2 * numpy.pi * self.phase / 360)


class DCSource(BrianCurrentSource, electrodes.DCSource):
Expand All @@ -140,6 +157,12 @@ def _generate(self):
else:
self.times = [0.0, self.start, self.stop]
self.amplitudes = [0.0, self.amplitude, 0.0]
# ensures proper handling of changes in parameters on the fly
if self.start < simulator.state.t*1e-3 < self.stop:
self.times.insert(-1, simulator.state.t*1e-3)
self.amplitudes.insert(-1, self.amplitude)
if (self.start==0 and self.i==2) or (self.start!=0 and self.i==3):
self.i -= 1


class NoisyCurrentSource(BrianCurrentSource, electrodes.NoisyCurrentSource):
Expand All @@ -160,7 +183,9 @@ def __init__(self, **parameters):
self._generate()

def _generate(self):
self.times = numpy.arange(self.start, self.stop, simulator.state.dt)
temp_num_t = len(numpy.arange(self.start, self.stop, max(self.dt, simulator.state.dt * 1e-3)))
self.times = numpy.array([self.start+(i*max(self.dt, simulator.state.dt * 1e-3)) for i in range(temp_num_t)])
self.times = numpy.append(self.times, self.stop)

def _compute(self, time):
return self.mean + (self.stdev * self.dt) * numpy.random.randn()
return self.mean + self.stdev * numpy.random.randn()
6 changes: 3 additions & 3 deletions pyNN/neuron/standardmodels/electrodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def __init__(self, **parameters):
def _generate(self):
## Not efficient at all... Is there a way to have those vectors computed on the fly ?
## Otherwise should have a buffer mechanism
self.times = numpy.arange(self.start, self.stop + simulator.state.dt, simulator.state.dt)
tmp = numpy.arange(0, self.stop - self.start, simulator.state.dt)
self.amplitudes = self.mean + (self.stdev * self.dt) * numpy.random.randn(len(tmp))
self.times = numpy.arange(self.start, self.stop, max(self.dt, simulator.state.dt))
self.times = numpy.append(self.times, self.stop)
self.amplitudes = self.mean + self.stdev * numpy.random.randn(len(self.times))
self.amplitudes[-1] = 0.0
199 changes: 198 additions & 1 deletion test/system/scenarios/test_electrodes.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@


from nose.tools import assert_equal
from nose.tools import assert_equal, assert_true
from numpy.testing import assert_array_equal
import quantities as pq
import numpy
from .registry import register

try:
import scipy
have_scipy = True
except ImportError:
have_scipy = False
from nose.plugins.skip import SkipTest


@register(exclude=["nemo"])
def test_changing_electrode(sim):
Expand Down Expand Up @@ -95,10 +103,199 @@ def issue321(sim):
assert abs((v[-3:, 1] - v[-3:, 0]).max()) < 0.2


@register()
def issue437(sim):
"""
Checks whether NoisyCurrentSource works properly, by verifying that:
1) no change in vm before start time
2) change in vm at dt after start time
3) monotonic decay of vm after stop time
4) noise.dt is properly implemented
Note: On rare occasions this test might fail as the signal is stochastic.
Test implementation makes use of certain approximations for thresholding.
If fails, run the test again to confirm. Passes 9/10 times on first attempt.
"""
if not have_scipy:
raise SkipTest

v_rest = -60.0 # for this test keep v_rest < v_reset
sim.setup(timestep=0.1, min_delay=0.1)
cells = sim.Population(2, sim.IF_curr_alpha(tau_m=20.0, cm=1.0, v_rest=v_rest,
v_reset=-55.0, tau_refrac=5.0))
cells.initialize(v=-60.0)

#We test two cases: dt = simulator.state.dt and dt != simulator.state.dt
t_start = 25.0
t_stop = 150.0
dt_0 = 0.1
dt_1 = 1.0
noise_0 = sim.NoisyCurrentSource(mean=0.5, stdev=0.25, start=t_start, stop=t_stop, dt=dt_0)
noise_1 = sim.NoisyCurrentSource(mean=0.5, stdev=0.25, start=t_start, stop=t_stop, dt=dt_1)
cells[0].inject(noise_0)
cells[1].inject(noise_1)

cells.record('v')
sim.run(200.0)
v = cells.get_data().segments[0].filter(name="v")[0]
v0 = v[:, 0]
v1 = v[:, 1]
t = v.times
sim.end()

t_start_ind = numpy.argmax(t >= t_start)
t_stop_ind = numpy.argmax(t >= t_stop)

# test for no change in vm before start time
# note: exact matches not appropriate owing to floating point rounding errors
assert_true (all(abs(val0 - v_rest*pq.mV) < 1e-9 and abs(val1 - v_rest*pq.mV) < 1e-9 for val0, val1 in zip(v0[:t_start_ind+1], v1[:t_start_ind+1])))

# test for change in vm at dt after start time
assert_true (abs(v0[t_start_ind+1] - v_rest*pq.mV) >= 1e-9 and abs(v1[t_start_ind+1] - v_rest*pq.mV) >= 1e-9)

# test for monotonic decay of vm after stop time
assert_true (all(val0 >= val0_next and val1 >= val1_next for val0, val0_next, val1, val1_next in zip(v0[t_stop_ind:], v0[t_stop_ind+1:], v1[t_stop_ind:], v1[t_stop_ind+1:])))

# test for ensuring noise.dt is properly implemented; checking first instance for each
# recording current profiles not implemented currently, thus using double derivative of vm
# necessary to upsample signal with noise of dt; else fails in certain scenarios
# Test implementation makes use of certain approximations for thresholding.
# Note: there can be a much simpler check for this once recording current profiles enabled (for all simulators).
# Test implementation makes use of certain approximations for thresholding; hence taking mode of initial values
t_up = numpy.arange(float(min(t)), float(max(t))+dt_0/10.0, dt_0/10.0)
v0_up = numpy.interp(t_up, t, v0)
v1_up = numpy.interp(t_up, t, v1)
d2_v0_up = numpy.diff(v0_up, n=2)
d2_v1_up = numpy.diff(v1_up, n=2)
dt_0_list = [ j for (i,j) in zip(d2_v0_up, t_up) if abs(i) >= 0.00005 ]
dt_1_list = [ j for (i,j) in zip(d2_v1_up, t_up) if abs(i) >= 0.00005 ]
dt_0_list_diff = numpy.diff(dt_0_list, n=1)
dt_1_list_diff = numpy.diff(dt_1_list, n=1)
dt_0_mode = scipy.stats.mode(dt_0_list_diff[0:10])[0][0]
dt_1_mode = scipy.stats.mode(dt_1_list_diff[0:10])[0][0]
assert_true (abs(dt_0_mode - dt_0) < 1e-9 or abs(dt_1_mode - dt_1) < 1e-9)


@register()
def issue442(sim):
"""
Checks whether ACSource works properly, by verifying that:
1) no change in vm before start time
2) change in vm at dt after start time
3) monotonic decay of vm after stop time
4) accurate frequency of output signal
5) offset included in output signal
"""
v_rest = -60.0
sim.setup(timestep=0.1, min_delay=0.1)
cells = sim.Population(1, sim.IF_curr_alpha(tau_m=20.0, cm=1.0, v_rest=v_rest,
v_reset=-65.0, tau_refrac=5.0))
cells.initialize(v=v_rest)

# set t_start, t_stop and freq such that
# "freq*1e-3*(t_stop-t_start)" is an integral value
t_start = 22.5
t_stop = 122.5
freq = 100.0
acsource = sim.ACSource(start=t_start, stop=t_stop, amplitude=0.5, offset=0.1, frequency=freq, phase=0.0)
cells[0].inject(acsource)

cells.record('v')
sim.run(150.0)
v = cells.get_data().segments[0].filter(name="v")[0]
v0 = v[:, 0]
t = v.times
sim.end()

t_start_ind = numpy.argmax(t >= t_start)
t_stop_ind = numpy.argmax(t >= t_stop)

# test for no change in vm before start time
# note: exact matches not appropriate owing to floating point rounding errors
assert_true (all(abs(val0 - v_rest*pq.mV) < 1e-9 for val0 in v0[:t_start_ind+1]))

# test for change in vm at dt after start time
assert_true (abs(v0[t_start_ind+1] - v0[t_start_ind]) >= 1e-9)

# test for monotonic decay of vm after stop time
assert_true (all(val0 >= val0_next for val0, val0_next in zip(v0[t_stop_ind:], v0[t_stop_ind+1:])))

# test for accurate frequency; simply counts peaks
peak_ctr = 0
peak_ind = []
for i in range(t_stop_ind-t_start_ind):
if v0[t_start_ind+i-1] < v0[t_start_ind+i] and v0[t_start_ind+i] >= v0[t_start_ind+i+1]:
peak_ctr+=1
peak_ind.append(t_start_ind+i)
assert_equal(peak_ctr, freq*1e-3*(t_stop-t_start))
# also test for offset; peaks initially increase in magnitude
assert_true (v0[peak_ind[0]] < v0[peak_ind[1]] and v0[peak_ind[1]] < v0[peak_ind[2]])


@register(exclude=["nest"])
def issue445(sim):
"""
This test basically checks if a new value of current is calculated at every
time step, and that the total number of time steps is as expected theoretically
Note: NEST excluded as recording of electrode currents still to be implemented
"""
sim_dt = 0.1
simtime = 200.0
sim.setup(timestep=sim_dt, min_delay=1.5)
cells = sim.Population(1, sim.IF_curr_exp(v_thresh=-55.0, tau_refrac=5.0))
t_start=50.0
t_stop=125.0
acsource = sim.ACSource(start=t_start, stop=t_stop, amplitude=0.5, offset=0.0, frequency=100.0, phase=0.0)
cells[0].inject(acsource)
acsource._record()

sim.run(simtime)
sim.end()

i_t_ac, i_amp_ac = acsource._get_data()
t_start_ind = numpy.argmax(i_t_ac >= t_start)
t_stop_ind = numpy.argmax(i_t_ac >= t_stop)
assert_true (all(val != val_next for val, val_next in zip(i_t_ac[t_start_ind:t_stop_ind-1], i_t_ac[t_start_ind+1:t_stop_ind])))
# note: exact matches not appropriate owing to floating point rounding errors
assert_true (( len(i_t_ac) - ((max(i_t_ac)-min(i_t_ac))/sim_dt + 1) )< 1e-9)


@register()
def issue451(sim):
"""
Modification of test: test_changing_electrode
Difference: incorporates a start and stop time for stimulus
Check that changing the values of the electrodes on the fly is taken into account
"""
repeats = 2
dt = 0.1
simtime = 100
sim.setup(timestep=dt, min_delay=dt)
v_rest = -60.0
p = sim.Population(1, sim.IF_curr_exp(v_rest=v_rest))
p.initialize(v=v_rest)
c = sim.DCSource(amplitude=0.0, start=25.0, stop=50.0)
c.inject_into(p)
p.record('v')

for i in range(repeats):
sim.run(simtime)
c.amplitude += 0.1

v = p.get_data().segments[0].filter(name="v")[0]
sim.end()
# check that the value of v is equal to v_rest throughout the simulation
# note: exact matches not appropriate owing to floating point rounding errors
assert_true (all( (val.item()-v_rest)<1e-9 for val in v[:, 0]))


if __name__ == '__main__':
from pyNN.utility import get_simulator
sim, args = get_simulator()
test_changing_electrode(sim)
ticket226(sim)
issue165(sim)
issue321(sim)
issue437(sim)
issue442(sim)
issue445(sim)
issue451(sim)

0 comments on commit e3ed9eb

Please sign in to comment.