In [None]:
import numpy as np
import pandas as pd
from pathlib import Path
from tqdm.auto import tqdm
import yaml

In [None]:
import matplotlib.pyplot as plt
# plt.rcParams['font.family'] = 'Times New Roman'
plt.rcParams['font.family'] = 'Arial'

import anndata as ad
import scanpy as sc
# sc.settings.verbosity = 3
# sc.logging.print_versions()
Path("results/figures").mkdir(parents=True, exist_ok=True)
Path("results/data").mkdir(parents=True, exist_ok=True)
figure_type = 'svg'
sc.settings.figdir = "results/figures"
sc.settings.set_figure_params(fontsize=12, color_map='RdYlGn', dpi=80, dpi_save=1000)

In [None]:
import sys
sys.path.extend(['../../mylibs'])

import scAnalysis_util

In [None]:
samples = {
    "ZT-410": {
        "path": Path("../../data/ZT-410-velocyto/").absolute(),
        "name": "no-infection",
    },
    "ZT-485": {
        "path": Path("../../data/ZT-485-velocyto/").absolute(),
        "name": "9h-infection", #no-infection-1
    },
    "ZT-486": {
        "path": Path("../../data/ZT-486-velocyto/").absolute(),
        "name": "12h-infection", #1h-infection
    },
    "ZT-487": {
        "path": Path("../../data/ZT-487-velocyto/").absolute(),
        "name": "1h-infection", #3h-infection
    },
    "ZT-488": {
        "path": Path("../../data/ZT-488-velocyto/").absolute(),
        "name": "3h-infection", #6h-infection
    },
    "ZT-490": {
        "path": Path("../../data/ZT-490-velocyto/").absolute(),
        "name": "6h-infection", #9h-infection
    },
    "ZT-491": {
        "path": Path("../../data/ZT-491-velocyto/").absolute(),
        "name": "7.5h-infection", #12h-infection
    },
}

adatas = {}
for sample_id, sample_info in samples.items():
    sample_path = sample_info["path"]
    sample_name = sample_info["name"]
    solo_out = sample_path / "starsolo_outputs/Solo.out/GeneFull/filtered"
    solo_out_raw = sample_path / "starsolo_outputs/Solo.out/GeneFull/raw"

    sample_adata = sc.read_h5ad(solo_out / "matrix.stats.velocyto.h5ad")
    sample_adata.X = sample_adata.X.astype('float64')
    sample_adata.var_names = sample_adata.var['gene_name'].apply(lambda x: x if x and str(x).strip() else None).fillna(sample_adata.var['gene_ids'])
    sample_adata.var_names_make_unique()
    adatas[sample_name] = sample_adata

adata = ad.concat(adatas, label="sample", join="outer", merge="first")
adata.obs_names_make_unique()
print(adata.obs["sample"].value_counts())
adata

In [None]:
adata

In [None]:
adata.obs['sample'].unique

In [None]:
sample_names = [
    'no-infection',
    '9h-infection',
    '12h-infection',
    '1h-infection',
    '3h-infection',
    '6h-infection',
    '7.5h-infection',
]

In [None]:
import pandas as pd

# 你的 11 个基因
genes_of_interest = [
    'VP1', 'VP2', 'VP3', 'VP4', 'NSP1', 'VP6',
    'NSP3', 'NSP2', 'VP7', 'NSP4', 'NSP5/6'
]

# 明确 sample 名称
sample_names = [
    'no-infection',
    '9h-infection',
    '12h-infection',
    '1h-infection',
    '3h-infection',
    '6h-infection',
    '7.5h-infection'
]

# 检查基因名字在 adata.var_names 里都存在
genes_exist = [g for g in genes_of_interest if g in adata.var_names]

result = pd.DataFrame(index=sample_names, columns=genes_exist)

for sample in sample_names:
    # 选出该sample的细胞索引
    sample_idx = adata.obs['sample'] == sample
    for gene in genes_exist:
        # 统计这个sample内所有细胞该基因的总UMI
        result.loc[sample, gene] = adata[sample_idx, gene].X.sum()

print(result)

In [None]:
import pandas as pd
import numpy as np

# 核心基因列表和 sample 名
genes_of_interest = [
    'VP1', 'VP2', 'VP3', 'VP4', 'NSP1', 'VP6',
    'NSP3', 'NSP2', 'VP7', 'NSP4', 'NSP5/6'
]
sample_names = [
    'no-infection', '1h-infection', '3h-infection', '6h-infection',
    '7.5h-infection', '9h-infection', '12h-infection'
]


