In [None]:
import numpy as np
import scanpy as sc
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
adata = sc.read('/home/zfeng/ssr/various_model/GSE194122_openproblems_neurips2021_multiome_BMMC_processed.h5ad')

adata.X=adata.layers['counts'].copy()
rna = adata[:, adata.var["feature_types"] == "GEX"].copy()
rna.X = rna.layers['counts'].copy()
sc.pp.normalize_total(rna, target_sum=1e4)
sc.pp.log1p(rna)
sc.pp.highly_variable_genes(rna, n_top_genes=2000, batch_key='batch')


rna_hvg = rna.var_names[rna.var['highly_variable']]
gene_peak_pd = pd.read_csv("/home/zfeng/ssr/genes and peaks/gene_peak_df.csv")
ocr_position_pd = pd.read_csv("/home/zfeng/ssr/genes and peaks/ocr_position.csv")

GAP_data=adata[:, gene_peak_pd['TMEM259'].dropna()].X
rna_data = rna[:, rna.var['highly_variable']].copy()
rna_data.X=rna_data.layers['counts'].copy()

vocab=np.unique(rna_data.X.data).astype(int)
mask_id = vocab[-1]+1

pos={}
for i in rna_hvg:
    if i in gene_peak_pd.columns:
        gene_pos =(gene_peak_pd[i].dropna() == i).idxmax()
        pos[i]=gene_pos

pos_code = {}
for i in rna_hvg:
    result=ocr_position_pd[i].dropna().str.split(r"[-]")
    start=result.map(lambda x: x[1]).astype(int).values
    end=result.map(lambda x: x[2]).astype(int).values
    coo=np.column_stack((start, end))
    pos_code[i]=coo



In [None]:
from sklearn.model_selection import train_test_split
from OCRBert import OCRBDataset
train_dataset, test_dataset = train_test_split(GAP_data, test_size=0.2)

batch_size = 256
train_loader = DataLoader(OCRBDataset(train_dataset, vocab, mask_id),
                          batch_size=batch_size)
test_loader = DataLoader(OCRBDataset(test_dataset, vocab, mask_id),
                          batch_size=batch_size)

result=ocr_position_pd['TMEM259'].dropna().str.split(r"[-]")
start=result.map(lambda x: x[1]).astype(int).values
end=result.map(lambda x: x[2]).astype(int).values
positions =  np.column_stack((start, end))

In [None]:
from OCRBert import OCRBTrainer
model=OCRBTrainer(lr=1e-5,vocab_size = 7324, hidden=12,positions=positions,train_dataloader= train_loader, test_dataloader=test_loader,cuda_devices='cuda:1',warmup_steps=150)
for epoch in range(3):
    model.train(epoch)
    torch.cuda.empty_cache()
    model.test(epoch)
    torch.cuda.empty_cache()

In [None]:
data = DataLoader(OCRBDataset(GAP_data),
                          batch_size=1024)

# peak to gene 分析

In [None]:
a=model.get_attn(data,model.positions)

In [None]:
import matplotlib.pyplot as plt
import numpy as np
# attn = attn.cpu().numpy()
seq_len=210
# 选择一个目标词，提取该词的注意力列（假设是第 15 个词）
target_word_idx = 113
target_attention = a[0][target_word_idx].cpu()
target_attention

# 准备绘图
fig, ax = plt.subplots(figsize=(12, 6))

# 定义节点的 x 轴位置和固定的 y 轴高度
x_positions = np.arange(seq_len)
y_position = 1.0  # 目标词的 y 轴位置
bar_base = 0.8  # 其他词的 y 轴位置
# 曲线的控制参数
curve_amplitude = 0.3  # 曲线的最大弯曲幅度
linewidth_scale = 10  # 连线粗细放大系数
# 绘制连线（目标词到其他词）

for i, attn_weight in enumerate(np.array(target_attention)):
    if attn_weight > 0.01:  # 过滤掉非常小的注意力值
        # 曲线的起点、终点和控制点
        start = (target_word_idx, y_position)  # 目标词的位置
        end = (i, y_position)  # 上下文词的位置
        control = ((target_word_idx + i) / 2, y_position + curve_amplitude)  # 控制点，位于中间位置，y 值向上弯曲

        # 生成贝塞尔曲线的点
        t = np.linspace(0, 1, 100)  # 曲线采样点
        bezier_x = (1 - t)**2 * start[0] + 2 * (1 - t) * t * control[0] + t**2 * end[0]
        bezier_y = (1 - t)**2 * start[1] + 2 * (1 - t) * t * control[1] + t**2 * end[1]

        # 绘制曲线
        ax.plot(
            bezier_x, bezier_y,
            alpha=min(attn_weight * 0.5, 1.0),  # 曲线透明度由注意力值决定
            color="purple",
            linewidth=attn_weight * linewidth_scale  # 曲线粗细由注意力值决定
        )
        
        ax.scatter(i, y_position, color="blue", s=10)  # 其他词的点大小和颜色

        ax.bar(
            i,  # 柱子的 x 轴位置
            attn_weight * 0.5,  # 柱子的高度，根据注意力值缩放
            color="blue",
            alpha=0.6,
            width=0.4,
            bottom=bar_base  # 柱子的底部设置为 bar_base（远低于点的 y_others）
        )
        
        ax.bar(
            i,  # 柱子的 x 轴位置
            GAP_data[0].toarray()[0][i] * 0.02,  # 原始peak值
            color="blue",
            alpha=0.6,
            width=0.4,
            bottom=0.6  # 柱子的底部设置为 bar_base（远低于点的 y_others）
        )

