<a href="https://colab.research.google.com/github/dhynasah/dg_coding_interview/blob/main/roc_analysis_dash_app_testing_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import pandas as pd
import numpy as np
from typing import Union


class ADFCalculations:
    """
    A class used to parse and store data related to a NeuMoDx ADF RawDataExport File
    """

    settings_df = pd.DataFrame

    def __init__(
        self,
        sample_key: pd.DataFrame,
        adf_df: pd.DataFrame,
        parameter: str,
        channel: str,
        specimen_type: str,
        increment_list: list,
    ) -> None:
        self.sample_key = pd.DataFrame(sample_key)
        self.adf_df = pd.DataFrame(adf_df)
        self.parameter = parameter
        self.specimen_type = specimen_type
        self.channel = channel
        self.increment_list = increment_list
        self.increment_calculations()

    @staticmethod
    def check_cutoffs(sk_group_df: pd.DataFrame, adf_df: pd.DataFrame) -> pd.DataFrame:
        """This function takes 2 dataframes, sample_key and adf_df and

        Args:
            sk_group_df: a dataframe created from group_by where all the rows have the
             same target name and specimen type
            adf_df: a dataframe that has been modified by the adfparser class
        Returns:
            a pandas dataframe with a new column, simulated target result

        """
        sk_group_df = sk_group_df.copy()
        target_name_list = list(sk_group_df["Target Name"].unique())
        target_name_str = ""
        target_name = target_name_str.join(target_name_list)
        specimen_type_list = list(sk_group_df["Target Setting Specimen Type"].unique())
        specimen_type_str = ""
        specimen_type = specimen_type_str.join(specimen_type_list)

        min_peak_height = adf_df[
            (adf_df["TargetName"] == target_name)
            & (adf_df["SpecimenType"] == specimen_type)
        ]["Minimum Peak Height"].values[0]

        min_EPF = adf_df[
            (adf_df["TargetName"] == target_name)
            & (adf_df["SpecimenType"] == specimen_type)
        ]["Minimum End Point Fluorescence"].values[0]

        peak_max_cycle = adf_df[
            (adf_df["TargetName"] == target_name)
            & (adf_df["SpecimenType"] == specimen_type)
        ]["Peak Maximum Cycle"].values[0]

        overall_threshold = adf_df[
            (adf_df["TargetName"] == target_name)
            & (adf_df["SpecimenType"] == specimen_type)
        ]["Overall EPR Threshold"].values[0]

        EPR_check_ct_threshold = adf_df[
            (adf_df["TargetName"] == target_name)
            & (adf_df["SpecimenType"] == specimen_type)
        ]["EPR Check Ct Threshold"].values[0]

        EPR_threshold = adf_df[
            (adf_df["TargetName"] == target_name)
            & (adf_df["SpecimenType"] == specimen_type)
        ]["EPR Threshold"].values[0]
        sk_group_df["Max Peak Height"] = sk_group_df["Max Peak Height"].replace(
            "", np.nan, regex=True
        )
        sk_group_df["End Point Fluorescence"] = sk_group_df[
            "End Point Fluorescence"
        ].replace("", np.nan, regex=True)
        sk_group_df["Ct"] = sk_group_df["Ct"].replace("", np.nan, regex=True)
        sk_group_df["EPR"] = sk_group_df["EPR"].replace("", np.nan, regex=True)

        sk_group_df["Observed Result"] = np.where(
            (
                (sk_group_df["Max Peak Height"] > min_peak_height)
                & (sk_group_df["End Point Fluorescence"] > min_EPF)
                & (sk_group_df["Ct"] <= peak_max_cycle + 0.5)
                & (sk_group_df["EPR"] >= overall_threshold)
                & (
                    (sk_group_df["Ct"] > EPR_check_ct_threshold)
                    | (
                        (sk_group_df["Ct"] <= EPR_check_ct_threshold)
                        & (sk_group_df["EPR"] > EPR_threshold)
                    )
                )
            ),
            "POS",
            "NEG",
        )
        return sk_group_df

    def group_by_cutoffs(
        self, sample_key: pd.DataFrame, adf_df: pd.DataFrame
    ) -> pd.DataFrame:
        """

        Args:
            sample_key: dataframe with all samples
            adf_df:a dataframe that has been modified by the adfparser class

        Returns:

        """
        sample_df = sample_key.groupby(
            ["Target Name", "Target Setting Specimen Type"], group_keys=False
        ).apply(self.check_cutoffs, adf_df)

        return sample_df

    @staticmethod
    def classification(expected_result: str, observed_result: str) -> Union[str, None]:
        """This function takes the value from the expected result and the simulated target results column
        and uses them to determine if that sample row is TP, TN, FN, FP

        Args:
            expected_result: a string that is either "POS" or "NEG"
            observed_result: as string that is either "POS" or "NEG"

        Returns: String either True positive (TP), True Negative(TN), False Positive(FP) or False Negative(FN)

        """
        positive_result = "POS"
        negative_result = "NEG"
        if expected_result == positive_result and observed_result == positive_result:
            return "TP"
        elif expected_result == negative_result and observed_result == positive_result:
            return "FP"
        elif expected_result == negative_result and observed_result == negative_result:
            return "TN"
        else:
            return "FN"

    def vectorize_classification(self, sample_key_df: pd.DataFrame) -> pd.DataFrame:
        """This function preforms the classification function on all rows in the dataframe comparing the expected results
         column to the observed result column.

        Args:
            sample_key_df: sample data

        Returns: sample key dataframe with a new classification column

        """
        sample_key_df["Classification"] = np.vectorize(self.classification)(
            sample_key_df["Expected Result"], sample_key_df["Observed Result"]
        )
        return sample_key_df

    def sensitivity_specificity_calculations(
        self, vectorized_sample_key: pd.DataFrame, setting: int
    ) -> dict:
        """This function takes a vectorized sample key dataframe. That means the sample key has a classification column,
        it creates a slice of that dataframe that has only the channel of interest and specimen type. It then places the
        counts of the values in the classification column into a series. the series is turned into a dictionary.
        Sensitivity and specificity are set to zero. and if any of the necessary values (TN,FN,TP,FP) are not in the
        dictionary they are added and set to zero. if TP is not zero then sensitivity is calculated. If TN is not
        zero specificity is calculated. The youden index is  next. All calculations are values are in a dictionary.

        Args:
            vectorized_sample_key:
            setting:

        Returns:

        """
        channel_specimen_type_df = vectorized_sample_key.loc[
            (vectorized_sample_key["Channel"] == self.channel)
            & (
                vectorized_sample_key["Target Setting Specimen Type"]
                == self.specimen_type
            )
        ]
        class_counts = channel_specimen_type_df["Classification"].value_counts()
        class_counts_dict = class_counts.to_dict()
        sensitivity = 0
        specificity = 0
        if "TP" not in class_counts_dict:
            class_counts_dict["TP"] = 0
        if "TN" not in class_counts_dict:
            class_counts_dict["TN"] = 0
        if "FN" not in class_counts_dict:
            class_counts_dict["FN"] = 0
        if "FP" not in class_counts_dict:
            class_counts_dict["FP"] = 0

        if class_counts_dict["TP"] != 0:
            sensitivity = class_counts_dict["TP"] / (
                class_counts_dict["TP"] + class_counts_dict["FN"]
            )
        if class_counts_dict["TN"] != 0:
            specificity = class_counts_dict["TN"] / (
                class_counts_dict["TN"] + class_counts_dict["FP"]
            )

        youden_index = sensitivity + specificity - 1
        settings_dict = {
            "setting": setting,
            "True Negatives": class_counts_dict["TN"],
            "True Positives": class_counts_dict["TP"],
            "False Negatives": class_counts_dict["FN"],
            "False Positives": class_counts_dict["FP"],
            "Analytical Sensitivity": sensitivity,
            "Analytical Specificity": specificity,
            "1-Analytical Specificity": (1 - specificity),
            "Youden Index": youden_index,
        }
        return settings_dict

    def increment_calculations(self) -> None:
        """increment_calculations takes the increment list and parses through it. for each value in the increment list,
        the parameter of choice is set to the value, and group_by_cutoffs, vectorize_classification, and
         sensitivity_specificity_calc is applied to the dataset. the dictionary that is returned from the last function
         is placed in a list. The list is turned into a dataframe.
        """
        sample_key = self.sample_key.copy()
        adf_df = self.adf_df.copy()
        target_name = sample_key.loc[
            sample_key["Channel"] == self.channel, "Target Name"
        ].values[0]

        sample_key.replace("", np.nan)
        settings_dict_list = []
        for i in self.increment_list:
            adf_df.loc[
                (
                    (adf_df["TargetName"] == target_name)
                    & (adf_df["SpecimenType"] == self.specimen_type)
                ),
                self.parameter,
            ] = i
            grouped_sample_key = self.group_by_cutoffs(sample_key, self.adf_df)
            vectorized_sample_key = self.vectorize_classification(grouped_sample_key)
            setting_dict = self.sensitivity_specificity_calculations(
                vectorized_sample_key, i
            )
            settings_dict_list.append(setting_dict)

        self.settings_df = pd.DataFrame(settings_dict_list)
        adf_df[self.parameter] = self.adf_df[self.parameter]


