In [None]:
import sys
import os
import yaml
import numpy as np
from scipy import stats

import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.express as px
import plotly.graph_objects as go

import pandas as pd
pd.options.mode.chained_assignment = None

sys.path.append('../python/')
from plotly_figure_parameters import dict_y_axis_parameters, dict_font_parameters, dict_x_axis_parameters_categorical
from dashboard_notebook_util import (
    read_estimated_concentrations, 
    read_plate_data_with_calibration_concentrations,
    read_quality_control_concentrations
)

import logging
logging.basicConfig(level=logging.DEBUG, filename="dash_logs.log")

In [None]:
dict_parameters = yaml.safe_load(open("../parameters/july_2024_data_parameters.yaml", "r"))

In [None]:
pd_df_plate_data = read_plate_data_with_calibration_concentrations(dict_parameters)
pd_df_estimated_concentrations = read_estimated_concentrations(dict_parameters)

In [None]:
pd_df_quality_control_concentrations = pd.read_csv(
    open(
        os.path.join(
            dict_parameters["base directory path"],
            dict_parameters["data directory"],
            dict_parameters["quality control concentrations file name"]
        ),
        "rb"
    )
)

In [None]:
pd_df_plate_data

In [None]:
pd_df_quality_control_concentrations

In [None]:
pd_df_estimated_concentrations

In [None]:
def perform_t_test_on_paired_wells(pd_group):
    if len(pd_group) != 2:
        return np.nan
    mean_1, mean_2 = pd_group["IFN-gamma Trimmed Mean"].values
    std_dev_1, std_dev_2 = pd_group["IFN-gamma Trimmed Standard Deviation"].values
    count_1, count_2 = pd_group["IFN-gamma Count"].values
    t_statistic, p_value = stats.ttest_ind_from_stats(
        mean_1, 
        std_dev_1, 
        count_1, 
        mean_2, 
        std_dev_2, 
        count_2,
        equal_var=False,
    )
    return p_value

In [None]:
pd_df_tested = (
    pd_df_plate_data
    .groupby(["sample name plate", "plate number"])
    .apply(perform_t_test_on_paired_wells, include_groups=False)
    .reset_index()
)
pd_df_tested.columns = ["sample name plate", "plate number"] + ['t test p value']
pd_df_tested = pd.merge(
    pd_df_tested, 
    pd_df_plate_data[["sample name plate", "plate number", "IFN-gamma Trimmed Mean", "IFN-gamma Trimmed Standard Deviation", "IFN-gamma Count"]], 
    on = ["sample name plate", "plate number"], 
    how = "left"
)

In [None]:
pd_df_tested

In [None]:
pd_df_tested_unique = pd_df_tested.drop_duplicates(subset = ["sample name plate", "plate number"])

In [None]:
pd_df_tested_unique

In [None]:
import seaborn as sns
sns.histplot(pd_df_tested_unique, x = "t test p value", bins = 20)

In [None]:
def calculate_paired_intra_plate_cv(str_analyte, pd_group):
    if len(pd_group) != 2:
        return np.nan
    str_concentration_column_prefix = dict_parameters["column name prefix for estimated concentrations"]
    estimate_1, estimate_2 = pd_group[f"{str_concentration_column_prefix}{str_analyte}"].values
    mean = (estimate_1 + estimate_2) / 2
    std_dev = np.sqrt((estimate_1 - mean)**2 + (estimate_2 - mean)**2)
    return std_dev / mean


In [None]:
def calculate_paired_intra_plate_rel_abs_diff(str_analyte, pd_group):
    if len(pd_group) != 2:
        return np.nan
    str_concentration_column_prefix = dict_parameters["column name prefix for estimated concentrations"]
    estimate_1, estimate_2 = pd_group[f"{str_concentration_column_prefix}{str_analyte}"].values
    mean = (estimate_1 + estimate_2) / 2
    return np.abs(estimate_1 - estimate_2) / mean

In [None]:
def calculate_max_gradient(str_analyte, pd_group):
    str_gradient_column_prefix = dict_parameters["column name prefix for calibration curve gradient"]
    if len(pd_group) < 2:
        return pd_group[f"{str_gradient_column_prefix}{str_analyte}"].values[0]
    else:
        return np.max(pd_group[f"{str_gradient_column_prefix}{str_analyte}"].values)

In [None]:
def get_table_of_duplicate_qc_checks(function_check, str_check_name):
    list_pd_df_estimated_concentrations_checked_one_analyte = []
    for str_analyte in dict_parameters["list of analytes"]:
        pd_df_estimated_concentrations_checked_one_analyte = (
            pd_df_estimated_concentrations
            .groupby(["sample name annotations", "plate number"])
            .apply(lambda x: function_check(str_analyte, x), include_groups=False)
            .reset_index()
        )
        pd_df_estimated_concentrations_checked_one_analyte.columns = ["sample name annotations", "plate number"] + [f"{str_check_name} {str_analyte}"]
        #[dict_parameters["column name prefix for calibration curve gradient"] + str_analyte]
        
        list_pd_df_estimated_concentrations_checked_one_analyte.append(pd_df_estimated_concentrations_checked_one_analyte)
    
    pd_df_estimated_concentrations_checked = list_pd_df_estimated_concentrations_checked_one_analyte[0]
    for pd_df_estimated_concentrations_checked_one_analyte in list_pd_df_estimated_concentrations_checked_one_analyte[1:]:
        pd_df_estimated_concentrations_checked = pd.merge(
            pd_df_estimated_concentrations_checked, 
            pd_df_estimated_concentrations_checked_one_analyte, 
            on = ["sample name annotations", "plate number"], 
            how = "outer"
        )
    return pd_df_estimated_concentrations_checked

