In [1]:
# from the portal bin folder the utils.py
import os
import random
import json
import pandas as pd
import numpy as np

from flask import  Response,jsonify
from typing  import List
from datetime import datetime



def handle_Na_imputation(imputestatement:str) -> bool:
    """
    Returns a boolean of wheter to do or not to do the imputation 
    :imputestatement: could be 'impute' or 'noimpute'
    """
    if imputestatement == 'impute':
        return True
    else:
        return False


def count_df_to_density_plot_df(temp_df,identifier):
    # background = pd.DataFrame(temp_fpkm.filter(regex = cn.REGEX).to_numpy().flatten())
    background = temp_df.filter(regex = cn.REGEX).to_numpy().flatten()
    backgournd_count_df = data_to_count_df(background,color='blue',opacity= 0.15)
    abundances = temp_df[temp_df.index == identifier]
    abundances =  abundances.filter(regex=cn.REGEX).to_numpy()
    abundances_count_df = data_to_count_df(abundances,color='red',opacity=0.8,density=False)
    final_count_df = pd.concat([backgournd_count_df,abundances_count_df])
    final_count_df = final_count_df.sort_values(by='X')
    return final_count_df


def data_to_count_df(data:np.array, color='red',bins = 50, opacity = 0.3, density = True):
    inp = data[~np.isnan(data)]
    inp = data[np.isfinite(data)]
    hs = np.histogram(inp, bins= bins, density=density)
    counts = np.interp(hs[0], (hs[0].min(), hs[0].max()), (0, +1))
    edges = np.round(hs[1][0:len(hs[1])-1],decimals=2)
    df = pd.DataFrame(list(zip(edges, counts)))
    df.columns = ['X','Y']
    df['color'] = color
    df['opacity'] = opacity
    df['ind'] = df['X']
    return df


def ranodom_color_genetator(num:int) :
    """
    Generates a list with  num random colors
    
    """
    no_of_colors = num
    colors=["#"+''.join([random.choice('0123456789ABCDEF') for i in range(6)]) for j in range(10000)]
    color = random.sample(list(set(colors)),no_of_colors)
    return(color)

def unnest_proteingroups(df:pd.DataFrame) -> pd.DataFrame:
    """
    Unnest the protein_groups A;B as two separate rows with the same values
    the protein groups are the index of the the pandas dataframe df
    """
    temp_df = df
    temp_df['index'] = temp_df.index.str.split(';')
    temp_df = temp_df.explode('index')
    temp_df = temp_df.set_index('index')
    return temp_df


def get_index_cols(data_type: str) -> List[str]:
    index_cols = ['Gene names']
    if data_type == 'pp':
        index_cols = ['Gene names', 'Modified sequence', 'Proteins']
    return index_cols


def keep_only_sample_columns(df: pd.DataFrame) -> pd.DataFrame:
    # Filter for patient expression columns, sample IDs have the format 
    # - "I007-031-108742" (INFORM)
    # - "H021-7AAYWW-T1" (MASTER/Chordoma)
    # - "P024567" (Glioma)
    # - "Reporter intensity corrected" (QC channels)
    # TODO: move regex to config file
    return df.filter(regex = r'(^\S+-.+-\S)|(Reporter intensity corrected)|(^P\d{6}$)')


def calculate_confidence_score(df:pd.DataFrame) -> pd.DataFrame:
    # print(df[['num_pep',z_or_intensity]])
    try:
        df_temp = df[['num_pep','Z-score']].copy().apply(pd.to_numeric, errors='coerce')
        df_temp['confidence_score'] = df_temp['num_pep']* df_temp['Z-score']
        #df_temp['confidence_score'] = df_temp['confidence_score'].apply(np.ceil)
        df['confidence_score'] = df_temp['confidence_score'].fillna('n.d.')
        return df
    except:
        print('some thing wrong with confidence scoring')
        return df


def post_process_for_front_end(df:pd.DataFrame,defaultcolor = 'grey',defaultsize=2):
    if isinstance(df,pd.DataFrame):
        abundances_table = df.copy()
        abundances_table['colorID'] = defaultcolor  # default value for the color on the frontend
        abundances_table['sizeR'] = defaultsize        # default value for the point size on the frontend
        abundances_table = abundances_table.reset_index()
        abundances_table['index'] = abundances_table.index
        return abundances_table
    else:
        return df



