In [7]:
# Written by Mengzhan Liufu at Yu Lab, UChicago

In [3]:
import numpy as np
import math
from scipy import signal
import scipy

In [3]:
def bandpass_filter(filter_name, flattened_array, sampling_freq, order, lowcut, highcut):
    '''
    Filter a signal array with bandpass
    @param filter_name: the type of filter to use
    @param flattened_array: 1D signal array to be filtered
    @param sampling_freq: the sampling frequency with which the signal is collected
    @param order:
    @param lowcut: lower bound of the frequency band of interest
    @param highcut: higher bound of the frequency band of interest
    
    @return y: the signal in frequency band [lowcut, highcut]
    '''
    if filter_name == 'elliptical':
        sos = scipy.signal.ellip(order, 0.01, 120, [lowcut, highcut], btype='bp', output='sos', fs=sampling_freq)
    if filter_name == 'butterworth':
        sos = scipy.signal.butter(order, [lowcut, highcut], 'bp', fs=sampling_freq, output='sos')
    if filter_name == 'cheby1':
        sos = scipy.signal.cheby1(order, 1, [lowcut, highcut], 'bp', fs=sampling_freq, output='sos')
    if filter_name == 'cheby2':
        sos = signal.cheby2(order, 15, [lowcut, highcut], 'bp', fs=sampling_freq, output='sos')

    y = scipy.signal.sosfiltfilt(sos, flattened_array)

    return y

In [3]:
def convert_time(time_file):
    '''
    Convert trodes timestamps into seconds
    @param time_file: trodes timestamps.dat source file
    
    @return np.array(time_data): numpy array of timestamps in seconds 
    '''
    start_time = int(time_file['first_timestamp'])
    time_data = []
    for i in time_file['data']:
        time_data.append((i[0]-start_time)/30000)
        
    return np.array(time_data)

In [6]:
def hilbert_processing(*power_data):
    '''
    Apply Hilbert transformation to a power data series to get is magnitude envelope
    
    @param power_data: raw power data arrays of consistent lengths
    
    @return hilbert_magnitude: magnitude envelope of power_data after hilbert transformation
    '''
    length = len(power_data[0])
    power_sum = np.array([0.]*length)
    for i in power_data:
        if(len(i)!=length):
            raise ValueError('Power arrays must have consistent length')
    
        power_sum += np.array(i)
                    
    return np.abs(scipy.signal.hilbert(power_sum/len(power_data)))

In [6]:
def denoise(power_data, time_data, noise_label, threshold):
    '''
    Remove data samples that are labeled as global noise from power series 
    @param power_data: data sample array
    @param time_data: timestamp array in seconds, match power_data in length
    @param noise_label: T/F array of each sample labeled as noise or not, match power_data in length
    @param threshold: a maximum value of power to be considered normal activity (instead of noise)
    
    @return denoised: array of samples without noise samples
    @retun denoised_time: timestamp array in seconds, match denoised in length
    '''
    if((len(power_data)!=len(time_data)) or (len(power_data))!=len(noise_label)):
        raise ValueError('Power array, time array and noise label array should all have the same length')
    
    denoised = []
    denoised_time = []
    for i in range(0,len(power_data)):
        if ((not noise_label[i]) and (power_data[i]<=threshold)):
            denoised.append(power_data[i])
            denoised_time.append(time_data[i])
    
    return denoised, denoised_time

In [6]:
def detect(magnitude, num_std, num_wait):
    '''
    Detect threshold crossing events in a given power series
    @param magnitude: power array for threshold-crossing events detection
    @param num_std: number of standard deviation above average (of power array) for threshold to be
    @param num_wait: the number of samples to wait before stimulating. i.e num_wait consecutive Trues then stimulate
    
    @return stimulation: numpy array of stimulation status (T/F) at each sample point in power array
    '''
    if(len(magnitude)==0 or (len(magnitude)<num_wait)):
        raise ValueError('Length of array magnitude is 0, or its length is smaller than wait buffer')
    
    avg = np.mean(magnitude)
    std = np.std(magnitude)
    threshold = avg + std*num_std
    decision = [False]*(num_wait-1)
    stimulation = []
    
    for i in range(0,len(magnitude)):
        decision.append(magnitude[i]>threshold)
        current_stimulation = True
        for m in range(len(decision)-num_wait,len(decision)):
            if not decision[m]:
                current_stimulation = False
        stimulation.append(current_stimulation)
    
    return np.array(stimulation)

