In [1]:
import pandas as pd
from io import StringIO
import regex
from pathlib import Path
from os import path, listdir, mkdir

In [3]:
def drop_hidden_columns(initial_df):
    """
    Drops the hidden and empty columns from a Quant Excel Results tab DataFrame.
    """
    results_df = initial_df.drop(["Unnamed: 2", "Unnamed: 3", "Unnamed: 4"], axis=1)
    undesired_columns = [i for i in range(5, 77) if ((i + 4) % 3 != 0)]
    for i in undesired_columns:
        results_df.drop([f"Unnamed: {i}"], axis=1, inplace=True)
    return results_df

def split_plate_into_rows(results_df):
    """
    Splits a given DataFrame from a Quant Excel Results tab into 16 equivalent DataFrames, one for each plate row.
    """
    row_a_df = results_df.loc[0:6]
    row_b_df = results_df.loc[7:13]
    row_c_df = results_df.loc[14:20]
    row_d_df = results_df.loc[21:27]
    row_e_df = results_df.loc[28:34]
    row_f_df = results_df.loc[35:41]
    row_g_df = results_df.loc[42:48]
    row_h_df = results_df.loc[49:55]
    row_i_df = results_df.loc[56:62]
    row_j_df = results_df.loc[63:69]
    row_k_df = results_df.loc[70:76]
    row_l_df = results_df.loc[77:83]
    row_m_df = results_df.loc[84:90]
    row_n_df = results_df.loc[91:97]
    row_o_df = results_df.loc[98:104]
    row_p_df = results_df.loc[105:111]
    return [row_a_df, row_b_df, row_c_df, row_d_df, row_e_df, row_f_df, row_g_df, row_h_df, 
            row_i_df, row_j_df, row_k_df, row_l_df, row_m_df, row_n_df, row_o_df, row_p_df]


def insert_row(row_number, df, row_value):
    """
    Inserts a row into any given index of a DataFrame when provided: 
        the desired insertion index (i.e. what index the new row will have in the DataFrame), 
        the DataFrame upon which to act, 
        the values for the new row.
    """
    
    # Starting value of upper half 
    start_upper = 0
   
    # End value of upper half 
    end_upper = row_number 
   
    # Start value of lower half 
    start_lower = row_number 
   
    # End value of lower half 
    end_lower = df.shape[0] 
   
    # Create a list of upper_half index 
    upper_half = [*range(start_upper, end_upper, 1)] 
   
    # Create a list of lower_half index 
    lower_half = [*range(start_lower, end_lower, 1)] 
   
    # Increment the value of lower half by 1 
    lower_half = [x.__add__(1) for x in lower_half] 
   
    # Combine the two lists 
    index_ = upper_half + lower_half 
   
    # Update the index of the dataframe 
    df.index = index_ 
   
    # Insert a row at the end 
    df.loc[row_number] = row_value 
    
    # Sort the index labels 
    df = df.sort_index() 
   
    # return the dataframe 
    return df 


def new_letter_row(plate_row_letter):
    """
    Creates a row of a single letter 26 columns in length when provided the letter itself.
    """
    new_row = [i for i in range(0, 26)]
    for i in range(0, 26):
        new_row[i] = plate_row_letter
    return new_row

def transform_plate_row(row_df, plate_row_letter):
    new_row = new_letter_row(plate_row_letter)
    row_df = insert_row(1, row_df, new_row)
    row_df.drop(["Unnamed: 0"], axis=1, inplace=True)
    row_df.iloc[0][0] = "Labels"
    row_df.iloc[1][0] = "ROW"
    row_df.iloc[7][0] = "SOURCE WELL"
    headers = row_df.iloc[0]
    row_df = row_df[1:]
    row_df.rename(columns = headers, inplace=True)
    new_row_labels = row_df.loc[:7, "Labels"]
    row_df.rename(new_row_labels, inplace=True)
    row_df.drop(["Labels"], axis=1, inplace=True)
    row_df = row_df.transpose()
    row_df.index.name = "Sample ID"
    row_df.reset_index(inplace=True)
    return row_df

