In [1]:
import numpy as np # линейная алгебра
import pandas as pd # для работы с данными
import seaborn as sns # для визуализации
import matplotlib.pyplot as plt # тоже для визуализации
plt.rcParams.update({'figure.max_open_warning': 0}) # убираем ошибку о визуализации слишком большого числа графиков
import os
import plotly.express as ex
from skimage import restoration # для вычитания базовой линии

<h3> Функция нелинейного фильтра

In [None]:
def non_lin_filter(data, mm = 4, p = 1, windows = [2,4,8,16,32,64]):
    pi = 1/(2*len(windows))
    max_window = max(windows)
    backward = []
    forward = []
    b = []
    f = []
    bf = [0]*len(data)
    result = [0]*len(data)
    ind = data.index
    data = data.values

    for i in windows:
        temp_back = [0]*len(data)
        temp_forward = [0]*len(data)
        for t in range(max_window,len(data)-max_window):
            temp_value_b = 0
            temp_value_f = 0
            for step in range(i):
                temp_value_f+=data[t-step-1]
                temp_value_b+=data[t+step+1]
            temp_back[t] = temp_value_b/i
            temp_forward[t] = temp_value_f/i
        backward.append(temp_back)
        forward.append(temp_forward)


    for i in range(len(windows)):
        fi = [0]*len(data)
        bi = [0]*len(data)
        for t in range(max_window,len(data)-max_window):
            temp_fi = 0
            temp_bi = 0
            for j in range(mm):
                temp_fi+=(data[t-j] - forward[i][t-j])**2
                temp_bi+=(data[t+j] - backward[i][t+j])**2
            fi[t] = pi*temp_fi**(-p)
            bi[t] = pi*temp_bi**(-p)
        f.append(fi)
        b.append(bi)

    for t in range(max_window,len(data)-max_window):
        for i in range(len(windows)):
            bf[t] += f[i][t]+b[i][t]
        for i in range(len(windows)):
            f[i][t] = f[i][t]/bf[t]
            b[i][t] = b[i][t]/bf[t]
            result[t] += f[i][t]*forward[i][t]+b[i][t]*backward[i][t]
    return pd.Series(result, index = ind)

<h3>Функция вычета базовой линии

In [None]:
def remove_baseline(data, radius = 15000):
    # Применяем алгоритм катящегося шара
    return data-restoration.rolling_ball(data, radius = radius)

<h3>Решаем в какой последовательности применять фильтр и вычет базовой линии

In [None]:
# fb_signal = signal.copy()
# fb_signal['filter-baseline'] = non_lin_filter(fb_signal['pA'])
# fb_signal = fb_signal[64:-64]
# fb_signal['filter-baseline'] = remove_baseline(fb_signal['filter-baseline'])
# fb_signal = fb_signal.drop(columns = ['pA'])

# bf_signal = signal.copy()
# bf_signal['baseline-filter'] = remove_baseline(bf_signal['pA'])
# bf_signal['baseline-filter'] = non_lin_filter(bf_signal['baseline-filter'])
# bf_signal = bf_signal[64:-64]
# bf_signal = bf_signal.drop(columns = ['pA'])

# fbf_signal = signal.copy()
# fbf_signal['filter-baseline-filter'] = non_lin_filter(fbf_signal['pA'])
# fbf_signal = fbf_signal[64:-64]
# fbf_signal['filter-baseline-filter'] = remove_baseline(fbf_signal['filter-baseline-filter'])
# fbf_signal['filter-baseline-filter'] = non_lin_filter(fbf_signal['filter-baseline-filter'])
# fbf_signal = fbf_signal[64:-64]
# fbf_signal = fbf_signal.drop(columns = ['pA'])

# plot_df = pd.DataFrame(fb_signal)
# plot_df = plot_df.merge(bf_signal, on = 't', how = 'left')
# plot_df = plot_df.merge(fbf_signal, on = 't', how = 'left')

# plt.figure(figsize = (15,8))
# ax = plt.gca()
# ax.set_facecolor('black')
# sns.lineplot(data = pd.melt(plot_df.iloc[:5000],'t'),x = 't', y = 'value', hue = 'variable')
# plt.grid(False)
# legend = plt.legend(title = 'Treatment combination', labelcolor = 'white', frameon = False)
# plt.setp(legend.get_title(), color = 'white');

<h5> Лучший результат дало применение фильтра, вычитание базовой линии, а затем повторное применение фильтра

<h3>Функция фильтрации и обрезки сигнала

