### Data preprocessing of scBERT (knowing that scGPT is required for variable gene selection)

In [None]:
import os
import scanpy as sc
import numpy as np
import pandas as pd
import anndata
import random

In [2]:
def make_kfold(anndata_give,name_column,n_fold): 
    '''
    Is a function that takes an Anndata as input, and creates a new column containing a group number. Equally distributed within each cell type.

    Keyword arguments:
    anndata_give -- Choose the anndata.
    name_column -- Choose a name for the column with the fold group name
    n_fold -- Choose the nuber of fold group.
    override -- Autorise or not to overwrite a column existing and replace it by the new column (default False)
    '''

    # Check if the anndata provided is a real anndata.
    assert type(anndata_give) == anndata._core.anndata.AnnData, "Error : Type of '" + anndata_give + "' is not an anndata." 
    #Check if the name_column is a string
    assert type(name_column) == str, "Error : Column name is not a string."
    # Check whether the n_fold supplied is a int.
    assert type(n_fold) == int, "Error : Type of n_fold is incorrect. Change it to a int."
    
    # Create a new column, which only have "None" inside
    anndata_give.obs[name_column] = None 

    # Loop over the cell types to create fold groups in each of them
    for cell_type in anndata_give.obs.celltype.unique():
        # Retrieve the list of cells of the current cell type
        cell_name_list = list(anndata_give.obs.index[anndata_give.obs.celltype == cell_type])
        # Monitors whether the number of cells within the cell type is sufficient
        if len(cell_name_list) < 20 :
            print("Error : Not enought cell in :", cell_type, "\nIt won't be taken into account for training and testing.")
            continue
        # Shuffle the list of cells to change their order
        random.shuffle(cell_name_list)
        # Split the list of cells into the required number of groups
        array_fold = np.array_split(np.array(cell_name_list),n_fold)
        # Assign to each fold group the index of their group
        for fold_index in range(len(array_fold)):
            anndata_give.obs.loc[array_fold[fold_index] , name_column] = fold_index

In [3]:
def run_finetuning(anndata_give, name_column, n_fold, list_type_cell=None):
    '''
    run_finetuning is a function that takes as input 1 anndata , a column and a fold number or a list of fold numbers, with a list of cell types to be excluded for the training phase.
    On output, the function provides 2 anndata, one for the test with the fold numbers chosen beforehand. And an anndata for training with the anndata not selected for the test phase and without the cell type list, if this has been supplied.
    
    Keyword arguments:
    anndata_give -- Choose the anndata.
    name_column -- Name of column with folds (group) name
    n_fold -- Provide (by a value, or a list) the folds for the test anndata
    list_type_cell -- List of cell types to be excluded from the training anndata. (default None)
    '''

    # Check if the anndata provided is a real anndata.
    assert type(anndata_give) == anndata._core.anndata.AnnData, "Error : Type of '" + anndata_give + "' is not an anndata." 
    #Check if the name_column is a string
    assert type(name_column) == str, "Error : Column name is not a string."
    # Check that the given column name exists in the anndata
    assert name_column in anndata_give.obs.columns , "Error : Column name don't exists."
    # Check whether the n_fold supplied can be read by the function.
    assert type(n_fold) == int or type(n_fold) == list , "Error : Type of n_fold is incorrect. Change it to a int or a list."
    # Check if "list_type_cell" is not none and if it's a list
    assert list_type_cell is None or type(list_type_cell) == list , "Error : " + list_type_cell + " is not a list."
 
    # Anndata creation block in case of a list.
    if type(n_fold) == list :
        # Go through the values in the list, and if one of them is not present in the column provided, display the error.
        for index in n_fold :
            assert anndata_give.obs[name_column].isin([index]).any() == True, "Error: The fold number in your list is not in " + name_column + " column."   
        # The values present in the list are scanned, separating the cells containing or not containing the fold number supplied as input. Either in the test or in the training.
        anndata_test = anndata_give[[k in n_fold for k in anndata_give.obs[name_column]]]
        # If no cell type list has been provided, then anndata_training returns only the groups that have not been given as input.
        if list_type_cell is None :
            anndata_training = anndata_give[[index not in n_fold for index in anndata_give.obs[name_column]]]
        # If a list of cell types has been provided, then anndata_training returns only the groups that have not been given as input and without the cell types present in the list.
        else :
            anndata_training = anndata_give[[index not in list_type_cell for index in anndata_give.obs.celltype]] and anndata_give[[index not in n_fold for index in anndata_give.obs[name_column]]]
            
    
    # Anndata creation block in case of a int.
    if type(n_fold) == int :
        # Check if the n_fold are not in colum provided, display the error.
        assert anndata_give.obs[name_column].isin([n_fold]).any() == True , "Error : The fold number is not present in " + name_column + " column."   
        # Separate the cells contain in name_column the n_fold in a new anndata name 'anndata_test' and the other in a anndata_training.
        anndata_test = anndata_give[anndata_give.obs[name_column] == n_fold]
        # If no cell type list has been provided, then anndata_training returns only the groups that have not been given as input.
        if list_type_cell is None :
            anndata_training = anndata_give[anndata_give.obs[name_column] != n_fold]
        # If a list of cell types has been provided, then anndata_training returns only the groups that have not been given as input and without the cell types present in the list.    
        else :
            anndata_training = anndata_give[[index not in list_type_cell for index in anndata_give.obs.celltype]] and anndata_give[anndata_give.obs[name_column] != n_fold] 
            
    return anndata_test, anndata_training