def merge_with_sample_annotation_df(scores_df:pd.DataFrame,sample_annotation_df:pd.DataFrame):  
    try:  
        if isinstance(sample_annotation_df,pd.DataFrame):
            if 'Entity' in sample_annotation_df.columns:
                sample_annotation_df = sample_annotation_df.drop('Entity', axis=1)
            merged_table = scores_df.merge(sample_annotation_df, on='Sample name', how='left')
            return merged_table
        else:
            return scores_df
    except Exception as err:
        print(err)
        return scores_df


def merge_with_patients_meta_df(scores_df:pd.DataFrame,patients_df:pd.DataFrame):  
    try:  
        if isinstance(patients_df,pd.DataFrame):
            new_patients_df = patients_df.copy()
            scores_table = scores_df.copy()
            scores_table['Sample_name_rep_truncated'] = scores_table['Sample name'].str.replace(r'-R[0-9]$', '', regex=True)
            new_patients_df = new_patients_df.rename(columns={'Sample name': 'patient_id'})
            scores_table = scores_table.dropna(subset = ["Sample_name_rep_truncated"])
            new_patients_df = new_patients_df.dropna(subset = ['patient_id'])
            if len(scores_table)>0 and len(new_patients_df)>0:
                merged_table = scores_table.merge( new_patients_df, left_on= 'Sample_name_rep_truncated',right_on='patient_id', how='left')
                merged_table = merged_table.drop(['patient_id'],axis=1)
                return merged_table
            else:
                return scores_df
        else:
            return scores_df
    except Exception as err:
        print(err)
        return scores_df



def time_now():
    now = datetime.now()
    current_time = now.strftime("%D:%H:%M")
    return(str(current_time) + ': ')


def log_delete(fileName):
    with open(fileName,'r+') as file:
        file.truncate(0)

def write_log(msg,fileName):
    f = open(fileName, "a")
    f.write(time_now())
    f.write(msg)
    f.write(' nneewwlliinnee ')
    f.close()


def read_log(fileName):
    f = open(fileName,'r')
    lines = f.readline()
    f.close()
    return lines


def config_reader(config_path):
    with open(config_path, 'r') as f:
        config = json.load(f)
    return config


def df_to_json(df):
    """
    Makes flask JSON response from a dataframe
    """
    return Response(json.dumps(df.to_dict(orient='records')),  mimetype='application/json')


def get_the_list_of_disease_from_config(config_path):
    """
    returns the list of cohorts in the config file
    """
    config = config_reader(config_path)
    all_diseases = []
    disease_lists = list(config['patient_annotation_path'].keys())
    for disease in disease_lists:
        all_diseases.append(disease)
    return all_diseases



def intersection(lst1, lst2):
    return list(set(lst1) & set(lst2))


def setdiff(lst1,lst2):
    return(list(set(lst1) - set(lst2)))



def check_complete_or_fp_or_pp(fp_df_patients:pd.DataFrame,pp_df_patients:pd.DataFrame) -> str:
    """ Check if the the cohort is complete or its only fp or pp"""
    cohort_type = ''
    if isinstance(fp_df_patients,pd.DataFrame) & (pp_df_patients,pd.DataFrame):
        cohort_type = 'complete'
    else:
        if isinstance(fp_df_patients,pd.DataFrame):
            cohort_type = 'fp_only'
        else:
            cohort_type = 'pp_only'
    return cohort_type



def merged_df_to_json(data_type:str, merged_df:pd.DataFrame, title = 'Scores'):
    if data_type == 'plot':
        merged_df = merged_df.sort_values(['Batch_No','TMT_channel'],ascending=False)
        merged_df.index = merged_df['Sample name'] + '_' + merged_df['TMT_channel'].astype(str)
        plot_df = merged_df.drop(list(cn.SAMPLE_ANNOTATION.values()),axis = 1)
        return plotlyprepare.get_simple_heatmap(plot_df , title)
    else:
        meta_data = merged_df[list(cn.SAMPLE_ANNOTATION.values())].sort_values(['Batch_No','TMT_channel'],ascending=True)
        return df_to_json(meta_data)











