In [1]:
import json
import os
import subprocess
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from Bio.Blast.Applications import NcbiblastpCommandline
from Bio.Blast import NCBIXML
import glob
import unicodedata


Due to the on going maintenance burden of keeping command line application
wrappers up to date, we have decided to deprecate and eventually remove these
modules.

We instead now recommend building your command line and invoking it directly
with the subprocess module.


# 数据清洗

In [2]:
def readExcel(file_path, sheet_name):
    ganmeiNew1 = pd.read_excel(file_path,sheet_name=f"{sheet_name}")
    
    print(ganmeiNew1[:5])
    return ganmeiNew1

In [3]:
def dataRest(df):
    # 将第一行数据设为列名
    df.columns = df.iloc[0]

    # 删除原来的第一行，并重置索引
    df = df.drop(0).reset_index(drop=True)

    print(df[:5])
    return df

In [4]:
def dataClean(df,outpath):
    
    # 清洗 species 列：去除括号及后续内容（如 "Ectocarpus siliculosus（褐藻）" → "Ectocarpus siliculosus"）
    df["species"] = df["species"].str.split(r"[（(]").str[0].str.strip()

    # 合并 species 和 name 列，生成新名称，并剔除换行符
    df["name"] = (df["species"] + "_" + df["name"]).str.replace(r'[\n\r]', '', regex=True)

    # 提取需要的字段并重命名列
    result = df[["name", "Aa sequences"]].rename(columns={"Aa sequences": "seq"})

    # 剔除 seq 中的换行符（\n 或 \r）
    result["seq"] = result["seq"].str.replace(r'[\n\r]', '', regex=True)

    # 基于 name 和 seq 去重（保留第一条重复记录）
    result = result.drop_duplicates(subset=["name", "seq"], keep="first")

    # 转换为字典列表格式
    json_data = result.to_dict(orient="records")

    # 保存为 JSON 文件（格式化缩进）
    with open(outpath, "w", encoding="utf-8") as f:
        json.dump(json_data, f, ensure_ascii=False, indent=4)

    print("JSON 文件已保存成功！")

In [5]:
def sanitize_name(name):
    """替换希腊字母为英文单词并移除其他非ASCII字符"""
    replacements = {
        'α': 'alpha',
        'β': 'beta',
        'γ': 'gamma',
    }
    
    # 替换希腊字母
    for greek, eng in replacements.items():
        name = name.replace(greek, eng)
    
    # 移除其他非ASCII字符
    name = unicodedata.normalize('NFKD', name).encode('ascii', 'ignore').decode()
    return name

In [6]:
def create_blast_database(fasta_path, gene_name):
    """创建BLAST数据库并处理异常"""
    try:
        db_path = f"temp_{gene_name}_db"
        cmd = f"makeblastdb -in {fasta_path} -dbtype prot -out {db_path}"
        subprocess.run(cmd, check=True, shell=True)
        return db_path
    except subprocess.CalledProcessError as e:
        print(f"Error creating BLAST database: {str(e)}")
        raise

In [7]:
def run_blast_all_vs_all(records, output_dir):
    """执行全基因组两两BLAST比对"""
    # 准备临时文件（清洗名称）
    all_sequences = []
    for rec in records:
        sanitized_name = sanitize_name(rec['name'])
        all_sequences.append(f">{sanitized_name}\n{rec['seq']}")
    
    with open("temp_all.fasta", "w") as f:
        f.write("\n".join(all_sequences))
    
    # 创建BLAST数据库
    db_path = create_blast_database("temp_all.fasta", "all")
    
    # 执行BLASTP
    blast_cline = NcbiblastpCommandline(
        query="temp_all.fasta",
        db=db_path,
        outfmt=5,
        evalue=10,
        out=os.path.join(output_dir, "blast_results.xml")
    )
    stdout, stderr = blast_cline()
    
    return os.path.join(output_dir, "blast_results.xml")

In [8]:
def parse_blast_results(xml_path, original_records):
    """解析BLAST结果并构建相似度矩阵"""
    # 创建清洗后的名称映射
    name_mapping = {sanitize_name(rec['name']): rec['name'] for rec in original_records}
    # print(name_mapping)
    
    # 初始化矩阵
    species = [rec['name'] for rec in original_records]
    matrix = pd.DataFrame(0.0, index=species, columns=species)
    # print(matrix)
    np.fill_diagonal(matrix.values, 100.0)
    
    # 解析XML结果
    with open(xml_path) as blast_file:
        blast_records = NCBIXML.parse(blast_file)
        
        for blast_record in blast_records:
            query = name_mapping[blast_record.query]  # 转换回原始名称
            print(query)
            
            for alignment in blast_record.alignments:
                hit = name_mapping[alignment.hit_def]
                # print(hit)
                if query == hit:
                    continue
                
                max_sim = 0.0
                for hsp in alignment.hsps:
                    sim = hsp.identities / alignment.length * 100
                    max_sim = max(max_sim, sim)
                
                matrix.loc[query, hit] = max(max_sim, matrix.loc[query, hit])
    
    return matrix

