In [None]:
from dataclasses import dataclass
from pathlib import Path
from typing import Optional

import matplotlib.pylab as plt
import pandas as pd

import data
import recovery
import utils

In [None]:
parent_folder = utils.Configuration.RAW_DATA_PATH

df = data.Data(
    quant_file=parent_folder.joinpath("results.csv"),
    is_correspondence_file=parent_folder.joinpath("is_std_table_correspondence.csv"),
    sample_properties_file=parent_folder.joinpath("sample_properties.csv"),
    qc_file=parent_folder.joinpath("qc.csv"),
    is_concentration_file=parent_folder.joinpath("is_std_table_concentration.csv"),
)

In [None]:
df.validate_data()

In [None]:
# TODO test the recovery module
class Recovery:
    def __init__(
        self,
        peak_areas,
        is_concentration_file,
        sample_properties_file,
        is_correspondence_file,
    ):
        self.peak_areas = peak_areas.query("type == 'area'").set_index("name")
        self.is_concentration_file = is_concentration_file.set_index("name")
        self.sample_properties_file = sample_properties_file
        self.is_correspondence_file = is_correspondence_file

    @property
    def isrs_sample_names(self):
        return self.sample_properties_file.loc[
            lambda df: df.sample_type == "isrs", "sample_name"
        ].values

    @property
    def sample_names_not_isrs(self):
        return self.sample_properties_file.loc[
            lambda df: df.sample_type != "isrs", "sample_name"
        ].values

    @property
    def is_identity(self):
        return self.is_correspondence_file.internal_standard.unique()

    @property
    def rs_identity(self):
        return self.is_correspondence_file.external_standard.unique()

    def calculate_RRF(self):
        rs_area = self.filter_peak_areas(
            self.rs_identity, self.isrs_sample_names
        ).squeeze()
        is_area = self.filter_peak_areas(self.is_identity, self.isrs_sample_names)

        rs_amount = self.is_concentration_file_amount(self.rs_identity).squeeze()
        is_amount = self.is_concentration_file_amount(self.is_identity)

        return ((is_area * rs_amount) / rs_area).div(is_amount, axis="index")

    def filter_peak_areas(self, index, columns):
        return self.peak_areas.loc[
            self.peak_areas.index.isin(index),
            self.peak_areas.columns.isin(columns),
        ]

    def is_concentration_file_amount(self, index):
        return self.is_concentration_file.loc[index, "amount"]

    def calculate_mean_RRF(self):
        return self.calculate_RRF().mean(axis="columns")

    def get_RFF_stats(self):
        return self.calculate_RRF().describe()

    def plot_RFF(self):
        fig, ax = plt.subplots()
        plot = self.calculate_RRF().boxplot(ax=ax, rot=90)
        ax.set_title("Boxplots for RFF values")

        return plot

    def calculate_recovery(self):
        rs_area = self.filter_peak_areas(
            self.rs_identity, self.sample_names_not_isrs
        ).squeeze()
        is_area = self.filter_peak_areas(self.is_identity, self.sample_names_not_isrs)

        rs_amount = self.is_concentration_file_amount(self.rs_identity).squeeze()
        is_amount = self.is_concentration_file_amount(self.is_identity)

        mean_RFF = self.calculate_mean_RRF()

        is_masses = ((is_area * rs_amount) / rs_area).div(mean_RFF, axis="index")

        return is_masses.div(is_amount, axis="index").mul(100)

    def get_recovery_stats(self):
        return self.calculate_recovery().describe()

    def plot_recovery(self):
        fig, ax = plt.subplots()
        plot = self.calculate_recovery().boxplot(ax=ax, rot=90)
        ax.set_title("Boxplots for recovery values")

        return plot

In [None]:
recovery.Recovery(
    df.quant_file,
    df.is_concentration_file,
    df.sample_properties_file,
    df.is_correspondence_file,
).plot_recovery()

In [None]:
Recovery(
    df.quant_file,
    df.is_concentration_file,
    df.sample_properties_file,
    df.is_correspondence_file,
).plot_recovery()