In [2]:
# retrieved from the wp3 pipeline
from typing import Union
from pathlib import Path
def read_basket_scores(results_folder: Union[str, Path],
                       data_type: str = 'fp') -> pd.DataFrame:
    """
    Retrieved from the WP3 pipeline for the PCA plots
    Read basket score results for report creation
    Requires one of [basket_scores_4th_gen.tsv|basket_scores.tsv]
    """
    basket_scores_df = []
    if os.path.exists(os.path.join(results_folder, cn.BASKET_SCORES_FILE)):
        try:
            basket_scores = os.path.join(results_folder, cn.BASKET_SCORES_FILE)
            basket_scores_df = pd.read_csv(basket_scores, sep='\t', index_col='Sample')
        except PermissionError:
            raise PermissionError(f'Cannot open basket scores file, check if you have it open in Excel. {basket_scores}')
    elif os.path.exists(os.path.join(results_folder, 'basket_scores.tsv')):
        try:
            basket_scores = os.path.join(results_folder, cn.BASKET_SCORES_FILE_2nd_GENERATION)
            basket_scores_df = pd.read_csv(basket_scores, sep='\t', index_col='Sample')
        except PermissionError:
            raise PermissionError(f'Cannot open basket scores file, check if you have it open in Excel. {basket_scores}')
        # For 3rd gen basket scheme we take just the main baskets
        # subset to data type
        basket_scores_df = basket_scores_df.iloc[:, basket_scores_df.columns.str.startswith(data_type.upper())]
        # remove drug scores
        basket_scores_df = basket_scores_df.iloc[:, ~basket_scores_df.columns.str.contains('TOPAS')]
        # remove RTK baskets
        basket_scores_df = basket_scores_df.iloc[:, ~basket_scores_df.columns.str.contains('RTK')]
        
    else:
        raise FileNotFoundError("No baskets score file found")
    return basket_scores_df

In [3]:
# PCA pipeline with some modifications: retrieved from the wp3 topas pipeline
import os
import re
#import sys
#import argparse
from pathlib import Path
#import warnings
#import logging
import math
from itertools import product
from typing import Dict, List, Union

import pandas as pd
import numpy as np
#import seaborn as sns
#import matplotlib.pyplot as plt


from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from ppca import PPCA
from umap import UMAP

no_components = 8

#import bin.config as config
# logger = logging.getLogger(__package__ + "." + __file__)

ALL_COLORS = ['silver', 'dodgerblue', 'darkorange', 'limegreen', 'mediumorchid', 'sienna', 'darkturquoise', 'darkkhaki',
              'blueviolet', 'seagreen', 'orange', 'tan', 'slategrey', 'olive', 'lightpink', 'darkslategray',
              'mediumvioletred', 'khaki', 'powderblue', 'lightsalmon', 'olivedrab', 'firebrick',
              'lawngreen', 'steelblue', 'indigo', 'linen', 'springgreen', 'gold', 'darkred', 'lightgreen', 'pink',
              'yellowgreen']
ALL_SYMBOLS = ['o', 'v', '^', 's', '+', 'd', 'x', 'p', 'P', '<', '>']




