Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 134 additions & 1 deletion neo/core/analogsignal.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import numpy as np
import quantities as pq

from neo.core.baseneo import BaseNeo, MergeError, merge_annotations
from neo.core.baseneo import BaseNeo, MergeError, merge_annotations, intersect_annotations
from neo.core.dataobject import DataObject
from copy import copy, deepcopy

Expand Down Expand Up @@ -657,3 +657,136 @@ def rectify(self, **kwargs):
rectified_signal.array_annotations = self.array_annotations.copy()

return rectified_signal

def concatenate(self, *signals, overwrite=False, padding=False):
"""
Concatenate multiple neo.AnalogSignal objects across time.

Units, sampling_rate and number of signal traces must be the same
for all signals. Otherwise a ValueError is raised.
Note that timestamps of concatenated signals might shift in oder to
align the sampling times of all signals.

Parameters
----------
signals: neo.AnalogSignal objects
AnalogSignals that will be concatenated
overwrite : bool
If True, samples of the earlier (lower index in `signals`)
signals are overwritten by that of later (higher index in `signals`)
signals.
If False, samples of the later are overwritten by earlier signal.
Default: False
padding : bool, scalar quantity
Sampling values to use as padding in case signals do not overlap.
If False, do not apply padding. Signals have to align or
overlap. If True, signals will be padded using
np.NaN as pad values. If a scalar quantity is provided, this
will be used for padding. The other signal is moved
forward in time by maximum one sampling period to
align the sampling times of both signals.
Default: False

Returns
-------
signal: neo.AnalogSignal
concatenated output signal
"""

# Sanity of inputs
if not hasattr(signals, '__iter__'):
raise TypeError('signals must be iterable')
if not all([isinstance(a, AnalogSignal) for a in signals]):
raise TypeError('Entries of anasiglist have to be of type neo.AnalogSignal')
if len(signals) == 0:
return self

signals = [self] + list(signals)

# Check required common attributes: units, sampling_rate and shape[-1]
shared_attributes = ['units', 'sampling_rate']
attribute_values = [tuple((getattr(anasig, attr) for attr in shared_attributes))
for anasig in signals]
# add shape dimensions that do not relate to time
attribute_values = [(attribute_values[i] + (signals[i].shape[1:],))
for i in range(len(signals))]
if not all([attrs == attribute_values[0] for attrs in attribute_values]):
raise MergeError(
f'AnalogSignals have to share {shared_attributes} attributes to be concatenated.')
units, sr, shape = attribute_values[0]

# find gaps between Analogsignals
combined_time_ranges = self._concatenate_time_ranges(
[(s.t_start, s.t_stop) for s in signals])
missing_time_ranges = self._invert_time_ranges(combined_time_ranges)
if len(missing_time_ranges):
diffs = np.diff(np.asarray(missing_time_ranges), axis=1)
else:
diffs = []

if padding is False and any(diffs > signals[0].sampling_period):
raise MergeError(f'Signals are not continuous. Can not concatenate signals with gaps. '
f'Please provide a padding value.')
if padding is not False:
logger.warning('Signals will be padded using {}.'.format(padding))
if padding is True:
padding = np.NaN * units
if isinstance(padding, pq.Quantity):
padding = padding.rescale(units).magnitude
else:
raise MergeError('Invalid type of padding value. Please provide a bool value '
'or a quantities object.')

t_start = min([a.t_start for a in signals])
t_stop = max([a.t_stop for a in signals])
n_samples = int(np.rint(((t_stop - t_start) * sr).rescale('dimensionless').magnitude))
shape = (n_samples,) + shape

# Collect attributes and annotations across all concatenated signals
kwargs = {}
common_annotations = signals[0].annotations
common_array_annotations = signals[0].array_annotations
for anasig in signals[1:]:
common_annotations = intersect_annotations(common_annotations, anasig.annotations)
common_array_annotations = intersect_annotations(common_array_annotations,
anasig.array_annotations)

kwargs['annotations'] = common_annotations
kwargs['array_annotations'] = common_array_annotations

