<a href="https://colab.research.google.com/github/dtabuena/EphysLib/blob/main/ABF_Quality_Control.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
##################################  Quality Control Filtering ##################################
def QC_full_dataset(abf_recordings_df,to_plot=False,verbose=False,VC_prot=[],IC_prot=[],MT_prot=[]):
    'Loop through all abfs to look for signs of a poor recording including leak current, unstable holding V/I,'
    'high noise, and in appropriate holidng potential'

    # abf_recordings_df['QC_checks'] = None
    # abf_recordings_df['QC_values'] = None
    abf_recordings_df['passing_sweeps'] = None

    print('Voltage Clamp Protocols')
    VC_idx = [p in VC_prot for p in abf_recordings_df['protocol']]
    for fn in tqdm( abf_recordings_df[VC_idx].index ):
        if verbose: print(np.where(fn==abf_recordings_df.index)[0])
        try:
            pass_rate, passing_sweeps, QC_check_df, QC_val_df = Vclamp_QC(fn,to_plot=to_plot,verbose=verbose)
            abf_recordings_df.at[fn,'passing_sweeps'] = np.array(passing_sweeps)
        except AssertionError:
            if verbose: print_assert()
            abf_recordings_df.at[fn,'passing_sweeps'] = []

    print('Current Clamp Protocols')
    IC_idx = [p in IC_prot for p in abf_recordings_df['protocol']]
    for fn in tqdm( abf_recordings_df[IC_idx].index ):
        if verbose: print(np.where(fn==abf_recordings_df.index)[0])
        try:
            pass_rate, passing_sweeps, QC_check_df, QC_val_df = Iclamp_QC(fn,to_plot=to_plot,verbose=verbose)
            abf_recordings_df.at[fn,'passing_sweeps'] = np.array(passing_sweeps)
        except AssertionError:
            if verbose: print_assert()
            abf_recordings_df.at[fn,'passing_sweeps'] = []
       
    return abf_recordings_df


def Vclamp_QC(file_name, max_leak=200,max_high_freq_noise = 10, max_low_freq_noise = 15,
              Vhold_range = 8, to_plot=False,verbose=False):
    'Look for signs of a poor recording for Voltage Clamp recordings'
    is_VC = np.nan
    is_Ic = np.nan
    abf = pyabf.ABF( file_name )
    if 'mV' in abf.sweepLabelY:
        is_IC = True
        is_VC = False
    if 'pA' in abf.sweepLabelY:
        is_IC = False
        is_VC = True


    try:
        assert is_VC==True, 'Wrong clamp mode for protocol. Voltage protocol used during current clamp!'
    except AssertionError as e:
        print(e)
        return 0, [], [], []



    # assert is_VC==True, 'Wrong clamp mode for protocol. Voltage protocol used during current clamp!'
    

    theta, command_offset, correct_ch1 =  predict_telegraph(abf)


    QC_check_list = []
    QC_val_list = []
    for s in abf.sweepList:
        abf.setSweep(s,0)
        QC_checks, QC_values, windows = qc_sweep(abf.sweepX,abf.sweepC,abf.sweepY,command_offset,is_IC,is_VC,abf.sampleRate,                                                 
                                                 max_leak=max_leak,
                                                 max_high_freq_noise = max_high_freq_noise,
                                                 max_low_freq_noise = max_low_freq_noise,
                                                 Vhold_range = Vhold_range, to_plot=False)
        QC_check_list.append(QC_checks)
        QC_val_list.append(QC_values)
    
    QC_check_df = pd.DataFrame({'sweep' : np.arange(len(QC_val_list))}).set_index('sweep')
    for i in np.arange(len(QC_check_list)):
        d = QC_check_list[i]
        for (k,v) in d.items():
            QC_check_df.at[i,k]=v

    QC_val_df = pd.DataFrame({'sweep' : np.arange(len(QC_val_list))}).set_index('sweep')
    for i in np.arange(len(QC_val_list)):
        d = QC_val_list[i]
        for (k,v) in d.items():
            QC_val_df.at[i,k]=v

    pass_rate = {}
    for c in QC_check_df.columns:
        pass_rate[c] = np.round(np.mean(np.array(QC_check_df[c].values).astype(int))*100,2)

    if to_plot:
        fig, ax, theta = plot_sweeps_and_command(abf,windows=windows)
        plt.show()

    passing_sweeps = [s for s in QC_check_df.index if all(QC_check_df.loc[s,:])]
    if verbose: print('\n','pass_rate:',pass_rate)
    if verbose: print('passing_sweeps:',passing_sweeps)
    return pass_rate, passing_sweeps, QC_check_df, QC_val_df