In [3]:
from typing import Any
def target_results_calculations(
    sample_key: pd.DataFrame,
    adf_df: pd.DataFrame,
    specimen_type: str,
    channel: str,
    parameter: str,
    start: int,
    end: int,
    increment: int,
) -> Any:
    """
    target_results_calculations takes 2 dataframes (sample_key file and ADF), specimen type, channel, parameter, start,
    end and increment as parameters. The start, end and increment are used to make a list of settings using np.arange.
    Then an adf_calc_engine object is created with that list as well as all the other parameters. This object is used
    to return a pandas dataframe that has True Negatives counts, True Positives, False Negatives, False Positives,
    Analytical Sensitivity, Analytical Specificity, 1-Analytical Specificity, Youden Index.
    Args:
        sample_key: sample_key dataframe
        adf_df: adf dataframe
        parameter:a parameter from the adf
        channel: includes yellow, red, far_red, green orange.
        specimen_type: either option is UserSpecified1 or TransportMedium
        start: an integer
        increment: an integer
        end: and integer

    Returns: a dataframe that has sensitivity specificity calculations for each setting

    """

    increment_list = list(np.arange(int(start), int(end), int(increment)))

    calc_obj = ADFCalculations(
        sample_key=sample_key,
        adf_df=adf_df,
        parameter=parameter,
        specimen_type=specimen_type,
        channel=channel,
        increment_list=increment_list,
    )

    return calc_obj