def do_pca(selected_proteins: List[str],
           results_folder: str,
           plot_types: List[str],
           sample_annot: str,
           metadata_annot: str,
           data_type: str,
           min_sample_occurrence_ratio: float = 0.5,
           umap: bool = False,
           include_reference_channels: bool = False,
           include_replicates: bool = False
           ):
    """
    data_type = fp or pp
    """
    # load sample and metadata
    sample_annot_df = pd.read_csv(sample_annot)
    sample_annot_df = sample_annot_df[sample_annot_df['QC'].isin(['passed', 'shaky'])]
    if not include_replicates:
        if 'Replicate' in sample_annot_df.columns:
            sample_annot_df = sample_annot_df[sample_annot_df['Replicate'] != 'replicate']
    if 'Material issue' in sample_annot_df.columns:
        sample_annot_df = sample_annot_df[sample_annot_df['Material issue'] != '+']

    sample_annot_df.columns = sample_annot_df.columns.str.strip()
    meta_annot_df = pd.read_excel(metadata_annot)
    meta_annot_df = whitespace_remover(meta_annot_df)

    if include_reference_channels:
        sample_annot_df = create_ref_sample_annot(results_folder, sample_annot_df)

    # check error
    metadata_df = merge_sample_and_metadata_annots(sample_annot_df,
                                                   meta_annot_df,
                                                   remove_qc_failed=True,
                                                   keep_reference=include_reference_channels,
                                                   keep_replicates=include_replicates)

    all_principal_dfs = []
    all_principal_variances = []
    imputed_data = []
   
    # for data_type in data_types:
    for plot_type in plot_types:  # MT: plot types are actually different data inputs, change name
        # logger.info('plot type: ', plot_type)
        df = load_pca_data(results_folder, metadata_df['Sample name'], data_type, plot_type, include_reference_channels)
        
        if len(selected_proteins) > 2:
            final_selected = [f for f in selected_proteins if f in df.index]
            if len(final_selected) > 2:
                df = df[df.index.isin(final_selected)]

        principal_df, pca,imputed_df = metadata_pca(df,
                                         metadata_df,
                                         plot_type,
                                         umap,
                                         min_sample_occurrence_ratio)
       
        

        imputed_data.append(imputed_df)
        all_principal_dfs.append(principal_df)
        if umap:
            all_principal_variances.append([])
        else:
            all_principal_variances.append(pca.var_exp)
           
        
    return all_principal_dfs, all_principal_variances, imputed_data, metadata_df


def get_figure_name_and_title(metadata_column: str,
                              plot_folder: str,
                              plot_type: str,
                              umap: bool,
                              include_reference_channels: bool,
                              include_replicates: bool,
                              min_sample_occurrence_ratio: float):
    pca_type = 'PCA'
    if umap:
        pca_type = 'UMAP'

    extra = '_'
    if include_replicates:
        extra += 'rep_'
    if include_reference_channels:
        extra += 'ref'
    figure_name = os.path.join(plot_folder,
                               f'{plot_type}_{pca_type}_{min_sample_occurrence_ratio}_{data_type}_{metadata_column}{extra}.png')
    figure_title = f'{pca_type}_{data_type}_{metadata_column}'

    return figure_name, figure_title


def load_pca_data(results_folder, samples, data_type: str, plot_type: str = 'Basket', include_reference_channels: bool = False):
    if plot_type == 'Basket':
        df = read_basket_scores(results_folder, data_type)
    if plot_type == 'Kinase':
        df = read_kinase_score_file(results_folder)
    if plot_type == 'Phosphoprotein':
        df = read_phosphoprotein_score_file(results_folder)
    if plot_type in ['Intensity', 'Intensity_subset']:  # MT: move into a function
        file = f'annot_{data_type}.csv'
        if include_reference_channels:
            file = f'annot_{data_type}_with_ref.csv'

        index_col = get_index_cols(data_type)
        df = pd.read_csv(os.path.join(results_folder, file), index_col=index_col)

        df.columns = df.columns.str.strip()
        if plot_type == 'Intensity_subset':
            # Subset remove where both basket and rtk is empty
            if 'rtk' in df.columns:
                df = df.dropna(subset=['basket', 'rtk'], how='all')
            elif 'sub_basket' in df.columns:
                df = df.dropna(subset=['basket', 'sub_basket'], how='all')
            else:
                print('Error. Wrong basket scoring file. cannot find baskets of type basket & rtk or basket & sub_basket')

        df = keep_only_sample_columns(df)
        

    # prepare data
    if plot_type in ['Basket', 'Kinase', 'Phosphoprotein']:  # MT: include transpose as a parameter in the loading function
        df = df.transpose()

    if not include_reference_channels:  # but then also replicates are kept
        df = df.loc[:, df.columns.isin(samples)]  # MT: not sure what this does

    return df


# MT: split data loading from performing PCA
def metadata_pca(df: pd.DataFrame,
                 metadata_df: pd.DataFrame,
                 plot_type: str = 'Basket',
                 umap: bool = False,
                 min_sample_occurrence_ratio: float = 0.5):

    normalized_data, transposed_df = filter_and_normalize(df, min_sample_occurrence_ratio)
    
    # calculate ppca
    pca,imputed_data = calculate_pca(normalized_data, plot_type, umap)
    imputed_data.index = df.columns
    # get principal df
    principal_df = get_pca_df(pca, transposed_df, transposed_df.index, plot_type, umap)
    principal_df['Sample_name_rep_truncated'] = principal_df['Sample'].str.replace(r'-R[0-9]$', '', regex=True)
    # merge principal components with metadata
    principal_df = principal_df.merge(metadata_df, left_on='Sample', right_on='Sample name')
    
    #print(principal_df.columns)
    # principal_df = principal_df.drop(['Sample_name_rep_truncated'])
    return principal_df, pca,imputed_data




