# Imports

In [None]:
from pathlib import Path
import pandas as pd
import numpy as np
from pydantic import BaseModel, Field
import abc
from tqdm import tqdm

## Paths

In [None]:
data_path = Path("/Users/alexpayne/Scientific_Projects/mers-drug-discovery/sars2-retrospective-analysis/20240424_multi_pose_docking_cross_docking/")
csvs_path = data_path / ("results_csvs")
result_csv = csvs_path / "20240503_combined_results_with_data.csv"

In [None]:
output_data_path = Path("analyzed_data")

In [None]:
result_csv.exists()

## Load Data

In [None]:
df = pd.read_csv(result_csv, index_col=0)

In [None]:
df.nunique()

# Explanation

By this point I'd like to have some library code to point to for this kind of analysis. But that takes a while to write, and I'm realizing that having to look in another package just to figure out what I'm doing is kind of annoying. So instead of doing that I'm going to write out everything I'm doing here, if that ends up in library code someday (probably in harbor), that's great.

## Code Path

# Base Code

## utils

In [None]:
class ModelBase(BaseModel):
    
    @abc.abstractmethod
    def plot_name(self) -> str:
        pass
    
    @abc.abstractmethod
    def get_records(self) -> dict:
        pass

In [None]:
class SplitBase(ModelBase):
    name = 'SplitBase'
    variable: str = Field(description="Name of variable used to split the data")
    n_splits: int = Field(1, description="number of splits to generate")
    n_per_split: int = Field(..., description="Number of values per split to generate")
    
    @abc.abstractmethod
    def run(self, df:pd.DataFrame) -> [pd.DataFrame]:
        pass
    
    @property
    def plot_name(self) -> str:
        return f"{self.name}_{self.n_per_split}"
        
    def get_records(self) -> dict:
        return {"Split": self.name,
                  "N_Per_Split": self.n_per_split}
        
        

# Dataset Splits

In [None]:
class RandomSplit(SplitBase):
    """
    Randomly 
    """
    name = 'RandomSplit'
    
    def run(self, df: pd.DataFrame) -> [pd.DataFrame]:
        from random import shuffle
        variable_list = df[self.variable].unique()
        shuffle(variable_list)
        
        variable_splits = []
        dfs = []
        for i in range(self.n_splits):
            start = i * self.n_per_split
            end = i * self.n_per_split + self.n_per_split
            variable_splits.append(variable_list[start:end])
            dfs.append(df[df[self.variable].isin(variable_list[start:end])])
        return dfs
        

# Sorter

In [None]:
class SorterBase(ModelBase):
    name: str = Field(..., description="Name of sorting method")
    category: str = Field(..., description="Category of sort (i.e. why is sorting necessary here")
    variable: str = Field(..., description="Variable used to sort the data")
    higher_is_better: bool = Field(True, description="Higher values are better. Defaults True")
    number_to_return: None | int = Field(description="Number of values to return. Returns all values if None.")
    
    def run(self, df, groupby: list[str]) -> pd.DataFrame:
        return df.sort_values(self.variable, ascending=not self.higher_is_better).groupby(groupby).head(self.number_to_return)
    
    @property
    def plot_name(self) -> str:
        return f"{self.name}_Choose_{'All' if not self.number_to_return else self.number_to_return}"
    
    def get_records(self) -> dict:
        return {self.category: self.name, f"{self.category}_Choose_N":'All' if not self.number_to_return else self.number_to_return}

In [None]:
class StructureChoice(SorterBase):
    category = 'StructureChoice'

In [None]:
class Scorer(SorterBase):
    category = 'Score'

# Poses

# Scoring

# Evaluation