def create_binary_results(row_df):
    """
    Replaces all instances of NaN for a given plate row DataFrame in the gene columns with 0, and all instances of the gene names themselves with 1. REPEAT is left untouched.
    """
    row_df_filled = row_df.fillna({"MS2": "NON-AMPLIFIED", 
                                   "N gene": "NON-AMPLIFIED", 
                                   "ORF1ab": "NON-AMPLIFIED", 
                                   "RP-Cy5": "NON-AMPLIFIED", 
                                   "S gene": "NON-AMPLIFIED"})
    row_df_binary = row_df_filled.replace({"MS2": "AMPLIFIED", 
                                           "N gene": "AMPLIFIED", 
                                           "ORF1ab": "AMPLIFIED", 
                                           "RP-Cy5": "AMPLIFIED", 
                                           "S gene": "AMPLIFIED"})
    return row_df_binary

def add_well_number(joined_df):
    """
    Adds the Well Number column to a rejoined and transformed Results tab DataFrame.
    """
    well_number_df = joined_df.reset_index(drop=True)
    well_number_df.index.name = "well"
    well_number_df.reset_index(inplace=True)
    well_number_df["well"] = [i for i in range(1, 385)]
    return well_number_df

def transform_results(excel_file):
    """
    Takes an auto-generated Quant 12K Excel file and transforms the Results sheet from an imitation plate grid to an easy-to-read table. 
    It returns that table as a pandas DataFrame. This function also adds the Well Number for each sample as a new column. 
    """
    initial_df = pd.read_excel(excel_file, sheet_name="Results", skiprows=2)
    results_df = drop_hidden_columns(initial_df)
    initial_row_df_list = split_plate_into_rows(results_df)
    plate_row_labels = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P"]
    inter_row_df_list = []
    for i in range(0, 16): 
        transformed_df = transform_plate_row(initial_row_df_list[i], plate_row_labels[i])
        binary_df = create_binary_results(transformed_df)
        inter_row_df_list.append(binary_df)
    joined_df = pd.concat(inter_row_df_list)
    final_df = add_well_number(joined_df).rename(columns={"Sample ID": "sample_id",
                                                          "ROW": "row",
                                                          "SOURCE WELL": "source_well", 
                                                          "N gene": "n_gene_result", 
                                                          "S gene": "s_gene_result", 
                                                          "MS2": "ms2_result", 
                                                          "ORF1ab": "orf1ab_result", 
                                                          "RP-Cy5": "rp_cy5_result"})
    
    return final_df

def get_quant_data_from(file):
    lines = open(file, 'r').readlines()
    header = 'Header'
    data = {'Header': []}

    for line in lines:
        line = line.replace('\n', '')
        if line == '':
            continue

        if regex.match(r"\[(\\.|[^]])*]", line):
            data[header] = '\n'.join(data[header])
            header = line.replace('[', '').replace(']', '')
            data[header] = []
            continue

        data[header].append(line)
    return data

def generate_target(gene):
    if (gene == "ms2"):
        return "MS2"
    elif (gene == "n_gene"):
        return "N gene"
    elif (gene == "s_gene"):
        return "S gene"
    elif (gene == "orf1ab"):
        return "ORF1ab"
    else:
        return "RP-Cy5"