# 检查基因都在 adata 里
genes_exist = [g for g in genes_of_interest if g in adata.var_names]

count_dict = {}
for sample in sample_names:
    sample_idx = adata.obs['sample'] == sample
    # 只取感兴趣基因的表达矩阵
    sub_X = adata[sample_idx, genes_exist].X
    # 稀疏矩阵需用 .A 转为array（如不报错可省略）
    arr = sub_X.A if hasattr(sub_X, 'A') else sub_X
    # axis=1：每个细胞横向求和，>0代表该细胞至少有1个目标基因表达
    count = np.sum(arr.sum(axis=1) > 0)
    count_dict[sample] = count

# 整理成DataFrame展示
cell_count_df = pd.DataFrame.from_dict(count_dict, orient='index', columns=['cell_number_with_target_genes'])
print(cell_count_df)

In [None]:
#### Drop sum_umi_count(gene_id) == 0
count = adata.X.sum(axis=0)
count = np.array(count).flatten()
index = np.where(count>0)[0]
adata = adata[:, index].copy()

In [None]:
sc.pl.highest_expr_genes(adata, n_top=40)

In [None]:
# Remove MT-RNR1 and MT-RNR2 genes from adata
genes_to_remove = ['MT-RNR1', 'MT-RNR2']
mask = ~adata.var_names.isin(genes_to_remove)
adata = adata[:, mask].copy()
print(f"Removed {len(genes_to_remove)} genes. New shape: {adata.shape}")

In [None]:
#### Quality Control
# mitochondrial genes
adata.var["mt"] = adata.var_names.str.lower().str.startswith((
    "mt-"
))
# ribosomal genes
adata.var["ribo"] = adata.var_names.str.lower().str.startswith((
    "rps", "rpl"
))
# hemoglobin genes
adata.var["hb"] = adata.var_names.str.lower().str.contains('^hb[abgdez]$')

In [None]:
sc.pp.calculate_qc_metrics(adata, qc_vars=["mt", "ribo", "hb"], log1p=True, inplace=True)

In [None]:
sc.pl.violin(
    adata,
    ["n_genes_by_counts", "total_counts", "pct_counts_mt"],
    jitter=0.4,
    multi_panel=True
)

In [None]:
sc.pp.filter_cells(adata, min_genes=100)
sc.pp.filter_genes(adata, min_cells=3)
adata

In [None]:
adata_raw = adata.copy()

In [None]:
adata = adata_raw.copy()
adata = adata[
    (adata.obs.n_genes_by_counts > 0) &
    (adata.obs.n_genes_by_counts < 6000) &
    (adata.obs.total_counts < 30000) &
    (adata.obs.pct_counts_mt < 30)
, :].copy()

In [None]:
adata

In [None]:
import pandas as pd

# 你的 11 个基因
genes_of_interest = [
    'VP1', 'VP2', 'VP3', 'VP4', 'NSP1', 'VP6',
    'NSP3', 'NSP2', 'VP7', 'NSP4', 'NSP5/6'
]

# 明确 sample 名称
sample_names = [
    'no-infection', '1h-infection', '3h-infection', '6h-infection',
    '7.5h-infection', '9h-infection', '12h-infection'
]

# 检查基因名字在 adata.var_names 里都存在
genes_exist = [g for g in genes_of_interest if g in adata.var_names]

result = pd.DataFrame(index=sample_names, columns=genes_exist)

for sample in sample_names:
    # 选出该sample的细胞索引
    sample_idx = adata.obs['sample'] == sample
    for gene in genes_exist:
        # 统计这个sample内所有细胞该基因的总UMI
        result.loc[sample, gene] = adata[sample_idx, gene].X.sum()

print(result)

In [None]:
import pandas as pd
import numpy as np

# 核心基因列表和 sample 名
genes_of_interest = [
    'VP1', 'VP2', 'VP3', 'VP4', 'NSP1', 'VP6',
    'NSP3', 'NSP2', 'VP7', 'NSP4', 'NSP5/6'
]
sample_names = [
    'no-infection', '1h-infection', '3h-infection', '6h-infection',
    '7.5h-infection', '9h-infection', '12h-infection'
]


# 检查基因都在 adata 里
genes_exist = [g for g in genes_of_interest if g in adata.var_names]