# MT: move each of the reading functions to the corresponding file, here e.g. kinase_scoring.py
def read_kinase_score_file(results_folder):
    df = pd.read_csv(os.path.join(results_folder, 'kinase_results', f'kinase_scores.tsv'), sep='\t', index_col='PSP Kinases')
    df = df.transpose()
    df = df.drop('No. of total targets')
    return df


# MT: move each of the reading functions to the corresponding file, here e.g. protein_phosphorylation_scoring.py
def read_phosphoprotein_score_file(results_folder):
    df = pd.read_csv(os.path.join(results_folder, 'protein_results', f'protein_scores.tsv'), sep='\t', index_col='Gene names')
    df = df.transpose()
    df = df.drop(['mean', 'stdev'])
    return df



def get_pca_df(pca, df, samples, plot_type: str = 'Basket', umap: bool = False, target_dict: Dict = None,
               principal_df: pd.DataFrame = None, target_name: str = None):
    if not umap:
        if plot_type in ['Intensity', 'Intensity_subset', 'Phosphoprotein']: # MT: don't understand what this does
            pca = pca.transform()
        else:
            pca = pca.principal_components
    if target_dict is None:
        colnames = [f'Principal component {x+1}' for x in range(no_components)]
        principal_df = pd.DataFrame(data=pca[:, [x for x in range(no_components)]], columns=colnames )
        principal_df = pd.concat([principal_df, pd.Series(samples)], axis=1)
        colnames.append('Sample')
        principal_df.columns = colnames
    else:
        # targets = get_pca_targets(df, plot_type, target_dict)
        targets = get_pca_targets(df, plot_type, target_dict)
        new_columns = principal_df.columns.append(pd.Index([f'{target_name}']))
        principal_df = pd.concat([principal_df, pd.Series(targets)], axis=1)
        principal_df.columns = new_columns
    return principal_df


# MT: rename function to something more descriptive
def filter_and_normalize(df, min_sample_occurrence_ratio: float = 0.5):
    df = df.transpose()
    x = df.loc[:, df.count(axis=0) >= df.shape[0] * min_sample_occurrence_ratio]
    x = StandardScaler().fit_transform(x)  # MT: turn into dataframe?
    return x, df


# MT: rename function, it's confusing that this function can do an umap
def calculate_pca(x, plot_type, umap: bool = False):
    ppca = PPCA()
    
    ppca.fit(data=x, d=2, verbose=False)
    imputed_data = pd.DataFrame(ppca.data)
    
    if umap:
        pca = do_umap(x)
    else:
        if plot_type in ['Basket', 'Kinase']: # MT: check for presence of missing values rather than check for plot_type
            pca = PCA(n_components=no_components)
            pca.principal_components = pca.fit_transform(x)
            pca.var_exp = pca.explained_variance_ratio_
        else:
            pca = PPCA()
            pca.fit(data=x, d=8, verbose=False)
            pca.var_exp[1] = pca.var_exp[1] - pca.var_exp[0]
    return pca,imputed_data


# MT: rename function
def do_umap(x, n_comp: int = 30, n_neigh: int = 5, n_epochs: int = 1000, metric: str = 'euclidean'):
    # UMAP needs complete data (no missing values) so first PPCA is used for imputation
    ppca = PPCA()
    ppca.fit(data=x, d=2, verbose=False)
    imputed_data = pd.DataFrame(ppca.data)
    reducer = UMAP(n_components=n_comp, n_epochs=n_epochs, low_memory=True, n_neighbors=n_neigh, metric=metric)
    x_trans = reducer.fit_transform(imputed_data)
    return x_trans