In [4]:
import numpy as np
import pandas as pd
from typing import Union, TextIO
from io import BytesIO
from pathlib import Path

pd.set_option("display.max_columns", None)
pd.set_option("display.width", None)


def import_sample_key(sample_key: Union[str, Path, TextIO, BytesIO]) -> pd.DataFrame:
    """Read in and validate a  dataset
    Args:
        file path string
    Returns:
        pandas Dataframe
    Raises:
        ValueError: If data_set is missing required columns.
    """
    sample_key_cols = (
        "Target Setting Specimen Type",
        "Sample ID",
        "Channel",
        "Target Name",
        "Assay Name",
        "Assay Version",
        "Localized Result",
        "Expected Result",
        "Ct",
        "End Point Fluorescence",
        "EPR",
        "Max Peak Height",
        "Flags",
    )

    sample_data: pd.DataFrame = pd.read_excel(
        sample_key,
    )
    sample_data.astype(
        {
            "Ct": float,
            "End Point Fluorescence": float,
            "EPR": float,
            "Max Peak Height": float,
        }
    )
    sample_data = sample_data.replace(np.nan, "", regex=True)
    #roc.validate_required_columns(required_columns=sample_key_cols, df=sample_data)
    return sample_data

In [8]:
from typing import Union, Any
from pathlib import Path
from io import BytesIO
import pandas as pd