count_dict = {}
for sample in sample_names:
    sample_idx = adata.obs['sample'] == sample
    # 只取感兴趣基因的表达矩阵
    sub_X = adata[sample_idx, genes_exist].X
    # 稀疏矩阵需用 .A 转为array（如不报错可省略）
    arr = sub_X.A if hasattr(sub_X, 'A') else sub_X
    # axis=1：每个细胞横向求和，>0代表该细胞至少有1个目标基因表达
    count = np.sum(arr.sum(axis=1) > 0)
    count_dict[sample] = count

# 整理成DataFrame展示
cell_count_df = pd.DataFrame.from_dict(count_dict, orient='index', columns=['cell_number_with_target_genes'])
print(cell_count_df)

In [None]:
print(adata.obs["sample"].value_counts())

In [None]:
sc.pl.scatter(adata, x="total_counts", y="pct_counts_mt", color="pct_counts_mt")
sc.pl.scatter(adata, x="total_counts", y="n_genes_by_counts", color="pct_counts_mt")

In [None]:
#### Normalization
adata.layers["counts"] = adata.X.copy()  # Store raw counts in a layer for highly variable genes
sc.pp.normalize_total(adata)  # Normalizing to median total counts
sc.pp.log1p(adata)  # Logarithmize the data

In [None]:
#### Highly Variable Genes
sc.pp.highly_variable_genes(adata, flavor='cell_ranger', n_top_genes=2000, batch_key="sample")
sc.pl.highly_variable_genes(adata)

In [None]:
# Save raw expression values before variable gene subset, this will be used for regress_out and scale
adata.raw = adata

In [None]:
adata

In [None]:
sc.tl.pca(adata, n_comps=80)

In [None]:
sc.pl.pca_variance_ratio(adata, n_pcs=80, log=True, show=True)
sc.pl.pca(
    adata,
    color=["sample", "sample", "pct_counts_mt", "pct_counts_mt"],
    dimensions=[(0, 1), (2, 3), (0, 1), (2, 3)],
    ncols=2,
    size=2,
)

In [None]:
sc.external.pp.harmony_integrate(adata, "sample")
sc.external.pp.scanorama_integrate(adata, "sample")
adata_raw = adata.copy()

In [None]:
adata = adata_raw.copy()

sc.pp.neighbors(adata, n_pcs=15, n_neighbors=20, metric="manhattan")
sc.tl.leiden(adata, flavor="igraph", key_added="leiden", resolution=0.3)
sc.tl.umap(adata, min_dist=0.3, spread=3.0)
sc.pl.umap(adata, color=["sample", "leiden"])

In [None]:
sc.tl.leiden(adata, flavor="igraph", key_added="leiden_0_2", resolution=0.2)
sc.pl.umap(adata, color=["leiden_0_2"])

In [None]:
gene_list = ['VP1', 'VP2', 'VP3', 'VP4', 'NSP1', 'VP6', 'NSP3', 'NSP2', 'VP7', 'NSP4', 'NSP5/6']

# 选取测量矩阵
expr = adata.to_df()[gene_list]

# 新增一列，任意基因>0，则为1，否则为0
adata.obs['virus_nonzero'] = (expr > 0).any(axis=1).astype(int)

In [None]:
sc.pl.umap(adata, color=['virus_nonzero'], cmap='coolwarm', vcenter=0)

In [None]:
import scanpy as sc
import matplotlib.pyplot as plt

# 定义类别和颜色，其中 non-infection 为红色，其余为灰色
sample_palette = {
    'no-infection': "#3a4dc0",      # 突出显示
    # 其它类别都设为灰色
    **{cat: "#d7dbe3" for cat in adata.obs['sample'].unique() if cat != 'no-infection'}
}

sc.pl.umap(
    adata,
    color='sample',
    palette=sample_palette,
    show=True
)

In [None]:
import scanpy as sc
import matplotlib.pyplot as plt

# 定义类别和颜色，其中 non-infection 为红色，其余为灰色
sample_palette = {
    '1h-infection': "#3a4dc0",      # 突出显示
    # 其它类别都设为灰色
    **{cat: "#d7dbe3" for cat in adata.obs['sample'].unique() if cat != '1h-infection'}
}

sc.pl.umap(
    adata,
    color='sample',
    palette=sample_palette,
    show=True
)

In [None]:
import scanpy as sc
import matplotlib.pyplot as plt