# MT: rename function to get_pca_colors_and_symbols
def get_pca_colors_symbols(targets):
    marker = False # MT: no need for this variable
    if len(targets.unique()) > 8:
        marker = True

    num_combinations = math.ceil(np.sqrt(len(targets.unique())))
    colors = ALL_COLORS[0:num_combinations]
    symbols = ALL_SYMBOLS[0]
    if marker:
        symbols = ALL_SYMBOLS[0:num_combinations]
        color_symbol_tuples = []
        for shape, color in product(colors, symbols):
            color_symbol_tuples.append((shape, color))
        color_symbol_tuples = color_symbol_tuples[0:len(targets.unique())]
    else:
        # todo: make colors to color marker tuple
        color_symbol_tuples = [(color, symbols) for color in colors]
    return color_symbol_tuples


# MT: what does this do?
def get_pca_targets(df: pd.DataFrame, plot_type,
                    sample_dict=None) -> List:
    # TODO: clean up
    """Requires that fp and pp is the same """
    targets = []
    for v in df.index.tolist(): # MT: do not use v as variable name
        if v in sample_dict.keys():
            targets.append(sample_dict[v])
        # replicates
        elif '-R1' in v or '-R2' in v or '-R3' in v or '-R4' in v: # MT: use regex in case R5, R6 come up later
            temp_v = '-'.join(v.split('-')[:-1]) # MT: turn into a function
            if temp_v in sample_dict.keys():
                targets.append(sample_dict[temp_v])
        else:
            continue
    return targets



def merge_sample_and_metadata_annots(sample_annotation_df: pd.DataFrame,
                                     meta_data: pd.DataFrame,
                                     remove_qc_failed: bool,
                                     keep_replicates: bool = False,
                                     keep_reference: bool = False) -> pd.DataFrame:
    
    if remove_qc_failed:
        if 'QC' in sample_annotation_df.columns:
            sample_annotation_df = sample_annotation_df[sample_annotation_df['QC'].isin(['passed', 'shaky'])]
        else:
            sample_annotation_df = sample_annotation_df[sample_annotation_df['Failed'] != 'x']

    meta_data_samples = meta_data['Sample name'].tolist()
    if keep_replicates:  # MT: simplify if statements
        sample_annotation_df = get_replicate_groups(sample_annotation_df, meta_data_samples)

    if not keep_replicates and keep_reference:
        sample_annotation_df = sample_annotation_df.loc[
                               (sample_annotation_df['Sample name'].isin(meta_data_samples)) | (
                                   sample_annotation_df['Sample name'].str.startswith('Reporter')), :]

    if not keep_replicates and not keep_reference:
        sample_annotation_df = meta_data.loc[meta_data['Sample name'].isin(sample_annotation_df['Sample name'].tolist()), :]

    # merge the two annotations to keep replicates+reference if any
    merged_annot = pd.merge(left=sample_annotation_df, right=meta_data, how='left', left_on='Sample name',
                            right_on='Sample name', suffixes=('', '_drop'))

    # drop columns present in both dataframes
    merged_annot = merged_annot.drop([col for col in merged_annot.columns if '_drop' in col], axis=1)

    # if null in merged annot retrieve batch from sample_annot
    for i, value in enumerate(merged_annot['Batch_No']):
        if type(value) != str and math.isnan(value):
            batch = sample_annotation_df.loc[
                sample_annotation_df['Sample name'] == merged_annot.loc[i, 'Sample name'], 'Batch Name']
            merged_annot.loc[merged_annot.index.tolist()[i], 'Batch_No'] = batch.values[0]
    #merged_annot['Batch_No'] = merged_annot['Batch_No'].astype(int)

    merged_annot['Tumor cell content'] = merged_annot['Tumor cell content'].fillna('missing')


    merged_annot['Tumor cell content'] = merged_annot['Tumor cell content'].apply(group_tumor_cell_content)
    merged_annot['Is reference channel'] = merged_annot['Sample name'].str.startswith('Reporter')
    merged_annot = merged_annot.replace(np.nan, 'nan')

    return merged_annot