def Iclamp_QC(file_name, max_leak=250, to_plot=False,verbose=False):
    'Look for signs of a poor recording for Current Clamp recordings'
    abf = abf_or_name(file_name)
    # abf = pyabf.ABF( file_name )
    if 'mV' in abf.sweepLabelY:
        is_IC = True
        is_VC = False
    if 'pA' in abf.sweepLabelY:
        is_IC = False
        is_VC = True
    try:
        assert is_IC==True, 'Wrong clamp mode for protocol. IC protocol used during voltage clamp!'
    except AssertionError as e:
        print(e)

    # assert is_IC==True, 'Wrong clamp mode for protocol. IC protocol used during voltage clamp!'

    theta, command_offset, correct_ch1 =  predict_telegraph(abf)
    
    QC_check_list = []
    QC_val_list = []
    for s in abf.sweepList:
        abf.setSweep(s,0)
        QC_checks, QC_values, windows = qc_sweep(abf.sweepX,abf.sweepC,abf.sweepY,command_offset,is_IC,is_VC,
                                                 abf.sampleRate,to_plot=to_plot, max_high_freq_noise = .05, max_low_freq_noise = 0.4, Vhold_range=3)
        QC_check_list.append(QC_checks)
        QC_val_list.append(QC_values)
    
    QC_check_df = pd.DataFrame({'sweep' : np.arange(len(QC_val_list))}).set_index('sweep')
    for i in np.arange(len(QC_check_list)):
        d = QC_check_list[i]
        for (k,v) in d.items():
            QC_check_df.at[i,k]=v

    QC_val_df = pd.DataFrame({'sweep' : np.arange(len(QC_val_list))}).set_index('sweep')
    for i in np.arange(len(QC_val_list)):
        d = QC_val_list[i]
        for (k,v) in d.items():
            QC_val_df.at[i,k]=v

    pass_rate = {}
    mean_values = {}
    for c in QC_check_df.columns:
        pass_rate[c] = np.round(np.mean(np.array(QC_check_df[c].values).astype(int))*100,2)
        mean_values[c] = np.round(np.mean(np.array(QC_val_df[c].values)),3)
    passing_sweeps = [s for s in QC_check_df.index if all(QC_check_df.loc[s,:])]
    
    if verbose==True: 
        print('')
        print('pass_rate:',pass_rate)
        print('mean_values:',mean_values)
    if verbose==2:
        print('\n','pass_rate:',pass_rate)
        print('passing_sweeps:',passing_sweeps, str(len(passing_sweeps)/len(QC_check_df)*100)+'%')
        print(QC_val_df)

    if to_plot:
        fig, ax, theta = plot_sweeps_and_command(abf,windows=windows)
        plt.show()

    return [pass_rate,passing_sweeps,QC_check_df,QC_val_df] # return pass_rate, passing_sweeps, QC_check_df, QC_val_df

def qc_sweep(sweepX,sweepC,sweepY,command_offset,is_IC,is_VC,sampleRate,
             max_leak=100, 
             max_high_freq_noise = 10,
             max_low_freq_noise = 10,
             Vhold_range = 5, to_plot=False):
    'Recieves a sweep and and calculates leak, noise and holding potential'
    'returns a dict of calculated values and a dict of boolean indicating'
    'pass/fail, (True/False)'


    stim_buffer_time = 250 #ms
    filtered_command = movmean((sweepC==sweepC[0])*1, stim_buffer_time/1000*sampleRate)
    ss_no_stim_bool = filtered_command==1
    ss_no_stim_idx = np.arange(len(ss_no_stim_bool))[ss_no_stim_bool]
    no_stim_sig = sweepY[ss_no_stim_bool]
    no_stim_t = sweepX[ss_no_stim_idx]
    baseline = np.mean( no_stim_sig )

    QC_checks = {}
    QC_values = {}
    if is_IC:
        QC_values['V_hold'] = baseline
        QC_values['I_leak'] = command_offset
    if is_VC:
        QC_values['V_hold'] = command_offset
        QC_values['I_leak'] = baseline
    
    QC_checks['V_hold'] = abs(QC_values['V_hold'] - -70)< Vhold_range
    QC_checks['I_leak'] = QC_values['I_leak']<max_leak


    
    HF_noise_idx = ss_no_stim_idx[:int(0.0015*sampleRate)]
    HF_noise_signal = sweepY[ HF_noise_idx ] 
    HF_noise = rms_noise(HF_noise_signal)     
    
    QC_checks['HF_noise'] = HF_noise<max_high_freq_noise
    QC_values['HF_noise'] = HF_noise
    
    LF_noise_idx = ss_no_stim_idx[-int(0.1*sampleRate):] 
    if int(0.1*sampleRate)>len(LF_noise_idx): LF_noise_idx = np.random.choice(LF_noise_idx, size=int(0.1*sampleRate))
    LF_noise_signal = sweepY[ LF_noise_idx ] 
    LF_noise = rms_noise(LF_noise_signal)
    QC_checks['LF_noise'] = LF_noise<max_low_freq_noise
    QC_values['LF_noise'] = LF_noise

    HF_noise_time = sweepX[HF_noise_idx]
    LF_noise_time = sweepX[LF_noise_idx]
    LF_noise_time = np.sort(np.array(list(set(LF_noise_time))))

    return QC_checks, QC_values, [HF_noise_time, LF_noise_time]