# 定义类别和颜色，其中 non-infection 为红色，其余为灰色
sample_palette = {
    '3h-infection': "#3a4dc0",      # 突出显示
    # 其它类别都设为灰色
    **{cat: "#d7dbe3" for cat in adata.obs['sample'].unique() if cat != '3h-infection'}
}

sc.pl.umap(
    adata,
    color='sample',
    palette=sample_palette,
    show=True
)

In [None]:
import scanpy as sc
import matplotlib.pyplot as plt

# 定义类别和颜色，其中 non-infection 为红色，其余为灰色
sample_palette = {
    '6h-infection': "#3a4dc0",      # 突出显示
    # 其它类别都设为灰色
    **{cat: "#d7dbe3" for cat in adata.obs['sample'].unique() if cat != '6h-infection'}
}

sc.pl.umap(
    adata,
    color='sample',
    palette=sample_palette,
    show=True
)

In [None]:
import scanpy as sc
import matplotlib.pyplot as plt

# 定义类别和颜色，其中 non-infection 为红色，其余为灰色
sample_palette = {
    '7.5h-infection': "#3a4dc0",      # 突出显示
    # 其它类别都设为灰色
    **{cat: "#d7dbe3" for cat in adata.obs['sample'].unique() if cat != '7.5h-infection'}
}

sc.pl.umap(
    adata,
    color='sample',
    palette=sample_palette,
    show=True
)

In [None]:
import scanpy as sc
import matplotlib.pyplot as plt

# 定义类别和颜色，其中 non-infection 为红色，其余为灰色
sample_palette = {
    '9h-infection': "#3a4dc0",      # 突出显示
    # 其它类别都设为灰色
    **{cat: "#d7dbe3" for cat in adata.obs['sample'].unique() if cat != '9h-infection'}
}

sc.pl.umap(
    adata,
    color='sample',
    palette=sample_palette,
    show=True
)

In [None]:
import scanpy as sc
import matplotlib.pyplot as plt

# 定义类别和颜色，其中 non-infection 为红色，其余为灰色
sample_palette = {
    '12h-infection': "#3a4dc0",      # 突出显示
    # 其它类别都设为灰色
    **{cat: "#d7dbe3" for cat in adata.obs['sample'].unique() if cat != '12h-infection'}
}

sc.pl.umap(
    adata,
    color='sample',
    palette=sample_palette,
    show=True
)

注释 leiden_0_2

In [None]:
adata_raw = adata.copy()

In [None]:
adata = adata_raw.copy()
select_leiden = "leiden_0_2"

In [None]:
sc.tl.rank_genes_groups(adata, select_leiden, use_raw=False, method="wilcoxon")
sc.pl.rank_genes_groups(adata, n_genes=15, sharey=False)
top_markers = pd.DataFrame(adata.uns['rank_genes_groups']['names']).head(15)
top_markers.columns = [f'cluster{i}' for i in range(10)]
top_markers = top_markers.transpose()
top_markers = top_markers.reset_index()
top_markers = top_markers.rename(columns={"index": "cluster"})
print(top_markers)

In [None]:
top_markers.to_csv('top_markers.csv', index=False)

In [None]:
# 将结果转换为 DataFrame.
marker_genes_df = sc.get.rank_genes_groups_df(adata, group=None, key='rank_genes_groups')
print(marker_genes_df.head())

In [None]:
#筛选显著性的结果
significant_markers = marker_genes_df[(marker_genes_df['pvals_adj'] < 0.05) & (marker_genes_df['logfoldchanges'] > 1)]
significant_markers

In [None]:

# 筛选names列中以IF开头的基因，并用逗号分隔加上单引号
if_genes = significant_markers[significant_markers['names'].str.startswith('IF')]['names']
output = ','.join([f"'{gene}'" for gene in if_genes])
print(output)


In [None]:
# names_line = ",".join([f"'{gene}'" for gene in significant_markers['names'].drop_duplicates()])
# print(names_line)

In [None]:
sc.pl.umap(adata, color= ['IFI44L','IFIT2','IFFO2','IFIT1'], cmap='coolwarm', vcenter=0)

In [None]:
sc.pl.umap(adata, color= ['RSAD2','OAS2','HERC5'], cmap='coolwarm', vcenter=0)

In [None]:
sc.pl.umap(adata, color= ['IFNL1','IFNL2','IFNL3','IFNB1'], cmap='coolwarm', vcenter=0)