In [7]:
def calculate_rms(buffer):
    '''
    return the root mean-squared of a given array
    @param buffer: any array or list of number

    @return: the root mean-squared value of the array as a proxy for its power
    '''
    if(len(buffer)==0):
        raise ValueError('Array has length zero')
    
    square_summed = 0
    for k in buffer:
        square_summed += (k**2)
    
    return math.sqrt(square_summed/len(buffer))

In [18]:
def online_rms_processing(sampling_rate, time_data, buffer_length, lower_bound, upper_bound, threshold, *power_data):
    '''
    Process a given power array with stepwise (i.e online) RMS estimation
    
    @param sampling_rate: 
    @param time_data: array of power_data's timestamps in seconds
    @param buffer_length: number of data samples for RMS estimation
    @param lower_bound: lower bound of target frequency range 
    @param upper_bound: upper bound of target frequency range
    @param threshold: maximum RMS value to be considered
    @param *power_data: unknown number of raw power data arrays. these arrays should have the same length
    
    @return rms_magnitude: array of 1D RMS-processed power array
    @return rms_time: array of rms_magnitude's timestamps in seconds
    '''
    for i in power_data:
        if(len(i)!=len(time_data)):
            raise ValueError('Power array and time array should have the same length')
    if(len(power_data[0])<buffer_length):
        raise ValueError('Power array can\'t be shorted than buffer length')
    
    rms_magnitude = []
    rms_time = []
    for i in range(buffer_length-1,len(power_data[0])):
        current_filtered = np.array([0.]*buffer_length)
        for j in power_data:
            current_buffer = j[i-buffer_length+1:i+1]
            current_filtered += np.array(bandpass_filter('butterworth', current_buffer,\
                                                         sampling_rate, 1, lower_bound, upper_bound))
            
        current_rms = calculate_rms(current_filtered/len(power_data))
        if(current_rms<=threshold):
            rms_magnitude.append(current_rms)
            rms_time.append(time_data[i])
    
    return rms_magnitude, rms_time

In [19]:
def offline_rms_processing(time_data, buffer_length, threshold, *power_data):
    '''
    Filter the whole given power array, then perform RMS estimation on buffer at each sample
    
    @param power_data: array of raw power across time
    @param time_data: array of power_data's timestamps in seconds
    @param buffer_length: number of data samples for RMS estimation
    @param threshold: maximum RMS value to be considered
    
    @return rms_magnitude: array of RMS-processed power series
    @return rms_time: array of rms_magnitude's timestamps in seconds
    '''
    for i in power_data:
        if(len(i)!=len(time_data)):
            raise ValueError('Power array and time array should have the same length')
    if(len(power_data[0])<buffer_length):
        raise ValueError('Power array can\'t be shorted than buffer length')
    
    rms_magnitude = []
    rms_time = []
    for i in range(buffer_length-1,len(power_data[0])):
        current_buffer = np.array([0.]*buffer_length)
        for j in power_data:
            current_buffer += np.array(j[i-buffer_length+1:i+1])
            
        current_rms = calculate_rms(current_buffer/len(power_data))
        if(current_rms<=threshold):
            rms_magnitude.append(current_rms)
            rms_time.append(time_data[i])
    
    return rms_magnitude, rms_time