In [None]:
def filter_and_crop (signal):
    filtered_signal = signal.copy()
    # Последовательно применяем нелинейный фильтр и вычет базовой линии
    filtered_signal['pA'] = non_lin_filter(filtered_signal['pA'])
    filtered_signal = filtered_signal[64:-64]
    filtered_signal['pA'] = remove_baseline(filtered_signal['pA'])
    # Визуализируем отфтльтрованный сигнал
    plt.figure(figsize = (15,7))
    plot = sns.lineplot(x = filtered_signal['t'], y = filtered_signal['pA'])
    plot.set_title('Filtered signal')
    plt.show()

    # Даем пользователю возможность обрезки сигнала
    while True:
        user_input = input('Input start and end time (x10000): ')
        if user_input == '':
            return '0'
        elif user_input == 'stop':
            return 'stop'
        else:
            start_time, end_time = [int(x) for x in user_input.split()]
        cropped_signal = filtered_signal.iloc[start_time:end_time+1]
        # Визуализируем обрезанный сигнал
        plt.figure(figsize = (15,5))
        plot = sns.lineplot(x = cropped_signal['t'], y = cropped_signal['pA'])
        plot.set_title('Cropped signal')
        plt.show()
        # Удостоверяемся в том, что пользователь доволен выбранной обрезкой
        satisfied = input('Are you satisfied with a result (y/n)')
        if satisfied == 'y':
            break
    return cropped_signal

<h3>Функция поиска пиков и их характеризации

In [None]:
def find_peaks(signal):
    thr = float(input('Input threshold value:'))
    signal.reset_index(drop=True, inplace = True)
    # Создаем пустой список, в который будут вносится данные о найденных пиках
    peaks = []
    # Проходим по всем временным точкам сигнала через одну (нужно чтобы пик не регистрировался дважды)
    # Вносим временные точки вокруг которых происходит переход порогового значения в массив
    # с индексом "1", если сигнал поднялся выше порогового, и "-1" - если опустился ниже
    for i in range(1, len(signal)-1,2):
        if signal['pA'][i-1] < thr and signal['pA'][i+1] > thr:
            peaks.append([signal['t'][i],1])
        elif signal['pA'][i-1] > thr and signal['pA'][i+1] < thr:
            peaks.append([signal['t'][i],-1])

    # Переводим список в таблицу для удобства работы с ним, называем столбцы соответсвующе
    peaks = pd.DataFrame(peaks, columns = ['time','up/down'])
    peaks = peaks[['time', 'up/down']]

    result = pd.DataFrame(columns = ['peak time up', 'peak time down', 'time open',
                                 'time since last opening', 'current inside', 'current outside', 'amplitude'])

    for i in range(peaks.shape[0]-1): # Идем по списку найденных пиков кроме последнего (если он является пиком вверх, то мы все равно его не считаем)
        if peaks.iloc[i]['up/down'] == 1: # Смотрим только на пики вверх

            time_up = peaks.iloc[i]['time'] # Находим время открытия и закрытия канала
            time_down = peaks.iloc[i+1]['time']

            # Находим медианное значение тока между временем открытия и закрытия канала
            current_inside = signal[(signal['t']>=time_up)&(signal['t']<time_down)]['pA'].median()

            # Если пик не первый, то находим время прошлого пика вниз
            last_time_down = 0 if i == 0 else peaks.iloc[i-1]['time']
            # Если пик не последний вверх, то ищем время следущего пика вверх
            next_time_up = signal['t'].max() if i > peaks.shape[0]-3 else peaks.iloc[i+2]['time']
            # Медианный ток снаружи ищется как среднее знаение тока между прошлым пиком вниз и рассматриваемым пиком вверх (ток слева от открытого канала)
            current_outside = signal[((signal['t']>=last_time_down)&(signal['t']<time_up))]['pA'].median()

            # Создаем словарь, в который вносится интересующая нас информация
            temp = {'peak time up':time_up,
                    'peak time down':time_down,
                    'time open':time_down-time_up,
                    'time since last opening':time_up-last_time_down,
                    'current inside':current_inside,
                    'current outside':current_outside,
                    'amplitude': current_inside-current_outside
                   }
            # Новая строка с данными вносится в результирующую таблицу
            result = pd.concat([result,pd.DataFrame.from_records(temp, index = [0])])

    # Приводим таблицу в порядок переназначая индексные значения и удаляя временный индекс
    result = result.reset_index().drop(columns = ['index'])
    print('\n'+str(result.shape[0]) + ' peaks found.')
    return result

<h3> Функция моделирования сигнала

