In [1]:
# =========================
# 0. 环境与路径配置
# =========================
from pathlib import Path

# ——★ 根据自己文件实际位置修改 ★——
DATA_DIR  = Path('.')                       # 元数据所在目录
GCTX_FILE = DATA_DIR / 'level5_beta_trt_cp_n720216x12328.gctx'
INSTINFO  = DATA_DIR / 'instinfo_beta.txt'
COMPOUND  = DATA_DIR / 'compoundinfo_beta.txt'
GENEINFO  = DATA_DIR / 'geneinfo_beta.txt'

# ——★ 16 GB 机器推荐参数 ★——
CHUNK_SIZE  = 2_000        # 每批读取列数
DTYPE_EXPR  = 'float32'    # 累加矩阵精度
DTYPE_COUNT = 'uint16'     # 计数矩阵精度
MIN_REPLICA = 1            # ≥1 次重复才保留

print("✔️ 路径与参数已配置")

✔️ 路径与参数已配置


In [2]:
# =========================
# 1. 导入依赖
# =========================
import pandas as pd, numpy as np, re, gc, time, math
from tqdm import tqdm               # 进度条
from cmapPy.pandasGEXpress import parse

print("pandas 版本:", pd.__version__)

pandas 版本: 1.3.5


In [5]:
# =========================
# 2. plate:well → pert_id   /  pid → SMILES   /  trt_cp 集合   (兼容 Py<3.10)
# =========================
from typing import Union        # <—— 新增

def clean_plate(p: Union[str, float]) -> Union[str, None]:
    """
    仅保留板主名   AICHI001_BJAB_24H
    ----------------------------------------------------------
    去掉尾部批次标记（可选）：
      · _X1/_X2…            重复编号
      · _F1B10/_F3B7…       子板块
      · _DUO52HI53LO…       双孔标签
    """
    if not isinstance(p, str):
        return None
    return re.sub(r'(_X\d+)?(_F\d+B\d+)?(_DUO.*)?$', '', p)

print("▶ 读取 instinfo …")
use_cols = ['det_plate', 'det_well', 'rna_plate', 'rna_well',
            'sample_id', 'pert_id', 'pert_type']
inst = pd.read_csv(INSTINFO, sep='\t', usecols=use_cols)

# ---------- plate:well → pert_id ----------
pairs = []
for pc, wc in (('det_plate', 'det_well'),
               ('rna_plate', 'rna_well')):
    tmp = inst[[pc, wc, 'pert_id']].dropna()
    tmp.columns = ['plate', 'well', 'pid']
    pairs += tmp.itertuples(index=False)

sid = inst[['sample_id', 'pert_id']].dropna()
pairs += zip(sid.sample_id.str.split(':').str[0],
             sid.sample_id.str.split(':').str[1],
             sid.pert_id)

plate2pid = {}
for plate, well, pid in pairs:
    key = f"{clean_plate(plate)}:{well}"
    if key not in plate2pid:
        plate2pid[key] = pid
print(f"plate:well 键数 = {len(plate2pid):,}")

# ---------- trt_cp 集合 ----------
trt_cp_set = set(inst.loc[inst.pert_type == 'trt_cp', 'pert_id'])
print("trt_cp pert_id 数 =", len(trt_cp_set))

# ---------- pid → SMILES ----------
pid2smi = (pd.read_csv(COMPOUND, sep='\t',
                       usecols=['pert_id', 'canonical_smiles'])
             .set_index('pert_id')['canonical_smiles']
             .dropna()
             .to_dict())
print("含 SMILES 的化合物 =", len(pid2smi))


▶ 读取 instinfo …
plate:well 键数 = 3,920,132
trt_cp pert_id 数 = 34419
含 SMILES 的化合物 = 28634


In [6]:
# =========================
# 3. 读取 geneinfo
# =========================
genes = pd.read_csv(GENEINFO, sep='\t')
n_gene = len(genes)
landmark_mask = genes['feature_space'] == 'landmark'
print(f"基因总数 {n_gene}，其中 landmark = {landmark_mask.sum()}")

基因总数 12328，其中 landmark = 978


In [7]:
# =========================
# 4. Pass-1  扫描 cid → SMILES
# =========================
print("▶ 读取 GCTX 列元数据 (col_meta_only=True)…")
meta = parse.parse(str(GCTX_FILE), col_meta_only=True)
cid_all = (meta.index.tolist()
           if isinstance(meta, pd.DataFrame)
           else meta.col_metadata_df.index.tolist())
print("列总数 =", len(cid_all))

cid2sidx = {}           # cid → SMILES 索引
smiles2idx, smiles_list = {}, []
unmatched = 0

for cid in tqdm(cid_all, desc="scan", unit="cid"):
    parts = cid.split(':')
    if len(parts) - 1 >= 2:        # 2/3-colon
        pid = parts[1]
    elif len(parts) - 1 == 1:      # 1-colon
        pid = plate2pid.get(cid)
    else:
        pid = None

    if pid and pid in trt_cp_set and pid in pid2smi:
        smi = pid2smi[pid]
        if smi not in smiles2idx:
            smiles2idx[smi] = len(smiles_list)
            smiles_list.append(smi)
        cid2sidx[cid] = smiles2idx[smi]
    else:
        unmatched += 1

print(f"可用 SMILES 数           = {len(smiles_list):,}")
print(f"可用列 (trt_cp + SMILES) = {len(cid2sidx):,}")
print(f"未用列                    = {unmatched:,}")