In [None]:
sc.pl.umap(adata, color= ['ARHGEF38'], cmap='coolwarm', vcenter=0)

以时间做差异分析

In [None]:
adata

In [None]:
sc.tl.rank_genes_groups(adata, groupby='sample', use_raw=False, method="wilcoxon")
sc.pl.rank_genes_groups(adata, n_genes=15, sharey=False)

top_markers = pd.DataFrame(adata.uns['rank_genes_groups']['names']).head(15)
top_markers.columns = [f'cluster{i}' for i in range(7)]
top_markers = top_markers.transpose()
top_markers = top_markers.reset_index()
top_markers = top_markers.rename(columns={"index": "cluster"})
print(top_markers)
top_markers.to_csv('top_markers_timeline.csv', index=False)

In [None]:
# 将结果转换为 DataFrame.
marker_genes_df = sc.get.rank_genes_groups_df(adata, group=None, key='rank_genes_groups')
print(marker_genes_df.head())

In [None]:
marker_genes_df

In [None]:
significant_markers_up = marker_genes_df[(marker_genes_df['pvals_adj'] < 0.05) & (marker_genes_df['logfoldchanges'] > 1)]
significant_markers_up
significant_markers_up.to_csv("significant_markers_up.csv", index=False)

In [None]:
significant_markers_down = marker_genes_df[(marker_genes_df['pvals_adj'] < 0.05) & (marker_genes_df['logfoldchanges'] < -1)]
significant_markers_down
significant_markers_down.to_csv("significant_markers_down.csv", index=False)

In [None]:
significant_markers = marker_genes_df[(marker_genes_df['pvals_adj'] < 0.05) & (marker_genes_df['logfoldchanges'] > 1)]
significant_markers

In [None]:
#筛选显著性的结果
significant_markers = marker_genes_df[(marker_genes_df['pvals_adj'] < 0.05) & (marker_genes_df['logfoldchanges'] > 1)]
# ... 前面的代码 ...

# 获取感兴趣基因的biotype信息
gene_info = adata.var.loc[adata.var['gene_name'].isin(significant_markers['names']),
                          ['gene_biotype']]

# 合并gene_biotype信息到significant_markers，生成新的一列
significant_markers = significant_markers.merge(
    gene_info, left_on='names', right_on='gene_name', how='left'
)

# 先获取当前列顺序
cols = list(significant_markers.columns)
# 移除gene_biotype
cols.remove('gene_biotype')
# 在names后插入gene_biotype
names_idx = cols.index('names')
cols = cols[:names_idx+1] + ['gene_biotype'] + cols[names_idx+1:]
# 重新排列
significant_markers = significant_markers[cols]
significant_markers

In [None]:


# 假设 significant_markers 已经存在
grouped = {name: df for name, df in significant_markers.groupby('group')}

# 这样变量 grouped 是一个字典，key 为 group 名，value 为对应的 dataframe
# 你也可以分别赋值：
df_group1 = grouped['no-infection']    # 替换为具体的 group 名称
df_group2 = grouped['1h-infection']
df_group3 = grouped['3h-infection']
df_group4 = grouped['6h-infection']
df_group5 = grouped['7.5h-infection']
df_group6 = grouped['9h-infection']
df_group7 = grouped['12h-infection']

sample_names = [
    'no-infection', '1h-infection', '3h-infection', '6h-infection',
    '7.5h-infection', '9h-infection', '12h-infection'
]

In [None]:
df_group7


In [None]:
significant_markers

In [None]:
# 假设 significant_markers['gene_name'] 包含感兴趣基因
gene_info = adata.var.loc[adata.var['gene_name'].isin(significant_markers['names']),
                          ['gene_name', 'gene_biotype']]
print(gene_info)
# 将结果保存到 CSV 文件

for biotype, group in gene_info.groupby("gene_biotype"):
    filename = f"significant_markers_info_{biotype}.csv"
    group.to_csv(filename, index=False)


In [None]:

import matplotlib.pyplot as plt

sc.pl.violin(adata, ["IFI44L", "IFIT1", "OAS2"], groupby="sample", stripplot=False, show=False)
plt.gca().set_xticklabels(["0h", "1h", "3h", "6h", "7.5h", "9h", "12h"])
plt.show()


In [None]:
#piRNA
sc.pl.violin(adata, ["piR-31985", "piR-34871", "piR-35548-1"], groupby="sample", stripplot=False, show=False)
plt.gca().set_xticklabels(["0h", "1h", "3h", "6h", "7.5h", "9h", "12h"])
plt.show()