In [None]:
def model_signal(signal, peaks_analysis_table):
    modeled_signal = signal.copy()
    modeled_signal['pA'] = 0

    # Проходимся по всем пикам
    for i in range(peaks_analysis_table.shape[0]):
        # В моделируемом сигнале выбираем времена соответствующие времени нахождения канала в открытом состоянии
        # и придаем значение равное среднему уровню тока в этом канале
        modeled_signal.loc[(modeled_signal['t'] >= peaks_analysis_table.iloc[i]['peak time up'])&
                      (modeled_signal['t'] <= peaks_analysis_table.iloc[i]['peak time down']),'pA'] = peaks_analysis_table.iloc[i]['current inside']
        # Далее выбираем времена от конца прошлого канала до начала этого и заменяем значения их тока на средний ток снаружи канала
        # для первого канала время берем от нуля
        if i == 0:
            modeled_signal.loc[(modeled_signal['t'] >= 0)&
                               (modeled_signal['t'] < peaks_analysis_table.iloc[i]['peak time up']),
                               'pA'] = peaks_analysis_table.iloc[i]['current outside']
        else:
            modeled_signal.loc[(modeled_signal['t'] > peaks_analysis_table.iloc[i-1]['peak time down'])&
                               (modeled_signal['t'] < peaks_analysis_table.iloc[i]['peak time up']),
                               'pA'] = peaks_analysis_table.iloc[i]['current outside']

    last_current_outside = signal[signal['t']>peaks_analysis_table['peak time down'].iloc[-1]]['pA'].median()
    modeled_signal.loc[modeled_signal['t']>peaks_analysis_table['peak time down'].iloc[-1],['pA']] = last_current_outside

    # Строим смоделированный сигнал вместе с оригинальным

    # Задаем размер холста
    plt.figure(figsize = (15,8))
    # Строим оригинальный сигнал
    sns.lineplot(x = signal['t'],y = signal['pA'], alpha = 0.3, label = 'original signal')
    # Строим смоделированный сигнал
    sns.lineplot(x = modeled_signal['t'],y = modeled_signal['pA'], label = 'modeled signal', color = 'red')
    # Включаем легенду
    plt.legend()
    # Задаем название графика
    plt.title('Comparison between original and modelled signals')
    plt.show()
    return modeled_signal

In [None]:
# Задаем директорию из которой берем сигналы и номер порции сигналов
directory  = './patchclamp-data/16_09'
patch_batch = '01_3'

# Создаем список миен файлов нужных нам сигналов и сортируем его по возрастанию
signals = os.listdir(directory)
signals = [signal for signal in signals if patch_batch in signal]
signals.sort()
tenth_signal = signals.pop(1)
signals.append(tenth_signal)

# Для каждого сигнала проводим обработку
for i in signals:
    # Производим чтение файла с сигналом
    signal = pd.read_csv(directory+'/'+i, header = None, sep = ' ',names = ['t', 'pA'])
    # Делаем так, чтобы отсчет времени велся с 0 (из-за особенностей данных это не всегда изначально так)
    signal['t'] = np.arange(0,2.9401, 0.0001)
    # Фильтруем сигнал и вычитаем его базовую линию, после чего обрезаем его
    filtered_signal = filter_and_crop(signal)
    if type(filtered_signal) == pd.DataFrame:
        # Повторяем поиск пиков и моделирование сигнала пока не будет выбрано подходящее пороговое значение
        while True:
            # Находим пики в сигнале и вносим в таблицу результатов информацию о них
            results = find_peaks(filtered_signal)
            # На основании таблицы с результатами воссоздаем исходный сигнал
            modeled_signal = model_signal(filtered_signal, results)
            satisfied = input('Are you satisfied with chosen threshold value (y/n)')
            if satisfied == 'y':
                break
          # Сохраняем результат
        savename = '/saved_data/'+i[:-4]+'_results.txt'
        results.to_csv(savename)
    else:
        if filtered_signal == '0':
            continue
        elif filtered_signal == 'stop':
            break

<h3>Для обработки отдельного сигнала

In [None]:
# # задаем директорию из которой берем сигнал
# filepath  = ''


# signal = pd.read_csv(filepath, header = None, sep = ' ',names = ['t', 'pA'])
# #делаем так, чтобы отсчет времени велся с 0 (из-за особенностей данных это не всегда изначально так)
# signal['t'] = np.arange(0,2.9401, 0.0001)
# #фильтруем сигнал и вычитаем его базовую линию, после чего обрезаем его
# filtered_signal = filter_and_crop(signal)
# #повторяем поиск пиков и моделирование сигнала пока не будет выбрано подходящее пороговое значение
# while True:
#     #находим пики в сигнале и вносим в таблицу результатов информацию о них
#     results = find_peaks(filtered_signal)
#     #на основании таблицы с результатами воссоздаем исходный сигнал
#     modeled_signal = model_signal(filtered_signal, results)
#     satisfied = input('Are you satisfied with chosen threshold value (y/n)')
#     if satisfied == 'y':
#         break


# savename = filepath[filepath.rfind('/')+1:-4]+'_result'+filepath[-4:]
# results.to_csv(savename)