class ADFParser:
    """
    A class used to parse and store data related to a NeuMoDx ADF RawDataExport File
    """

    adf_clean_dataframes: dict[Any, Any]
    adf_tabs: list[Any]
    adf_raw_dataframes: dict[Any, Any]

    def __init__(self, filename: Union[str, Path, BytesIO]) -> None:
        self.filename = filename
        self.adf_tabs = []
        self.adf_raw_dataframes = {}
        self.adf_clean_dataframes = {}
        self.get_all_adf_data()

    @staticmethod
    def get_adf_tabs(filepath: Union[str, Path, BytesIO]) -> list[str]:
        """
        A function used to retrieve the ADF tabs from a NeuMoDx RawDataExport File.

        Args:
          filepath (str): Name of the NeuMoDx RawDataExport File to read from.

        Returns:
          list[str]: A list of sheet names that are the ADF tabs found in the File.
        """
        # Create Excel file Object from the filepath
        xls_file = pd.ExcelFile(filepath)

        # Get the sheet names that correspond to ADF Tabs
        adf_sheet_names = [x for x in xls_file.sheet_names if "ADF" in x]
        if len(adf_sheet_names) == 0:
            raise ValueError("File uploaded must include ADFs")

        return adf_sheet_names

    @staticmethod
    def read_adf_tab(
        filepath: Union[str, Path, BytesIO], sheet_name: str
    ) -> pd.DataFrame:
        """
        A function used to read the data contained in ADF Tab into a pandas DataFrame.

        Args:
         filepath (str): Name of the NeuMoDx RawDataExport File to read from.
         sheet_name (str): Name of sheet containing ADF Data.

        Returns:
         pd.Dataframe: DataFrame Representation of the ADF Tab
        """
        # Read data associated with the adf_tab into a Dataframe
        adf_raw_dataframe = pd.read_excel(filepath, sheet_name=sheet_name)[
            ["Key", "Value"]
        ]
        return adf_raw_dataframe

    def clean_adf_data(self, adf_raw_dataframe: pd.DataFrame) -> pd.DataFrame:
        """
        A function used to clean the raw data from the ADF tab into a more usable Data

        Args:
          adf_raw_dataframe (pd.DataFrame): adf_raw_dataframe to be cleaned.

        Returns:
          pd.DataFrame: A cleaned representation of the adf dataframe.
        """
        #
        adf_clean_dataframe = drop_excess_rows(adf_raw_dataframe)
        # Create a new column corresponding to the TargetName:
        adf_clean_dataframe["TargetName"] = self.get_series_substring(
            input_series=adf_clean_dataframe["Key"].values,
            start_substring="XPCR Target ",
            end_substring=" Setting",
        )

        # Create a new column corresponding to the SpecimenType:
        adf_clean_dataframe["SpecimenType"] = self.get_series_substring(
            input_series=adf_clean_dataframe["Key"].values,
            start_substring="SpecimenType ",
            end_substring=" - ",
        )

        # Create a new column corresponding to the Parameter name:
        adf_clean_dataframe["Parameter"] = self.get_series_substring(
            input_series=adf_clean_dataframe["Key"].values, start_substring=" - "
        )

        # Set Index to the individual properties parsed from the Key Column
        adf_clean_dataframe.set_index(
            ["TargetName", "SpecimenType", "Parameter"], inplace=True
        )

        # Drop the Key Column
        adf_clean_dataframe.drop("Key", axis=1, inplace=True)

        # Pivot DataFrame to new format
        adf_clean_dataframe = adf_clean_dataframe.reset_index().pivot(
            index=["TargetName", "SpecimenType"], columns=["Parameter"], values="Value"
        )
        adf_clean_dataframe.reset_index(inplace=True)
        return adf_clean_dataframe

    @staticmethod
    def get_series_substring(
        input_series: pd.Series,
        start_substring: str | None = None,
        end_substring: str | None = None,
    ) -> list[Any]:
        """
        A function used to create a new column based on a string found between
         a start_substring and end_substring contained in the pandas Series

        Args:
          input_series (pd.Series): Series to be parsed.
          start_substring (str|None): Text to serve as the minimum boundary.
          end_substring (str|None): Text substring to server as the maximum boundary.
        Returns:
          pd.Series: A series corresponding the values between the start and end boundaries


        """

        # Case handling for both start_string and end_substring defined.
        new_series_values = []
        if start_substring and end_substring:
            new_series_values = [
                x[
                    x.find(start_substring)
                    + len(start_substring) : x.find(end_substring)
                ]
                for x in input_series
            ]

        # Case handling for start_string defined and end_substring not defined.
        elif start_substring and not end_substring:
            new_series_values = [
                x[x.find(start_substring) + len(start_substring) :]
                for x in input_series
            ]

        # Case handling for start_string not defined and end_substring defined.
        elif not start_substring and end_substring:
            new_series_values = [x[: x.find(end_substring)] for x in input_series]

        return new_series_values

    def get_all_adf_data(self) -> None:
        """
        A function used to parse and clean all ADF data associated with a NeuMoDx Raw Data Export File.
        Returns:

        """

        self.adf_tabs = self.get_adf_tabs(filepath=self.filename)

        for adf in self.adf_tabs:
            self.adf_raw_dataframes[adf] = self.read_adf_tab(
                filepath=self.filename, sheet_name=adf
            )
            self.adf_clean_dataframes[adf] = self.clean_adf_data(
                adf_raw_dataframe=self.adf_raw_dataframes[adf]
            )