In [None]:
#tRNA
sc.pl.violin(adata, ['MT-TV','MT-TR',
                     'RNU4-2', #snRNA
                     'ENSG00000280441',
                     'ENSG00000287862',
                     'ENSG00000278996'
                     ], groupby="sample", stripplot=False, show=False)
plt.gca().set_xticklabels(["0h", "1h", "3h", "6h", "7.5h", "9h", "12h"])
plt.show()

In [None]:
#rRNA_pseudogene
sc.pl.violin(adata, ['RNA5SP149','RNA5SP216','RNA5SP389'], groupby="sample", stripplot=False, show=False)

plt.gca().set_xticklabels(["0h", "1h", "3h", "6h", "7.5h", "9h", "12h"])
plt.show()

In [None]:
#unprocessed_pseudogene
sc.pl.violin(adata, [
                    'MTND2P28','MTCO1P12','MTATP6P1'], groupby="sample", stripplot=False, show=False)

plt.gca().set_xticklabels(["0h", "1h", "3h", "6h", "7.5h", "9h", "12h"])
plt.show()

In [None]:
import csv

input_file = "significant_markers_info_lncRNA.csv"

genes = []
with open(input_file, newline='', encoding='utf-8') as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
        gene_name = row['gene_name'].strip()
        genes.append(f"'{gene_name}'")

result = ",".join(genes)

print(result)

In [None]:
#significant_markers_info_processed_pseudogene

sc.pl.violin(
    adata,
    ['LINC00486', 'PVT1', 'NEAT1', 'LINC02964','DANT2','CLDN10-AS1','EOLA2-DT','WARS2-AS1','LINC01019',
     'PCAT1','LNC-LBCS','LINC00486', 'LINC01019'
    ],
    groupby="sample",
    stripplot=False,
    show=False
)

plt.gca().set_xticklabels(["0h", "1h", "3h", "6h", "7.5h", "9h", "12h"])
plt.show()

In [None]:
sc.pl.heatmap(adata,  ['LINC00486', 'PVT1', 'NEAT1', 'LINC02964','DANT2','CLDN10-AS1','EOLA2-DT','WARS2-AS1','LINC01019',
     'PCAT1','LNC-LBCS','LINC00486', 'LINC01019'], groupby='sample', cmap='coolwarm', dendrogram=True, swap_axes=True, show_gene_labels=True, figsize=(10, 10))

In [None]:
# # # 小提琴图
# # sc.pl.rank_genes_groups_violin(adata, groups='6h-infection', n_genes=10)

# # # 条形图
# sc.pl.rank_genes_groups(adata, n_genes=20, sharey=False)

# # # 热图
sc.pl.rank_genes_groups_heatmap(adata, n_genes=20, groupby='sample')

# # # Dotplot
# sc.pl.rank_genes_groups_dotplot(adata, n_genes=10)

# # # Matrixplot
# sc.pl.rank_genes_groups_matrixplot(adata, n_genes=10)

In [None]:
# 提取所有 group 名称
groups = adata.uns['rank_genes_groups']['names'].dtype.names

# 将所有结果组合成一个大 DataFrame
all_de = []
for group in groups:
    df = sc.get.rank_genes_groups_df(adata, group=group)
    df['cluster'] = group
    all_de.append(df)

all_de_df = pd.concat(all_de, ignore_index=True)
all_de_df = all_de_df.dropna(subset=["logfoldchanges", "pvals_adj"])
all_de_df

In [None]:
# -log10(padj)
# 找到数据中最小的非零 p-value
min_nonzero_pval = all_de_df['pvals_adj'][all_de_df['pvals_adj'] > 0].min()

# 创建一个用于绘图的 p-value 列，将 0 替换为比最小非零值更小的值
pvals_adj_for_plot = all_de_df['pvals_adj'].copy()
# 如果存在0值，则进行替换
if min_nonzero_pval is not np.nan:
    pvals_adj_for_plot[pvals_adj_for_plot == 0] = min_nonzero_pval * 0.1
else: # 如果所有p值都是0，则用一个极小值代替
    pvals_adj_for_plot[pvals_adj_for_plot == 0] = 1e-300

