From 6a936925851a0ef22b6a8199e6aca23aa933db6f Mon Sep 17 00:00:00 2001 From: Julia Sprenger Date: Sun, 12 Jul 2020 15:01:59 +0200 Subject: [PATCH 01/13] Add function to intersect annotation dictionaries --- neo/core/baseneo.py | 31 ++++++++++++++++++++++ neo/test/coretest/test_base.py | 48 +++++++++++++++++++++++++++++++++- 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/neo/core/baseneo.py b/neo/core/baseneo.py index a11c2ed62..37231640a 100644 --- a/neo/core/baseneo.py +++ b/neo/core/baseneo.py @@ -3,6 +3,7 @@ used by all :module:`neo.core` classes. """ +from copy import deepcopy from datetime import datetime, date, time, timedelta from decimal import Decimal import logging @@ -108,6 +109,36 @@ def merge_annotations(A, *Bs): logger.debug("Merging annotations: A=%s Bs=%s merged=%s", A, Bs, merged) return merged +def intersect_annotations(A, B): + """ + Identify common entries in dictionaries A and B + and return these in a separate dictionary. + + Entries have to share key as well as value to be + considered common. + + Parameters + ---------- + A, B : dict + Dictionaries to merge. + """ + + result = {} + + for key in A.keys() & B.keys(): + v1, v2 = A[key], B[key] + assert type(v1) == type(v2), 'type({}) {} != type({}) {}'.format(v1, type(v1), + v2, type(v2)) + if isinstance(v1, dict) and v1 == v2: + result[key] = deepcopy(v1) + elif isinstance(v1, str) and v1 == v2: + result[key] = A[key] + elif isinstance(v1, list) and v1 == v2: + result[key] = deepcopy(v1) + elif isinstance(v1, np.ndarray) and all(v1 == v2): + result[key] = deepcopy(v1) + return result + def _reference_name(class_name): """ diff --git a/neo/test/coretest/test_base.py b/neo/test/coretest/test_base.py index 91e948784..f57e17e51 100644 --- a/neo/test/coretest/test_base.py +++ b/neo/test/coretest/test_base.py @@ -20,7 +20,8 @@ HAVE_IPYTHON = True from neo.core.baseneo import (BaseNeo, _check_annotations, - merge_annotations, merge_annotation) + merge_annotations, merge_annotation, + intersect_annotations) from neo.test.tools import assert_arrays_equal @@ -1266,5 +1267,50 @@ def test__pretty(self): self.assertEqual(res, targ) +class Test_intersect_annotations(unittest.TestCase): + ''' + TestCase for intersect_annotations + ''' + + def setUp(self): + self.dict1 = {1:'1', 2:'2'} + self.dict2 = {1:'1'} + self.dict3 = {'list1': [1,2,3]} + self.dict4 = {'list1': [1,2,3], 'list2': [1,2,3]} + self.dict5 = {'list1': [1,2]} + self.dict6 = {'array1': np.array([1,2])} + self.dict7 = {'array1': np.array([1,2]), 'array2': np.array([1,2]), + 'array3': np.array([1,2,3])} + + self.all_simple_dicts = [self.dict1, self.dict2, self.dict3, + self.dict4, self.dict5, ] + + def test_simple(self): + result = intersect_annotations(self.dict1, self.dict2) + self.assertDictEqual(self.dict2, result) + + def test_intersect_self(self): + for d in self.all_simple_dicts: + result = intersect_annotations(d, d) + self.assertDictEqual(d, result) + + def test_list(self): + result = intersect_annotations(self.dict3, self.dict4) + self.assertDictEqual(self.dict3, result) + + def test_list_values(self): + result = intersect_annotations(self.dict4, self.dict5) + self.assertDictEqual({}, result) + + def test_keys(self): + result = intersect_annotations(self.dict1, self.dict4) + self.assertDictEqual({}, result) + + def test_arrays(self): + result = intersect_annotations(self.dict6, self.dict7) + self.assertEqual(self.dict6.keys(), result.keys()) + np.testing.assert_array_equal([1,2], result['array1']) + + if __name__ == "__main__": unittest.main() From e6a19ad63246603fcf6f046f6406d6f371e11618 Mon Sep 17 00:00:00 2001 From: Julia Sprenger Date: Sun, 12 Jul 2020 16:52:59 +0200 Subject: [PATCH 02/13] Add patch methods for signal objects --- neo/core/analogsignal.py | 98 ++++++++++++++++++- neo/core/basesignal.py | 46 ++++++++- neo/core/irregularlysampledsignal.py | 81 ++++++++++++++- neo/test/coretest/test_analogsignal.py | 82 ++++++++++++++++ .../coretest/test_irregularysampledsignal.py | 52 ++++++++++ 5 files changed, 356 insertions(+), 3 deletions(-) diff --git a/neo/core/analogsignal.py b/neo/core/analogsignal.py index 04c89ba81..b7f80a3b1 100644 --- a/neo/core/analogsignal.py +++ b/neo/core/analogsignal.py @@ -29,7 +29,7 @@ import numpy as np import quantities as pq -from neo.core.baseneo import BaseNeo, MergeError, merge_annotations +from neo.core.baseneo import BaseNeo, MergeError, merge_annotations, intersect_annotations from neo.core.dataobject import DataObject from copy import copy, deepcopy @@ -657,3 +657,99 @@ def rectify(self, **kwargs): rectified_signal.array_annotations = self.array_annotations.copy() return rectified_signal + + def patch(self, other, overwrite=True): + ''' + Patch another signal to this one. + + The signal objects are concatenated vertically + (row-wise, :func:`np.vstack`). Patching can be + used to combine signals across segments. Signals + have to overlap by one sample in time, i.e. + self.t_stop == other.t_start + Note: Only common array annotations common to + both signals are attached to the patched signal. + + If the attributes of the two signal are not + compatible, an Exception is raised. + + Required attributes of the signal are used. + + Parameters + ---------- + other : neo.BaseSignal + The object that is merged into this one. + overwrite : bool + If False, samples of this signal are overwritten + by other signal. If True, samples of other signal + are overwritten by this signal. Default: True + + Returns + ------- + signal : neo.BaseSignal + Signal containing all non-overlapping samples of + both source signals. + + Raises + ------ + MergeError + If `other` object has incompatible attributes. + ''' + + for attr in self._necessary_attrs: + if 'signal' != attr[0]: + if getattr(self, attr[0], None) != getattr(other, attr[0], None): + if attr[0] in ['t_start','t_stop']: + continue + raise MergeError("Cannot patch these two signals as the %s differ." % attr[0]) + + if hasattr(self, "lazy_shape"): + if hasattr(other, "lazy_shape"): + if self.lazy_shape[-1] != other.lazy_shape[-1]: + raise MergeError("Cannot patch signals as they contain" + " different numbers of traces.") + merged_lazy_shape = (self.lazy_shape[0] + other.lazy_shape[0], self.lazy_shape[-1]) + else: + raise MergeError("Cannot patch a lazy object with a real object.") + if other.units != self.units: + other = other.rescale(self.units) + + if self.t_start > other.t_stop: + raise MergeError('Signals do not overlap.') + + # adjust overlapping signals + if self.t_stop + self.sampling_period >= other.t_start: + if not overwrite: # removing samples of other signal + slice_t_start = self.t_stop + self.sampling_period + sliced_other = other.time_slice(slice_t_start, None) + stack = np.vstack((self.magnitude, sliced_other.magnitude)) + else: # removing samples of this signal + slice_t_stop = other.t_start - other.sampling_period + sliced_self = self.time_slice(None, slice_t_stop) + stack = np.vstack((sliced_self.magnitude, other.magnitude)) + else: + raise MergeError("Cannot patch signals with non-overlapping times") + + kwargs = {} + for name in ("name", "description", "file_origin"): + attr_self = getattr(self, name) + attr_other = getattr(other, name) + if attr_self == attr_other: + kwargs[name] = attr_self + else: + kwargs[name] = "merge({}, {})".format(attr_self, attr_other) + merged_annotations = merge_annotations(self.annotations, other.annotations) + kwargs.update(merged_annotations) + + kwargs['array_annotations'] = intersect_annotations(self.array_annotations, + other.array_annotations) + + signal = self.__class__(stack, units=self.units, dtype=self.dtype, copy=False, + t_start=self.t_start, sampling_rate=self.sampling_rate, **kwargs) + signal.segment = None + signal.channel_index = None + + if hasattr(self, "lazy_shape"): + signal.lazy_shape = merged_lazy_shape + + return signal \ No newline at end of file diff --git a/neo/core/basesignal.py b/neo/core/basesignal.py index f03868897..ac412daaf 100644 --- a/neo/core/basesignal.py +++ b/neo/core/basesignal.py @@ -21,7 +21,7 @@ import numpy as np import quantities as pq -from neo.core.baseneo import BaseNeo, MergeError, merge_annotations +from neo.core.baseneo import MergeError, merge_annotations from neo.core.dataobject import DataObject, ArrayDict from neo.core.channelindex import ChannelIndex @@ -290,3 +290,47 @@ def merge(self, other): signal.channel_index = ChannelIndex(index=np.arange(signal.shape[1])) return signal + + def time_slice(self, t_start, t_stop): + ''' + Creates a new AnalogSignal corresponding to the time slice of the + original Signal between times t_start, t_stop. + ''' + NotImplementedError('Needs to be implemented for subclasses.') + + + def patch(self, other): + ''' + Patch another signal to this one. + + The signal objects are concatenated vertically + (row-wise, :func:`np.vstack`). Patching can be + used to combine signals across segments. + Note: Only array annotations common to + both signals are attached to the patched signal. + + If the attributes of the two signal are not + compatible, an Exception is raised. + + Required attributes of the signal are used. + + Parameters + ---------- + other : neo.BaseSignal + The object that is merged into this one. + + Returns + ------- + signal : neo.BaseSignal + Signal containing all non-overlapping samples of + both source signals. + + Raises + ------ + MergeError + If `other` object has incompatible attributes. + ''' + + NotImplementedError('Patching need to be implemented in sublcasses') + + diff --git a/neo/core/irregularlysampledsignal.py b/neo/core/irregularlysampledsignal.py index 7dee88f3d..03f6a9b8f 100644 --- a/neo/core/irregularlysampledsignal.py +++ b/neo/core/irregularlysampledsignal.py @@ -31,7 +31,7 @@ import numpy as np import quantities as pq -from neo.core.baseneo import BaseNeo, MergeError, merge_annotations +from neo.core.baseneo import MergeError, merge_annotations, intersect_annotations from neo.core.basesignal import BaseSignal from neo.core.analogsignal import AnalogSignal from neo.core.channelindex import ChannelIndex @@ -514,3 +514,82 @@ def merge(self, other): signal.channel_index = ChannelIndex(index=np.arange(signal.shape[1])) return signal + + def patch(self, other): + ''' + Patch another signal to this one. + + The signal objects are concatenated vertically + (row-wise, :func:`np.vstack`). Patching can be + used to combine signals across segments. + Note: Only array annotations common to + both signals are attached to the patched signal. + + If the attributes of the two signal are not + compatible, an Exception is raised. + + Required attributes of the signal are used. + + Parameters + ---------- + other : neo.BaseSignal + The object that is merged into this one. + + Returns + ------- + signal : neo.IrregularlySampledSignal + Signal containing all non-overlapping samples of + both source signals. + + Raises + ------ + MergeError + If `other` object has incompatible attributes. + ''' + + for attr in self._necessary_attrs: + if not (attr[0] in ['signal', 'times', 't_start', 't_stop', 'times']): + if getattr(self, attr[0], None) != getattr(other, attr[0], None): + raise MergeError("Cannot patch these two signals as the %s differ." % attr[0]) + + if hasattr(self, "lazy_shape"): + if hasattr(other, "lazy_shape"): + if self.lazy_shape[-1] != other.lazy_shape[-1]: + raise MergeError("Cannot patch signals as they contain" + " different numbers of traces.") + merged_lazy_shape = (self.lazy_shape[0] + other.lazy_shape[0], self.lazy_shape[-1]) + else: + raise MergeError("Cannot patch a lazy object with a real object.") + if other.units != self.units: + other = other.rescale(self.units) + + new_times = np.hstack((self.times, other.times)) + sorting = np.argsort(new_times) + new_samples = np.vstack((self.magnitude, other.magnitude)) + + kwargs = {} + for name in ("name", "description", "file_origin"): + attr_self = getattr(self, name) + attr_other = getattr(other, name) + if attr_self == attr_other: + kwargs[name] = attr_self + else: + kwargs[name] = "merge({}, {})".format(attr_self, attr_other) + merged_annotations = merge_annotations(self.annotations, other.annotations) + kwargs.update(merged_annotations) + + kwargs['array_annotations'] = intersect_annotations(self.array_annotations, + other.array_annotations) + t_start = min(self.t_start, other.t_start) + t_stop = max(self.t_start, other.t_start) + + signal = IrregularlySampledSignal(signal=new_samples[sorting], times=new_times[sorting], + units=self.units, dtype=self.dtype, copy=False, + t_start=t_start, t_stop=t_stop, **kwargs) + signal.segment = None + signal.channel_index = None + + if hasattr(self, "lazy_shape"): + signal.lazy_shape = merged_lazy_shape + + return signal diff --git a/neo/test/coretest/test_analogsignal.py b/neo/test/coretest/test_analogsignal.py index 608241960..1757ce0dd 100644 --- a/neo/test/coretest/test_analogsignal.py +++ b/neo/test/coretest/test_analogsignal.py @@ -28,6 +28,7 @@ HAVE_SCIPY = True from numpy.testing import assert_array_equal +from neo.core.baseneo import MergeError from neo.core.analogsignal import AnalogSignal, _get_sampling_rate from neo.core.channelindex import ChannelIndex from neo.core import Segment @@ -1572,6 +1573,87 @@ def test__merge(self): assert_arrays_equal(mergeddata23, targdata23) assert_arrays_equal(mergeddata24, targdata24) + def test_patch_simple(self): + signal1 = AnalogSignal([0,1,2,3]*pq.s, sampling_rate=1*pq.Hz) + signal2 = AnalogSignal([4,5,6]*pq.s, sampling_rate=1*pq.Hz, + t_start=signal1.t_stop + signal1.sampling_period) + + result = signal1.patch(signal2) + assert_array_equal(np.arange(7).reshape((-1, 1)), result.magnitude) + for attr in signal1._necessary_attrs: + self.assertEqual(getattr(signal1, attr[0], None), getattr(result, attr[0], None)) + + def test_patch_no_overlap(self): + signal1 = AnalogSignal([0,1,2,3]*pq.s, sampling_rate=1*pq.Hz) + signal2 = AnalogSignal([4,5,6]*pq.s, sampling_rate=1*pq.Hz, + t_start=10*pq.s + signal1.sampling_period) + + with self.assertRaises(MergeError): + signal1.patch(signal2) + + def test_patch_multi_trace(self): + data1 = np.arange(4).reshape(2,2) + data2 = np.arange(4,8).reshape(2,2) + signal1 = AnalogSignal(data1*pq.s, sampling_rate=1*pq.Hz) + signal2 = AnalogSignal(data2*pq.s, sampling_rate=1*pq.Hz, + t_start=signal1.t_stop + signal1.sampling_period) + + result = signal1.patch(signal2) + data_expected = np.array([[0,1],[2,3],[4,5],[6,7]]) + assert_array_equal(data_expected, result.magnitude) + for attr in signal1._necessary_attrs: + self.assertEqual(getattr(signal1, attr[0], None), getattr(result, attr[0], None)) + + def test_patch_overwrite_true(self): + signal1 = AnalogSignal([0,1,2,3]*pq.s, sampling_rate=1*pq.Hz) + signal2 = AnalogSignal([4,5,6]*pq.s, sampling_rate=1*pq.Hz, + t_start=signal1.t_stop) + + result = signal1.patch(signal2, overwrite=True) + assert_array_equal(np.array([0,1,2,4,5,6]).reshape((-1, 1)), result.magnitude) + + def test_patch_overwrite_false(self): + signal1 = AnalogSignal([0,1,2,3]*pq.s, sampling_rate=1*pq.Hz) + signal2 = AnalogSignal([4,5,6]*pq.s, sampling_rate=1*pq.Hz, + t_start=signal1.t_stop) + + result = signal1.patch(signal2, overwrite=False) + assert_array_equal(np.array([0,1,2,3,5,6]).reshape((-1, 1)), result.magnitude) + + def test_patch_array_annotations(self): + array_anno1 = {'first': ['a','b']} + array_anno2 = {'first': ['a','b'], + 'second': ['c','d']} + data1 = np.arange(4).reshape(2,2) + data2 = np.arange(4,8).reshape(2,2) + signal1 = AnalogSignal(data1*pq.s, sampling_rate=1*pq.Hz, + array_annotations=array_anno1) + signal2 = AnalogSignal(data2*pq.s, sampling_rate=1*pq.Hz, + t_start=signal1.t_stop + signal1.sampling_period, + array_annotations=array_anno2) + + result = signal1.patch(signal2) + assert_array_equal(array_anno1.keys(), result.array_annotations.keys()) + + for k in array_anno1.keys(): + assert_array_equal(np.asarray(array_anno1[k]), result.array_annotations[k]) + + def test_patch_complex(self): + signal1 = self.signal1 + assert_neo_object_is_compliant(self.signal1) + + signal2 = AnalogSignal(self.data1quant, sampling_rate=1 * pq.kHz, name='signal2', + description='test signal', file_origin='testfile.txt', + array_annotations=self.arr_ann1, + t_start=signal1.t_stop + signal1.sampling_period) + + patched12 = self.signal1.patch(signal2) + + for attr in signal1._necessary_attrs: + self.assertEqual(getattr(signal1, attr[0], None), getattr(patched12, attr[0], None)) + + assert_array_equal(np.vstack((signal1.magnitude, signal2.magnitude)), + patched12.magnitude) class TestAnalogSignalFunctions(unittest.TestCase): def test__pickle_1d(self): diff --git a/neo/test/coretest/test_irregularysampledsignal.py b/neo/test/coretest/test_irregularysampledsignal.py index 6b026878f..116c4e546 100644 --- a/neo/test/coretest/test_irregularysampledsignal.py +++ b/neo/test/coretest/test_irregularysampledsignal.py @@ -946,6 +946,58 @@ def test__merge(self): self.assertRaises(MergeError, signal1.merge, signal3) + def test_patch_simple(self): + signal1 = IrregularlySampledSignal(signal=[0,1,2,3]*pq.s, times=[1,10,11,20]*pq.s) + signal2 = IrregularlySampledSignal(signal=[4,5,6]*pq.s, times=[15,16,21]*pq.s) + + result = signal1.patch(signal2) + assert_array_equal(np.array([0,1,2,4,5,3,6]).reshape((-1, 1)), result.magnitude) + assert_array_equal(np.array([1,10,11,15,16,20,21]), result.times) + for attr in signal1._necessary_attrs: + if attr[0] == 'times': + continue + self.assertEqual(getattr(signal1, attr[0], None), getattr(result, attr[0], None)) + + def test_patch_no_overlap(self): + signal1 = IrregularlySampledSignal(signal=[0,1,2,3]*pq.s, times=range(4)*pq.s) + signal2 = IrregularlySampledSignal(signal=[4,5,6]*pq.s, times=range(4,7)*pq.s) + + result = signal1.patch(signal2) + assert_array_equal(np.arange(7).reshape((-1, 1)), result.magnitude) + assert_array_equal(np.arange(7), result.times) + + def test_patch_multi_trace(self): + data1 = np.arange(4).reshape(2,2) + data2 = np.arange(4,8).reshape(2,2) + n1 = len(data1) + n2 = len(data2) + signal1 = IrregularlySampledSignal(signal=data1*pq.s, times=range(n1)*pq.s) + signal2 = IrregularlySampledSignal(signal=data2*pq.s, times=range(n1, n1+n2)*pq.s) + + result = signal1.patch(signal2) + data_expected = np.array([[0,1],[2,3],[4,5],[6,7]]) + assert_array_equal(data_expected, result.magnitude) + + def test_patch_array_annotations(self): + array_anno1 = {'first': ['a','b']} + array_anno2 = {'first': ['a','b'], + 'second': ['c','d']} + data1 = np.arange(4).reshape(2,2) + data2 = np.arange(4,8).reshape(2,2) + n1 = len(data1) + n2 = len(data2) + signal1 = IrregularlySampledSignal(signal=data1*pq.s, times=range(n1)*pq.s, + array_annotations=array_anno1) + signal2 = IrregularlySampledSignal(signal=data2*pq.s, times=range(n1, n1+n2)*pq.s, + array_annotations=array_anno2) + + result = signal1.patch(signal2) + assert_array_equal(array_anno1.keys(), result.array_annotations.keys()) + + for k in array_anno1.keys(): + assert_array_equal(np.asarray(array_anno1[k]), result.array_annotations[k]) + + class TestAnalogSignalFunctions(unittest.TestCase): def test__pickle(self): signal1 = IrregularlySampledSignal(np.arange(10.0) / 100 * pq.s, np.arange(10.0), From 2d808839e2a9f496d39c3ae05a955cb9a45f9bda Mon Sep 17 00:00:00 2001 From: Julia Sprenger Date: Thu, 16 Jul 2020 20:59:15 +0200 Subject: [PATCH 03/13] Redefine patch parameter `overwrite` and add padding feature --- neo/core/analogsignal.py | 93 +++++++++++++++++++------- neo/test/coretest/test_analogsignal.py | 74 ++++++++++++++++---- 2 files changed, 131 insertions(+), 36 deletions(-) diff --git a/neo/core/analogsignal.py b/neo/core/analogsignal.py index b7f80a3b1..d673c1535 100644 --- a/neo/core/analogsignal.py +++ b/neo/core/analogsignal.py @@ -658,7 +658,7 @@ def rectify(self, **kwargs): return rectified_signal - def patch(self, other, overwrite=True): + def patch(self, other, overwrite=True, padding=False): ''' Patch another signal to this one. @@ -679,10 +679,24 @@ def patch(self, other, overwrite=True): ---------- other : neo.BaseSignal The object that is merged into this one. + The other signal needs cover a later time period than + this one, i.e. self.t_start < other.t_start overwrite : bool - If False, samples of this signal are overwritten - by other signal. If True, samples of other signal - are overwritten by this signal. Default: True + If True, samples of the earlier (smaller t_start) + signal are overwritten by the later signal. + If False, samples of the later (higher t_start) + are overwritten by earlier signal. + Default: False + padding : bool, scalar quantity + Sampling values to use as padding in case signals + do not overlap. + If False, do not apply padding. Signals have to align or + overlap. If True, signals will be padded using + np.NaN as pad values. If a scalar quantity is provided, this + will be used for padding. The other signal is moved + forward in time by maximum one sampling period to + align the sampling times of both signals. + Default: False Returns ------- @@ -696,11 +710,21 @@ def patch(self, other, overwrite=True): If `other` object has incompatible attributes. ''' + if other.units != self.units: + other = other.rescale(self.units) + + if self.t_start > other.t_start: + signal1, signal2 = other, self + else: + signal1, signal2 = self, other + # raise MergeError('Inconsistent timing of signals. Other signal needs to be later than' + # ' this signal') + for attr in self._necessary_attrs: - if 'signal' != attr[0]: + if attr[0] not in ['signal', 't_start', 't_stop']: if getattr(self, attr[0], None) != getattr(other, attr[0], None): - if attr[0] in ['t_start','t_stop']: - continue + # if attr[0] in ['t_start','t_stop']: + # continue raise MergeError("Cannot patch these two signals as the %s differ." % attr[0]) if hasattr(self, "lazy_shape"): @@ -711,24 +735,45 @@ def patch(self, other, overwrite=True): merged_lazy_shape = (self.lazy_shape[0] + other.lazy_shape[0], self.lazy_shape[-1]) else: raise MergeError("Cannot patch a lazy object with a real object.") - if other.units != self.units: - other = other.rescale(self.units) - if self.t_start > other.t_stop: - raise MergeError('Signals do not overlap.') - - # adjust overlapping signals - if self.t_stop + self.sampling_period >= other.t_start: - if not overwrite: # removing samples of other signal - slice_t_start = self.t_stop + self.sampling_period - sliced_other = other.time_slice(slice_t_start, None) - stack = np.vstack((self.magnitude, sliced_other.magnitude)) - else: # removing samples of this signal - slice_t_stop = other.t_start - other.sampling_period - sliced_self = self.time_slice(None, slice_t_stop) - stack = np.vstack((sliced_self.magnitude, other.magnitude)) + # in case of non-overlapping signals consider padding + if signal2.t_start > signal1.t_stop + signal1.sampling_period: + if padding != False: + logger.warning('Signals will be padded using {}.'.format(padding)) + pad_time = signal2.t_start-signal1.t_stop + n_pad_samples = int(((pad_time)*self.sampling_rate).rescale('dimensionless')) + if padding is True: + padding = np.NaN * self.units + if isinstance(padding, pq.Quantity): + padding = padding.rescale(self.units).magnitude + else: + raise ValueError('Invalid type of padding value. Please provide a bool value ' + 'or a quantities object.') + pad_data = np.full((n_pad_samples,) + signal1.shape[1:], padding) + + # create new signal 1 with extended data, but keep array_annotations + signal_tmp = signal1.duplicate_with_new_data(np.vstack((signal1.magnitude, pad_data))) + signal_tmp.array_annotations = signal1.array_annotations + signal1 = signal_tmp + else: + raise MergeError('Signals do not overlap, but no padding is provided.' + 'Please provide a padding mode.') + + # in case of overlapping signals slice according to overwrite parameter + elif signal2.t_start < signal1.t_stop + signal1.sampling_period: + n_samples = int(((signal1.t_stop - signal2.t_start)*signal1.sampling_rate).simplified) + logger.warning('Overwriting {} samples while patching signals.'.format(n_samples)) + if not overwrite: # removing samples second signal + slice_t_start = signal1.t_stop + signal1.sampling_period + signal2 = signal2.time_slice(slice_t_start, None) + else: # removing samples of the first signal + slice_t_stop = signal2.t_start - signal2.sampling_period + signal1 = signal1.time_slice(None, slice_t_stop) else: - raise MergeError("Cannot patch signals with non-overlapping times") + assert signal2.t_start == signal1.t_stop + signal1.sampling_period, \ + "Cannot patch signals with non-overlapping times" + + stack = np.vstack((signal1.magnitude, signal2.magnitude)) kwargs = {} for name in ("name", "description", "file_origin"): @@ -745,7 +790,7 @@ def patch(self, other, overwrite=True): other.array_annotations) signal = self.__class__(stack, units=self.units, dtype=self.dtype, copy=False, - t_start=self.t_start, sampling_rate=self.sampling_rate, **kwargs) + t_start=signal1.t_start, sampling_rate=self.sampling_rate, **kwargs) signal.segment = None signal.channel_index = None diff --git a/neo/test/coretest/test_analogsignal.py b/neo/test/coretest/test_analogsignal.py index 1757ce0dd..3327cdf89 100644 --- a/neo/test/coretest/test_analogsignal.py +++ b/neo/test/coretest/test_analogsignal.py @@ -1574,8 +1574,8 @@ def test__merge(self): assert_arrays_equal(mergeddata24, targdata24) def test_patch_simple(self): - signal1 = AnalogSignal([0,1,2,3]*pq.s, sampling_rate=1*pq.Hz) - signal2 = AnalogSignal([4,5,6]*pq.s, sampling_rate=1*pq.Hz, + signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) + signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, t_start=signal1.t_stop + signal1.sampling_period) result = signal1.patch(signal2) @@ -1583,9 +1583,19 @@ def test_patch_simple(self): for attr in signal1._necessary_attrs: self.assertEqual(getattr(signal1, attr[0], None), getattr(result, attr[0], None)) + def test_patch_inverse_signals(self): + signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) + signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, + t_start=signal1.t_stop + signal1.sampling_period) + + result = signal2.patch(signal1) + assert_array_equal(np.arange(7).reshape((-1, 1)), result.magnitude) + for attr in signal1._necessary_attrs: + self.assertEqual(getattr(signal1, attr[0], None), getattr(result, attr[0], None)) + def test_patch_no_overlap(self): - signal1 = AnalogSignal([0,1,2,3]*pq.s, sampling_rate=1*pq.Hz) - signal2 = AnalogSignal([4,5,6]*pq.s, sampling_rate=1*pq.Hz, + signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) + signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, t_start=10*pq.s + signal1.sampling_period) with self.assertRaises(MergeError): @@ -1594,8 +1604,8 @@ def test_patch_no_overlap(self): def test_patch_multi_trace(self): data1 = np.arange(4).reshape(2,2) data2 = np.arange(4,8).reshape(2,2) - signal1 = AnalogSignal(data1*pq.s, sampling_rate=1*pq.Hz) - signal2 = AnalogSignal(data2*pq.s, sampling_rate=1*pq.Hz, + signal1 = AnalogSignal(data1*pq.V, sampling_rate=1*pq.Hz) + signal2 = AnalogSignal(data2*pq.V, sampling_rate=1*pq.Hz, t_start=signal1.t_stop + signal1.sampling_period) result = signal1.patch(signal2) @@ -1605,30 +1615,70 @@ def test_patch_multi_trace(self): self.assertEqual(getattr(signal1, attr[0], None), getattr(result, attr[0], None)) def test_patch_overwrite_true(self): - signal1 = AnalogSignal([0,1,2,3]*pq.s, sampling_rate=1*pq.Hz) - signal2 = AnalogSignal([4,5,6]*pq.s, sampling_rate=1*pq.Hz, + signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) + signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, t_start=signal1.t_stop) result = signal1.patch(signal2, overwrite=True) assert_array_equal(np.array([0,1,2,4,5,6]).reshape((-1, 1)), result.magnitude) def test_patch_overwrite_false(self): - signal1 = AnalogSignal([0,1,2,3]*pq.s, sampling_rate=1*pq.Hz) - signal2 = AnalogSignal([4,5,6]*pq.s, sampling_rate=1*pq.Hz, + signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) + signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, t_start=signal1.t_stop) result = signal1.patch(signal2, overwrite=False) assert_array_equal(np.array([0,1,2,3,5,6]).reshape((-1, 1)), result.magnitude) + def test_patch_padding_False(self): + signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) + signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, + t_start=10*pq.s) + + with self.assertRaises(MergeError): + result = signal1.patch(signal2, overwrite=False, padding=False) + + def test_patch_padding_True(self): + signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) + signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, + t_start=signal1.t_stop + 3 * signal1.sampling_period) + + result = signal1.patch(signal2, overwrite=False, padding=True) + assert_array_equal(np.array([0,1,2,3,np.NaN,np.NaN,np.NaN,4,5,6]).reshape((-1, 1)), + result.magnitude) + + def test_patch_padding_quantity(self): + signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) + signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, + t_start=signal1.t_stop + 3 * signal1.sampling_period) + + result = signal1.patch(signal2, overwrite=False, padding=-1*pq.mV) + assert_array_equal(np.array([0,1,2,3,-1e-3,-1e-3,-1e-3,4,5,6]).reshape((-1, 1)), + result.magnitude) + + def test_patch_padding_invalid(self): + signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) + signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, + t_start=signal1.t_stop + 3 * signal1.sampling_period) + + with self.assertRaises(ValueError): + result = signal1.patch(signal2, overwrite=False, padding=1) + with self.assertRaises(ValueError): + result = signal1.patch(signal2, overwrite=False, padding=[1]) + with self.assertRaises(ValueError): + result = signal1.patch(signal2, overwrite=False, padding='a') + with self.assertRaises(ValueError): + result = signal1.patch(signal2, overwrite=False, padding=np.array([1,2,3])) + def test_patch_array_annotations(self): array_anno1 = {'first': ['a','b']} array_anno2 = {'first': ['a','b'], 'second': ['c','d']} data1 = np.arange(4).reshape(2,2) data2 = np.arange(4,8).reshape(2,2) - signal1 = AnalogSignal(data1*pq.s, sampling_rate=1*pq.Hz, + signal1 = AnalogSignal(data1*pq.V, sampling_rate=1*pq.Hz, array_annotations=array_anno1) - signal2 = AnalogSignal(data2*pq.s, sampling_rate=1*pq.Hz, + signal2 = AnalogSignal(data2*pq.V, sampling_rate=1*pq.Hz, t_start=signal1.t_stop + signal1.sampling_period, array_annotations=array_anno2) From aca119a6c332f044095c10260d7a287eab16c5c9 Mon Sep 17 00:00:00 2001 From: Julia Sprenger Date: Thu, 24 Sep 2020 15:01:11 +0200 Subject: [PATCH 04/13] Rename 'patch' to 'concatenate' --- neo/core/analogsignal.py | 14 ++--- neo/core/basesignal.py | 4 +- neo/core/irregularlysampledsignal.py | 10 ++-- neo/test/coretest/test_analogsignal.py | 58 +++++++++---------- .../coretest/test_irregularysampledsignal.py | 16 ++--- 5 files changed, 51 insertions(+), 51 deletions(-) diff --git a/neo/core/analogsignal.py b/neo/core/analogsignal.py index d673c1535..ec2fc3a2a 100644 --- a/neo/core/analogsignal.py +++ b/neo/core/analogsignal.py @@ -658,7 +658,7 @@ def rectify(self, **kwargs): return rectified_signal - def patch(self, other, overwrite=True, padding=False): + def concatenate(self, other, overwrite=True, padding=False): ''' Patch another signal to this one. @@ -668,7 +668,7 @@ def patch(self, other, overwrite=True, padding=False): have to overlap by one sample in time, i.e. self.t_stop == other.t_start Note: Only common array annotations common to - both signals are attached to the patched signal. + both signals are attached to the concatenated signal. If the attributes of the two signal are not compatible, an Exception is raised. @@ -725,16 +725,16 @@ def patch(self, other, overwrite=True, padding=False): if getattr(self, attr[0], None) != getattr(other, attr[0], None): # if attr[0] in ['t_start','t_stop']: # continue - raise MergeError("Cannot patch these two signals as the %s differ." % attr[0]) + raise MergeError("Cannot concatenate these two signals as the %s differ." % attr[0]) if hasattr(self, "lazy_shape"): if hasattr(other, "lazy_shape"): if self.lazy_shape[-1] != other.lazy_shape[-1]: - raise MergeError("Cannot patch signals as they contain" + raise MergeError("Cannot concatenate signals as they contain" " different numbers of traces.") merged_lazy_shape = (self.lazy_shape[0] + other.lazy_shape[0], self.lazy_shape[-1]) else: - raise MergeError("Cannot patch a lazy object with a real object.") + raise MergeError("Cannot concatenate a lazy object with a real object.") # in case of non-overlapping signals consider padding if signal2.t_start > signal1.t_stop + signal1.sampling_period: @@ -762,7 +762,7 @@ def patch(self, other, overwrite=True, padding=False): # in case of overlapping signals slice according to overwrite parameter elif signal2.t_start < signal1.t_stop + signal1.sampling_period: n_samples = int(((signal1.t_stop - signal2.t_start)*signal1.sampling_rate).simplified) - logger.warning('Overwriting {} samples while patching signals.'.format(n_samples)) + logger.warning('Overwriting {} samples while concatenating signals.'.format(n_samples)) if not overwrite: # removing samples second signal slice_t_start = signal1.t_stop + signal1.sampling_period signal2 = signal2.time_slice(slice_t_start, None) @@ -771,7 +771,7 @@ def patch(self, other, overwrite=True, padding=False): signal1 = signal1.time_slice(None, slice_t_stop) else: assert signal2.t_start == signal1.t_stop + signal1.sampling_period, \ - "Cannot patch signals with non-overlapping times" + "Cannot concatenate signals with non-overlapping times" stack = np.vstack((signal1.magnitude, signal2.magnitude)) diff --git a/neo/core/basesignal.py b/neo/core/basesignal.py index ac412daaf..aacf955d1 100644 --- a/neo/core/basesignal.py +++ b/neo/core/basesignal.py @@ -299,7 +299,7 @@ def time_slice(self, t_start, t_stop): NotImplementedError('Needs to be implemented for subclasses.') - def patch(self, other): + def concatenate(self, other): ''' Patch another signal to this one. @@ -307,7 +307,7 @@ def patch(self, other): (row-wise, :func:`np.vstack`). Patching can be used to combine signals across segments. Note: Only array annotations common to - both signals are attached to the patched signal. + both signals are attached to the concatenated signal. If the attributes of the two signal are not compatible, an Exception is raised. diff --git a/neo/core/irregularlysampledsignal.py b/neo/core/irregularlysampledsignal.py index 03f6a9b8f..5516b3547 100644 --- a/neo/core/irregularlysampledsignal.py +++ b/neo/core/irregularlysampledsignal.py @@ -515,7 +515,7 @@ def merge(self, other): return signal - def patch(self, other): + def concatenate(self, other): ''' Patch another signal to this one. @@ -523,7 +523,7 @@ def patch(self, other): (row-wise, :func:`np.vstack`). Patching can be used to combine signals across segments. Note: Only array annotations common to - both signals are attached to the patched signal. + both signals are attached to the concatenated signal. If the attributes of the two signal are not compatible, an Exception is raised. @@ -550,16 +550,16 @@ def patch(self, other): for attr in self._necessary_attrs: if not (attr[0] in ['signal', 'times', 't_start', 't_stop', 'times']): if getattr(self, attr[0], None) != getattr(other, attr[0], None): - raise MergeError("Cannot patch these two signals as the %s differ." % attr[0]) + raise MergeError("Cannot concatenate these two signals as the %s differ." % attr[0]) if hasattr(self, "lazy_shape"): if hasattr(other, "lazy_shape"): if self.lazy_shape[-1] != other.lazy_shape[-1]: - raise MergeError("Cannot patch signals as they contain" + raise MergeError("Cannot concatenate signals as they contain" " different numbers of traces.") merged_lazy_shape = (self.lazy_shape[0] + other.lazy_shape[0], self.lazy_shape[-1]) else: - raise MergeError("Cannot patch a lazy object with a real object.") + raise MergeError("Cannot concatenate a lazy object with a real object.") if other.units != self.units: other = other.rescale(self.units) diff --git a/neo/test/coretest/test_analogsignal.py b/neo/test/coretest/test_analogsignal.py index 3327cdf89..a92cff0bd 100644 --- a/neo/test/coretest/test_analogsignal.py +++ b/neo/test/coretest/test_analogsignal.py @@ -1573,104 +1573,104 @@ def test__merge(self): assert_arrays_equal(mergeddata23, targdata23) assert_arrays_equal(mergeddata24, targdata24) - def test_patch_simple(self): + def test_concatenate_simple(self): signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, t_start=signal1.t_stop + signal1.sampling_period) - result = signal1.patch(signal2) + result = signal1.concatenate(signal2) assert_array_equal(np.arange(7).reshape((-1, 1)), result.magnitude) for attr in signal1._necessary_attrs: self.assertEqual(getattr(signal1, attr[0], None), getattr(result, attr[0], None)) - def test_patch_inverse_signals(self): + def test_concatenate_inverse_signals(self): signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, t_start=signal1.t_stop + signal1.sampling_period) - result = signal2.patch(signal1) + result = signal2.concatenate(signal1) assert_array_equal(np.arange(7).reshape((-1, 1)), result.magnitude) for attr in signal1._necessary_attrs: self.assertEqual(getattr(signal1, attr[0], None), getattr(result, attr[0], None)) - def test_patch_no_overlap(self): + def test_concatenate_no_overlap(self): signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, t_start=10*pq.s + signal1.sampling_period) with self.assertRaises(MergeError): - signal1.patch(signal2) + signal1.concatenate(signal2) - def test_patch_multi_trace(self): + def test_concatenate_multi_trace(self): data1 = np.arange(4).reshape(2,2) data2 = np.arange(4,8).reshape(2,2) signal1 = AnalogSignal(data1*pq.V, sampling_rate=1*pq.Hz) signal2 = AnalogSignal(data2*pq.V, sampling_rate=1*pq.Hz, t_start=signal1.t_stop + signal1.sampling_period) - result = signal1.patch(signal2) + result = signal1.concatenate(signal2) data_expected = np.array([[0,1],[2,3],[4,5],[6,7]]) assert_array_equal(data_expected, result.magnitude) for attr in signal1._necessary_attrs: self.assertEqual(getattr(signal1, attr[0], None), getattr(result, attr[0], None)) - def test_patch_overwrite_true(self): + def test_concatenate_overwrite_true(self): signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, t_start=signal1.t_stop) - result = signal1.patch(signal2, overwrite=True) + result = signal1.concatenate(signal2, overwrite=True) assert_array_equal(np.array([0,1,2,4,5,6]).reshape((-1, 1)), result.magnitude) - def test_patch_overwrite_false(self): + def test_concatenate_overwrite_false(self): signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, t_start=signal1.t_stop) - result = signal1.patch(signal2, overwrite=False) + result = signal1.concatenate(signal2, overwrite=False) assert_array_equal(np.array([0,1,2,3,5,6]).reshape((-1, 1)), result.magnitude) - def test_patch_padding_False(self): + def test_concatenate_padding_False(self): signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, t_start=10*pq.s) with self.assertRaises(MergeError): - result = signal1.patch(signal2, overwrite=False, padding=False) + result = signal1.concatenate(signal2, overwrite=False, padding=False) - def test_patch_padding_True(self): + def test_concatenate_padding_True(self): signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, t_start=signal1.t_stop + 3 * signal1.sampling_period) - result = signal1.patch(signal2, overwrite=False, padding=True) + result = signal1.concatenate(signal2, overwrite=False, padding=True) assert_array_equal(np.array([0,1,2,3,np.NaN,np.NaN,np.NaN,4,5,6]).reshape((-1, 1)), result.magnitude) - def test_patch_padding_quantity(self): + def test_concatenate_padding_quantity(self): signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, t_start=signal1.t_stop + 3 * signal1.sampling_period) - result = signal1.patch(signal2, overwrite=False, padding=-1*pq.mV) + result = signal1.concatenate(signal2, overwrite=False, padding=-1 * pq.mV) assert_array_equal(np.array([0,1,2,3,-1e-3,-1e-3,-1e-3,4,5,6]).reshape((-1, 1)), result.magnitude) - def test_patch_padding_invalid(self): + def test_concatenate_padding_invalid(self): signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, t_start=signal1.t_stop + 3 * signal1.sampling_period) with self.assertRaises(ValueError): - result = signal1.patch(signal2, overwrite=False, padding=1) + result = signal1.concatenate(signal2, overwrite=False, padding=1) with self.assertRaises(ValueError): - result = signal1.patch(signal2, overwrite=False, padding=[1]) + result = signal1.concatenate(signal2, overwrite=False, padding=[1]) with self.assertRaises(ValueError): - result = signal1.patch(signal2, overwrite=False, padding='a') + result = signal1.concatenate(signal2, overwrite=False, padding='a') with self.assertRaises(ValueError): - result = signal1.patch(signal2, overwrite=False, padding=np.array([1,2,3])) + result = signal1.concatenate(signal2, overwrite=False, padding=np.array([1, 2, 3])) - def test_patch_array_annotations(self): + def test_concatenate_array_annotations(self): array_anno1 = {'first': ['a','b']} array_anno2 = {'first': ['a','b'], 'second': ['c','d']} @@ -1682,13 +1682,13 @@ def test_patch_array_annotations(self): t_start=signal1.t_stop + signal1.sampling_period, array_annotations=array_anno2) - result = signal1.patch(signal2) + result = signal1.concatenate(signal2) assert_array_equal(array_anno1.keys(), result.array_annotations.keys()) for k in array_anno1.keys(): assert_array_equal(np.asarray(array_anno1[k]), result.array_annotations[k]) - def test_patch_complex(self): + def test_concatenate_complex(self): signal1 = self.signal1 assert_neo_object_is_compliant(self.signal1) @@ -1697,13 +1697,13 @@ def test_patch_complex(self): array_annotations=self.arr_ann1, t_start=signal1.t_stop + signal1.sampling_period) - patched12 = self.signal1.patch(signal2) + concatenated12 = self.signal1.concatenate(signal2) for attr in signal1._necessary_attrs: - self.assertEqual(getattr(signal1, attr[0], None), getattr(patched12, attr[0], None)) + self.assertEqual(getattr(signal1, attr[0], None), getattr(concatenated12, attr[0], None)) assert_array_equal(np.vstack((signal1.magnitude, signal2.magnitude)), - patched12.magnitude) + concatenated12.magnitude) class TestAnalogSignalFunctions(unittest.TestCase): def test__pickle_1d(self): diff --git a/neo/test/coretest/test_irregularysampledsignal.py b/neo/test/coretest/test_irregularysampledsignal.py index 116c4e546..8a35e1bf4 100644 --- a/neo/test/coretest/test_irregularysampledsignal.py +++ b/neo/test/coretest/test_irregularysampledsignal.py @@ -946,11 +946,11 @@ def test__merge(self): self.assertRaises(MergeError, signal1.merge, signal3) - def test_patch_simple(self): + def test_concatenate_simple(self): signal1 = IrregularlySampledSignal(signal=[0,1,2,3]*pq.s, times=[1,10,11,20]*pq.s) signal2 = IrregularlySampledSignal(signal=[4,5,6]*pq.s, times=[15,16,21]*pq.s) - result = signal1.patch(signal2) + result = signal1.concatenate(signal2) assert_array_equal(np.array([0,1,2,4,5,3,6]).reshape((-1, 1)), result.magnitude) assert_array_equal(np.array([1,10,11,15,16,20,21]), result.times) for attr in signal1._necessary_attrs: @@ -958,15 +958,15 @@ def test_patch_simple(self): continue self.assertEqual(getattr(signal1, attr[0], None), getattr(result, attr[0], None)) - def test_patch_no_overlap(self): + def test_concatenate_no_overlap(self): signal1 = IrregularlySampledSignal(signal=[0,1,2,3]*pq.s, times=range(4)*pq.s) signal2 = IrregularlySampledSignal(signal=[4,5,6]*pq.s, times=range(4,7)*pq.s) - result = signal1.patch(signal2) + result = signal1.concatenate(signal2) assert_array_equal(np.arange(7).reshape((-1, 1)), result.magnitude) assert_array_equal(np.arange(7), result.times) - def test_patch_multi_trace(self): + def test_concatenate_multi_trace(self): data1 = np.arange(4).reshape(2,2) data2 = np.arange(4,8).reshape(2,2) n1 = len(data1) @@ -974,11 +974,11 @@ def test_patch_multi_trace(self): signal1 = IrregularlySampledSignal(signal=data1*pq.s, times=range(n1)*pq.s) signal2 = IrregularlySampledSignal(signal=data2*pq.s, times=range(n1, n1+n2)*pq.s) - result = signal1.patch(signal2) + result = signal1.concatenate(signal2) data_expected = np.array([[0,1],[2,3],[4,5],[6,7]]) assert_array_equal(data_expected, result.magnitude) - def test_patch_array_annotations(self): + def test_concatenate_array_annotations(self): array_anno1 = {'first': ['a','b']} array_anno2 = {'first': ['a','b'], 'second': ['c','d']} @@ -991,7 +991,7 @@ def test_patch_array_annotations(self): signal2 = IrregularlySampledSignal(signal=data2*pq.s, times=range(n1, n1+n2)*pq.s, array_annotations=array_anno2) - result = signal1.patch(signal2) + result = signal1.concatenate(signal2) assert_array_equal(array_anno1.keys(), result.array_annotations.keys()) for k in array_anno1.keys(): From fe8012993080789b0caaa0098a50f59e1df21244 Mon Sep 17 00:00:00 2001 From: Julia Sprenger Date: Thu, 24 Sep 2020 16:03:38 +0200 Subject: [PATCH 05/13] Some PEP8 fixes --- neo/core/analogsignal.py | 14 +-- neo/core/basesignal.py | 13 ++- neo/core/irregularlysampledsignal.py | 3 +- neo/test/coretest/test_analogsignal.py | 97 ++++++++++--------- .../coretest/test_irregularysampledsignal.py | 53 +++++----- 5 files changed, 92 insertions(+), 88 deletions(-) diff --git a/neo/core/analogsignal.py b/neo/core/analogsignal.py index ec2fc3a2a..50503e8ef 100644 --- a/neo/core/analogsignal.py +++ b/neo/core/analogsignal.py @@ -725,7 +725,8 @@ def concatenate(self, other, overwrite=True, padding=False): if getattr(self, attr[0], None) != getattr(other, attr[0], None): # if attr[0] in ['t_start','t_stop']: # continue - raise MergeError("Cannot concatenate these two signals as the %s differ." % attr[0]) + raise MergeError( + "Cannot concatenate these two signals as the %s differ." % attr[0]) if hasattr(self, "lazy_shape"): if hasattr(other, "lazy_shape"): @@ -740,8 +741,8 @@ def concatenate(self, other, overwrite=True, padding=False): if signal2.t_start > signal1.t_stop + signal1.sampling_period: if padding != False: logger.warning('Signals will be padded using {}.'.format(padding)) - pad_time = signal2.t_start-signal1.t_stop - n_pad_samples = int(((pad_time)*self.sampling_rate).rescale('dimensionless')) + pad_time = signal2.t_start - signal1.t_stop + n_pad_samples = int(((pad_time) * self.sampling_rate).rescale('dimensionless')) if padding is True: padding = np.NaN * self.units if isinstance(padding, pq.Quantity): @@ -752,7 +753,8 @@ def concatenate(self, other, overwrite=True, padding=False): pad_data = np.full((n_pad_samples,) + signal1.shape[1:], padding) # create new signal 1 with extended data, but keep array_annotations - signal_tmp = signal1.duplicate_with_new_data(np.vstack((signal1.magnitude, pad_data))) + signal_tmp = signal1.duplicate_with_new_data( + np.vstack((signal1.magnitude, pad_data))) signal_tmp.array_annotations = signal1.array_annotations signal1 = signal_tmp else: @@ -761,7 +763,7 @@ def concatenate(self, other, overwrite=True, padding=False): # in case of overlapping signals slice according to overwrite parameter elif signal2.t_start < signal1.t_stop + signal1.sampling_period: - n_samples = int(((signal1.t_stop - signal2.t_start)*signal1.sampling_rate).simplified) + n_samples = int(((signal1.t_stop - signal2.t_start) * signal1.sampling_rate).simplified) logger.warning('Overwriting {} samples while concatenating signals.'.format(n_samples)) if not overwrite: # removing samples second signal slice_t_start = signal1.t_stop + signal1.sampling_period @@ -797,4 +799,4 @@ def concatenate(self, other, overwrite=True, padding=False): if hasattr(self, "lazy_shape"): signal.lazy_shape = merged_lazy_shape - return signal \ No newline at end of file + return signal diff --git a/neo/core/basesignal.py b/neo/core/basesignal.py index aacf955d1..c814550de 100644 --- a/neo/core/basesignal.py +++ b/neo/core/basesignal.py @@ -282,10 +282,12 @@ def merge(self, other): # merge channel_index (move to ChannelIndex.merge()?) if self.channel_index and other.channel_index: signal.channel_index = ChannelIndex(index=np.arange(signal.shape[1]), - channel_ids=np.hstack( - [self.channel_index.channel_ids, other.channel_index.channel_ids]), - channel_names=np.hstack( - [self.channel_index.channel_names, other.channel_index.channel_names])) + channel_ids=np.hstack( + [self.channel_index.channel_ids, + other.channel_index.channel_ids]), + channel_names=np.hstack( + [self.channel_index.channel_names, + other.channel_index.channel_names])) else: signal.channel_index = ChannelIndex(index=np.arange(signal.shape[1])) @@ -298,7 +300,6 @@ def time_slice(self, t_start, t_stop): ''' NotImplementedError('Needs to be implemented for subclasses.') - def concatenate(self, other): ''' Patch another signal to this one. @@ -332,5 +333,3 @@ def concatenate(self, other): ''' NotImplementedError('Patching need to be implemented in sublcasses') - - diff --git a/neo/core/irregularlysampledsignal.py b/neo/core/irregularlysampledsignal.py index 5516b3547..f0e74f73d 100644 --- a/neo/core/irregularlysampledsignal.py +++ b/neo/core/irregularlysampledsignal.py @@ -550,7 +550,8 @@ def concatenate(self, other): for attr in self._necessary_attrs: if not (attr[0] in ['signal', 'times', 't_start', 't_stop', 'times']): if getattr(self, attr[0], None) != getattr(other, attr[0], None): - raise MergeError("Cannot concatenate these two signals as the %s differ." % attr[0]) + raise MergeError( + "Cannot concatenate these two signals as the %s differ." % attr[0]) if hasattr(self, "lazy_shape"): if hasattr(other, "lazy_shape"): diff --git a/neo/test/coretest/test_analogsignal.py b/neo/test/coretest/test_analogsignal.py index a92cff0bd..80b0e6f90 100644 --- a/neo/test/coretest/test_analogsignal.py +++ b/neo/test/coretest/test_analogsignal.py @@ -371,16 +371,16 @@ def test__pretty(self): for i, signal in enumerate(self.signals): prepr = pretty(signal) targ = ( - ('AnalogSignal with %d channels of length %d; units %s; datatype %s \n' - '' % (signal.shape[1], signal.shape[0], - signal.units.dimensionality.unicode, signal.dtype)) - + ('annotations: %s\n' % signal.annotations) - + ('sampling rate: {} {}\n'.format(float(signal.sampling_rate), - signal.sampling_rate.dimensionality.unicode)) - + ('time: {} {} to {} {}'.format(float(signal.t_start), - signal.t_start.dimensionality.unicode, - float(signal.t_stop), - signal.t_stop.dimensionality.unicode)) + ('AnalogSignal with %d channels of length %d; units %s; datatype %s \n' + '' % (signal.shape[1], signal.shape[0], + signal.units.dimensionality.unicode, signal.dtype)) + + ('annotations: %s\n' % signal.annotations) + + ('sampling rate: {} {}\n'.format(float(signal.sampling_rate), + signal.sampling_rate.dimensionality.unicode)) + + ('time: {} {} to {} {}'.format(float(signal.t_start), + signal.t_start.dimensionality.unicode, + float(signal.t_stop), + signal.t_stop.dimensionality.unicode)) ) self.assertEqual(prepr, targ) @@ -1291,6 +1291,7 @@ def test_rectify(self): assert_arrays_equal(rectified_signal.array_annotations['anno2'], target_signal.array_annotations['anno2']) + class TestAnalogSignalEquality(unittest.TestCase): def test__signals_with_different_data_complement_should_be_not_equal(self): signal1 = AnalogSignal(np.arange(55.0).reshape((11, 5)), units="mV", @@ -1574,8 +1575,8 @@ def test__merge(self): assert_arrays_equal(mergeddata24, targdata24) def test_concatenate_simple(self): - signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) - signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, + signal1 = AnalogSignal([0, 1, 2, 3] * pq.V, sampling_rate=1 * pq.Hz) + signal2 = AnalogSignal([4, 5, 6] * pq.V, sampling_rate=1 * pq.Hz, t_start=signal1.t_stop + signal1.sampling_period) result = signal1.concatenate(signal2) @@ -1584,8 +1585,8 @@ def test_concatenate_simple(self): self.assertEqual(getattr(signal1, attr[0], None), getattr(result, attr[0], None)) def test_concatenate_inverse_signals(self): - signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) - signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, + signal1 = AnalogSignal([0, 1, 2, 3] * pq.V, sampling_rate=1 * pq.Hz) + signal2 = AnalogSignal([4, 5, 6] * pq.V, sampling_rate=1 * pq.Hz, t_start=signal1.t_stop + signal1.sampling_period) result = signal2.concatenate(signal1) @@ -1594,71 +1595,71 @@ def test_concatenate_inverse_signals(self): self.assertEqual(getattr(signal1, attr[0], None), getattr(result, attr[0], None)) def test_concatenate_no_overlap(self): - signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) - signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, - t_start=10*pq.s + signal1.sampling_period) + signal1 = AnalogSignal([0, 1, 2, 3] * pq.V, sampling_rate=1 * pq.Hz) + signal2 = AnalogSignal([4, 5, 6] * pq.V, sampling_rate=1 * pq.Hz, + t_start=10 * pq.s + signal1.sampling_period) with self.assertRaises(MergeError): signal1.concatenate(signal2) def test_concatenate_multi_trace(self): - data1 = np.arange(4).reshape(2,2) - data2 = np.arange(4,8).reshape(2,2) - signal1 = AnalogSignal(data1*pq.V, sampling_rate=1*pq.Hz) - signal2 = AnalogSignal(data2*pq.V, sampling_rate=1*pq.Hz, + data1 = np.arange(4).reshape(2, 2) + data2 = np.arange(4, 8).reshape(2, 2) + signal1 = AnalogSignal(data1 * pq.V, sampling_rate=1 * pq.Hz) + signal2 = AnalogSignal(data2 * pq.V, sampling_rate=1 * pq.Hz, t_start=signal1.t_stop + signal1.sampling_period) result = signal1.concatenate(signal2) - data_expected = np.array([[0,1],[2,3],[4,5],[6,7]]) + data_expected = np.array([[0, 1], [2, 3], [4, 5], [6, 7]]) assert_array_equal(data_expected, result.magnitude) for attr in signal1._necessary_attrs: self.assertEqual(getattr(signal1, attr[0], None), getattr(result, attr[0], None)) def test_concatenate_overwrite_true(self): - signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) - signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, + signal1 = AnalogSignal([0, 1, 2, 3] * pq.V, sampling_rate=1 * pq.Hz) + signal2 = AnalogSignal([4, 5, 6] * pq.V, sampling_rate=1 * pq.Hz, t_start=signal1.t_stop) result = signal1.concatenate(signal2, overwrite=True) - assert_array_equal(np.array([0,1,2,4,5,6]).reshape((-1, 1)), result.magnitude) + assert_array_equal(np.array([0, 1, 2, 4, 5, 6]).reshape((-1, 1)), result.magnitude) def test_concatenate_overwrite_false(self): - signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) - signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, + signal1 = AnalogSignal([0, 1, 2, 3] * pq.V, sampling_rate=1 * pq.Hz) + signal2 = AnalogSignal([4, 5, 6] * pq.V, sampling_rate=1 * pq.Hz, t_start=signal1.t_stop) result = signal1.concatenate(signal2, overwrite=False) - assert_array_equal(np.array([0,1,2,3,5,6]).reshape((-1, 1)), result.magnitude) + assert_array_equal(np.array([0, 1, 2, 3, 5, 6]).reshape((-1, 1)), result.magnitude) def test_concatenate_padding_False(self): - signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) - signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, - t_start=10*pq.s) + signal1 = AnalogSignal([0, 1, 2, 3] * pq.V, sampling_rate=1 * pq.Hz) + signal2 = AnalogSignal([4, 5, 6] * pq.V, sampling_rate=1 * pq.Hz, + t_start=10 * pq.s) with self.assertRaises(MergeError): result = signal1.concatenate(signal2, overwrite=False, padding=False) def test_concatenate_padding_True(self): - signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) - signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, + signal1 = AnalogSignal([0, 1, 2, 3] * pq.V, sampling_rate=1 * pq.Hz) + signal2 = AnalogSignal([4, 5, 6] * pq.V, sampling_rate=1 * pq.Hz, t_start=signal1.t_stop + 3 * signal1.sampling_period) result = signal1.concatenate(signal2, overwrite=False, padding=True) - assert_array_equal(np.array([0,1,2,3,np.NaN,np.NaN,np.NaN,4,5,6]).reshape((-1, 1)), + assert_array_equal(np.array([0, 1, 2, 3, np.NaN, np.NaN, np.NaN, 4, 5, 6]).reshape((-1, 1)), result.magnitude) def test_concatenate_padding_quantity(self): - signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) - signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, + signal1 = AnalogSignal([0, 1, 2, 3] * pq.V, sampling_rate=1 * pq.Hz) + signal2 = AnalogSignal([4, 5, 6] * pq.V, sampling_rate=1 * pq.Hz, t_start=signal1.t_stop + 3 * signal1.sampling_period) result = signal1.concatenate(signal2, overwrite=False, padding=-1 * pq.mV) - assert_array_equal(np.array([0,1,2,3,-1e-3,-1e-3,-1e-3,4,5,6]).reshape((-1, 1)), + assert_array_equal(np.array([0, 1, 2, 3, -1e-3, -1e-3, -1e-3, 4, 5, 6]).reshape((-1, 1)), result.magnitude) def test_concatenate_padding_invalid(self): - signal1 = AnalogSignal([0,1,2,3]*pq.V, sampling_rate=1*pq.Hz) - signal2 = AnalogSignal([4,5,6]*pq.V, sampling_rate=1*pq.Hz, + signal1 = AnalogSignal([0, 1, 2, 3] * pq.V, sampling_rate=1 * pq.Hz) + signal2 = AnalogSignal([4, 5, 6] * pq.V, sampling_rate=1 * pq.Hz, t_start=signal1.t_stop + 3 * signal1.sampling_period) with self.assertRaises(ValueError): @@ -1671,14 +1672,14 @@ def test_concatenate_padding_invalid(self): result = signal1.concatenate(signal2, overwrite=False, padding=np.array([1, 2, 3])) def test_concatenate_array_annotations(self): - array_anno1 = {'first': ['a','b']} - array_anno2 = {'first': ['a','b'], - 'second': ['c','d']} - data1 = np.arange(4).reshape(2,2) - data2 = np.arange(4,8).reshape(2,2) - signal1 = AnalogSignal(data1*pq.V, sampling_rate=1*pq.Hz, + array_anno1 = {'first': ['a', 'b']} + array_anno2 = {'first': ['a', 'b'], + 'second': ['c', 'd']} + data1 = np.arange(4).reshape(2, 2) + data2 = np.arange(4, 8).reshape(2, 2) + signal1 = AnalogSignal(data1 * pq.V, sampling_rate=1 * pq.Hz, array_annotations=array_anno1) - signal2 = AnalogSignal(data2*pq.V, sampling_rate=1*pq.Hz, + signal2 = AnalogSignal(data2 * pq.V, sampling_rate=1 * pq.Hz, t_start=signal1.t_stop + signal1.sampling_period, array_annotations=array_anno2) @@ -1700,11 +1701,13 @@ def test_concatenate_complex(self): concatenated12 = self.signal1.concatenate(signal2) for attr in signal1._necessary_attrs: - self.assertEqual(getattr(signal1, attr[0], None), getattr(concatenated12, attr[0], None)) + self.assertEqual(getattr(signal1, attr[0], None), + getattr(concatenated12, attr[0], None)) assert_array_equal(np.vstack((signal1.magnitude, signal2.magnitude)), concatenated12.magnitude) + class TestAnalogSignalFunctions(unittest.TestCase): def test__pickle_1d(self): signal1 = AnalogSignal([1, 2, 3, 4], sampling_period=1 * pq.ms, units=pq.S) diff --git a/neo/test/coretest/test_irregularysampledsignal.py b/neo/test/coretest/test_irregularysampledsignal.py index 8a35e1bf4..7d792e276 100644 --- a/neo/test/coretest/test_irregularysampledsignal.py +++ b/neo/test/coretest/test_irregularysampledsignal.py @@ -240,12 +240,12 @@ def test_IrregularlySampledSignal_repr(self): # see https://github.com/numpy/numpy/blob/master/doc/release/1.14.0-notes.rst#many # -changes-to-array-printing-disableable-with-the-new-legacy-printing-mode targ = ( - '') + '') else: targ = ( - '') + '') res = repr(sig) self.assertEqual(targ, res) @@ -461,7 +461,7 @@ def test__time_slice_deepcopy_array_annotations(self): length = self.signal1.shape[-1] params1 = {'test0': ['y{}'.format(i) for i in range(length)], 'test1': ['deeptest' for i in range(length)], - 'test2': [(-1)**i > 0 for i in range(length)]} + 'test2': [(-1) ** i > 0 for i in range(length)]} self.signal1.array_annotate(**params1) result = self.signal1.time_slice(None, None) @@ -479,7 +479,7 @@ def test__time_slice_deepcopy_array_annotations(self): == result.array_annotations['test2'])) # Change annotations of result - params3 = {'test0': ['z{}'.format(i) for i in range(1, result.shape[-1]+1)]} + params3 = {'test0': ['z{}'.format(i) for i in range(1, result.shape[-1] + 1)]} result.array_annotate(**params3) result.array_annotations['test1'][0] = 'shallow2' self.assertFalse(all(self.signal1.array_annotations['test0'] @@ -493,12 +493,12 @@ def test__time_slice_deepcopy_data(self): result = self.signal1.time_slice(None, None) # Change values of original array - self.signal1[2] = 7.3*self.signal1.units + self.signal1[2] = 7.3 * self.signal1.units self.assertFalse(all(self.signal1 == result)) # Change values of sliced array - result[3] = 9.5*result.units + result[3] = 9.5 * result.units self.assertFalse(all(self.signal1 == result)) @@ -945,50 +945,49 @@ def test__merge(self): self.assertRaises(MergeError, signal1.merge, signal3) - def test_concatenate_simple(self): - signal1 = IrregularlySampledSignal(signal=[0,1,2,3]*pq.s, times=[1,10,11,20]*pq.s) - signal2 = IrregularlySampledSignal(signal=[4,5,6]*pq.s, times=[15,16,21]*pq.s) + signal1 = IrregularlySampledSignal(signal=[0, 1, 2, 3] * pq.s, times=[1, 10, 11, 20] * pq.s) + signal2 = IrregularlySampledSignal(signal=[4, 5, 6] * pq.s, times=[15, 16, 21] * pq.s) result = signal1.concatenate(signal2) - assert_array_equal(np.array([0,1,2,4,5,3,6]).reshape((-1, 1)), result.magnitude) - assert_array_equal(np.array([1,10,11,15,16,20,21]), result.times) + assert_array_equal(np.array([0, 1, 2, 4, 5, 3, 6]).reshape((-1, 1)), result.magnitude) + assert_array_equal(np.array([1, 10, 11, 15, 16, 20, 21]), result.times) for attr in signal1._necessary_attrs: if attr[0] == 'times': continue self.assertEqual(getattr(signal1, attr[0], None), getattr(result, attr[0], None)) def test_concatenate_no_overlap(self): - signal1 = IrregularlySampledSignal(signal=[0,1,2,3]*pq.s, times=range(4)*pq.s) - signal2 = IrregularlySampledSignal(signal=[4,5,6]*pq.s, times=range(4,7)*pq.s) + signal1 = IrregularlySampledSignal(signal=[0, 1, 2, 3] * pq.s, times=range(4) * pq.s) + signal2 = IrregularlySampledSignal(signal=[4, 5, 6] * pq.s, times=range(4, 7) * pq.s) result = signal1.concatenate(signal2) assert_array_equal(np.arange(7).reshape((-1, 1)), result.magnitude) assert_array_equal(np.arange(7), result.times) def test_concatenate_multi_trace(self): - data1 = np.arange(4).reshape(2,2) - data2 = np.arange(4,8).reshape(2,2) + data1 = np.arange(4).reshape(2, 2) + data2 = np.arange(4, 8).reshape(2, 2) n1 = len(data1) n2 = len(data2) - signal1 = IrregularlySampledSignal(signal=data1*pq.s, times=range(n1)*pq.s) - signal2 = IrregularlySampledSignal(signal=data2*pq.s, times=range(n1, n1+n2)*pq.s) + signal1 = IrregularlySampledSignal(signal=data1 * pq.s, times=range(n1) * pq.s) + signal2 = IrregularlySampledSignal(signal=data2 * pq.s, times=range(n1, n1 + n2) * pq.s) result = signal1.concatenate(signal2) - data_expected = np.array([[0,1],[2,3],[4,5],[6,7]]) + data_expected = np.array([[0, 1], [2, 3], [4, 5], [6, 7]]) assert_array_equal(data_expected, result.magnitude) def test_concatenate_array_annotations(self): - array_anno1 = {'first': ['a','b']} - array_anno2 = {'first': ['a','b'], - 'second': ['c','d']} - data1 = np.arange(4).reshape(2,2) - data2 = np.arange(4,8).reshape(2,2) + array_anno1 = {'first': ['a', 'b']} + array_anno2 = {'first': ['a', 'b'], + 'second': ['c', 'd']} + data1 = np.arange(4).reshape(2, 2) + data2 = np.arange(4, 8).reshape(2, 2) n1 = len(data1) n2 = len(data2) - signal1 = IrregularlySampledSignal(signal=data1*pq.s, times=range(n1)*pq.s, + signal1 = IrregularlySampledSignal(signal=data1 * pq.s, times=range(n1) * pq.s, array_annotations=array_anno1) - signal2 = IrregularlySampledSignal(signal=data2*pq.s, times=range(n1, n1+n2)*pq.s, + signal2 = IrregularlySampledSignal(signal=data2 * pq.s, times=range(n1, n1 + n2) * pq.s, array_annotations=array_anno2) result = signal1.concatenate(signal2) From 98f02e3e71818b2e495627ec511df5296c164df9 Mon Sep 17 00:00:00 2001 From: Julia Sprenger Date: Thu, 24 Sep 2020 16:09:05 +0200 Subject: [PATCH 06/13] Some PEP8 fixes --- neo/test/coretest/test_base.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/neo/test/coretest/test_base.py b/neo/test/coretest/test_base.py index f57e17e51..bd7b010db 100644 --- a/neo/test/coretest/test_base.py +++ b/neo/test/coretest/test_base.py @@ -1273,17 +1273,17 @@ class Test_intersect_annotations(unittest.TestCase): ''' def setUp(self): - self.dict1 = {1:'1', 2:'2'} - self.dict2 = {1:'1'} - self.dict3 = {'list1': [1,2,3]} - self.dict4 = {'list1': [1,2,3], 'list2': [1,2,3]} - self.dict5 = {'list1': [1,2]} - self.dict6 = {'array1': np.array([1,2])} - self.dict7 = {'array1': np.array([1,2]), 'array2': np.array([1,2]), - 'array3': np.array([1,2,3])} + self.dict1 = {1: '1', 2: '2'} + self.dict2 = {1: '1'} + self.dict3 = {'list1': [1, 2, 3]} + self.dict4 = {'list1': [1, 2, 3], 'list2': [1, 2, 3]} + self.dict5 = {'list1': [1, 2]} + self.dict6 = {'array1': np.array([1, 2])} + self.dict7 = {'array1': np.array([1, 2]), 'array2': np.array([1, 2]), + 'array3': np.array([1, 2, 3])} self.all_simple_dicts = [self.dict1, self.dict2, self.dict3, - self.dict4, self.dict5, ] + self.dict4, self.dict5, ] def test_simple(self): result = intersect_annotations(self.dict1, self.dict2) @@ -1309,7 +1309,7 @@ def test_keys(self): def test_arrays(self): result = intersect_annotations(self.dict6, self.dict7) self.assertEqual(self.dict6.keys(), result.keys()) - np.testing.assert_array_equal([1,2], result['array1']) + np.testing.assert_array_equal([1, 2], result['array1']) if __name__ == "__main__": From 936c466f0a24a285bf77751daa55c1b1fc35d70c Mon Sep 17 00:00:00 2001 From: Julia Sprenger Date: Wed, 30 Sep 2020 21:18:35 +0200 Subject: [PATCH 07/13] Introduce 'allow_overlap' parameter for irregular signals This parameter allows to raise a ValueError by default when irregularlysampledsignals are overlapping --- neo/core/analogsignal.py | 2 +- neo/core/irregularlysampledsignal.py | 15 +++++++++-- .../coretest/test_irregularysampledsignal.py | 27 ++++++++++++++----- 3 files changed, 35 insertions(+), 9 deletions(-) diff --git a/neo/core/analogsignal.py b/neo/core/analogsignal.py index 50503e8ef..bac36e2ce 100644 --- a/neo/core/analogsignal.py +++ b/neo/core/analogsignal.py @@ -660,7 +660,7 @@ def rectify(self, **kwargs): def concatenate(self, other, overwrite=True, padding=False): ''' - Patch another signal to this one. + Combine this and another signal along the time axis. The signal objects are concatenated vertically (row-wise, :func:`np.vstack`). Patching can be diff --git a/neo/core/irregularlysampledsignal.py b/neo/core/irregularlysampledsignal.py index f0e74f73d..e0f5a79b0 100644 --- a/neo/core/irregularlysampledsignal.py +++ b/neo/core/irregularlysampledsignal.py @@ -515,9 +515,9 @@ def merge(self, other): return signal - def concatenate(self, other): + def concatenate(self, other, allow_overlap=False): ''' - Patch another signal to this one. + Combine this and another signal along the time axis. The signal objects are concatenated vertically (row-wise, :func:`np.vstack`). Patching can be @@ -534,6 +534,11 @@ def concatenate(self, other): ---------- other : neo.BaseSignal The object that is merged into this one. + allow_overlap : bool + If false, overlapping samples between the two + signals are not permitted and an ValueError is raised. + If true, no check for overlapping samples is + performed and all samples are combined. Returns ------- @@ -581,6 +586,12 @@ def concatenate(self, other): kwargs['array_annotations'] = intersect_annotations(self.array_annotations, other.array_annotations) + + if not allow_overlap: + if max(self.t_start, other.t_start) <= min(self.t_stop, other.t_stop): + raise ValueError('Can not combine signals that overlap in time. Allow for ' + 'overlapping samples using the "no_overlap" parameter.') + t_start = min(self.t_start, other.t_start) t_stop = max(self.t_start, other.t_start) diff --git a/neo/test/coretest/test_irregularysampledsignal.py b/neo/test/coretest/test_irregularysampledsignal.py index 7d792e276..b4fb77915 100644 --- a/neo/test/coretest/test_irregularysampledsignal.py +++ b/neo/test/coretest/test_irregularysampledsignal.py @@ -946,12 +946,12 @@ def test__merge(self): self.assertRaises(MergeError, signal1.merge, signal3) def test_concatenate_simple(self): - signal1 = IrregularlySampledSignal(signal=[0, 1, 2, 3] * pq.s, times=[1, 10, 11, 20] * pq.s) + signal1 = IrregularlySampledSignal(signal=[0, 1, 2, 3] * pq.s, times=[1, 10, 11, 14] * pq.s) signal2 = IrregularlySampledSignal(signal=[4, 5, 6] * pq.s, times=[15, 16, 21] * pq.s) result = signal1.concatenate(signal2) - assert_array_equal(np.array([0, 1, 2, 4, 5, 3, 6]).reshape((-1, 1)), result.magnitude) - assert_array_equal(np.array([1, 10, 11, 15, 16, 20, 21]), result.times) + assert_array_equal(np.array([0, 1, 2, 3, 4, 5, 6]).reshape((-1, 1)), result.magnitude) + assert_array_equal(np.array([1, 10, 11, 14, 15, 16, 21]), result.times) for attr in signal1._necessary_attrs: if attr[0] == 'times': continue @@ -961,9 +961,24 @@ def test_concatenate_no_overlap(self): signal1 = IrregularlySampledSignal(signal=[0, 1, 2, 3] * pq.s, times=range(4) * pq.s) signal2 = IrregularlySampledSignal(signal=[4, 5, 6] * pq.s, times=range(4, 7) * pq.s) - result = signal1.concatenate(signal2) - assert_array_equal(np.arange(7).reshape((-1, 1)), result.magnitude) - assert_array_equal(np.arange(7), result.times) + for allow_overlap in [True, False]: + result = signal1.concatenate(signal2, allow_overlap=allow_overlap) + assert_array_equal(np.arange(7).reshape((-1, 1)), result.magnitude) + assert_array_equal(np.arange(7), result.times) + + def test_concatenate_overlap_exception(self): + signal1 = IrregularlySampledSignal(signal=[0, 1, 2, 3] * pq.s, times=range(4) * pq.s) + signal2 = IrregularlySampledSignal(signal=[4, 5, 6] * pq.s, times=range(2, 5) * pq.s) + + self.assertRaises(ValueError, signal1.concatenate, signal2, allow_overlap=False) + + def test_concatenate_overlap(self): + signal1 = IrregularlySampledSignal(signal=[0, 1, 2, 3] * pq.s, times=range(4) * pq.s) + signal2 = IrregularlySampledSignal(signal=[4, 5, 6] * pq.s, times=range(2, 5) * pq.s) + + result = signal1.concatenate(signal2, allow_overlap=True) + assert_array_equal(np.array([0,1,2,4,3,5,6]).reshape((-1, 1)), result.magnitude) + assert_array_equal(np.array([0,1,2,2,3,3,4]), result.times) def test_concatenate_multi_trace(self): data1 = np.arange(4).reshape(2, 2) From c6519a5672eed5526d808f0e5fa0cae2cd68c173 Mon Sep 17 00:00:00 2001 From: Julia Sprenger Date: Mon, 5 Oct 2020 12:08:38 +0200 Subject: [PATCH 08/13] Make concatenate more performant and accepting multiple signals --- neo/core/analogsignal.py | 214 +++++++++++++++++++-------------------- 1 file changed, 102 insertions(+), 112 deletions(-) diff --git a/neo/core/analogsignal.py b/neo/core/analogsignal.py index bac36e2ce..70a03c615 100644 --- a/neo/core/analogsignal.py +++ b/neo/core/analogsignal.py @@ -658,38 +658,25 @@ def rectify(self, **kwargs): return rectified_signal - def concatenate(self, other, overwrite=True, padding=False): - ''' - Combine this and another signal along the time axis. - - The signal objects are concatenated vertically - (row-wise, :func:`np.vstack`). Patching can be - used to combine signals across segments. Signals - have to overlap by one sample in time, i.e. - self.t_stop == other.t_start - Note: Only common array annotations common to - both signals are attached to the concatenated signal. - - If the attributes of the two signal are not - compatible, an Exception is raised. + def concatenate(self, *signals, overwrite=False, padding=False): + """ + Concatenate multiple neo.AnalogSignal objects across time. - Required attributes of the signal are used. + Units, sampling_rate and number of signal traces must be the same + for all signals. Otherwise a ValueError is raised. Parameters ---------- - other : neo.BaseSignal - The object that is merged into this one. - The other signal needs cover a later time period than - this one, i.e. self.t_start < other.t_start + signals: neo.AnalogSignal objects + AnalogSignals that will be concatenated overwrite : bool - If True, samples of the earlier (smaller t_start) - signal are overwritten by the later signal. - If False, samples of the later (higher t_start) - are overwritten by earlier signal. + If True, samples of the earlier (lower index in `signals`) + signals are overwritten by that of later (higher index in `signals`) + signals. + If False, samples of the later are overwritten by earlier signal. Default: False padding : bool, scalar quantity - Sampling values to use as padding in case signals - do not overlap. + Sampling values to use as padding in case signals do not overlap. If False, do not apply padding. Signals have to align or overlap. If True, signals will be padded using np.NaN as pad values. If a scalar quantity is provided, this @@ -700,103 +687,106 @@ def concatenate(self, other, overwrite=True, padding=False): Returns ------- - signal : neo.BaseSignal - Signal containing all non-overlapping samples of - both source signals. - - Raises - ------ - MergeError - If `other` object has incompatible attributes. - ''' + signal: neo.AnalogSignal + concatenated output signal + """ - if other.units != self.units: - other = other.rescale(self.units) + # Sanity of inputs + if not hasattr(signals, '__iter__'): + raise TypeError('signals must be iterable') + if not all([isinstance(a, AnalogSignal) for a in signals]): + raise TypeError('Entries of anasiglist have to be of type neo.AnalogSignal') + if len(signals) == 0: + return self - if self.t_start > other.t_start: - signal1, signal2 = other, self + signals = [self] + list(signals) + + # Check required common attributes: units, sampling_rate and shape[-1] + shared_attributes = ['units', 'sampling_rate'] + attribute_values = [tuple((getattr(anasig, attr) for attr in shared_attributes)) + for anasig in signals] + # add shape dimensions that do not relate to time + attribute_values = [(attribute_values[i] + (signals[i].shape[1:],)) + for i in range(len(signals))] + if not all([attrs == attribute_values[0] for attrs in attribute_values]): + raise MergeError( + f'AnalogSignals have to share {shared_attributes} attributes to be concatenated.') + units, sr, shape = attribute_values[0] + + # find gaps between Analogsignals + combined_time_ranges = self._concatenate_time_ranges([(s.t_start, s.t_stop) for s in signals]) + missing_time_ranges = self._invert_time_ranges(combined_time_ranges) + if len(missing_time_ranges): + diffs = np.diff(np.asarray(missing_time_ranges), axis=1) else: - signal1, signal2 = self, other - # raise MergeError('Inconsistent timing of signals. Other signal needs to be later than' - # ' this signal') - - for attr in self._necessary_attrs: - if attr[0] not in ['signal', 't_start', 't_stop']: - if getattr(self, attr[0], None) != getattr(other, attr[0], None): - # if attr[0] in ['t_start','t_stop']: - # continue - raise MergeError( - "Cannot concatenate these two signals as the %s differ." % attr[0]) - - if hasattr(self, "lazy_shape"): - if hasattr(other, "lazy_shape"): - if self.lazy_shape[-1] != other.lazy_shape[-1]: - raise MergeError("Cannot concatenate signals as they contain" - " different numbers of traces.") - merged_lazy_shape = (self.lazy_shape[0] + other.lazy_shape[0], self.lazy_shape[-1]) + diffs = [] + + if padding is False and any(diffs > signals[0].sampling_period): + raise MergeError(f'Signals are not continuous. Can not concatenate signals with gaps. ' + f'Please provide a padding value.') + if padding is not False: + logger.warning('Signals will be padded using {}.'.format(padding)) + if padding is True: + padding = np.NaN * units + if isinstance(padding, pq.Quantity): + padding = padding.rescale(units).magnitude else: - raise MergeError("Cannot concatenate a lazy object with a real object.") - - # in case of non-overlapping signals consider padding - if signal2.t_start > signal1.t_stop + signal1.sampling_period: - if padding != False: - logger.warning('Signals will be padded using {}.'.format(padding)) - pad_time = signal2.t_start - signal1.t_stop - n_pad_samples = int(((pad_time) * self.sampling_rate).rescale('dimensionless')) - if padding is True: - padding = np.NaN * self.units - if isinstance(padding, pq.Quantity): - padding = padding.rescale(self.units).magnitude - else: - raise ValueError('Invalid type of padding value. Please provide a bool value ' - 'or a quantities object.') - pad_data = np.full((n_pad_samples,) + signal1.shape[1:], padding) - - # create new signal 1 with extended data, but keep array_annotations - signal_tmp = signal1.duplicate_with_new_data( - np.vstack((signal1.magnitude, pad_data))) - signal_tmp.array_annotations = signal1.array_annotations - signal1 = signal_tmp - else: - raise MergeError('Signals do not overlap, but no padding is provided.' - 'Please provide a padding mode.') - - # in case of overlapping signals slice according to overwrite parameter - elif signal2.t_start < signal1.t_stop + signal1.sampling_period: - n_samples = int(((signal1.t_stop - signal2.t_start) * signal1.sampling_rate).simplified) - logger.warning('Overwriting {} samples while concatenating signals.'.format(n_samples)) - if not overwrite: # removing samples second signal - slice_t_start = signal1.t_stop + signal1.sampling_period - signal2 = signal2.time_slice(slice_t_start, None) - else: # removing samples of the first signal - slice_t_stop = signal2.t_start - signal2.sampling_period - signal1 = signal1.time_slice(None, slice_t_stop) - else: - assert signal2.t_start == signal1.t_stop + signal1.sampling_period, \ - "Cannot concatenate signals with non-overlapping times" + raise MergeError('Invalid type of padding value. Please provide a bool value ' + 'or a quantities object.') - stack = np.vstack((signal1.magnitude, signal2.magnitude)) + t_start = min([a.t_start for a in signals]) + t_stop = max([a.t_stop for a in signals]) + n_samples = int(np.rint(((t_stop - t_start) * sr).rescale('dimensionless').magnitude)) + shape = (n_samples,) + shape + # Collect attributes and annotations across all concatenated signals kwargs = {} + common_annotations = signals[0].annotations + common_array_annotations = signals[0].array_annotations + for anasig in signals[1:]: + common_annotations = intersect_annotations(common_annotations, anasig.annotations) + common_array_annotations = intersect_annotations(common_array_annotations, + anasig.array_annotations) + + kwargs['annotations'] = common_annotations + kwargs['array_annotations'] = common_array_annotations + for name in ("name", "description", "file_origin"): - attr_self = getattr(self, name) - attr_other = getattr(other, name) - if attr_self == attr_other: - kwargs[name] = attr_self + attr = [getattr(s, name) for s in signals] + if all([a == attr[0] for a in attr]): + kwargs[name] = attr[0] else: - kwargs[name] = "merge({}, {})".format(attr_self, attr_other) - merged_annotations = merge_annotations(self.annotations, other.annotations) - kwargs.update(merged_annotations) + kwargs[name] = f'concatenation ({attr})' + + conc_signal = AnalogSignal(np.full(shape=shape, fill_value=padding, dtype=signals[0].dtype), + sampling_rate=sr, t_start=t_start, units=units, **kwargs) + + if not overwrite: + signals = signals[::-1] + while len(signals) > 0: + conc_signal.splice(signals.pop(0), copy=False) + + return conc_signal + + def _concatenate_time_ranges(self, time_ranges): + time_ranges = sorted(time_ranges) + new_ranges = time_ranges[:1] + for t_start, t_stop in time_ranges[1:]: + # time range are non continuous -> define new range + if t_start > new_ranges[-1][1]: + new_ranges.append((t_start, t_stop)) + # time range is continuous -> extend time range + elif t_stop > new_ranges[-1][1]: + new_ranges[-1] = (new_ranges[-1][0], t_stop) + return new_ranges - kwargs['array_annotations'] = intersect_annotations(self.array_annotations, - other.array_annotations) + def _invert_time_ranges(self, time_ranges): + i = 0 + new_ranges = [] + while i < len(time_ranges)-1: + new_ranges.append((time_ranges[i][1], time_ranges[i+1][0])) + i += 1 + return new_ranges - signal = self.__class__(stack, units=self.units, dtype=self.dtype, copy=False, - t_start=signal1.t_start, sampling_rate=self.sampling_rate, **kwargs) - signal.segment = None - signal.channel_index = None - if hasattr(self, "lazy_shape"): - signal.lazy_shape = merged_lazy_shape - return signal From 7fd7fc8e51e04f6dfea288b1d8ddc6ca72f35109 Mon Sep 17 00:00:00 2001 From: Julia Sprenger Date: Mon, 5 Oct 2020 12:09:35 +0200 Subject: [PATCH 09/13] Update signal concatenation tests to cover multiple signals --- neo/test/coretest/test_analogsignal.py | 48 ++++++++++++++++++-------- 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/neo/test/coretest/test_analogsignal.py b/neo/test/coretest/test_analogsignal.py index 80b0e6f90..2b99e3ee4 100644 --- a/neo/test/coretest/test_analogsignal.py +++ b/neo/test/coretest/test_analogsignal.py @@ -1577,17 +1577,21 @@ def test__merge(self): def test_concatenate_simple(self): signal1 = AnalogSignal([0, 1, 2, 3] * pq.V, sampling_rate=1 * pq.Hz) signal2 = AnalogSignal([4, 5, 6] * pq.V, sampling_rate=1 * pq.Hz, - t_start=signal1.t_stop + signal1.sampling_period) + t_start=signal1.t_stop) result = signal1.concatenate(signal2) assert_array_equal(np.arange(7).reshape((-1, 1)), result.magnitude) for attr in signal1._necessary_attrs: self.assertEqual(getattr(signal1, attr[0], None), getattr(result, attr[0], None)) - def test_concatenate_inverse_signals(self): + def test_concatenate_no_signals(self): + signal1 = AnalogSignal([0, 1, 2, 3] * pq.V, sampling_rate=1 * pq.Hz) + self.assertIs(signal1, signal1.concatenate()) + + def test_concatenate_reverted_order(self): signal1 = AnalogSignal([0, 1, 2, 3] * pq.V, sampling_rate=1 * pq.Hz) signal2 = AnalogSignal([4, 5, 6] * pq.V, sampling_rate=1 * pq.Hz, - t_start=signal1.t_stop + signal1.sampling_period) + t_start=signal1.t_stop) result = signal2.concatenate(signal1) assert_array_equal(np.arange(7).reshape((-1, 1)), result.magnitude) @@ -1596,8 +1600,7 @@ def test_concatenate_inverse_signals(self): def test_concatenate_no_overlap(self): signal1 = AnalogSignal([0, 1, 2, 3] * pq.V, sampling_rate=1 * pq.Hz) - signal2 = AnalogSignal([4, 5, 6] * pq.V, sampling_rate=1 * pq.Hz, - t_start=10 * pq.s + signal1.sampling_period) + signal2 = AnalogSignal([4, 5, 6] * pq.V, sampling_rate=1 * pq.Hz, t_start=10 * pq.s) with self.assertRaises(MergeError): signal1.concatenate(signal2) @@ -1607,7 +1610,7 @@ def test_concatenate_multi_trace(self): data2 = np.arange(4, 8).reshape(2, 2) signal1 = AnalogSignal(data1 * pq.V, sampling_rate=1 * pq.Hz) signal2 = AnalogSignal(data2 * pq.V, sampling_rate=1 * pq.Hz, - t_start=signal1.t_stop + signal1.sampling_period) + t_start=signal1.t_stop) result = signal1.concatenate(signal2) data_expected = np.array([[0, 1], [2, 3], [4, 5], [6, 7]]) @@ -1618,7 +1621,7 @@ def test_concatenate_multi_trace(self): def test_concatenate_overwrite_true(self): signal1 = AnalogSignal([0, 1, 2, 3] * pq.V, sampling_rate=1 * pq.Hz) signal2 = AnalogSignal([4, 5, 6] * pq.V, sampling_rate=1 * pq.Hz, - t_start=signal1.t_stop) + t_start=signal1.t_stop - signal1.sampling_period) result = signal1.concatenate(signal2, overwrite=True) assert_array_equal(np.array([0, 1, 2, 4, 5, 6]).reshape((-1, 1)), result.magnitude) @@ -1626,7 +1629,7 @@ def test_concatenate_overwrite_true(self): def test_concatenate_overwrite_false(self): signal1 = AnalogSignal([0, 1, 2, 3] * pq.V, sampling_rate=1 * pq.Hz) signal2 = AnalogSignal([4, 5, 6] * pq.V, sampling_rate=1 * pq.Hz, - t_start=signal1.t_stop) + t_start=signal1.t_stop - signal1.sampling_period) result = signal1.concatenate(signal2, overwrite=False) assert_array_equal(np.array([0, 1, 2, 3, 5, 6]).reshape((-1, 1)), result.magnitude) @@ -1662,13 +1665,13 @@ def test_concatenate_padding_invalid(self): signal2 = AnalogSignal([4, 5, 6] * pq.V, sampling_rate=1 * pq.Hz, t_start=signal1.t_stop + 3 * signal1.sampling_period) - with self.assertRaises(ValueError): + with self.assertRaises(MergeError): result = signal1.concatenate(signal2, overwrite=False, padding=1) - with self.assertRaises(ValueError): + with self.assertRaises(MergeError): result = signal1.concatenate(signal2, overwrite=False, padding=[1]) - with self.assertRaises(ValueError): + with self.assertRaises(MergeError): result = signal1.concatenate(signal2, overwrite=False, padding='a') - with self.assertRaises(ValueError): + with self.assertRaises(MergeError): result = signal1.concatenate(signal2, overwrite=False, padding=np.array([1, 2, 3])) def test_concatenate_array_annotations(self): @@ -1680,7 +1683,7 @@ def test_concatenate_array_annotations(self): signal1 = AnalogSignal(data1 * pq.V, sampling_rate=1 * pq.Hz, array_annotations=array_anno1) signal2 = AnalogSignal(data2 * pq.V, sampling_rate=1 * pq.Hz, - t_start=signal1.t_stop + signal1.sampling_period, + t_start=signal1.t_stop, array_annotations=array_anno2) result = signal1.concatenate(signal2) @@ -1696,7 +1699,7 @@ def test_concatenate_complex(self): signal2 = AnalogSignal(self.data1quant, sampling_rate=1 * pq.kHz, name='signal2', description='test signal', file_origin='testfile.txt', array_annotations=self.arr_ann1, - t_start=signal1.t_stop + signal1.sampling_period) + t_start=signal1.t_stop) concatenated12 = self.signal1.concatenate(signal2) @@ -1707,6 +1710,23 @@ def test_concatenate_complex(self): assert_array_equal(np.vstack((signal1.magnitude, signal2.magnitude)), concatenated12.magnitude) + def test_concatenate_multi_signal(self): + signal1 = AnalogSignal([0, 1, 2, 3] * pq.V, sampling_rate=1 * pq.Hz) + signal2 = AnalogSignal([4, 5, 6] * pq.V, sampling_rate=1 * pq.Hz, + t_start=signal1.t_stop + 3 * signal1.sampling_period) + signal3 = AnalogSignal([40] * pq.V, sampling_rate=1 * pq.Hz, + t_start=signal1.t_stop + 3 * signal1.sampling_period) + signal4 = AnalogSignal([30, 35] * pq.V, sampling_rate=1 * pq.Hz, + t_start=signal1.t_stop - signal1.sampling_period) + + concatenated = signal1.concatenate(signal2, signal3, signal4, padding=-1 * pq.V, + overwrite=True) + for attr in signal1._necessary_attrs: + self.assertEqual(getattr(signal1, attr[0], None), + getattr(concatenated, attr[0], None)) + assert_arrays_equal(np.array([0, 1, 2, 30, 35, -1, -1, 40, 5, 6]).reshape((-1, 1)), + concatenated.magnitude) + class TestAnalogSignalFunctions(unittest.TestCase): def test__pickle_1d(self): From dc183cd2dc31b406606ee75cd291389c2c664b32 Mon Sep 17 00:00:00 2001 From: Julia Sprenger Date: Mon, 5 Oct 2020 15:42:16 +0200 Subject: [PATCH 10/13] Fix documentation --- neo/core/basesignal.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/neo/core/basesignal.py b/neo/core/basesignal.py index c814550de..7d271b676 100644 --- a/neo/core/basesignal.py +++ b/neo/core/basesignal.py @@ -300,31 +300,29 @@ def time_slice(self, t_start, t_stop): ''' NotImplementedError('Needs to be implemented for subclasses.') - def concatenate(self, other): + def concatenate(self, *signals): ''' - Patch another signal to this one. + Concatenate multiple signals across time. The signal objects are concatenated vertically - (row-wise, :func:`np.vstack`). Patching can be + (row-wise, :func:`np.vstack`). Concatenation can be used to combine signals across segments. - Note: Only array annotations common to + Note: Only (array) annotations common to both signals are attached to the concatenated signal. - If the attributes of the two signal are not + If the attributes of the signals are not compatible, an Exception is raised. - Required attributes of the signal are used. - Parameters ---------- - other : neo.BaseSignal - The object that is merged into this one. + signals : multiple neo.BaseSignal objects + The objects that is concatenated with this one. Returns ------- signal : neo.BaseSignal Signal containing all non-overlapping samples of - both source signals. + the source signals. Raises ------ From 904ca1f62f63d8f1717af83ea7a9c0b0655d1dbc Mon Sep 17 00:00:00 2001 From: Julia Sprenger Date: Fri, 6 Nov 2020 22:34:18 +0100 Subject: [PATCH 11/13] Pep8 fixes --- neo/core/analogsignal.py | 12 +++++------- neo/core/baseneo.py | 1 + neo/test/coretest/test_analogsignal.py | 16 ++++++++-------- .../coretest/test_irregularysampledsignal.py | 13 ++++++------- 4 files changed, 20 insertions(+), 22 deletions(-) diff --git a/neo/core/analogsignal.py b/neo/core/analogsignal.py index 70a03c615..b26e8c0ba 100644 --- a/neo/core/analogsignal.py +++ b/neo/core/analogsignal.py @@ -714,7 +714,8 @@ def concatenate(self, *signals, overwrite=False, padding=False): units, sr, shape = attribute_values[0] # find gaps between Analogsignals - combined_time_ranges = self._concatenate_time_ranges([(s.t_start, s.t_stop) for s in signals]) + combined_time_ranges = self._concatenate_time_ranges( + [(s.t_start, s.t_stop) for s in signals]) missing_time_ranges = self._invert_time_ranges(combined_time_ranges) if len(missing_time_ranges): diffs = np.diff(np.asarray(missing_time_ranges), axis=1) @@ -759,7 +760,7 @@ def concatenate(self, *signals, overwrite=False, padding=False): kwargs[name] = f'concatenation ({attr})' conc_signal = AnalogSignal(np.full(shape=shape, fill_value=padding, dtype=signals[0].dtype), - sampling_rate=sr, t_start=t_start, units=units, **kwargs) + sampling_rate=sr, t_start=t_start, units=units, **kwargs) if not overwrite: signals = signals[::-1] @@ -783,10 +784,7 @@ def _concatenate_time_ranges(self, time_ranges): def _invert_time_ranges(self, time_ranges): i = 0 new_ranges = [] - while i < len(time_ranges)-1: - new_ranges.append((time_ranges[i][1], time_ranges[i+1][0])) + while i < len(time_ranges) - 1: + new_ranges.append((time_ranges[i][1], time_ranges[i + 1][0])) i += 1 return new_ranges - - - diff --git a/neo/core/baseneo.py b/neo/core/baseneo.py index 37231640a..beaef3148 100644 --- a/neo/core/baseneo.py +++ b/neo/core/baseneo.py @@ -109,6 +109,7 @@ def merge_annotations(A, *Bs): logger.debug("Merging annotations: A=%s Bs=%s merged=%s", A, Bs, merged) return merged + def intersect_annotations(A, B): """ Identify common entries in dictionaries A and B diff --git a/neo/test/coretest/test_analogsignal.py b/neo/test/coretest/test_analogsignal.py index 2b99e3ee4..de16b9ede 100644 --- a/neo/test/coretest/test_analogsignal.py +++ b/neo/test/coretest/test_analogsignal.py @@ -370,18 +370,17 @@ def test__repr(self): def test__pretty(self): for i, signal in enumerate(self.signals): prepr = pretty(signal) - targ = ( - ('AnalogSignal with %d channels of length %d; units %s; datatype %s \n' + targ = (('AnalogSignal with %d channels of length %d; units %s; datatype %s \n' '' % (signal.shape[1], signal.shape[0], signal.units.dimensionality.unicode, signal.dtype)) + ('annotations: %s\n' % signal.annotations) - + ('sampling rate: {} {}\n'.format(float(signal.sampling_rate), - signal.sampling_rate.dimensionality.unicode)) + + ('sampling rate: {} {}\n'.format( + float(signal.sampling_rate), + signal.sampling_rate.dimensionality.unicode)) + ('time: {} {} to {} {}'.format(float(signal.t_start), signal.t_start.dimensionality.unicode, float(signal.t_stop), - signal.t_stop.dimensionality.unicode)) - ) + signal.t_stop.dimensionality.unicode))) self.assertEqual(prepr, targ) @@ -1648,8 +1647,9 @@ def test_concatenate_padding_True(self): t_start=signal1.t_stop + 3 * signal1.sampling_period) result = signal1.concatenate(signal2, overwrite=False, padding=True) - assert_array_equal(np.array([0, 1, 2, 3, np.NaN, np.NaN, np.NaN, 4, 5, 6]).reshape((-1, 1)), - result.magnitude) + assert_array_equal( + np.array([0, 1, 2, 3, np.NaN, np.NaN, np.NaN, 4, 5, 6]).reshape((-1, 1)), + result.magnitude) def test_concatenate_padding_quantity(self): signal1 = AnalogSignal([0, 1, 2, 3] * pq.V, sampling_rate=1 * pq.Hz) diff --git a/neo/test/coretest/test_irregularysampledsignal.py b/neo/test/coretest/test_irregularysampledsignal.py index b4fb77915..ecce9c7ec 100644 --- a/neo/test/coretest/test_irregularysampledsignal.py +++ b/neo/test/coretest/test_irregularysampledsignal.py @@ -239,12 +239,10 @@ def test_IrregularlySampledSignal_repr(self): if np.__version__.split(".")[:2] > ['1', '13']: # see https://github.com/numpy/numpy/blob/master/doc/release/1.14.0-notes.rst#many # -changes-to-array-printing-disableable-with-the-new-legacy-printing-mode - targ = ( - '') else: - targ = ( - '') res = repr(sig) self.assertEqual(targ, res) @@ -946,7 +944,8 @@ def test__merge(self): self.assertRaises(MergeError, signal1.merge, signal3) def test_concatenate_simple(self): - signal1 = IrregularlySampledSignal(signal=[0, 1, 2, 3] * pq.s, times=[1, 10, 11, 14] * pq.s) + signal1 = IrregularlySampledSignal(signal=[0, 1, 2, 3] * pq.s, + times=[1, 10, 11, 14] * pq.s) signal2 = IrregularlySampledSignal(signal=[4, 5, 6] * pq.s, times=[15, 16, 21] * pq.s) result = signal1.concatenate(signal2) @@ -977,8 +976,8 @@ def test_concatenate_overlap(self): signal2 = IrregularlySampledSignal(signal=[4, 5, 6] * pq.s, times=range(2, 5) * pq.s) result = signal1.concatenate(signal2, allow_overlap=True) - assert_array_equal(np.array([0,1,2,4,3,5,6]).reshape((-1, 1)), result.magnitude) - assert_array_equal(np.array([0,1,2,2,3,3,4]), result.times) + assert_array_equal(np.array([0, 1, 2, 4, 3, 5, 6]).reshape((-1, 1)), result.magnitude) + assert_array_equal(np.array([0, 1, 2, 2, 3, 3, 4]), result.times) def test_concatenate_multi_trace(self): data1 = np.arange(4).reshape(2, 2) From 9508e1e7fcb291ac7b1d7ed0ae1cfa9b8dc254de Mon Sep 17 00:00:00 2001 From: Julia Sprenger Date: Mon, 9 Nov 2020 14:06:03 +0100 Subject: [PATCH 12/13] Add note about potential timestamp shifts. --- neo/core/analogsignal.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/neo/core/analogsignal.py b/neo/core/analogsignal.py index b26e8c0ba..476b732c2 100644 --- a/neo/core/analogsignal.py +++ b/neo/core/analogsignal.py @@ -664,6 +664,8 @@ def concatenate(self, *signals, overwrite=False, padding=False): Units, sampling_rate and number of signal traces must be the same for all signals. Otherwise a ValueError is raised. + Note that timestamps of concatenated signals might shift in oder to + align the sampling times of all signals. Parameters ---------- From 2933091fcb3b7735e2d76f6370533726ec432b83 Mon Sep 17 00:00:00 2001 From: Julia Sprenger Date: Mon, 9 Nov 2020 14:30:01 +0100 Subject: [PATCH 13/13] Safer usage of '&' operator via sets. --- neo/core/baseneo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neo/core/baseneo.py b/neo/core/baseneo.py index beaef3148..f9e2120d6 100644 --- a/neo/core/baseneo.py +++ b/neo/core/baseneo.py @@ -126,7 +126,7 @@ def intersect_annotations(A, B): result = {} - for key in A.keys() & B.keys(): + for key in set(A.keys()) & set(B.keys()): v1, v2 = A[key], B[key] assert type(v1) == type(v2), 'type({}) {} != type({}) {}'.format(v1, type(v1), v2, type(v2))