In [None]:
from pydantic import confloat
class FractionGood(ModelBase):
    name = 'FractionGood'
    total: int = Field(..., description='Total number of items being evaluated')
    fraction: confloat(ge=0, le=1) = Field(..., description='Fraction of "good" values returned')
    replicates: list[float] = Field([], description='List of "good" fractions for error bar analysis')
    
    @property
    def min(self) -> float:
        return np.array(self.replicates).min()
    
    @property
    def max(self) -> float:
        return np.array(self.replicates).max()
    
    @property
    def ci_upper(self):
        n_reps = len(self.replicates)
        self.replicates.sort()
        return self.replicates[int(0.975 * n_reps)]
    
    @property
    def ci_lower(self):
        n_reps = len(self.replicates)
        self.replicates.sort()
        return self.replicates[int(0.025 * n_reps)]
    
    @classmethod
    def from_replicates(cls, reps = list['FractionGood']) -> 'FractionGood':
        all_fracs = np.array([rep.fraction for rep in reps])
        totals = np.array([rep.total for rep in reps])
        return FractionGood(total=totals.mean(), fraction=all_fracs.mean(), replicates=list(all_fracs))
    
    def get_records(self) -> dict:
        mydict = {"Min": self.min,
                  "Max": self.max,
                  "CI_Upper": self.ci_upper,
                  "CI_Lower": self.ci_lower,
                  "Total": self.total,
                  "Fraction": self.fraction}
        return mydict
    def plot_name(self) -> str:
        return "Fraction"

In [None]:
class BinaryEvaluation(ModelBase):
    name = 'BinaryEvaluation'
    variable: str = Field(..., description="Variable used to evaluate the results")
    cutoff: float = Field(..., description="Cutoff used to determine if a result is good")
    below_cutoff_is_good: bool = Field(True, description='Whether values below or above the cutoff are good. Defaults to below.')
    
    def run(self, df, groupby: list[str] = []) -> FractionGood:
        total = len(df.groupby(groupby))
        if self.below_cutoff_is_good:
            fraction = df[self.variable].apply(lambda x: x <= self.cutoff).sum() / total
        else:
            fraction = df[self.variable].apply(lambda x: x >= self.cutoff).sum() / total
        return FractionGood(total=total, fraction=fraction)
    
    def get_records(self) -> dict:
        return {"EvaluationMetric": self.variable, "EvaluationMetric_Cutoff": self.cutoff}
    
    def plot_name(self) -> str:
        return "_".join([self.name, self.variable, self.cutoff])
        

In [None]:
class Evaluator(ModelBase):
    name = 'Evaluator'
    dataset_split: SplitBase = Field(..., description='Dataset split')
    structure_choice: StructureChoice = Field(..., description="How to choose which structures to dock to")
    scorer: Scorer = Field(..., description="How to score and rank resulting poses")
    evaluator: BinaryEvaluation = Field(..., description="How to determine how good the results are")
    n_bootstraps: int = Field(1, description="Number of bootstrap replicates to run")
    groupby: list[str] = Field(..., description="List of variables that group the data")
    
    
    def run(self, df:pd.DataFrame) -> FractionGood:
        results = []
        for i in range(self.n_bootstraps):
            split1 = self.dataset_split.run(df)[0]
            subset_df = self.structure_choice.run(split1, groupby=groupby)
            subset_df = self.scorer.run(subset_df, groupby=groupby)
            results.append(self.evaluator.run(subset_df, groupby=groupby))
        return FractionGood.from_replicates(results)
        
    @property
    def plot_name(self) -> str:
        variables = [model.plot_name for model in [self.dataset_split, self.structure_choice, self.scorer]]
        variables += [f"{self.n_bootstraps}reps"]
        return "_".join(variables)
    
    def get_records(self) -> dict:
        mydict = {"Bootstraps": n_bootstraps}
        for container in [self.structure_choice,
                          self.scorer,
                          self.evaluator,
                          self.dataset_split]:
            mydict.update(container.get_records())
        return mydict

