# PROG022 - Build Your Own Analysis: CODI CSV Analysis Tool

#### Box 1: Imports and Installs

In [None]:
%pip install ipywidgets==8.1.5
import csv
import json
from natsort import natsorted
import numpy as np
import os
import pandas as pd
import ipywidgets as widgets
from rich.progress import track

#### Box 2: Set Parameters

In [None]:
# Folder must have Loc CSVs from CODI
# Set parameters then run to generate checkboxes, then check the boxes and run Box 3

folder = "/Volumes/guttman/Guoming_Gao-Resnick/Data_BIF/DEFAULT_USER/20250424_ONIdemo_Guoming/Praneeth_data"  # String: Path to folder containing data
drift_correction = (
    True  # Boolean: True or False (To drift correct CSVs using CODI drift correction)
)
filter_locs = False  # Boolean: True or False (To filter CSVs using CODI filters)
save_ints = True  # Boolean: True or False (To save intermediate CSVs)
keyword = "dSTORM7"  # String: Filter files in the folder for only those containing this string

# Find point files, filter .json, and drift correction files
points_list = natsorted(
    [
        i
        for i in os.listdir(folder)
        if (
            i.endswith(".csv")
            and ("drift" not in i)
            and ("output" not in i)
            and ("result" not in i)
            and ("SNR" not in i)
            and ("densities" not in i)
            and (keyword in i)
        )
    ]
)
if filter_locs:
    filters_file = [g for g in os.listdir(folder) if ((g.endswith(".json")))][0]
if drift_correction:
    drift_files = [
        h for h in os.listdir(folder) if (h.endswith("drift_correction.csv"))
    ]

model_csv = pd.read_csv(os.path.join(folder, points_list[0]))
channelTitle = [
    p
    for p in model_csv.columns
    if ("channel" in p or "Channel" in p) and ("Name" not in p)
][0]
channels = np.unique(model_csv[channelTitle])

# Set up checkboxes with column titles
items = [widgets.Checkbox(value=False, description=c) for c in model_csv.columns]
ui = widgets.GridBox(
    items, layout=widgets.Layout(grid_template_columns="repeat(1,100px)")
)
display(ui)

#### Box 3: Run Analysis


In [None]:
# Filtering
def filter(points, filters_file, channels, channelTitle):
    f = open(os.path.join(folder, filters_file))
    filters = json.load(f)
    idx = 0
    for c in channels:
        channel_array = points.loc[points[channelTitle] == c]
        for i in filters["filters"][str(c)[0]]:
            column = [j for j in points.keys() if i in j]
            if len(column) > 0:
                channel_array = channel_array.loc[
                    channel_array[column[0]]
                    >= (filters["filters"][str(c)[0]][i]["min"])
                ]
                channel_array = channel_array.loc[
                    channel_array[column[0]]
                    <= (filters["filters"][str(c)[0]][i]["max"])
                ]
        if idx == 0:
            points_filtered = channel_array
            idx += 1
        else:
            points_filtered = pd.concat((points_filtered, channel_array))
    points_filtered.columns = points.columns
    if save_ints == True:
        points_filtered.to_csv(
            os.path.join(path, "{}_filtered.csv".format(os.path.split(path)[-1]))
        )

    return points_filtered


# Analysis
def analyze(
    points,
    items,
    channels,
    filter_locs,
    points_original,
    channelTitle,
    frameTitle,
    row_titles,
):
    # Set up data storage: rows are numChannels * numItems
    # Columns are each CSV
    data_dump = np.zeros(len(row_titles))

    # Populate values for each channel
    row = 0
    for c in range(len(channels)):
        # Populate number of locs before/after filtering if applicable
        if filter_locs:
            data_dump[row] = len(
                points_original.loc[points_original[channelTitle] == channels[c]]
            )
            row += 1
        data_dump[row] = len(points.loc[points[channelTitle] == channels[c]])
        row += 1
        channel_array = points.loc[points[channelTitle] == channels[c]]

        for a in range(len(items)):
            if items[a].value:
                try:
                    data_dump[row] = np.median(channel_array[str(items[a].description)])
                except TypeError:
                    data_dump[row] = 0
                    print(
                        "error in column",
                        items[a].description,
                        "-- might be text, not a number",
                    )
                row += 1

    return data_dump