▶ 读取 GCTX 列元数据 (col_meta_only=True)…
列总数 = 720216


scan: 100%|██████████| 720216/720216 [00:01<00:00, 455825.78cid/s]

可用 SMILES 数           = 10,670
可用列 (trt_cp + SMILES) = 465,354
未用列                    = 254,862





In [8]:
# =========================
# 5. 初始化 memmap 缓冲
# =========================
n_smiles = len(smiles_list)
expr_sum = np.memmap('expr_sum.dat', dtype=DTYPE_EXPR, mode='w+',
                     shape=(n_gene, n_smiles))
expr_cnt = np.memmap('expr_cnt.dat', dtype=DTYPE_COUNT, mode='w+',
                     shape=(n_gene, n_smiles))
print(f"memmap 创建成功  | genes × SMILES = {expr_sum.shape}")

memmap 创建成功  | genes × SMILES = (12328, 10670)


In [9]:
# =========================
# 6. Pass-2  分块读取表达矩阵并累加
# =========================
def cid_chunks(seq, size):
    """简单的 list 分块生成器"""
    for i in range(0, len(seq), size):
        yield seq[i:i + size]

start = time.time()
for chunk in tqdm(list(cid_chunks(list(cid2sidx.keys()), CHUNK_SIZE)),
                  desc="accumulate", unit="chunk"):
    gct = parse.parse(str(GCTX_FILE), cid=chunk)
    block = gct.data_df.values.astype(DTYPE_EXPR)    # (genes × chunk)
    idxs  = [cid2sidx[c] for c in chunk]

    for j, s_idx in enumerate(idxs):
        expr_sum[:, s_idx] += block[:, j]
        expr_cnt[:, s_idx] += 1

    del gct, block
    gc.collect()

print("累加完成，耗时 %.1f 分钟" % ((time.time() - start) / 60))

accumulate: 100%|██████████| 233/233 [08:18<00:00,  2.14s/chunk]

累加完成，耗时 8.3 分钟





In [10]:
# =========================
# 6. Pass-2  分块读取表达矩阵并累加
# =========================
def cid_chunks(seq, size):
    """简单的 list 分块生成器"""
    for i in range(0, len(seq), size):
        yield seq[i:i + size]

start = time.time()
for chunk in tqdm(list(cid_chunks(list(cid2sidx.keys()), CHUNK_SIZE)),
                  desc="accumulate", unit="chunk"):
    gct = parse.parse(str(GCTX_FILE), cid=chunk)
    block = gct.data_df.values.astype(DTYPE_EXPR)    # (genes × chunk)
    idxs  = [cid2sidx[c] for c in chunk]

    for j, s_idx in enumerate(idxs):
        expr_sum[:, s_idx] += block[:, j]
        expr_cnt[:, s_idx] += 1

    del gct, block
    gc.collect()

print("累加完成，耗时 %.1f 分钟" % ((time.time() - start) / 60))

accumulate: 100%|██████████| 233/233 [08:01<00:00,  2.07s/chunk]

累加完成，耗时 8.0 分钟





In [11]:
# =========================
# 7. 计算均值并过滤
# =========================
valid = expr_cnt >= MIN_REPLICA
keep_mask = valid.any(axis=0)          # 至少有一个基因满足
print("满足重复 ≥1 的 SMILES =", keep_mask.sum())

expr_mean = np.divide(expr_sum[:, keep_mask],
                      expr_cnt[:, keep_mask],
                      where=expr_cnt[:, keep_mask] != 0,
                      out=np.full_like(expr_sum[:, keep_mask], np.nan))
final_smiles = [s for s, flag in zip(smiles_list, keep_mask) if flag]

满足重复 ≥1 的 SMILES = 10670


In [12]:
# =========================
# 8. 导出 12328 基因 × SMILES
# =========================
out12328 = 'expr_by_smiles_12328.csv'
print("▶ 写出", out12328)
with open(out12328, 'w', newline='') as f:
    header = ['gene_id', 'gene_symbol', 'feature_space', *final_smiles]
    f.write(','.join(header) + '\n')
    for i, row in enumerate(expr_mean):
        rec = [str(genes.at[i, 'gene_id']),
               genes.at[i, 'gene_symbol'],
               genes.at[i, 'feature_space'],
               *[f"{x:.4f}" if not math.isnan(x) else '' for x in row]]
        f.write(','.join(rec) + '\n')
print("✅ 12328 文件完成")

▶ 写出 expr_by_smiles_12328.csv
✅ 12328 文件完成


In [13]:
# =========================
# 9. 导出 978 Landmark 基因 × SMILES
# =========================
out978 = 'expr_by_smiles_978.csv'
print("▶ 写出", out978)
with open(out978, 'w', newline='') as f:
    header = ['gene_id', 'gene_symbol', 'feature_space', *final_smiles]
    f.write(','.join(header) + '\n')
    for i, row in zip(np.where(landmark_mask)[0], expr_mean[landmark_mask]):
        rec = [str(genes.at[i, 'gene_id']),
               genes.at[i, 'gene_symbol'],
               genes.at[i, 'feature_space'],
               *[f"{x:.4f}" if not math.isnan(x) else '' for x in row]]
        f.write(','.join(rec) + '\n')
print("✅ 978 文件完成")

▶ 写出 expr_by_smiles_978.csv
✅ 978 文件完成