In [9]:
def plot_heatmap(matrix, gene_name, output_dir):
    """生成符合示例样式的热图"""
    # 动态调整画布尺寸
    num_species = len(matrix)
    plt.figure(figsize=(num_species*1.4, num_species*1.2))
    
    # 创建下三角mask
    mask = np.tril(np.ones_like(matrix, dtype=bool), k=-1)
    
    # 生成热图对象
    ax = sns.heatmap(
        matrix, 
        mask=mask,
        annot=True,
        annot_kws={"size": 16, "color": "black"},
        fmt=".1f",
        cmap="YlGnBu",
        vmin=0,
        vmax=100,
        linewidths=0.5,
        square=True,
        cbar_kws={
            "shrink": 0.8,
            "label": "Similarity (%)",
            "location": "left"
        },
        xticklabels=False,
        yticklabels=False
    )
    
    # 自定义标签系统
    # ----------------- 顶部标签 -----------------
    ax.set_xticks(np.arange(len(matrix.columns)) + 0.5)
    ax.set_xticklabels(
        matrix.columns,
        rotation=45,
        ha='left',
        va='bottom',
        rotation_mode='anchor',
        fontsize=16,
        fontstyle='italic',
        position=(0, 1.02)
    )
    
    # ----------------- 右侧标签 -----------------
    ax.set_yticks(np.arange(len(matrix.index)) + 0.5)
    ax.set_yticklabels(
        matrix.index,
        rotation=0,
        ha='left',
        va='center',
        fontsize=16,
        position=(1.02, 0),
        fontstyle='italic'
    )
    
    # ----------------- 样式优化 -----------------
    # 隐藏所有坐标轴线
    for spine in ax.spines.values():
        spine.set_visible(False)
    
    # 特别移除左边和底部残余边框
    ax.spines['left'].set_visible(False)
    ax.spines['bottom'].set_visible(False)
    
    # 添加主标题（与示例完全一致）
    plt.title(
        f"Sequence Similarity (%)",
        fontsize=20,
        pad=320,  # 精确控制标题间距
        fontweight='bold',
        color='#333333'
    )
    
    # 布局微调
    plt.subplots_adjust(
        left=0.15,
        right=0.85,
        top=0.85,  # 为标题留出空间
        bottom=0.1
    )
    
    # 保存图像
    plt.savefig(
        f"{output_dir}/{gene_name}_heatmap.png",
        dpi=300,
        bbox_inches='tight',
        facecolor='white'
    )
    plt.close()

In [10]:
def main(file_path, sheet, input_json, output_dir):

    # 数据预处理
    gamma = readExcel(file_path, sheet)
    print("*=-"*80)
    gamma = dataRest(gamma)
    print("*=-"*80)
    os.makedirs(os.path.dirname(input_json), exist_ok=True)
    dataClean(gamma, input_json)
    print("*=-"*80)

    os.makedirs(output_dir, exist_ok=True)

    # 读取数据
    with open(input_json) as f:
        records = json.load(f)
    try:
        # 执行BLAST比对
        blast_xml = run_blast_all_vs_all(records, output_dir)
    
        # 解析结果
        similarity_matrix = parse_blast_results(blast_xml, records)
    
        # 保存原始数据
        similarity_matrix.to_csv(os.path.join(output_dir, "similarity_matrix.csv"))
    
        # 生成热图
        plot_heatmap(similarity_matrix, 'sheet1' ,output_dir)

    finally:
        # 清理临时文件
        for f in ["temp_all.fasta"] + glob.glob("temp_all_db*"):
            if os.path.exists(f):
                os.remove(f)

    print(f"Analysis complete. Result Save Path:{output_dir}")

In [11]:
# 配置参数
file_path = "./data/UreC.xlsx"
sheet = "UreC"
input_json = "./cleanData/UreC.json"
output_dir = "./analysis/UreC"
main(file_path, sheet, input_json, output_dir)

   Unnamed: 0          Unnamed: 1  \
0         NaN                name   
1         NaN      pasteurii UreC   
2         NaN      aerogenes UreC   
3         NaN  leguminosarum UreC   
4         NaN             pyloriB   

                                          Unnamed: 2    Unnamed: 3  
0                                       Aa sequences       species  
1  MKINRQQYAESYGPTVGDQVRLADTDLWIEVEKDYTTYGDEANFGG...  Sporosarcina  
2  MSNISRQAYADMFGPTVGDKVRLADTELWIEVEDDLTTYGEEVKFG...    Klebsiella  
3  MPYKISRAAYAGMFGPTTGDKVRLADTELFIEIEKDFTTYGEEVKF...     Rhizobium  
4  MKKISRKEYVSMYGPTTGDKVRLGDTDLIAEVEHDYTIYGEELKFG...  Helicobacter  
*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-*=-
0  NaN                name                                       Aa sequences  \
0  NaN      pasteurii UreC  MKINRQQYAESYG