In [4]:
# Input path of the reference/raw data
PATH_PROJECT = "/mnt/DOSI/PLATEFORMES/BIOINFORMATIQUE/04_PROJECT/scLLM"
PATH_EXPERIMENT = os.path.join( PATH_PROJECT, "cross_tissue_immune_cell")
PATH_EXPERIMENT_REFERENCE = os.path.join( PATH_EXPERIMENT, "01_Reference")
PATH_EXPERIMENT_REFERENCE_EXTRA = os.path.join( PATH_EXPERIMENT_REFERENCE, "00_Dataset")
PATH_EXPERIMENT_OUTPUT = os.path.join( PATH_EXPERIMENT, "05_Output")

#File H5AD
PATH_INPUT_FILE = os.path.join( PATH_EXPERIMENT_REFERENCE_EXTRA, "cross_tissue_immune_cell.h5ad")

# File Variable Gene (do with preprocess from scGPT)
PATH_INPUT_FILE_GENE = os.path.join( PATH_EXPERIMENT_OUTPUT, "02a_GlobalHeterogenity")
PATH_INPUT_FILE_GENE_EXTRA = os.path.join( PATH_INPUT_FILE_GENE, "Variable_Gene.csv")

# Output path of the pre processed dataset
ANALYSIS_NAME = os.path.join( PATH_EXPERIMENT_OUTPUT, "02b_FilterData")
ANALYSIS_NAME_EXTRA = os.path.join( ANALYSIS_NAME, "Preprocess_Anndata_File_scBERT")

os.makedirs(os.path.dirname(PATH_OUTPUT_FILE_TEST_FOLD_0_ANNDATA), exist_ok = True)

# Constant to filter the minimum number of cells per cell type for annotation
MINIMAL_NUMBER_CELL_BY_TYPE = 40

In [5]:
# Read the reference/raw data
dataset_anndata = sc.read_h5ad(PATH_INPUT_FILE)

In [6]:
# Read the variable gene file and save it as a data frame
df_variable_gene = pd.read_csv(PATH_INPUT_FILE_GENE_EXTRA)

# Delete the column containing the numbers (which is useless in our case).
df_variable_gene = df_variable_gene.drop('Unnamed: 0', axis=1)

# Creation of a list of variable genes from the previous data frame
list_variable_gene = df_variable_gene['x'].tolist()

In [7]:
# Rename the column cell_type by celltype (In this case : rename cell_type by celltype)
dataset_anndata.obs.rename(columns={"cell_type" : "celltype"}, inplace=True)

In [8]:
# If this is not the case, we copy the previous index and put it in a column to avoid losing information.
dataset_anndata.var["index_column"] = dataset_anndata.var.index

# We replace the var name
dataset_anndata.var.rename(columns={"feature_name" : "gene_name"}, inplace=True)

# Move the "gene_name" column to index. (This overwrites the previous column)
dataset_anndata.var = dataset_anndata.var.set_index("gene_name")

AnnData expects .var.index to contain strings, but got values like:
    ['MIR1302-2HG', 'FAM138A', 'OR4F5', 'ENSG00000238009.6', 'ENSG00000239945.1']

    Inferred to be: categorical

  value_idx = self._prep_dim_index(value.index, attr)


In [9]:
# Loop over the cell types to create fold groups in each of them
for celltype in dataset_anndata.obs.celltype.unique():
    # Retrieve the list of cells of the current cell type
    cell_name_list = list(dataset_anndata.obs.index[dataset_anndata.obs.celltype == celltype])
    # Monitors whether the number of cells within the cell type is sufficient
    if len(cell_name_list) < MINIMAL_NUMBER_CELL_BY_TYPE :
        # Remove the all the cell from the cell type
        print("Warning: The cell type ", celltype, " is removed from the dataset because it does not contain enough cells (", MINIMAL_NUMBER_CELL_BY_TYPE, ").", sep='')
        dataset_anndata = dataset_anndata[~dataset_anndata.obs.celltype.str.contains(celltype)]



In [10]:
# Only genes in the list are kept in the anndata
dataset_anndata = dataset_anndata[:,[gene for gene in list_variable_gene if gene in dataset_anndata.var.index]]

In [13]:
# We run the function to create multiple fold.
make_kfold(dataset_anndata, "K_Fold", 5)

  anndata_give.obs[name_column] = None


In [17]:
# The output of this second preprocess data is for scBERT. He get every file in output to use them one per one after.
for i in range (0,5):
    dataset_anndata_test, dataset_anndata_training = run_finetuning(dataset_anndata, "K_Fold", i)
    
    dataset_anndata_test.obs.drop(["K_Fold"], inplace=True, axis = 1)
    dataset_anndata_training.obs.drop(["K_Fold"], inplace=True, axis = 1)
    
    dataset_anndata_test.write_h5ad(os.path.join( ANALYSIS_NAME_EXTRA, "Cross_Tissue_test_FOLD_"+str(i)+"_Preprocess.h5ad"))
    dataset_anndata_training.write_h5ad(os.path.join( ANALYSIS_NAME_EXTRA, "Cross_Tissue_Training_FOLD_"+str(i)+"_Preprocess.h5ad"))