In [None]:
pd_df_CV = get_table_of_duplicate_qc_checks(calculate_paired_intra_plate_cv, "CV")
pd_df_rel_abs_diff = get_table_of_duplicate_qc_checks(calculate_paired_intra_plate_rel_abs_diff, "rel. abs. diff.")
pd_df_gradients = get_table_of_duplicate_qc_checks(calculate_max_gradient, "max gradient")

In [None]:
pd_df_rel_abs_diff

In [None]:
pd_df_CV

In [None]:
pd_df_data = pd_df_CV.merge(pd_df_rel_abs_diff, on = ["sample name annotations", "plate number"], how = "outer")
pd_df_data = pd_df_data.merge(pd_df_gradients, on = ["sample name annotations", "plate number"], how = "outer")
pd_df_data

In [None]:
dash_app_object_boxes = dash.Dash(__name__)

dash_app_object_boxes.layout = html.Div([
    html.H1("Plots of statistics comparing duplicate samples"),

    html.Div([
        html.Label("Plot type:"),
        dcc.Dropdown(
            id='plot-type-dropdown',
            options=[
                {'label': 'box', 'value': 'box'},
                {'label': 'strip', 'value': 'strip'},
            ],
            value='box',
        ),
    ], style={'width': '20%', 'display': 'inline-block'}),

    html.Div([
        html.Label("Analyte:"),
        dcc.Dropdown(
            id='analyte-dropdown',
            options=[{'label': col, 'value': col} for col in dict_parameters["list of analytes"]],
            value=dict_parameters["list of analytes"][0]
        ),
    ], style={'width': '20%', 'display': 'inline-block'}),

    html.Div([
        html.Label("Statistic:"),
        dcc.Dropdown(
            id='statistic-dropdown',
            options=[
                {'label': 'CV', 'value': 'CV'},
                {'label': 'rel. abs. diff.', 'value': 'rel. abs. diff.'},
            ],
            value='CV',
        ),
    ], style={'width': '20%', 'display': 'inline-block'}),

    dcc.Graph(id='scatter-plot-duplicates')
], style={'backgroundColor': 'white', 'padding': '20px'})


@dash_app_object_boxes.callback(
    Output('scatter-plot-duplicates', 'figure'),
    Input('analyte-dropdown', 'value'),
    Input('plot-type-dropdown', 'value'),
    Input('statistic-dropdown', 'value'),
)
def update_graph(str_analyte, str_plot_type, str_statistic):
    # if str_statistic == "CV":
    #     pd_df_data = pd_df_CV
    # elif str_statistic == "rel. abs. diff.":
    #     pd_df_data = pd_df_rel_abs_diff

    str_column_name = f"{str_statistic} {str_analyte}"

    if str_plot_type == "box":
        fig = px.box(
            pd_df_data,
            x="plate number",
            y=str_column_name,
            hover_name="sample name annotations",
        )
    elif str_plot_type == "strip":
        # fig = px.strip(
        #     pd_df_data,
        #     x="plate number",
        #     y=str_column_name,
        #     hover_name="sample name annotations",
        #     color="max gradient " + str_analyte,
        # )
        
        fig = go.Figure()

        fig.add_trace(go.Scatter(
            x=pd_df_data["plate number"] + np.random.uniform(-0.1, 0.1, len(pd_df_data)),
            y=pd_df_data[str_column_name],
            mode='markers',
            marker=dict(
                size=8,
                color=pd_df_data["max gradient " + str_analyte],
                colorscale='Viridis',
                showscale=True,
                colorbar=dict(title='Value')
            ),
            #text=[pd_df_data["plate number"].iloc[i] for i in ],
            hovertemplate='Category: %{text}<br>Value: %{y:.2f}<extra></extra>'
        ))
        fig.update_traces(marker=dict(opacity=0.75))
    if str_statistic == "CV":
        str_y_axis_title = "%CV"
    else:
        str_y_axis_title = str_statistic
    fig.update_layout(
        xaxis=dict_x_axis_parameters_categorical,
        yaxis=dict_y_axis_parameters,
        font=dict_font_parameters,
        xaxis_title="Plate",
        yaxis_title=str_y_axis_title,
        plot_bgcolor='white',
        paper_bgcolor='white',
    )

    return fig

# Run the app
if __name__ == '__main__':
    dash_app_object_boxes.run(jupyter_mode="inline", debug = True, port=3310)