In [None]:
import os
import sys
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import pandas as pd
from tqdm import tqdm

sys.path.append('../..')
from data.constants import BASE_PATH_EXPERIMENTS, BASE_PATH_DATA

In [None]:
plt.rcParams.update({'pdf.fonttype':42, 'font.family':'sans-serif', 'font.sans-serif':'Arial', 'font.size':14})

In [None]:
root_exp_dir = Path(os.path.join(BASE_PATH_EXPERIMENTS, 'control_genes_selection/mean_var_per_gene_scores'))
storing_path = root_exp_dir / 'plots'

# Create the directory if it doesn't exist
storing_path.mkdir(parents=True, exist_ok=True)

The following simply checks if the produces data in the `control_bias` is of correct shape

In [None]:
# # Define a function to process files
# def _check_file(file_path):
#     df = pd.read_csv(file_path)
#     return df.shape
    
    
# def _process_folder(folder_path):
#     df_shape = []
#     i=0
#     for file in folder_path.glob('*.csv'):
#         df_shape.append(_check_file(file))
#         i+=1
        
#     return all(item == df_shape[0] for item in df_shape) and i==5

In [None]:
# for curr_path in root_exp_dir.rglob('*'):
#     if curr_path.is_dir():
#         res = _process_folder(curr_path)
#         if res:
#             print(f'All files are ok for subtype {curr_path.name}')
#         else:
#             print(f'Some files are not ok for subtype {curr_path.name}')

Create the visualization for one folder and then upscale to all folders. 

In [None]:
def _create_and_store_plot(folder_path, storing_path, show=True):
    # define name mapping 
    sc_method_name_mapping= {
        'adjusted_neighborhood_scoring':'ANS',
        'seurat_scoring':'Seurat',
        'seurat_ag_scoring':'Seurat_AG',
        'seurat_lvg_scoring':'Seurat_LVG',
        'scanpy_scoring':'Scanpy',
    }
    hue_order = list(sc_method_name_mapping.values())
    
    # read all .csv files containing cell scores 
    dfs = []
    for file in folder_path.glob('*.csv'):
        df = pd.read_csv(file)
        dfs.append(df.copy())
    all_scores = pd.concat(dfs, axis=0)
    all_scores.columns = ['sample_id']+list(all_scores.columns)[1:]
    
    # melt all scores to make it ready for seaborn lineplot
    melted_all_scores = pd.melt(all_scores, id_vars=['sample_id', 'scoring_method'],
                                var_name='gene', value_name='score')
    
    melted_all_scores.scoring_method = melted_all_scores.scoring_method.map(sc_method_name_mapping)
    
    # plot score lines 
    cm = 1/2.54  # centimeters in inches
    plt.figure(figsize=(12*cm, 6*cm))
    ax = sns.lineplot(data=melted_all_scores, x="gene", y="score", hue='scoring_method', hue_order=hue_order)
    
    ax.set_title(f'Control selection bias for subtype {folder_path.name}', fontsize=10)
    plt.xlabel('Genes in last two expression bins.', fontsize=10)
    plt.ylabel('Score', fontsize=10)
    
    plt.xticks(rotation=90, fontsize=8)
    plt.yticks(fontsize=8)
    
    all_xticks = ax.get_xticks()
    subset_x_ticks = all_xticks[::50]
    ax.set_xticks(subset_x_ticks)
    
    # store figure
#     plt.savefig(storing_path/f"bias_{folder_path.name.replace(" ", "_")}.pdf", format=pdf)
    print(storing_path/f"bias_{folder_path.name.replace(' ', '_')}.pdf")

    # show or close figure
    if show:
        plt.show()
    else:
        plt.close()


In [None]:
plt.rcParams.update({'pdf.fonttype':42, 'font.family':'sans-serif', 'font.sans-serif':'Arial', 'font.size':10})

In [None]:
test_folder = root_exp_dir / 'NK_1'

In [None]:
_create_and_store_plot(test_folder, storing_path)

In [None]:
plt.rcParams.update({'pdf.fonttype':42, 'font.family':'sans-serif', 'font.sans-serif':'Arial', 'font.size':10})

In [None]:
for folder_path  in tqdm(root_exp_dir.rglob('*')):
    _create_and_store_plot(folder_path, storing_path, show=False)

In [None]:
import argparse
import json
import os
import sys
from datetime import datetime

import pandas as pd
import scanpy as sc
from signaturescoring import score_signature
from signaturescoring.utils.utils import (
    get_bins_wrt_avg_gene_expression, get_mean_and_variance_gene_expression)

sys.path.append("../..")
from data.constants import DATASETS
from data.load_data import load_datasets
from data.preprocess_pbmc_helper import preprocess_dataset
from experiments.experiment_utils import (AttributeDict,
                                          get_scoring_method_params)


