## 05_1. Clustering and DEG Analysis

<div style="text-align: left;">
    <p style="text-align: left;">Updated Time: 2025-02-11</p>
</div>

##### Load libraries

In [None]:
import os
import sys
import warnings
import numpy as np
import pandas as pd

import anndata as ad
import scanpy as sc
import omicverse as ov
from pyclustree import clustree

# Needed for some plotting
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib.pyplot import rc_context
ov.plot_set()

import warnings
warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=DeprecationWarning)
warnings.simplefilter(action="ignore", category=UserWarning)

##### Set working directory  for analysis

In [None]:
working_dir = '/media/bio/Disk/Research Data/EBV/omicverse'
os.chdir(working_dir)
updated_dir = os.getcwd()
print("Updated working directory: ", updated_dir)

from pathlib import Path
saving_dir = Path('Results/05.celltype_annotation')
saving_dir.mkdir(parents=True, exist_ok=True)

##### Reading in integrated AnnData object

In [None]:
adata = sc.read("Processed Data/scRNA_Batch_All.h5ad")
adata

In [None]:
print(np.min(adata.X), np.max(adata.X))

###  Re-perform umap and re-do clustering with different resolutions using intergrated data

##### Data has been intergrated previously
Integration benchmarking indicates that Harmony outperformed other integration methods in terms of batch correction and biological preservation, making it the preferred choice for downstream analysis.

In [None]:
adata.raw = adata.copy() # This saves the raw count data in adata.raw

In [None]:
ov.pp.neighbors(adata, n_neighbors=15, n_pcs=20, use_rep='X_harmony')
ov.pp.umap(adata)

In [None]:
# Run Leiden clustering in default resolution, which is adequate for first-round clustering based our experience.
ov.pp.leiden(adata,resolution=1, key_added='leiden_1_0')

In [None]:
ov.pl.embedding(adata,
                basis='X_umap',
                color=['leiden_1_0'],
                ncols=1,
                frameon='small')

### Cluster-specific Differentially Expressed Genes analysis

In [None]:
sc.tl.dendrogram(adata,'leiden_1_0',use_rep='scaled|original|X_pca')

Let us compute a ranking for the highly differential genes in each cluster. For this, by default, the .raw attribute of AnnData is used in case it has been initialized before. The simplest and fastest method to do so is the t-test.

In [None]:
sc.tl.rank_genes_groups(adata, 'leiden_1_0', use_rep='scaled|original|X_pca',
                        method='t-test',use_raw=False,key_added='leiden_1_0_ttest')
sc.pl.rank_genes_groups_dotplot(adata,groupby='leiden_1_0',
                                cmap='Spectral_r',key='leiden_1_0_ttest',
                                standard_scale='var',n_genes=3)

Output the marker list as pandas dataframe:

In [None]:
ttest_marker_gene=pd.DataFrame(adata.uns['leiden_1_0_ttest']['names'])
ttest_marker_gene.head()

In [None]:
ttest_marker_gene=ttest_marker_gene.head(50)
ttest_marker_gene.to_csv('Results/05.celltype_annotation/ttest_marker_gene_leiden_1_0.csv', index=False)

cosg is also considered to be a better algorithm for finding marker genes. Here, omicverse provides the calculation of cosg

Paper: Accurate and fast cell marker gene identification with COSG

Code: https://github.com/genecell/COSG

In [None]:
import scipy.sparse
if scipy.sparse.issparse(adata.X):
    adata.X = adata.X.toarray()

adata.uns['log1p'] = {'base': None}  

In [None]:
sc.tl.rank_genes_groups(adata, groupby='leiden_1_0', 
                        method='t-test',use_rep='scaled|original|X_pca',)
ov.single.cosg(adata, key_added='leiden_1_0_cosg', groupby='leiden_1_0')
sc.pl.rank_genes_groups_dotplot(adata,groupby='leiden_1_0',
                                use_raw=True,
                                cmap='Spectral_r',key='leiden_1_0_cosg',
                                standard_scale='var',n_genes=3)

Output the marker list as pandas dataframe:

In [None]:
cosg_marker_gene=pd.DataFrame(adata.uns['leiden_1_0_cosg']['names'])
cosg_marker_gene.head()

In [None]:
cosg_marker_gene=cosg_marker_gene.head(50)
cosg_marker_gene.to_csv('Results/05.celltype_annotation/cosg_marker_gene_leiden_1_0.csv', index=False)

#### Automatic cell type annotation with GPT/Other

In [None]:
top_genes_cluster_lists = {str(col): ttest_marker_gene[col].tolist() for col in ttest_marker_gene.columns}
top_genes_cluster_lists
top_genes_cluster_lists


In [None]:
import requests
import os
import numpy as np
import pandas as pd

