In [4]:
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from multiprocessing import Pool, cpu_count
from tqdm import tqdm
import glob

# === 單個樣本的讀取與欄位補齊 ===
def load_sample(args):
    path, common_genes = args
    try:
        df = pd.read_csv(path, sep="\t", skiprows=6)
        df.columns = ["gene_id", "gene_name", "gene_type", "unstranded", "stranded_first",
                      "stranded_second", "tpm_unstranded", "fpkm_unstranded", "fpkm_uq_unstranded"]
        df["gene_id"] = df["gene_id"].str.rsplit(".", n=1).str[0]
        df = df.groupby("gene_id").first()
        series = df["tpm_unstranded"].reindex(common_genes)
        sample_name = os.path.basename(path).replace(".tsv", "")
        return sample_name, series
    except Exception:
        return None

# === 合併 parquet 子檔案 ===
def load_and_align(path_and_columns):
    path, all_columns = path_and_columns
    table = pq.read_table(path)
    table_columns = table.column_names
    missing_columns = [col for col in all_columns if col not in table_columns]
    for col in missing_columns:
        table = table.append_column(col, pa.nulls(len(table)))
    table = table.select(all_columns)
    return table

# === 主流程函數 ===
def preprocess(cancer_folder, input_folder, batch_size=50, n_jobs=8):
    # Step 1: 掃描檔案路徑
    tsv_paths = []
    for folder in cancer_folder:
        root_dir = os.path.join(input_folder, folder)
        for dirpath, _, filenames in os.walk(root_dir):
            for filename in filenames:
                if filename.endswith(".tsv"):
                    tsv_paths.append(os.path.abspath(os.path.join(dirpath, filename)))
    print(f"共找到 {len(tsv_paths)} 份 TSV 檔案。")

    # Step 2: 找出共通 gene_id
    common_genes = None
    for path in tqdm(tsv_paths, desc="計算共通基因中..."):
        df = pd.read_csv(path, sep="\t", skiprows=6, usecols=[0], names=["gene_id"], header=None)
        genes = set(gene.rsplit('.', 1)[0] for gene in df["gene_id"])
        common_genes = genes if common_genes is None else common_genes & genes
    common_genes = sorted(common_genes)

    # Step 3: 建立輸出目錄
    parquet_dir = os.path.join(input_folder, "final_data")
    os.makedirs(parquet_dir, exist_ok=True)

    # Step 4: 多核心平行讀取與批次寫入
    tasks = [(path, common_genes) for path in tsv_paths]
    n_jobs = n_jobs
    
    batch = []
    sample_names = []
    batch_id = 0

    with Pool(processes=n_jobs) as pool:
        for result in tqdm(pool.imap(load_sample, tasks), total=len(tasks), desc="處理並寫出 parquet 子檔案中..."):
            if result is None:
                continue
            sample_name, series = result
            batch.append(series)
            sample_names.append(sample_name)

            if len(batch) == batch_size:
                df_batch = pd.DataFrame(batch).T
                df_batch.columns = sample_names
                df_batch.index = common_genes
                table = pa.Table.from_pandas(df_batch)
                pq.write_table(table, os.path.join(parquet_dir, f"batch_{batch_id:05}.parquet"),
                               compression="snappy")
                batch = []
                sample_names = []
                batch_id += 1

    if batch:
        df_batch = pd.DataFrame(batch).T
        df_batch.columns = sample_names
        df_batch.index = common_genes
        table = pa.Table.from_pandas(df_batch)
        pq.write_table(table, os.path.join(parquet_dir, f"batch_{batch_id:05}.parquet"),
                       compression="snappy")

    # Step 5: 分批合併所有 parquet 子檔案為 final_data.parquet
    print("分批合併 parquet 子檔案中...")
    paths = sorted(glob.glob(os.path.join(parquet_dir, "*.parquet")))

    # 讀入每個 parquet 檔為 pandas DataFrame，並以 axis=1 合併（橫向）
    df_parts = []
    for path in tqdm(paths, desc="讀入 parquet 子檔"):
        df = pd.read_parquet(path)  # 應為 gene_id 為 index、樣本為欄位
        df_parts.append(df)

    # 最終橫向合併（以 gene 為索引）
    final_df = pd.concat(df_parts, axis=1)

    # 儲存為 Parquet
    final_path = os.path.join(input_folder, "final_data.parquet")
    final_df.to_parquet(final_path, compression="snappy")
    print(f"✅ 最終資料完成：{final_path}")


In [5]:
# Import necessary module
import os

# Define the function
def find_folder(folder):
    """ Find folders for each cancer """
    # Create a empty list to store folder data
    cancer_folder = []
    
    # Walk across the file and folders in "folder"
    for dir in os.listdir(folder):
        cancer_folder.append(dir)
        
        # Remove .txt files in this list
        if dir.endswith(".txt"):
            cancer_folder.remove(dir)
    
    # Return this list for further utilization
    return cancer_folder
    

In [6]:
input_folder = "/home/terry_0714/tcga_cancer"
cancer_folder = find_folder(input_folder)

preprocess(cancer_folder, input_folder, batch_size=50, n_jobs=8)

共找到 10759 份 TSV 檔案。


計算共通基因中...: 100%|██████████| 10759/10759 [17:40<00:00, 10.15it/s]
處理並寫出 parquet 子檔案中...: 100%|██████████| 10759/10759 [22:22<00:00,  8.02it/s] 


分批合併 parquet 子檔案中...


讀入 parquet 子檔: 100%|██████████| 216/216 [00:44<00:00,  4.87it/s]


✅ 最終資料完成：/home/terry_0714/tcga_cancer/final_data.parquet