def load_pbmc_data():
    fn_data = os.path.join(BASE_PATH_DATA, 'raw_data/pbmc_citeseq.h5ad')
    adata = sc.read_h5ad(fn_data)

    adata = adata.raw.to_adata()
    adata.var_names = adata.var['_index']
    adata.var_names.name = None
    adata.var.columns = ['gene_names']

    if 'mt' not in adata.var:
        # get mitochondrial genes
        adata.var["mt"] = adata.var_names.str.startswith("MT-")
    if 'ribo' not in adata.var:         
        # get ribosomal genes
        adata.var["ribo"] = adata.var_names.str.startswith(("RPS", "RPL"))
    if 'hb' not in adata.var:
        # get hemoglobin genes.
        adata.var["hb"] = adata.var_names.str.contains(("^HB[^(P)]"))

    return adata


n_bins = 25

In [None]:
subtype = 'NK_1'
print('Load entire PBMC dataset.')
adata = load_pbmc_data()
adata = adata[adata.obs['celltype.l3']==subtype,:].copy()

# preprocess dataset
print(f'Preprocessing data with cell-type {subtype}.')
adata = preprocess_dataset(adata,
                           params_cell_filtering=dict(mad_tot_cnt=5, 
                                                      mad_ngenes_cnt=5, 
                                                      nr_top_genes=20,
                                                      mad_pct_cnt_top_genes=5,
                                                      mad_pct_mt=5,
                                                      min_pct_mt=9),
                            )

df_mean_var = get_mean_and_variance_gene_expression(adata)
df_mean_var = df_mean_var.sort_values(by="mean", ascending=True)
gene_bins = get_bins_wrt_avg_gene_expression(df_mean_var["mean"], n_bins)
genes_2nd_last = gene_bins[gene_bins == (n_bins - 2)].index.tolist()
genes_last = gene_bins[gene_bins == (n_bins - 1)].index.tolist()
genes_of_lat_2_bins = gene_bins[(gene_bins == (n_bins - 1)) | (gene_bins == (n_bins - 2))].index.tolist()

In [None]:
all((genes_of_lat_2_bins[0:len(genes_of_lat_2_bins)//2] == genes_2nd_last, genes_of_lat_2_bins[len(genes_of_lat_2_bins)//2:] == genes_last))

In [None]:
print('Load entire PBMC dataset.')
entire_adata = load_pbmc_data()

In [None]:
def _check_bin_jump(list_subtypes):
    for subtype in list_subtypes:
        adata = entire_adata[entire_adata.obs['celltype.l3']==subtype,:].copy()
        adata = preprocess_dataset(adata,
                           params_cell_filtering=dict(mad_tot_cnt=5, 
                                                      mad_ngenes_cnt=5, 
                                                      nr_top_genes=20,
                                                      mad_pct_cnt_top_genes=5,
                                                      mad_pct_mt=5,
                                                      min_pct_mt=9),
                            )

        df_mean_var = get_mean_and_variance_gene_expression(adata)
        df_mean_var = df_mean_var.sort_values(by="mean", ascending=True)
        gene_bins = get_bins_wrt_avg_gene_expression(df_mean_var["mean"], n_bins)
        genes_2nd_last = gene_bins[gene_bins == (n_bins - 2)].index.tolist()
        genes_last = gene_bins[gene_bins == (n_bins - 1)].index.tolist()
        genes_of_lat_2_bins = gene_bins[(gene_bins == (n_bins - 1)) | (gene_bins == (n_bins - 2))].index.tolist()
        middle_separating_two_bins = all((genes_of_lat_2_bins[0:len(genes_of_lat_2_bins)//2] == genes_2nd_last, genes_of_lat_2_bins[len(genes_of_lat_2_bins)//2:] == genes_last))
        if middle_separating_two_bins:
            print(f'For subtype {subtype} the index of length//2 is separating the two bins')
        else:
            mid_p_1 = (len(genes_of_lat_2_bins)//2)+1
            mid_m_1 = (len(genes_of_lat_2_bins)//2)-1
            mid_p_1_separating_two_bins = all((genes_of_lat_2_bins[0:mid_p_1] == genes_2nd_last, genes_of_lat_2_bins[mid_p_1:] == genes_last))
            mid_m_1_separating_two_bins = all((genes_of_lat_2_bins[0:mid_m_1] == genes_2nd_last, genes_of_lat_2_bins[mid_m_1:] == genes_last))            
            
            if mid_p_1:
                print(f'For subtype {subtype} the index of length//2 +1 is separating the two bins')
            elif mid_m_1:
                print(f'For subtype {subtype} the index of length//2 -1 is separating the two bins')
            else:
                print(f'I AM LOST')
                

In [None]:
import random

my_list = ['B memory kappa', 'B naive kappa', 'B naive lambda', 'CD14 Mono', 'CD16 Mono', 'CD4 CTL', 'CD4 Naive', 'CD4 TCM_1', 'CD4 TCM_3', 'CD4 TEM_1', 'CD4 TEM_3', 'CD8 Naive', 'CD8 TEM_1', 'CD8 TEM_2', 'CD8 TEM_4', 'CD8 TEM_5', 'MAIT', 'NK_1', 'NK_2', 'NK_3', 'Platelet', 'cDC2_2']

# Specify the number of elements you want to sample
sample_size = 5

# Use random.sample to get a random subset of the list
sampled_subset = random.sample(my_list, sample_size)

print(sampled_subset)

In [None]:
_check_bin_jump(sampled_subset)