In [None]:
# default_exp core

In [None]:
from nbdev.showdoc import *

In [None]:
#export
import numpy as np
from collections import namedtuple 
from typing import Dict, Tuple, Sequence, Union
import itertools

import matplotlib.pyplot as plt
from matplotlib.patches import Patch

class DataChunk(np.ndarray):
    """Base brick of data."""
    def __new__(cls, data, idx, group, fill=0):
        # See https://docs.scipy.org/doc/numpy-1.11.0/user/basics.subclassing.html#basics-subclassing
        # for explanation on subclassing numpy arrays
        obj = np.asarray(data).view(cls)
        obj.idx = idx
        obj.group = group
        obj.fill = fill
        
        obj.attrs = {}
        obj.slice = obj._get_slice()
        obj.range = obj._range()
        
        return obj

    def __array_finalize__(self, obj):
        if obj is None: return
        self.idx = getattr(obj, 'idx', None)
        self.group = getattr(obj, 'group', None)
        self.fill = getattr(obj, 'fill', 0)
        
    def _range(self):
        return range(self.idx, self.idx + len(self))
    
    def _get_slice(self):
        return slice(self.idx, self.idx + len(self))

    def __str__(self):
        return ("Group: "+str(self.group)
                +"\nStarting index: "+str(self.idx)
                +"\nFilling value: "+str(self.fill)
                +"\n"+super().__str__())
    
    def __repr__(self):
        return "DataChunk(%s,%s,%s,%s)"%(self.shape, self.idx, self.group, self.fill)

In [None]:
#export
class ContiguousRecord():
    """Representation of a contiguous recording session to store DataChunk
    of various sources under a single time reference. DataChunk are stored
    under a name in one of the groups "sync","stim","data" and "cell". 
    
    A name can contain multiple DataChunk if those are not overlapping in time.
    
    Each ContiguousRecord contains in the group "sync" two master DataChunk,
    one for signals to be recorded across acquisition device to syncronize them,
    one for timepoints of these signals for the main device and are called 
    respectively "signals" and "main_tp".
    
    """
    MAIN_TP = "main_tp"
    SIGNALS = "signals"
    def __init__(self, length:int, signals:DataChunk, main_tp:DataChunk):
        """Instanciate a ContiguousRecord.
        
        Parameters:
            length (int): Number of bins of this record
            signals (DataChunk): Signals for this record
            main_tp (DataChunk): Timepoints of the signals for the main device
        """
        self.length = length
        self._data_dict = {}  
        
        self[self.SIGNALS] = signals
        self[self.MAIN_TP] = main_tp
      
    def dataset_intersect(self, existing_datachunk:list, new_datachunk:DataChunk):
        range_new = set(new_datachunk.range)
        intersect = False
        for range_existing in existing_datachunk:
            intersect |= len(range_new.intersection(range_existing.range)) > 0
        return intersect
    
    def keys(self):
        return self._data_dict.keys()
    
    def get_slice(self, datachunk_name:str) -> list:
        if datachunk_name in self._data_dict.keys():
            return [chunk.slice for chunk in self._data_dict[datachunk_name]]
        else:
            return []
    
    def get_names_group(self, group_name:str) -> list:
        names = []
        for key, dChunk_l in self._data_dict.items():
            if dChunk_l[0].group == group_name:
                names.append(key)
        return names
        
    def __len__(self):
        return self.length
    
    def __setitem__(self, key, value:DataChunk):
        if isinstance(key, str):
            if key not in self._data_dict.keys():
                self._data_dict[key] = []
                
            if not self.dataset_intersect(self._data_dict[key], value):
                self._data_dict[key].append(value)
            else:
                raise ValueError("Data with the same name already exists and intersect with the one provided")
        else:
            raise KeyError("Cannot set data with an integer index, it needs a name")
                
    def __getitem__(self, key):
        if isinstance(key, str):
            l_datachunk = self._data_dict[key]
            shape     = l_datachunk[0].shape
            full_sequence = np.full(shape=(self.length, *shape[1:]), 
                                    fill_value=l_datachunk[0].fill, 
                                    dtype=l_datachunk[0].dtype)
            for datachunk in l_datachunk:
                full_sequence[datachunk.slice] = datachunk.data
            
            return full_sequence
                
    def __iter__(self):             
        groups = {"sync":[],"stim":[],"data":[],"cell":[]}
        for key, dChunk_l in self._data_dict.items():
            groups[dChunk_l[0].group].append((dChunk_l[0].idx, key))

        self._iter_order = []
        for group_name in ["sync","stim","data","cell"]:
            sorted_ = sorted(groups[group_name], key=lambda e:(e[0],))
            self._iter_order.extend([key for _, key in sorted_])
        self._n = 0
        
        return self

    def __next__(self):
        if self._n < len(self._iter_order):
            key = self._iter_order[self._n]
            dChunk_l = self._data_dict[key]
            self._n += 1
            return (key, dChunk_l)
        else:
            raise StopIteration
            
    def __delitem__(self, key):
        del self._data_dict[key]
        
    def __str__(self):
        res = "ContiguousRecord:\n"
        for k,v in self._data_dict.items():
            res += k+" : "+" ".join([str(dc.shape) for dc in v]) +"\n"
        return res
    
    def __repr__(self):
        return self._data_dict.__repr__()