def get_replicate_groups(sample_annotation_df, meta_data_samples):
    replicates = sample_annotation_df[
        ~(sample_annotation_df['Sample name'].isin(meta_data_samples)) &
        ~(sample_annotation_df['Sample name'].str.startswith('Reporter'))]

    # split off the replicate identifier from the sample name, e.g. ABCD-R2 => ABCD
    replicates['Sample name'] = replicates['Sample name'].apply(lambda x: '-'.join(x.split('-')[:-1]))
    sample_annotation_df['Replicate group'] = np.nan

    for i, sample in enumerate(replicates['Sample name'].unique()):  # with one there is the one without and one with -R2
        group_indices = sample_annotation_df[sample_annotation_df['Sample name'].str.contains(sample)].index
        sample_annotation_df.loc[group_indices.values, 'Replicate group'] = i + 1

    return sample_annotation_df


def group_tumor_cell_content(value) -> str:
    try:
        new_value = int(float(value))
    except ValueError:
        if pd.isnull(value):
            new_value = 'missing'
        elif '-' in value:
            the_range = value.split('-')
            range_average = sum([float(j) for j in the_range]) / len(the_range)
            new_value = round(range_average)
        elif any(string in str(value) for string in
                 ['missing', 'n.d.', '?', 'nd', 'nan', 'ND (benigne)']):  # MT: check for this before trying to parse as float
            new_value = 'missing'
        else:
            print('ERROR, what is this value: ', value)

    if new_value != 'missing':
        if new_value < 10:
            new_value = '0-10%'
        elif new_value < 20:
            new_value = '10-20%'
        elif new_value < 40:
            new_value = '20-40%'
        elif new_value < 60:
            new_value = '40-60%'
        elif new_value < 80:
            new_value = '60-80%'
        else:
            new_value = '80-100%'
    return new_value


def create_ref_sample_annot(results_folder, sample_annot_df):
    df = pd.read_csv(os.path.join(results_folder, f'annot_fp_with_ref.csv'), index_col='Gene names')
    df = keep_only_sample_columns(df)
    for sample in df:
        if sample not in sample_annot_df['Sample name'].tolist():
            # retrieve tmt channel, batch
            tmt_channel = re.search('corrected \d{1,2}', sample).group().split(' ')[1]
            batch = re.search('Batch\d{1,2}', sample).group()
            batch = re.findall(r'\d+', batch)[0]
            cohort = re.search('\d [A-Z,a-z]+_', sample).group().split(' ')[1][:-1]
            new_sample = {'Sample name': sample, 'Cohort': cohort, 'Batch Name': batch, 'TMT Channel': tmt_channel,
                          'QC': 'passed'}
            sample_annot_df = sample_annot_df.append(new_sample, ignore_index=True)
            
    return sample_annot_df


def whitespace_remover(df):
    for col in df.columns:
        if df[col].dtype == 'object':
            # applying strip function on column
            df[col] = df[col].astype(str).map(str.strip)
        else:
            pass
    return df


def confidence_ellipse(x, y, ax, n_std=3.0, facecolor='none', **kwargs):
    """
    Create a plot of the covariance confidence ellipse of `x` and `y`

    Parameters
    ----------
    x, y : array_like, shape (n, )
        Input data.

    ax : matplotlib.axes.Axes
        The axes object to draw the ellipse into.

    n_std : float
        The number of standard deviations to determine the ellipse's radiuses.

    Returns
    -------
    matplotlib.patches.Ellipse

    Other parameters
    ----------------
    kwargs : `~matplotlib.patches.Patch` properties
    """
    if x.size != y.size:
        raise ValueError("x and y must be the same size")

    cov = np.cov(x, y)
    pearson = cov[0, 1] / np.sqrt(cov[0, 0] * cov[1, 1])
    # Using a special case to obtain the eigenvalues of this two-dimensionl dataset
    ell_radius_x = np.sqrt(1 + pearson)
    ell_radius_y = np.sqrt(1 - pearson)
    ellipse = matplotlib.patches.Ellipse((0, 0),
                                         width=ell_radius_x * 2,
                                         height=ell_radius_y * 2,
                                         facecolor=facecolor,
                                         **kwargs)

    # Calculating the stdandard deviation of x from the squareroot of the variance and multiplying
    # with the given number of standard deviations
    scale_x = np.sqrt(cov[0, 0]) * n_std
    mean_x = np.mean(x)

    # calculating the stdandard deviation of y ...
    scale_y = np.sqrt(cov[1, 1]) * n_std
    mean_y = np.mean(y)

    transf = matplotlib.transforms.Affine2D() \
        .rotate_deg(45) \
        .scale(scale_x, scale_y) \
        .translate(mean_x, mean_y)

    ellipse.set_transform(transf + ax.transData)
    return ax.add_patch(ellipse)




  from .autonotebook import tqdm as notebook_tqdm