for name in ("name", "description", "file_origin"):
attr = [getattr(s, name) for s in signals]
if all([a == attr[0] for a in attr]):
kwargs[name] = attr[0]
else:
kwargs[name] = f'concatenation ({attr})'

conc_signal = AnalogSignal(np.full(shape=shape, fill_value=padding, dtype=signals[0].dtype),
sampling_rate=sr, t_start=t_start, units=units, **kwargs)

if not overwrite:
signals = signals[::-1]
while len(signals) > 0:
conc_signal.splice(signals.pop(0), copy=False)

return conc_signal

def _concatenate_time_ranges(self, time_ranges):
time_ranges = sorted(time_ranges)
new_ranges = time_ranges[:1]
for t_start, t_stop in time_ranges[1:]:
# time range are non continuous -> define new range
if t_start > new_ranges[-1][1]:
new_ranges.append((t_start, t_stop))
# time range is continuous -> extend time range
elif t_stop > new_ranges[-1][1]:
new_ranges[-1] = (new_ranges[-1][0], t_stop)
return new_ranges

def _invert_time_ranges(self, time_ranges):
i = 0
new_ranges = []
while i < len(time_ranges) - 1:
new_ranges.append((time_ranges[i][1], time_ranges[i + 1][0]))
i += 1
return new_ranges
32 changes: 32 additions & 0 deletions neo/core/baseneo.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
used by all :module:`neo.core` classes.
"""

from copy import deepcopy
from datetime import datetime, date, time, timedelta
from decimal import Decimal
import logging
Expand Down Expand Up @@ -109,6 +110,37 @@ def merge_annotations(A, *Bs):
return merged


def intersect_annotations(A, B):
"""
Identify common entries in dictionaries A and B
and return these in a separate dictionary.

Entries have to share key as well as value to be
considered common.

Parameters
----------
A, B : dict
Dictionaries to merge.
"""

result = {}

for key in set(A.keys()) & set(B.keys()):
v1, v2 = A[key], B[key]
assert type(v1) == type(v2), 'type({}) {} != type({}) {}'.format(v1, type(v1),
v2, type(v2))
if isinstance(v1, dict) and v1 == v2:
result[key] = deepcopy(v1)
elif isinstance(v1, str) and v1 == v2:
result[key] = A[key]
elif isinstance(v1, list) and v1 == v2:
result[key] = deepcopy(v1)
elif isinstance(v1, np.ndarray) and all(v1 == v2):
result[key] = deepcopy(v1)
return result


def _reference_name(class_name):
"""
Given the name of a class, return an attribute name to be used for
Expand Down
51 changes: 46 additions & 5 deletions neo/core/basesignal.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import numpy as np
import quantities as pq

from neo.core.baseneo import BaseNeo, MergeError, merge_annotations
from neo.core.baseneo import MergeError, merge_annotations
from neo.core.dataobject import DataObject, ArrayDict
from neo.core.channelindex import ChannelIndex

Expand Down Expand Up @@ -282,11 +282,52 @@ def merge(self, other):
# merge channel_index (move to ChannelIndex.merge()?)
if self.channel_index and other.channel_index:
signal.channel_index = ChannelIndex(index=np.arange(signal.shape[1]),
channel_ids=np.hstack(
[self.channel_index.channel_ids, other.channel_index.channel_ids]),
channel_names=np.hstack(
[self.channel_index.channel_names, other.channel_index.channel_names]))
channel_ids=np.hstack(
[self.channel_index.channel_ids,
other.channel_index.channel_ids]),
channel_names=np.hstack(
[self.channel_index.channel_names,
other.channel_index.channel_names]))
else:
signal.channel_index = ChannelIndex(index=np.arange(signal.shape[1]))

return signal

def time_slice(self, t_start, t_stop):
'''
Creates a new AnalogSignal corresponding to the time slice of the
original Signal between times t_start, t_stop.
'''
NotImplementedError('Needs to be implemented for subclasses.')