def drop_excess_rows(df: pd.DataFrame) -> pd.DataFrame:
    """
    Args:
        df: pandas dataframe with key and value columns
    Returns:
        pd.Dataframe
    """
    df["sample_info"] = df["Key"].apply(key_string_bool)
    false_index_values = df[(df["sample_info"] == False)].index
    df.drop(false_index_values, inplace=True)
    df.drop(columns="sample_info", inplace=True)
    return df


def key_string_bool(adf_key_string: str) -> bool:
    """
    Args:
        adf_key_string: a string from the key column that
         contains information on specimen type and target
    Returns:
        this function returns True or False
    """

    if "xpcr target" and "specimentype" in adf_key_string.lower():
        return True
    else:
        return False


In [5]:
increment_list = list(np.arange(0,100,5))
parameter = "Peak Maximum Cycle"
Channel = "Yellow"
Specimen_type = "UserSpecified1"

In [7]:
sample_key = import_sample_key("/content/sample_data/Combined_Channels_sample_key.xlsx")

In [9]:
adf_obj = ADFParser("/content/sample_data/RawDataExport.96-3.96000003.2208021509.8EB890A6.xlsx")

In [10]:
adf_df = adf_obj.adf_clean_dataframes[adf_obj.adf_tabs[0]]

In [11]:
adf_calc = ADFCalculations(sample_key,adf_df,parameter,Channel,Specimen_type,increment_list)

In [12]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go

In [21]:
settings_df = adf_calc.settings_df
settings_df.head(3)

Unnamed: 0,setting,True Negatives,True Positives,False Negatives,False Positives,Analytical Sensitivity,Analytical Specificity,1-Analytical Specificity,Youden Index
0,0,23,5,33,6,0.131579,0.793103,0.206897,-0.075318
1,5,23,5,33,6,0.131579,0.793103,0.206897,-0.075318
2,10,23,5,33,6,0.131579,0.793103,0.206897,-0.075318


In [None]:
fig = go.Figure()
fig.add_trace(
    go.Scatter(x=settings_df["setting"], y=settings_df["Analytical Sensitivity"],name="Senstivity",mode='lines+markers')
)
fig.add_trace(
    go.Scatter(x=settings_df["setting"],y=settings_df["Analytical Specificity"],name="Specificity",mode='lines+markers')
)
fig.update_layout(title='Variation of Analytical Sensitivity/ Specificity',
                   yaxis_title='Sensitivity/Specficity',
                   xaxis_title="{0}".format(parameter))



In [None]:
fig1 = go.Figure()
fig1.add_trace(
    go.Scatter(x=settings_df["1-Analytical Specificity"], y=settings_df["Analytical Sensitivity"],name="Senstivity (True Positive Rate)",mode='lines+markers')
)

fig1.update_layout(title='ROC Curve',
                   xaxis_title='1-Specificity(False Positive Rate)',
                   yaxis_title="Sensitivity(True Positive Rate)")

In [None]:
fig2 = go.Figure()
fig2.add_trace(
    go.Scatter(x=settings_df["setting"], y=settings_df["Youden Index"],name="Youden Index",mode='lines+markers')
)

fig2.update_layout(title='Youden Index',
                   yaxis_title='Youden Index(Sensitivity+Specificity-1)',
                   xaxis_title="{0}".format(parameter))