# ----------------------------------MAIN--------------------------------------

numItems = sum([items[a].value for a in range(len(items))]) + int(filter_locs) + 1
data_total = np.zeros((numItems * len(channels), len(points_list)))
col = 0
for i in natsorted(points_list):
    # Open File
    print(i)
    points = pd.read_csv(os.path.join(folder, i))

    # Identify column titles
    channelTitle = [
        p
        for p in points.columns
        if ("channel" in p or "Channel" in p) and ("Name" not in p)
    ][0]
    filter_channels = [str(c)[0] for c in channels]
    frameTitle = [p for p in points.columns if ("frame" in p or "Frame" in p)][0]
    if "localization precision (nm)" in points.columns:
        if "CRLBXPosition" not in points.columns:
            points["CRLBXPosition"] = (
                points["localization precision (nm)"]
                * points["localization precision (nm)"]
            )
    elif "CRLBXPosition" in points.columns:
        points["localization precision (nm)"] = np.sqrt(
            points["CRLBXPosition"] + points["CRLBYPosition"]
        )
    else:
        points["localization precision (nm)"] = (
            points["X precision (nm)"] + points["Y precision (nm)"]
        ) / 2

    # Set up row columns for output file
    numChannels = len(channels)
    numItems = sum([items[a].value for a in range(len(items))]) + int(filter_locs) + 1
    # Set up row names
    row_titles = []
    for c in range(numChannels):
        row_titles.append("Channel {} Points".format(str(channels[c])[0]))
        if filter_locs:
            row_titles.append(
                "Channel {} Points Post-Filtering".format(str(channels[c])[0])
            )
        for e in range(len(items)):
            if items[e].value == 1:
                row_titles.append(
                    "Channel {} {}".format(str(channels[c])[0], items[e].description)
                )

    if drift_correction:
        # Identify more column titles and perform drift correction
        xTitle = [
            p for p in points.columns if (p == "x (nm)" or p == "X (nm)" or p == "x")
        ][0]
        yTitle = [
            p for p in points.columns if (p == "y (nm)" or p == "Y (nm)" or p == "y")
        ][0]
        drift_file = [k for k in drift_files if (i[:-4] in k)][0]
        drift = pd.read_csv(os.path.join(folder, drift_file))
        for di in track(points.index, description="Drift Correction Progress"):
            curr_frame = points[frameTitle][di]
            try:
                curr_x_drift = drift.iloc[int(curr_frame)]["x-drift (nm)"]
                curr_y_drift = drift.iloc[int(curr_frame)]["y-drift (nm)"]

                points.loc[di, xTitle] -= curr_x_drift
                points.loc[di, yTitle] -= curr_y_drift
            except IndexError:
                pass
        if save_ints == True:
            points.to_csv(os.path.join(folder, "{}_driftcorrected.csv".format(i)))
    if filter_locs:
        # Filter then analyze
        points_filtered = filter(points, filters_file, channels, channelTitle)
        data_column = analyze(
            points_filtered,
            items,
            channels,
            filter_locs,
            points,
            channelTitle,
            frameTitle,
            row_titles,
        )
    else:
        # Analyze
        data_column = analyze(
            points,
            items,
            channels,
            filter_locs,
            "",
            channelTitle,
            frameTitle,
            row_titles,
        )
    data_total[:, col] = data_column
    col += 1

df = pd.DataFrame(data_total, columns=natsorted(points_list))
df.index = row_titles
df_name = "{}_{}_drifted{}_filter{}_output.csv".format(
    os.path.split(folder)[-1], keyword, drift_correction, filter_locs
)
df.to_csv(os.path.join(folder, df_name))