def concatenate(self, *signals):
'''
Concatenate multiple signals across time.

The signal objects are concatenated vertically
(row-wise, :func:`np.vstack`). Concatenation can be
used to combine signals across segments.
Note: Only (array) annotations common to
both signals are attached to the concatenated signal.

If the attributes of the signals are not
compatible, an Exception is raised.

Parameters
----------
signals : multiple neo.BaseSignal objects
The objects that is concatenated with this one.

Returns
-------
signal : neo.BaseSignal
Signal containing all non-overlapping samples of
the source signals.

Raises
------
MergeError
If `other` object has incompatible attributes.
'''

NotImplementedError('Patching need to be implemented in sublcasses')
93 changes: 92 additions & 1 deletion neo/core/irregularlysampledsignal.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import numpy as np
import quantities as pq

from neo.core.baseneo import BaseNeo, MergeError, merge_annotations
from neo.core.baseneo import MergeError, merge_annotations, intersect_annotations
from neo.core.basesignal import BaseSignal
from neo.core.analogsignal import AnalogSignal
from neo.core.channelindex import ChannelIndex
Expand Down Expand Up @@ -514,3 +514,94 @@ def merge(self, other):
signal.channel_index = ChannelIndex(index=np.arange(signal.shape[1]))

return signal

def concatenate(self, other, allow_overlap=False):
'''
Combine this and another signal along the time axis.

The signal objects are concatenated vertically
(row-wise, :func:`np.vstack`). Patching can be
used to combine signals across segments.
Note: Only array annotations common to
both signals are attached to the concatenated signal.

If the attributes of the two signal are not
compatible, an Exception is raised.

Required attributes of the signal are used.

Parameters
----------
other : neo.BaseSignal
The object that is merged into this one.
allow_overlap : bool
If false, overlapping samples between the two
signals are not permitted and an ValueError is raised.
If true, no check for overlapping samples is
performed and all samples are combined.

Returns
-------
signal : neo.IrregularlySampledSignal
Signal containing all non-overlapping samples of
both source signals.

Raises
------
MergeError
If `other` object has incompatible attributes.
'''

for attr in self._necessary_attrs:
if not (attr[0] in ['signal', 'times', 't_start', 't_stop', 'times']):
if getattr(self, attr[0], None) != getattr(other, attr[0], None):
raise MergeError(
"Cannot concatenate these two signals as the %s differ." % attr[0])

if hasattr(self, "lazy_shape"):
if hasattr(other, "lazy_shape"):
if self.lazy_shape[-1] != other.lazy_shape[-1]:
raise MergeError("Cannot concatenate signals as they contain"
" different numbers of traces.")
merged_lazy_shape = (self.lazy_shape[0] + other.lazy_shape[0], self.lazy_shape[-1])
else:
raise MergeError("Cannot concatenate a lazy object with a real object.")
if other.units != self.units:
other = other.rescale(self.units)

new_times = np.hstack((self.times, other.times))
sorting = np.argsort(new_times)
new_samples = np.vstack((self.magnitude, other.magnitude))

kwargs = {}
for name in ("name", "description", "file_origin"):
attr_self = getattr(self, name)
attr_other = getattr(other, name)
if attr_self == attr_other:
kwargs[name] = attr_self
else:
kwargs[name] = "merge({}, {})".format(attr_self, attr_other)
merged_annotations = merge_annotations(self.annotations, other.annotations)
kwargs.update(merged_annotations)

kwargs['array_annotations'] = intersect_annotations(self.array_annotations,
other.array_annotations)

if not allow_overlap:
if max(self.t_start, other.t_start) <= min(self.t_stop, other.t_stop):
raise ValueError('Can not combine signals that overlap in time. Allow for '
'overlapping samples using the "no_overlap" parameter.')

t_start = min(self.t_start, other.t_start)
t_stop = max(self.t_start, other.t_start)

signal = IrregularlySampledSignal(signal=new_samples[sorting], times=new_times[sorting],
units=self.units, dtype=self.dtype, copy=False,
t_start=t_start, t_stop=t_stop, **kwargs)
signal.segment = None
signal.channel_index = None

if hasattr(self, "lazy_shape"):
signal.lazy_shape = merged_lazy_shape

return signal
Loading