# -log10(padj)
# all_de_df['-log10(padj)'] = -np.log10(all_de_df['pvals_adj'] + all_de_df['pvals_adj'].max() * 1e-10)
# all_de_df['-log10(padj)'] = -np.log10(all_de_df['pvals_adj'])
all_de_df['-log10(padj)'] = -np.log10(pvals_adj_for_plot)

# 标记显著性
padj_thresh = 0.05
logfc_thresh = 1

def mark_sig(row):
    if row['pvals_adj'] < padj_thresh and row['logfoldchanges'] > logfc_thresh:
        return 'Up'
    elif row['pvals_adj'] < padj_thresh and row['logfoldchanges'] < -logfc_thresh:
        return 'Down'
    else:
        return 'NS'

all_de_df['sig'] = all_de_df.apply(mark_sig, axis=1)
all_de_df

In [None]:
significant_markers

In [None]:
# ... existing code ...

significant_markers.to_csv("significant_markers.csv", index=False)

# ... rest of code ...

In [None]:
import numpy as np
import matplotlib.pyplot as plt

n_groups = len(groups)
fig, axes = plt.subplots(1, n_groups, figsize=(5 * n_groups, 6), sharey=True)

padj_thresh = 0.05
logfc_thresh = 1

for i, group in enumerate(groups):
    ax = axes[i] if n_groups > 1 else axes
    df_group = all_de_df[all_de_df['cluster'] == group]

    # 只绘制 Up 和 Down
    df_sig = df_group[df_group['sig'] != 'NS']

    x = df_sig['-log10(padj)']
    y = df_sig['logfoldchanges']
    colors = df_sig['sig'].map({'Up': '#BBBBBB', 'Down': '#BBBBBB'})

    ax.scatter(x, y, c=colors, s=10, alpha=0.7, edgecolor='none')

    ax.axhline(logfc_thresh, color='black', linestyle='--', linewidth=2)
    ax.axhline(-logfc_thresh, color='black', linestyle='--', linewidth=2)

    ax.text(
        0.5, 1.05, group, ha='center', va='bottom', transform=ax.transAxes,
        fontsize=14, fontweight='bold',
        bbox=dict(facecolor='#DDDDDD', edgecolor='none', boxstyle='round,pad=0.6')
    )

    ax.set_xlabel('-log10(padj)')
    if i == 0:
        ax.set_ylabel('log2 Fold Change')
    else:
        ax.set_ylabel('')
    ax.set_title('')
    ax.set_xlim(0, min(350, x.max() + 10))
    ax.set_ylim(-10, 10)
    ax.grid(False)

plt.tight_layout()
plt.show()

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from adjustText import adjust_text

# Parameter to control number of top genes to display
n_top_genes = 5

# Plot the bar first (same as existing code), then split df_group by significance and plot NS first:
fig, ax = plt.subplots(figsize=(12, 8))
x_positions = np.arange(len(groups))
bar_width = 0.6

# Calculate jitter width based on bar_width
jitter_width = bar_width * 0.13  # Adjust this multiplier as needed

# Store all text annotations for adjustment