In [None]:
class Results(BaseModel):
    evaluator: Evaluator
    fraction_good: FractionGood
    
    def get_records(self) -> dict:
        mydict = self.evaluator.get_records()
        mydict.update(self.fraction_good.get_records())
        return mydict
    
    @classmethod
    def calculate_results(cls, df: pd.DataFrame, evaluators: list['Evaluator']) -> list['Results']:
        for ev in tqdm(evaluators):
            result = ev.run(df)
            yield cls(evaluator=ev, fraction_good=result)
    
    @classmethod
    def df_from_results(cls, results : list['Results']) -> pd.DataFrame:\
        return pd.DataFrame.from_records([result.get_records() for result in results])
        

# Basic Analysis

In [None]:
df.nunique()

In [None]:
refs = df.Reference_Ligand
queries = df.Query_Ligand
pairs = {(ref, query) for ref, query in zip(refs,queries)}

In [None]:
len(pairs)

# Plotting

## Plot Variables

In [None]:
n_bootstraps = 100
rmsd_cutoff = 2.0
n_per_splits = np.array([1] + list(range(5,206,50)))
groupby = ["Query_Ligand"]

In [None]:
single_pose = df.groupby(["Query_Ligand", "Reference_Ligand"]).head(1)

# SinglePose - RandomSplit - POSIT Score - Structure Choices

## Exp 1 : SinglePose - RandomSplit - POSIT Score

In [None]:
evaluators = [Evaluator(
    dataset_split=RandomSplit(variable="Reference_Ligand", n_per_split=n_per_split),
    structure_choice=StructureChoice(name="Dock_to_All", variable="Tanimoto", higher_is_better=True),
    scorer=Scorer(name="POSIT_Probability", variable="docking-confidence-POSIT", higher_is_better=True, number_to_return=1),
    evaluator=BinaryEvaluation(variable="RMSD", cutoff=rmsd_cutoff),
    groupby=groupby,
    n_bootstraps=n_bootstraps,
) for n_per_split in n_per_splits]

In [None]:
results = list(Results.calculate_results(single_pose, evaluators))

In [None]:
df1 = Results.df_from_results(results)

In [None]:
df1.to_csv(output_data_path / "20240503_random_posit.csv", index=False)

## Exp 2: ECFP4 choose n

In [None]:
structure_choices = [1, 2, 5, 10]
evaluators = [Evaluator(
    dataset_split=RandomSplit(variable="Reference_Ligand", n_per_split=n_per_split),
    structure_choice=StructureChoice(name="ECFP4_Similarity", variable="Tanimoto", higher_is_better=True, number_to_return=structure_choice),
    scorer=Scorer(name="POSIT_Probability", variable="docking-confidence-POSIT", higher_is_better=True, number_to_return=1),
    evaluator=BinaryEvaluation(variable="RMSD", cutoff=rmsd_cutoff),
    groupby=groupby,
    n_bootstraps=n_bootstraps,
) for n_per_split in n_per_splits for structure_choice in structure_choices]

In [None]:
results = list(Results.calculate_results(single_pose, evaluators))

In [None]:
df2 = Results.df_from_results(results)

In [None]:
df2.to_csv(output_data_path / "20240503_random_posit_ecfp4_choose_n.csv", index=False)

## Exp 3: MCSS Choose N

In [None]:
structure_choices = [1, 2, 5, 10]
evaluators = [Evaluator(
    dataset_split=RandomSplit(variable="Reference_Ligand", n_per_split=n_per_split),
    structure_choice=StructureChoice(name="MCSS_Similarity", variable="Num_Atoms_in_MCS", higher_is_better=True, number_to_return=structure_choice),
    scorer=Scorer(name="POSIT_Probability", variable="docking-confidence-POSIT", higher_is_better=True, number_to_return=1),
    evaluator=BinaryEvaluation(variable="RMSD", cutoff=rmsd_cutoff),
    groupby=groupby,
    n_bootstraps=n_bootstraps,
) for n_per_split in n_per_splits for structure_choice in structure_choices]

In [None]:
results = list(Results.calculate_results(single_pose, evaluators))

In [None]:
df3 = Results.df_from_results(results)

In [None]:
df3.to_csv(output_data_path / "20240503_random_posit_mcss_choose_n.csv", index=False)