# Codes were modified from omicverse
def gpt4celltype(input_data, tissuename=None, speciename='human',
                provider='qwen', model='qwen-plus', topgenenumber=20,
                base_url=None):
    """
    Annotation of cell types using AGI model.

    Arguments:
        input: dict, input dictionary with clusters as keys and gene markers as values.
        tissuename: str, tissue name.
        provider: str, provider of the model. Default: 'qwen', you can select from ['openai','kimi','qwen'] now.

    """
    input=input_data.copy()
    input_data=input
    del_k=[]
    for k in input_data:
        if len(input_data[k])==0:
            del_k.append(k)
    for k in del_k:
        del input[k]
    
    if base_url is None:
        if provider == 'openai':
            base_url = "https://api.openai.com/v1"
        elif provider == 'kimi':
            base_url = "https://api.moonshot.cn/v1"
        elif provider == 'qwen':
            base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"

    QWEN_API_KEY = os.getenv("AGI_API_KEY")
    if QWEN_API_KEY == "":
        print("Note: AGI API key not found: returning the prompt itself.")
        API_flag = False
    else:
        API_flag = True

    if isinstance(input, dict):
        input = {k: 'unknown' if not v else ','.join(v[:topgenenumber]) for k, v in input.items()}
    elif isinstance(input, pd.DataFrame):
        # Filter genes with positive log fold change and group by cluster, selecting top genes
        input = input[input['logfoldchanges'] > 0]
        input = input.groupby('cluster')['names'].apply(lambda x: ','.join(x.iloc[:topgenenumber]))
    else:
        raise ValueError("Input must be either a dictionary of lists or a pandas DataFrame.")

    
    if not API_flag:
        message = f'Identify the most probable cell types of {speciename} {tissuename} tissue using the following gene markers separately for each row, which were ranked by log-foldchange and p value. Please return the cell subtype annotation in unified singular form without any other information and blanks, such as NK cell, Macrophage, T cell, etc. Do not show numbers before the name. Some can be a mixture of multiple cell types or not specific to a single cell type and label them as Unknown.\n' + '\n'.join([f'{k}: {v}' for k, v in input.items()])
        return message
    else:
        print("Note: AGI API key found: returning the cell type annotations.")
        
        headers = {
            "Authorization": f"Bearer {QWEN_API_KEY}",
        }
        
        cutnum = int(np.ceil(len(input) / 30))
        if cutnum > 1:
            cid = np.digitize(range(len(input)), bins=np.linspace(0, len(input), cutnum + 1))
        else:
            cid = np.ones(len(input), dtype=int)
        
        allres = {}
        from tqdm import tqdm
        for i in tqdm(range(1, cutnum + 1)):
            id_list = [j for j, x in enumerate(cid) if x == i]
            flag = False
            while not flag:
                messages = [{"role": "user", 
                             "content": f'Identify the most probable cell types of {speciename} {tissuename} tissue using the following gene markers separately for each row, which were ranked by log-foldchange and p value. Please return the cell subtype annotation in unified singular form without any other information and blanks, such as NK cell, Macrophage, T cell, etc. Do not show numbers before the name. Some can be a mixture of multiple cell types or not specific to a single cell type and label them as Unknown.\n' + '\n'.join([input[list(input.keys())[j]] for j in id_list if input[list(input.keys())[j]] != 'unknown'])}]
                
                params = {
                    "model": model,
                    "messages": messages
                }
                
                
                response = requests.post(
                    f"{base_url}/chat/completions",
                    headers=headers,
                    json=params,
                    stream=False
                )
                
                res = response.json()
                
                if 'choices' in res and len(res['choices']) > 0:
                    res_content = res['choices'][0]['message']['content'].split('\n')
                    #print(res_content)
                    if len(res_content) == len(id_list):
                        flag = True
                        for idx, cell_type in zip(id_list, res_content):
                            key = list(input.keys())[idx]
                            allres[key] = 'unknown' if input[key] == 'unknown' else cell_type.strip(',')
        
        print('Note: It is always recommended to check the results returned by GPT-4 in case of AI hallucination, before going to downstream analysis.')
        for k in del_k:
            allres[k]='Unknown'
        return allres

In [None]:
os.environ['AGI_API_KEY'] = 'sk-921dec7b8cff42ee805d6eeebca5eec6'  # model='qwen-plus', provider='qwen',

gpt_result = gpt4celltype(top_genes_cluster_lists, tissuename='nasopharyngeal', speciename='human',
                     model='qwen-max-2025-01-25', provider='qwen', # deepseek-v3 qwen-max-2025-01-25
                     topgenenumber=50)
gpt_result

In [None]:
# Removing trailing spaces from the dictionary values
gpt_result = {key: value.strip() for key, value in gpt_result.items()}

# Display the cleaned result
gpt_result

In [None]:
adata.obs['gpt_celltype'] = adata.obs['leiden_1_0'].map(gpt_result).astype('category')

In [None]:
# Let's check the unique categories in `adata.obs['gpt_celltype']` 
adata.obs['gpt_celltype'].value_counts()

In [None]:
plt.rcParams['figure.figsize'] = [5, 5]
ov.pl.embedding(adata,
                basis='X_umap',
                color=['gpt_celltype'],
                ncols=1,
                frameon='small')
plt.savefig('Results/05.celltype_annotation/05.X_umap_gpt_celltype.pdf', format='pdf', bbox_inches='tight')
plt.show()

Check leiden cluster and auto-annotated celltype

In [None]:
crosstab_data = adata.obs.groupby(['gpt_celltype', 'leiden_1_0']).size().unstack(fill_value=0)
crosstab_data.to_csv('Results/05.celltype_annotation/automated_annotation_leiden_1_0_crosstab.csv', index=True)

#### Save AnnData object with automated celltype annotation

In [None]:
adata = adata.raw.to_adata() # This recovers the raw count data in adata.X

In [None]:
adata

In [None]:
print(np.min(adata.X), np.max(adata.X))

In [None]:
adata.write_h5ad("Processed Data/scRNA_Reclustering_AutoAnnotation.h5ad")


**<span style="font-size:16px;">Session information：</span>**

In [None]:
import sys
import platform
import pkg_resources

# Get Python version information
python_version = sys.version
# Get operating system information
os_info = platform.platform()
# Get system architecture information
architecture = platform.architecture()[0]
# Get CPU information
cpu_info = platform.processor()
# Print Session information
print("Python version:", python_version)
print("Operating system:", os_info)
print("System architecture:", architecture)
print("CPU info:", cpu_info)

# Print imported packages and their versions
print("\nImported packages and their versions:")
for package in pkg_resources.working_set:
    print(package.key, package.version)