In [7]:
import pandas as pd
import numpy as np
import subprocess
import sys

### 调用R脚本，对输入的gene list进行富集分析，结果保存为tsv表中

In [8]:
def run_kegg_enrich(input_path, output_path, species, pvalue):
    """
    运行KEGG富集分析R脚本。

    Args:
        input_path (str): 输入文件的路径。
        output_path (str): 输出文件的路径。
        species (str): 菌种名称。
        pvalue (float): P值阈值。

    Returns:
        str: R脚本的输出。
    
    """
    # R脚本的路径，需要师哥你改路径
    script_path = '/Users/dongjiacheng/Desktop/coder/mtd/code/analysis-module/enrichment-analysis/kegg_enrich.R'

    # Rscript kegg_enrich.R --input input-file/gene_list.txt --output output-file/enrich_kegg.tsv --species "Myceliophthora thermophila" --pvalue 0.05  
    cmd = [
        'Rscript', script_path, 
        '--input', input_path,
        '--output', output_path,
        '--species', species, 
        '--pvalue', str(pvalue), 
    ]

    # 执行R脚本并捕获输出
    try:
        result = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
        return result.stdout
    except subprocess.CalledProcessError as e:
        return e.stderr

# 示例调用
output_kegg = run_kegg_enrich("input-file/gene_list.txt", "output-file/enrich_kegg.tsv", "Myceliophthora thermophila", 0.05)

In [10]:
def run_go_enrich(input_path, output_path, species, pvalue):
    """
    运行GO富集分析R脚本。

    Args:
        input_path (str): 输入文件的路径。
        output_path (str): 输出文件的路径。
        species (str): 菌种名称。
        pvalue (float): P值阈值。

    Returns:
        str: R脚本的输出。
    
    """
    # R脚本的路径
    script_path = '/Users/dongjiacheng/Desktop/coder/mtd/code/analysis-module/enrichment-analysis/go_enrich.R'

    # Rscript go_enrich.R --input input-file/gene_list.txt --output output-file/enrich_go.tsv --species "Myceliophthora thermophila" --pvalue 0.05  
    cmd = [
        'Rscript', script_path, 
        '--input', input_path, 
        '--output', output_path,
        '--species', species, 
        '--pvalue', str(pvalue), 
    ]

    # 执行R脚本并捕获输出
    try:
        result = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
        return result.stdout
    except subprocess.CalledProcessError as e:
        return e.stderr
    
# 示例调用
output_go = run_go_enrich("input-file/gene_list.txt", "output-file/enrich_go.tsv", "Myceliophthora thermophila", 0.05)

### 基于Python的Plotly包，对富集分析结果进行可视化

In [5]:
# 读取文件
df_kegg = pd.read_csv('./output-file/enrich_kegg.tsv', sep='\t')
df_go = pd.read_csv('./output-file/enrich_go.tsv', sep='\t')

FileNotFoundError: [Errno 2] No such file or directory: './output-file/enrich_kegg.tsv'

In [3]:
import plotly.io as pio
import plotly.express as px
import plotly.graph_objects as go

def plot_kegg_chart(df_kegg, pic_type='bubble', color='Geyser', p_adjust=0.05, font_size=15, bubble_num=30, bubble_size=30, width=1200, height=600):
    """根据输入的kegg富集结果，绘制气泡图
    """

    # 数据预处理
    df_kegg = df_kegg.copy()
    df_kegg = df_kegg[['ID', 'Description', 'GeneRatio', 'p.adjust', 'Count']]
    df_kegg.columns = ["ID", "Pathway", "GeneRatio","P.adjust", 'Count']
    df_kegg["GeneRatio"] = df_kegg["GeneRatio"].apply(lambda x: round(eval(x), 3))  # GeneRatio列输出处理为浮点数
    df_kegg['P.adjust'] = df_kegg['P.adjust'].apply(lambda x: round(x, 6))  # 控制P.adjust列的小数位数

    # 数据筛选
    df_kegg = df_kegg[df_kegg['P.adjust'] < p_adjust]  # 过滤P.adjust值
    df_kegg = df_kegg.sort_values(by='Count', ascending=False)  # 按照Count列降序排列
    df_kegg = df_kegg.iloc[:bubble_num]  # 取前bubble_num个数据

    # 图表公共布局设置
    layout_args = {
        'title': "KEGG Enrichment Analysis",
        'yaxis_title': "Pathway",
        'yaxis': dict(autorange="reversed"),
        'font': dict(family="Arial", size=font_size),
        'template': "plotly_white",
        'width': width,
        'height': height
    }
    # 颜色轴设置
    color_axis_args = {
        'colorbar_title': "P.adjust",
        'colorbar_tickformat': ".3f",
        'colorbar': dict(dtick=0.005)
    }
    # 根据pic_type绘制不同类型的图表
    if pic_type == 'bubble':
        fig = px.scatter(
            df_kegg,
            x='GeneRatio',
            y='Pathway',
            size='Count',
            color='P.adjust',
            color_continuous_scale=color,
            opacity=0.85,
            hover_data=["ID",'P.adjust', 'Count'],
            size_max=bubble_size
        )
    elif pic_type == 'bar':
        fig = px.bar(
            df_kegg,
            x='Count',
            y='Pathway',
            color='P.adjust',
            color_continuous_scale=color,
            opacity=0.85,
            hover_data=['ID','P.adjust', 'Count']
        )

    # 应用颜色轴设置
    fig.update_layout(**layout_args)
    fig.update_coloraxes(**color_axis_args)

    # 保存为png，scale设置为4
    fig.write_image("./output-file/kegg.png", scale=4)
    
    # 测试用
    # return fig 

    # 方案1:将fig对象转为json
    fig_json = pio.to_json(fig)
    return fig_json

    # 方案2:将fig转为html格式，返回html代码
    # fig_html = plot(fig, output_type='div', include_plotlyjs=False)        
    # return fig_html

    # 方案3:将fig转为html格式，保存为html文件
    # fig.write_html("./output-file/kegg.html")
    # return "kegg.html"