def get_sample_data(txt_file):
    data = get_quant_data_from(txt_file)['Amplification Data']
    data_string = ''
    for string in data:
        data_string += string + "\n"
    df = pd.read_csv(StringIO(data_string), delimiter='\t') \
        .rename(columns={'Well': 'well', 'Cycle': 'cycle', 'Target Name': 'target', 'Rn': 'rn', 'Delta Rn': 'delta_rn'})
    df['rn'] = df['rn'].str.replace(',', '')
    df['rn'] = df['rn'].astype(float)
    df['delta_rn'] = df['delta_rn'].str.replace(',', '')
    df['delta_rn'] = df['delta_rn'].astype(float)

    data = {}

    for well in df.query(f"target == 'MS2' and cycle == 1")['well'].values:
        for target in pd.unique(df.query(f"cycle == 1")['target']).tolist():
            dict_target_name = target.replace('-', '_').replace(' ', '_').lower()

            if well not in data:
                data[well] = {}

            target_df = df.query(f"well == {well} and target == '{target}'")
            data[well][dict_target_name] = target_df['rn'].values.tolist()
            data[well][f'{dict_target_name}_delta'] = target_df['delta_rn'].values.tolist()

    return data

def create_gene_df(joined_df, gene):
    gene_df = joined_df.filter(items=["well", "sample_id", "well_position", f"{gene}"]).rename(columns={f"{gene}": "result"})
    return gene_df

def get_sample_answers(excel_file):
    """
    Gene parameter must be either 'n_gene', 's_gene', 'orf1ab', 'ms2', or 'rp-cy5'. No other options are acceptable.
    """
    undesirables = "['neg', 'PC', '', 0]"
    #.query(f'sample_id != {undesirables}') \
    
    answers_df = transform_results(excel_file) \
        .filter(items=['well', 'sample_id', 'ms2_result', 'n_gene_result', 'orf1ab_result', 'rp_cy5_result', 's_gene_result']) \
        .query(f'sample_id != {undesirables}')
    answers_df['sample_id'] = answers_df['sample_id'].apply(str)
    
    
    samples_df = pd.read_excel(excel_file, sheet_name='QUANT DATA', skiprows=42) \
        .query("`Target Name` == 'MS2'") \
        .filter(items=['Well Position', 'Sample Name']) \
        .rename(columns={'Sample Name': 'sample_id', 'Well Position': 'well_position'})
    samples_df['sample_id'] = samples_df['sample_id'].apply(str)
    samples_df = samples_df.reset_index(drop=True).query(f'sample_id != {undesirables}')
    
    return pd.merge(answers_df, samples_df, how='inner', on='sample_id').set_index('well').to_dict('index')    

def merge_dicts(a, b, path=None):
    """
    Merges b into a.
    """
    if path is None: path = []
    for key in b:
        if key in a:
            if isinstance(a[key], dict) and isinstance(b[key], dict):
                merge_dicts(a[key], b[key], path + [str(key)])
            elif a[key] == b[key]:
                pass # same leaf value
            else:
                raise Exception('Conflict at %s' % '.'.join(path + [str(key)]))
        else:
            a[key] = b[key]
    return a

def create_ml_gene_df(df, gene):
    """
    Takes the transposed dataframe created from the full plate dictionary plus the desired gene and returns a dataframe suitable for a machine learning algorithm for that gene alone.
    Gene options are 'n_gene', 's_gene', 'ms2', 'orf1ab', or 'rp_cy5'.
    """
    df_gene = df.drop([column for column in df.columns if (column != f"{gene}_result" and 
                                                           column != f"{gene}_delta" and 
                                                           column != "sample_id" and 
                                                           column != "well_position")], axis=1)
    
    df_gene = df_gene[f"{gene}_delta"].apply(pd.Series) \
        .merge(df_gene, left_index=True, right_index=True) \
        .drop([f"{gene}_delta"], axis=1)
    for i in range(0, 40):
        df_gene.rename(columns={i: f"{gene}_delta_cycle{i+1}"}, inplace=True)
    
    df_gene.index.name = 'well'
    return df_gene