In [None]:
#export
class RecordMaster(list):
    """
    One Timeserie to rule them all, One Timeserie to find them,
    One Timeserie to bring them all and in the darkness bind them
    
    The RecordMaster class is the top level object managing all
    timeseries. It uses a list of ContiguousRecord to represent
    possible discontinuted data records.
    
    The main aim of the RecordMaster is to store the various data
    stream of an experiment under a unique time reference, to ease
    the processing of the data.
    """
    
    def __init__(self, reference_data_list: Sequence[Tuple[DataChunk, DataChunk]], frame_time=1/60, sep_size=1000):
        
        self.frame_time = frame_time
        self.sep_size   = sep_size
        self._sequences = []
        for ref_timepoints, ref_signals in reference_data_list:
            cs = ContiguousRecord(len(ref_timepoints), ref_signals, ref_timepoints)
            self._sequences.append(cs)
            
    def set_datachunk(self, dc:DataChunk, name:str, sequence_idx=0):
        """Set the given DataChunk dc for the sequence at sequence_idx under name."""
        self._sequences[sequence_idx][name] = dc
        
    def append(self, ref_timepoints:DataChunk, ref_signals:DataChunk):
        cs = ContiguousRecord(len(ref_timepoints), ref_signals, ref_timepoints)
        self._sequences.append(cs)
        
    def insert(self, idx:int, ref_timepoints:DataChunk, ref_signals:DataChunk):
        cs = ContiguousRecord(len(ref_timepoints), ref_signals, ref_timepoints)
        self._sequences.insert(idx, cs)
        
    def __setitem__(self, key, value:DataChunk):
        """Setting an item directly to the record_master place it in the first sequence"""
        if isinstance(key, str):
            self._sequences[0][key] = value
        
    def __getitem__(self, key):
        if isinstance(key, (int, np.integer)):
            return self._sequences[key]
        elif isinstance(key, str):
            #We want all the data of that name
            res = []
            for seq in self._sequences:
                res.append(seq[key])
            return res

        raise TypeError("Indexing not understood")
        
    def __iter__(self):
        self._n = 0
        return self
    
    def __next__(self):
        if self._n < len(self):
            res = self._sequences[self._n]
            self._n += 1
            return res
        else:
            raise StopIteration 
            
    def __len__(self):
        return len(self._sequences)
        
    def plot(self):
        colors = {"sync":"cornflowerblue", "stim":"orange", "data":"yellowgreen", "cell":"plum"}
        cursor = 0
        y_pos_dict = {}
        y_count    = 0
        fig, ax = plt.subplots(figsize=(10, 5))
        ax.invert_yaxis()
        ax.xaxis.set_visible(False)
        for seq in self._sequences:
            for y, (name, dChunk_l) in enumerate(seq):
                for dChunk in dChunk_l:
                    pos = dChunk.idx + cursor 
                    ax.barh(name, len(dChunk), left=pos, height=0.8, color=colors[dChunk.group], label=dChunk.group)
                    x = pos + len(dChunk)/2
                    text = "{0} -> {1} ".format(self.to_time_str(dChunk.idx), self.to_time_str(dChunk.idx+len(dChunk)))
                    if name not in y_pos_dict.keys():
                        y_pos_dict[name] = y_count
                        y_count+=1
                    y_pos = y_pos_dict[name]
                    ax.text(x, y_pos, text, ha='center', va='center')
            cursor += len(seq) + self.sep_size
            
        legend_elements = [Patch(facecolor=colors["sync"],label='Synchro'),
                           Patch(facecolor=colors["data"],label='Data'),
                           Patch(facecolor=colors["stim"],label='Stimulus'),
                           Patch(facecolor=colors["cell"],label='Cell'),]
        ax.legend(handles=legend_elements, ncol=5, bbox_to_anchor=(0, 1), loc='lower left', fontsize='small')
        
        ax.set_xlim(-100,cursor)
            
    def to_s(self, n_frame):
        return round(self.frame_time*n_frame,2)
    
    def to_time_str(self, n_frame):
        s = int(self.to_s(n_frame))
        m, s = s//60, str(s%60)
        h, m = str(m//60), str(m%60)
        return "{0}:{1}:{2}".format('0'*(2-len(h))+h, '0'*(2-len(m))+m, '0'*(2-len(s))+s)
    
    def __str__(self):
        return "["+",\n".join([repr(seq) for seq in self._sequences])+"]"
    
    def __repr__(self):
        return "["+", ".join([repr(seq) for seq in self._sequences])+"]"

In [None]:
from nbdev.export import *
notebook2script()

Converted 00_core.ipynb.
Converted index.ipynb.