plot_kegg_chart(df_kegg, color='RdBu_r', pic_type='bar')

NameError: name 'df_kegg' is not defined

In [None]:
def plot_go_chart(df_go, pic_type='bubble', color='Geyser', go_type='all', p_value=0.05, chart_num=30, chart_size=30, width=1000, height=800):
    """根据输入的GO富集分析结果，根据用户选择绘制气泡图或柱状图。
    """

    # 数据处理
    df_go = df_go.copy()
    df_go = df_go[["category", "ID", "Description", "Count", 'GeneRatio', "p.adjust"]]
    df_go.columns = ["Class", "ID", "Description", "Count", "GeneRatio", "P.adjust"]

    # 数据列处理
    df_go["GeneRatio"] = df_go["GeneRatio"].apply(lambda x: round(eval(x), 3))
    df_go['P.adjust'] = df_go['P.adjust'].apply(lambda x: round(x, 6))
    df_go = df_go.sort_values(by='Count', ascending=False)
    df_go = df_go[df_go["P.adjust"] < p_value]
    df_go = df_go.iloc[:chart_num]

    # 过滤GO类型
    if go_type in ["BP", "CC", "MF"]:
        df_go = df_go[df_go["Class"].str.contains(go_type)]

    # 图表公共布局设置
    layout_args = {
        'title': "GO Enrichment Analysis",
        'yaxis_title': "Description",
        'yaxis': dict(autorange="reversed"),
        'font': dict(family="Arial", size=14),
        'template': "plotly_white",
        'width': width,
        'height': height
    }

    # 颜色轴设置
    color_axis_args = {
        'colorbar_title': "P.adjust",
        'colorbar_tickformat': ".3f",
        'colorbar': dict(dtick=0.005)
    }

    # 根据pic_type绘制不同类型的图表
    if pic_type == "bubble":
        fig = px.scatter(
            df_go,
            x="GeneRatio",
            y="Description",
            size="Count",
            color="P.adjust",
            color_continuous_scale=color,
            opacity=0.85,
            hover_name="Class",
            hover_data=["ID", "Description", "Count", "GeneRatio", "P.adjust"],
            size_max=chart_size,
        )
        
    elif pic_type == "bar":
        fig = px.bar(
            df_go,
            x='Count',
            y='Description',
            color='P.adjust',
            color_continuous_scale=color,
            opacity=0.85,
            hover_data=["Class", "ID", "GeneRatio", "P.adjust"],
        )

    # 应用颜色轴设置
    fig.update_layout(**layout_args)
    fig.update_coloraxes(**color_axis_args)
    
    # 测试用
    # return fig 

    # 方案1:将fig对象转为json
    fig_json = pio.to_json(fig)
    return fig_json

    # 方案2:将fig转为html格式，返回html代码
    # fig_html = plot(fig, output_type='div', include_plotlyjs=False)        
    # return fig_html

    # 方案3:将fig转为html格式，保存为html文件
    # fig.write_html("go.html")
    # return "go.html"


# 调用函数示例
plot_go_chart(df_go, pic_type='bubble',color='RdBu_r')