# 标注目标词和其他词的位置
# ax.scatter(x_positions, [y_others] * seq_len, color="blue", s=50, label="Words")  # 其他词
# 添加柱子底部的水平连接线
ax.hlines(
    y=bar_base,  # 线的 y 轴位置，与柱子的底部一致
    xmin=-0.5,  # 线的起始 x 位置
    xmax=seq_len - 0.5,  # 线的结束 x 位置
    colors="black",  # 线的颜色
    linewidth=0.5,  # 线的宽度
    linestyles="-"  # 线的样式（虚线）
)
# 在目标词位置绘制点
ax.scatter([target_word_idx], [y_position], color="red", s=10, label="Target Word")  # 目标词

# 图形美化
ax.set_xlim(-1, seq_len)  # x 轴范围
ax.set_ylim(0.5, 1.5)  # y 轴范围
# ax.set_xticks(x_positions)  # x 轴标注每个词的位置
ax.set_yticks([])  # 不需要 y 轴刻度
ax.set_title(f"Attention Relationships for Target Word {target_word_idx} (Curved Connections)", fontsize=14)
ax.legend()

# 显示图形
plt.tight_layout()
plt.show()


In [None]:
row=adata.obs[adata.obs['cell_type'] == 'B1 B'].index.tolist()
row_index=[adata.obs_names.get_loc(i) for i in row]
w=torch.zeros([210])
for i in row_index:
    w=w+a[i][113].cpu()
w=w/len(row_index)

In [None]:
import matplotlib.pyplot as plt
import numpy as np

seq_len=210
# 选择一个目标词，提取该词的注意力列（假设是第 15 个词）
target_word_idx = 113
target_attention = w
target_attention

# 准备绘图
fig, ax = plt.subplots(figsize=(12, 6))

# 定义节点的 x 轴位置和固定的 y 轴高度
x_positions = np.arange(seq_len)
y_position = 1.0  # 目标词的 y 轴位置
bar_base = 0.8  # 其他词的 y 轴位置
# 曲线的控制参数
curve_amplitude = 0.3  # 曲线的最大弯曲幅度
linewidth_scale = 40  # 连线粗细放大系数
# 绘制连线（目标词到其他词）

for i, attn_weight in enumerate(np.array(target_attention)):
    if attn_weight > 0.015:  # 过滤掉非常小的注意力值
        # 曲线的起点、终点和控制点
        start = (target_word_idx, y_position)  # 目标词的位置
        end = (i, y_position)  # 上下文词的位置
        control = ((target_word_idx + i) / 2, y_position + curve_amplitude)  # 控制点，位于中间位置，y 值向上弯曲

        # 生成贝塞尔曲线的点
        t = np.linspace(0, 1, 100)  # 曲线采样点
        bezier_x = (1 - t)**2 * start[0] + 2 * (1 - t) * t * control[0] + t**2 * end[0]
        bezier_y = (1 - t)**2 * start[1] + 2 * (1 - t) * t * control[1] + t**2 * end[1]

        # 绘制曲线
        ax.plot(
            bezier_x, bezier_y,
            alpha=min(attn_weight * 10, 1.0),  # 曲线透明度由注意力值决定
            color="purple",
            linewidth=attn_weight * linewidth_scale  # 曲线粗细由注意力值决定
        )
        
        ax.scatter(i, y_position, color="blue", s=10)  # 其他词的点大小和颜色

        ax.bar(
            i,  # 柱子的 x 轴位置
            attn_weight * 0.5,  # 柱子的高度，根据注意力值缩放
            color="blue",
            alpha=0.6,
            width=0.4,
            bottom=bar_base  # 柱子的底部设置为 bar_base（远低于点的 y_others）
        )


# 标注目标词和其他词的位置
# ax.scatter(x_positions, [y_others] * seq_len, color="blue", s=50, label="Words")  # 其他词
# 添加柱子底部的水平连接线
ax.hlines(
    y=bar_base,  # 线的 y 轴位置，与柱子的底部一致
    xmin=-0.5,  # 线的起始 x 位置
    xmax=seq_len - 0.5,  # 线的结束 x 位置
    colors="black",  # 线的颜色
    linewidth=0.5,  # 线的宽度
    linestyles="-"  # 线的样式（虚线）
)
# 在目标词位置绘制点
ax.scatter([target_word_idx], [y_position], color="red", s=10, label="Target Word")  # 目标词

# 图形美化
ax.set_xlim(-1, seq_len)  # x 轴范围
ax.set_ylim(0.5, 1.5)  # y 轴范围
# ax.set_xticks(x_positions)  # x 轴标注每个词的位置
ax.set_yticks([])  # 不需要 y 轴刻度
ax.set_title(f"cell_type: 'B1 B'", fontsize=14)
ax.legend()

# 显示图形
plt.tight_layout()
plt.show()