In [10]:
def extract_events(stimulation, timestamps):
    '''
    Extract timestamps of event onset, offset and duration array from stimulation status array
    @param stimulation: array of stimulation status (T/F) at each timestamp
    @param timestamps: array of timestamps in seconds that match the stimulation array
    
    @return np.array(changedtime): numpy array of timestamps of event onset and offset (alternative) in seconds
    @return np.array(eventduration): numpy array of event durations in milliseconds
    @return np.array(truetime): numpy array of timestamps in seconds of event onset only 
    '''
    if(len(stimulation)!=len(timestamps)):
        raise ValueError('stimulation array and timestamps array must have the same length')
    
    changedtime = []
    eventduration = []
    truetime = []
    for i in range(1,len(stimulation)):
        previous = stimulation[i-1]
        if previous != stimulation[i]:
            changedtime.append(timestamps[i])
            
    for i in range(0,len(changedtime)-1,2):
        eventduration.append((changedtime[i+1]-changedtime[i])*1000)
        
    for i in range(0,len(changedtime),2):
        truetime.append(changedtime[i])
    
    return np.array(changedtime), np.array(eventduration), np.array(truetime)

In [11]:
def deblip_with_duration(truetime,eventduration,threshold):
    '''
    Remove the onset timestamps that belong to noise events based on event duration
    @param truetime: array of timestamps of event onset
    @param eventduration: array of event duration in milliseconds, should match array truetime in length
    @param threshold: minimum event duration to be considered a real event
    
    @return np.array(deblipped): array of timestamps in seconds without "blips"
    '''
    if(len(truetime)!=len(eventduration)):
        raise ValueError('truetime array and eventduration must have the same length')
        
    deblipped = []
    for i in range(0,len(truetime)):
        if(eventduration[i]>=threshold):
            deblipped.append(truetime[i])
    
    return np.array(deblipped)

In [12]:
def deblip_with_frequency(truetime,threshold):
    '''
    Remove the onset timestamps that belong to noise events based on event frequency
    @param truetime: array of timestamps of event onset
    @param threshold: minimum time gap between two events for the second one to be considered a real event
    
    @return np.array(deblipped): array of timestamps in seconds without "blips"
    '''
    deblipped = []
    deblipped.append(truetime[0])
    for i in range(1,len(truetime)):
        if((truetime[i]-truetime[i-1])>threshold):
            deblipped.append(truetime[i])
            
    return np.array(deblipped)

In [2]:
def accuracy_precision_calculation(detection_series_1, detection_series_2, window_width):
    '''
    detection_series_2 is used as ground truth reference to calculate the accuracy and precision of detection_series_1. 
    For a particular event in detection_series_2, if there exists an event in detection_series_1 that matches the one
    in series_2, we mark the one in series_2 (ground truth) as detected and the one in series_1 as truepositive (TP). 
    If an event in series_1 doesn't match any series_2 event, it's marked as falsepositive(FP).
    
    @param detection_series_1: timestamp (of the starts of events) arrays in seconds 
    @param detection_series_2: timestamp (of the starts of events) arrays in seconds
    @param window_width: range of detection time difference allowed in seconds
    
    @return accuracy: percentage of series_2 events detected in series_1
    @return precision: percentage of correct detections (TP) in series_1
    @return (len(detection_series_2)-len(truepositive)): number of series_2 events unmatched by any series_1 event
    '''
    truepositive = []
    falsepositive = []
    for i in range(0,len(detection_series_1)):
        cnt = 0
        for j in range(0,len(detection_series_2)):
            if(abs(detection_series_1[i]-detection_series_2[j])<window_width):
                break
            cnt += 1
        if(cnt<len(detection_series_2)):
            truepositive.append(detection_series_1[i])
        else:
            falsepositive.append(detection_series_1[i])

    try:
        accuracy = (len(truepositive)/len(detection_series_2))*100
    except ZeroDivisionError:
        accuracy = 0
    try:
        precision = (len(truepositive)/(len(truepositive)+len(falsepositive)))*100
    except ZeroDivisionError:
        precision = 0
    
    return accuracy, precision, (len(detection_series_2)-len(truepositive))

In [None]:
def lag_distribution(time_series_1,time_series_2):
    '''
    Calculate general time lag distribution between two time series with cross correlation
    @param time_series_1: array of timestamps in seconds
    @param time_series_2: array of timestamps in seconds
    
    @return lag_distribution: array of cross-correlated time lags in milliseconds
    '''
    lag_distribution = []
    for i in range(len(time_series_1)):
        for j in range(len(time_series_2)):
            lag_distribution.append(round(time_series_1[i]-time_series_2[j])*1000)
            
    return lag_distribution