for i, group in enumerate(groups):
    df_group = all_de_df[all_de_df['cluster'] == group]
    min_val = df_group['logfoldchanges'].min()
    max_val = df_group['logfoldchanges'].max()

    ax.bar(
        x_positions[i],
        max_val - min_val,
        bottom=min_val,
        width=bar_width,
        color='#CCCCCC',
        alpha=0.5
    )

    # Separate points into NS, Up, Down and plot in layers
    df_up = df_group[df_group['sig'] == 'Up']
    df_down = df_group[df_group['sig'] == 'Down']

    # Remove top N up genes from df_up before plotting
    if len(df_up) > 0:
        top_up = df_up.nlargest(n_top_genes, 'logfoldchanges')
        df_up_others = df_up.drop(top_up.index, errors='ignore')
        x_jitter_up = np.random.normal(x_positions[i], jitter_width, size=len(df_up_others))
        ax.scatter(x_jitter_up, df_up_others['logfoldchanges'], c='#FF6666', alpha=0.5, s=10, zorder=2)

    if len(df_down) > 0:
        top_down = df_down.nsmallest(n_top_genes, 'logfoldchanges')
        df_down_others = df_down.drop(top_down.index, errors='ignore')
        x_jitter_down = np.random.normal(x_positions[i], jitter_width, size=len(df_down_others))
        ax.scatter(x_jitter_down, df_down_others['logfoldchanges'], c='#6666FF', alpha=0.5, s=10, zorder=2)

    texts = []
    # Highlight top N up and down genes
    if len(df_up) > 0:
        top_up = df_up.nlargest(n_top_genes, 'logfoldchanges')
        x_jitter_top_up = np.random.normal(x_positions[i], jitter_width, size=len(top_up))
        ax.scatter(x_jitter_top_up, top_up['logfoldchanges'],
                  c='red', marker='.', s=100, zorder=3, edgecolors='black', linewidth=0.5)

        # Add gene name annotations for top N up genes
        for j, (idx, gene_row) in enumerate(top_up.iterrows()):
            # print(gene_row['names'])
            text = ax.annotate(gene_row['names'],
                       xy=(x_jitter_top_up[j], gene_row['logfoldchanges']),
                       xytext=(5, 5), textcoords='offset points',
                       fontsize=8, ha='left', va='bottom',
                       # arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=0')
                    )
            texts.append(text)

    if len(df_down) > 0:
        top_down = df_down.nsmallest(n_top_genes, 'logfoldchanges')
        x_jitter_top_down = np.random.normal(x_positions[i], jitter_width, size=len(top_down))
        ax.scatter(x_jitter_top_down, top_down['logfoldchanges'],
                  c='blue', marker='.', s=100, zorder=3, edgecolors='black', linewidth=0.5)

        # Add gene name annotations for top N down genes
        for j, (idx, gene_row) in enumerate(top_down.iterrows()):
            # print(gene_row['names'])
            text = ax.annotate(gene_row['names'],
                       xy=(x_jitter_top_down[j], gene_row['logfoldchanges']),
                       xytext=(5, -5), textcoords='offset points',
                       fontsize=8, ha='left', va='top',
                       # arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=0')
                    )
            texts.append(text)

    # Adjust text positions to avoid overlap
    # adjust_text(texts, ax=ax)

ax.set_ylim(-30, 30)
ax.set_xticks(x_positions)
ax.set_xticklabels(groups, rotation=45, ha="right")
ax.set_ylabel("log2FC")
ax.set_title(f"log2FC distribution by cluster (Top {n_top_genes} genes labeled)")
plt.tight_layout()
plt.show()

In [None]:
# Parameter to control number of top genes to display
n_top_genes = 10

enrich_dict = {}

for i, group in enumerate(groups):
    df_group = all_de_df[all_de_df['cluster'] == group]

    # Separate points into NS, Up, Down and plot in layers
    df_up = df_group[df_group['sig'] == 'Up']
    df_down = df_group[df_group['sig'] == 'Down']

    # Remove top N up genes from df_up before plotting
    if len(df_up) > 0:
        top_up = df_up.nlargest(n_top_genes, 'logfoldchanges')

    if len(df_down) > 0:
        top_down = df_down.nsmallest(n_top_genes, 'logfoldchanges')

    enrich_dict[group] = {
        'up': top_up['names'].to_list(),
        'down': top_down['names'].to_list()
    }

    # enrich_dict[group] = {
    #     'up': df_up['names'].to_list(),
    #     'down': df_down['names'].to_list()
    # }

In [None]:
groups

In [None]:
gene_list = enrich_dict['12h-infection']['up']  ## enrich_dict['no-infection']['down']

In [None]:
go_bp_results = sc.queries.enrich(
    gene_list,
    org='hsapiens',  # 'hsapiens'（人类）、'mmusculus'（小鼠）
    gprofiler_kwargs={
        'sources': ['GO:BP'],
        'user_threshold': 0.05,
        'no_evidences': False,
        'all_results': True,
        'significance_threshold_method': 'fdr'  # 多重检验方法，如 'fdr', 'bonferroni', 'gSCS'
    }
)
go_bp_results

In [None]:
go_bp_results.to_csv("go_bp_results_12h-infection_up.csv", index=False)

In [None]:
kegg_results = sc.queries.enrich(
    gene_list,
    org='hsapiens',  # 'hsapiens'（人类）、'mmusculus'（小鼠）
    gprofiler_kwargs={
        'sources': ['KEGG'],
        'user_threshold': 0.05,
        'no_evidences': False,
        'all_results': True,
        'significance_threshold_method': 'fdr'  # 多重检验方法，如 'fdr', 'bonferroni', 'gSCS'
    }
)
kegg_results

In [None]:
kegg_results.to_csv("kegg_results_12h-infection_up.csv", index=False)