def get_gene_dfs(excel_file, txt_file):
    """
    Given a Quant Excel file and txt file, returns a group of gene specific dataframes that can be used in a machine learning algorithm
    """
    data = get_sample_data(txt_file)
    answers = get_sample_answers(excel_file)
    merged = merge_dicts(answers, data)
    df = pd.DataFrame.from_dict(merged)
    df = df.transpose()
    gene_list = ["n_gene", "s_gene", "orf1ab", "ms2", "rp_cy5"]
    gene_df_dict = {}
    for gene in gene_list:
        gene_df = create_ml_gene_df(df, gene)
        gene_df_dict[f"{gene}"] = gene_df
    
    n_gene = gene_df_dict['n_gene']
    s_gene = gene_df_dict['s_gene']
    ms2 = gene_df_dict['ms2']
    orf1ab = gene_df_dict['orf1ab']
    rp_cy5 = gene_df_dict['rp_cy5']
    return n_gene, s_gene, ms2, orf1ab, rp_cy5

def make_gene_csv(gene_df, gene, plate_id):
    gene_df['plate_id'] = plate_id
    gene_df.to_csv(f"{gene}.csv")
    
def add_to_csv(csv_file, gene_df, plate_id):
    gene_df['plate_id'] = plate_id
    gene_df.to_csv(csv_file, header=None, mode='a')
    
    
def check_plate_matches(excel_directory, txt_directory):
    # Check each Excel for a matching txt file
    for file in listdir(excel_directory):
        plate_name = file.split('.')[0]
        match_tracker = {"Non-matches": 0, "Matches": 0}
        for txt_file in listdir(txt_directory):
            txt_plate_name = txt_file.split('.')[0]
            if (plate_name in txt_plate_name):
                match_tracker["Matches"] += 1
            else:
                match_tracker["Non-matches"] += 1
        if (match_tracker["Matches"] != 1):
            print(f"The Excel file with the plate ID {plate_name} has {match_tracker['Matches']} txt matches")
    
    # Check each txt file for a matching Excel file
    for file in listdir(txt_directory):
        plate_name = file.split('.')[0]
        match_tracker = {"Non-matches": 0, "Matches": 0}
        for excel_file in listdir(excel_directory):
            excel_plate_name = excel_file.split('.')[0]
            if (excel_plate_name in plate_name):
                match_tracker["Matches"] += 1
            else:
                match_tracker["Non-matches"] += 1
        if (match_tracker["Matches"] != 1):
            print(f"The txt file with the plate ID {plate_name} has {match_tracker['Matches']} Excel matches")
    print("Completed Checking")
    
def add_all_data_to_csv(excel_directory, txt_directory):
    gene_list = ['n_gene', 's_gene', 'ms2', 'orf1ab', 'rp_cy5']
    for excel_file in [f for f in listdir(excel_directory) if regex.match('\\d{6}-COV\\d{1,2}-[A-Z].xlsx', f)]:
        plate_name = excel_file.split('.')[0]
        txt_file = [f for f in listdir(txt_directory) if regex.match(f'{plate_name}_QuantStudio 12K Flex_export.txt', f)][0]
        n_gene, s_gene, ms2, orf1ab, rp_cy5 = get_gene_dfs(path.join(excel_directory, excel_file), path.join(txt_directory, txt_file))
        gene_df_list = [n_gene, s_gene, ms2, orf1ab, rp_cy5]
        for gene, gene_df in zip(gene_list, gene_df_list):
            add_to_csv(f"{gene}.csv", gene_df, plate_name)



In [None]:
n_gene, s_gene, ms2, orf1ab, rp_cy5 = get_gene_dfs("090120-COV1-P.xlsx", "090120-COV1-P_QuantStudio 12K Flex_export.txt")
gene_list = [n_gene, s_gene, ms2, orf1ab, rp_cy5]
genes = ["n_gene", "s_gene", "ms2", "orf1ab", "rp_cy5"]
for gene_df, gene in zip(gene_list, genes):
    make_gene_csv(gene_df, gene, "090120-COV1-P")

In [4]:
txt_directory = "[txt_directory path]"
excel_directory = "[excel_directory path]"

In [None]:
add_all_data_to_csv(excel_directory, txt_directory)