# calculate the imputed data

In [4]:

do_umap =False
use_ref = False
use_replicate = True
basket_genes = []

#QC_PCS = ['pc1' ,'pc2']
#pc_cols = QC_PCS
reports_dir = "/media/kusterlab/internal_projects/active/TOPAS/WP31/Playground/Retrospective_study/2023.06.22_AhS_PAPER_COHORT"
sample_annotation_path = "/media/kusterlab/internal_projects/active/TOPAS/WP31/Searches/patient_annotation_paper_cohort_230626_final.csv" 
patient_annotation_path = "/media/kusterlab/internal_projects/active/TOPAS/WP31/Playground/Retrospective_MTBs_Evaluation/Metadata_Papercohort_230801.xlsx"

if os.path.exists(sample_annotation_path) and os.path.exists(patient_annotation_path):
    pcs = do_pca(basket_genes,
                    reports_dir,
                    ['Intensity'] ,
                    sample_annotation_path,
                    patient_annotation_path,
                    'fp',
                    min_sample_occurrence_ratio=0.6,
                    include_reference_channels=use_ref, 
                    umap=do_umap,
                    include_replicates=use_replicate)
    



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  replicates['Sample name'] = replicates['Sample name'].apply(lambda x: '-'.join(x.split('-')[:-1]))


# Silhouette_scores

In [5]:
meta_col = 'code_oncotree'

from sklearn.metrics import silhouette_samples

def calculate_silhouette_scores(silhoutte_input_df:pd.DataFrame,
                                meta_data_df:pd.DataFrame,
                                meta_col_silhoutte:str) -> pd.DataFrame:
    """
    calculates the silhoette score per sample 
    :silhoutte_input_df: an indexed data frame with the patients as the index
    :meta_data_df: an indexed data frame with the patient ids as the index
    :meta_col_silhouette: the column name used as the class labels
    """
    common_samples = intersection(meta_data_df.index,silhoutte_input_df.index)
    silhoutte_input_df = silhoutte_input_df.loc[common_samples,:].to_numpy()
    meta_data = meta_data_df.loc[common_samples,meta_col_silhoutte].to_numpy()
    silhouete_df = pd.DataFrame(list(zip(silhouette_samples(silhoutte_input_df,meta_data),meta_data)),
    columns=['silhouette_score','meta_data'])
    silhouete_df.index = common_samples
    return silhouete_df



#silhoutte_input_df = pcs[2][0] # imputed intensity data
silhoutte_input_df = pcs[0][0].set_index('Sample name').filter(regex='Principal') # PC level
meta_data_df = pcs[3].set_index('Sample name')



In [6]:
#meta_data_df = meta_data_df[meta_data_df['Input Material'] == 'Kryotissue']  # Lysate Kryotissue
count_dict = meta_data_df[meta_col].value_counts().to_dict()
meta_data_df['count'] = meta_data_df[meta_col].map(count_dict)
meta_data_df[meta_col][meta_data_df['count'] < 20] = 'others'
df = calculate_silhouette_scores(silhoutte_input_df,meta_data_df,meta_col)
df['sample'] = df.index
av_dict = df.groupby('meta_data').mean().to_dict()
df['average'] = df['meta_data'].map(av_dict['silhouette_score'])
df = df.sort_values(by=['average','meta_data','silhouette_score'],ascending=[False,True,False])

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  meta_data_df[meta_col][meta_data_df['count'] < 20] = 'others'


# seaborn visualization

In [7]:
"""
import seaborn as sns
sns.barplot(data=df,x='sample',y='silhouette_score',hue='meta_data')
"""

"\nimport seaborn as sns\nsns.barplot(data=df,x='sample',y='silhouette_score',hue='meta_data')\n"

# Plotly visualization

In [8]:
import plotly.express as px

fig = px.histogram(df, x="sample", y="silhouette_score",
             color='meta_data',
             orientation='v',
             height=400
             )
fig.show()