In [None]:
import math
import os
import pandas as pd
import numpy as np


In [None]:
def well_namer(row, col):
    well_name = str(chr(ord('@')+ row)) + str(col).rjust(2, '0')  #make the number have a left align, adding a zero
    return well_name

import re

def extract_number(string):
    match = re.search(r'\d+', string)
    if match:
        return int(match.group())
    else:
        return None




In [None]:
# Get 96-well plate CSV file from benchiling/Notion NOTE: don't initialze an empty df, use a list and convert after.

plate_path = (
    "plate_metadata/"  #'/home/mattiazzilab/Documents/Allie_Scripts/May 8 seeding.csv'
)

export_path = "plate_metadata/"  #'/mnt/bigdisk1/Allie_S/Replicative_Age_Project/Data Mining/metadata/'


def load_plate_df(path):
    raw_plate_df = pd.read_csv(path, header=0, usecols=range(1, 13)).dropna(
        axis=1, how="all"
    )
    plate_df = raw_plate_df.dropna(axis=0, thresh=2)
    print(plate_df.shape)  # Checks if correct number of rows and columns

    return plate_df


def make_map_df(condition_cols, aux_cols):
    """
    Return a df with the required columns
    """
    main_columns = [
        "Metadata_Well",
        "Metadata_WellRow",
        "Metadata_WellColumn",
        "Metadata_Field",
        "Metadata_RowColFieldCode",
        "Staining",
    ]
    df_cols = main_columns + condition_cols + aux_cols
    plate_map_df = pd.DataFrame(columns=df_cols)
    print(plate_map_df.columns)
    return plate_map_df


# plate_df.head(13)
# print(columns_row)
condition_cols = [
    "SerialPassage_BatchNumber",
    "AgeGroup",
    "PassageNumber",
]
aux_cols = [
    "Drug",
    "FlaggedBatch",
    "TimepointName",
]
make_map_df(condition_cols, aux_cols)

In [None]:
def conditional_ops(condition_cols, well_metadata, aux_cols=[]):
    updates_dict = {}
    text = well_metadata.split(" ")
    # use regex to extract the numerical bits e.g.  grab the number after the B in the serial passage batch
    all_cols = condition_cols+aux_cols
    for condition in all_cols:
        if condition == "SerialPassage_BatchNumber":
            updates_dict[condition] = text[0].split("B")[1] 
        elif condition == "TreatmentGroup":
            updates_dict[condition] = text[0]
        elif condition == "AgeGroup":
            # time used to mean age group - deprecated term but still used in code
            updates_dict[condition] = extract_number(text[1])  
        elif condition == "PassageNumber":
            # passage_number(Int(time)) - use the function if you don;t have passage number in the table
            updates_dict[condition] = extract_number(text[2]) 
        elif condition == "Drug":
            # grab drug, name an
            if ("_" in text[1]):  
                # if there is an underscore than the well is drug-treated for this group
                updates_dict[condition] = text[1].split("_")[1]  
                # grab the drug name past the underscore
            else:
                updates_dict[condition] = "None"
        elif condition == "FlaggedBatch":
            # flag passage if we have "Flagged" in the serial passage batch
            updates_dict[condition] = "Flagged" in text[0]
        elif condition == "TimepointName":
            updates_dict[condition] = text[0] + " " + text[1] + " " + text[2]
        elif condition == "ShortStaining":
            new_text = []
            for term in text:
                term = term.removesuffix("INK4A")
                term = term.removesuffix("CIP1WAF1")
                if term != "+":
                    new_text.append(term)
            stains = "_".join(new_text[len(condition_cols) : len(new_text)])
            updates_dict[condition] = stains
    return updates_dict

In [None]:
def export_platemap_csv(plate_df, plate_name, export_path, condition_cols, aux_cols=[]):
    plate_map_df = make_map_df(condition_cols,aux_cols)
    columns_row = plate_df.columns  # get_columns_row(plate_df)
    for index, data in plate_df.iterrows():
        row = data.to_list()
        for count, value in enumerate(row):
            curr_well = value
            if pd.isna(curr_well):
                continue
            row_list = []
            # get string data and label of row in df (e.g. col1, text= R1T0 EAA1-488 Tfn-647)
            # regex to separate into different variables
            # then add them to dict with their respective col index (label) and index of the row in the column (column.index)
            for i in range(40):
                row_entry = {}

                row_index = index + 1  # Make it 1-indexed

                column_index = columns_row[count]  # Use header row for column index
                # print(column_index)

                if ~np.isnan(row_index):
                    well_name = well_namer(row_index, column_index)
                else:
                    well_name = "Empty"
                    continue
                
                # information for the field is just 1-40, nothing else changes
                field = i + 1
                rowcolfield = f"r{str(row_index).rjust(2, '0')}c{str(column_index).rjust(2, '0')}f{str(field).rjust(2, '0')}"
                
                # seperate well metadata by space
                text = curr_well.split(" ")
                
                #stains come after all the conditions
                n_conditions = len(condition_cols)
                stains = " ".join(
                    text[n_conditions : len(text)]
                ) 
                
                row_entry.update(
                    {
                        "Metadata_Well": well_name,
                        "Metadata_WellRow": row_index,
                        "Metadata_WellColumn": column_index,
                        "Metadata_Field": field,
                        "Staining": stains.strip(),
                        "Metadata_RowColFieldCode":rowcolfield
                    }
                )
                conditional_entries = conditional_ops(condition_cols, curr_well, aux_cols)
                row_entry.update(conditional_entries)

                row_list.append(row_entry)

            rows = pd.DataFrame(row_list)
            plate_map_df = pd.concat([plate_map_df, rows], ignore_index=True)
            # column_index = column_index+1
    plate_map_df.to_csv(os.path.join(export_path, f"{plate_name}_map.csv"), index=False)

In [None]:

for root, dirs, files in os.walk(plate_path):
    for filename in files:
        if filename.endswith(".csv") and "map" not in filename and "Pilot" not in filename:
            file_path = os.path.join(root,filename)
            plate_df = load_plate_df(os.path.abspath(file_path))
            display(plate_df)
            export_path = os.path.abspath(root)
            print(f"Exporting {filename} to {export_path}")
            plate_name = filename.split(".")[0]
            export_platemap_csv(plate_df, plate_name, export_path, condition_cols, aux_cols=aux_cols)
            

In [None]:
#for the pilot one
for root, dirs, files in os.walk(plate_path):
    for filename in files:
        if (
            filename.endswith(".csv")
            and "map" not in filename
            and "Pilot" in filename
        ):
            file_path = os.path.join(root, filename)
            plate_df = load_plate_df(os.path.abspath(file_path))
            display(plate_df)
            export_path = os.path.abspath(root)
            print(f"Exporting {filename} to {export_path}")
            plate_name = filename.split(".")[0]
            
            pilot_conditions = ["TreatmentGroup"]
            pilot_aux = ["ShortStaining"]
            
            export_platemap_csv(
                plate_df, plate_name, export_path, condition_cols=pilot_conditions, aux_cols=pilot_aux
            )
