In [None]:
import pandas as pd
from Bio import SeqIO
import re
import os

# --- C·∫§U H√åNH ---
FASTA_PATH = "Homo_sapiens.GRCh38.pep.all.fa"
CSV_PATH = "variant_annotated_official_clean.csv"
OUTPUT_PATH = "variant_protein_sequence_101aa.csv"

WINDOW = 50 
TARGET_LEN = 101 # 50 + 1 (ƒë·ªôt bi·∫øn) + 50
PAD_CHAR = 'X'

In [None]:
# --- H√ÄM TI·ªÜN √çCH ---

def load_ensembl_protein_fasta(path):
    """
    K·∫ø th·ª´a logic load genome: Map t·ª´ Transcript ID (ENST) sang Protein Sequence.
    """
    mapping = {}
    print(f"üß¨ ƒêang n·∫°p FASTA t·ª´ {path}...")
    for record in SeqIO.parse(path, "fasta"):
        # Tr√≠ch xu·∫•t ENST t·ª´ header: ... transcript:ENST00000641515.2 ...
        match = re.search(r'transcript:(ENST\d+)', record.description)
        if match:
            enst_id = match.group(1)
            mapping[enst_id] = str(record.seq).upper()
    print(f"‚úÖ ƒê√£ n·∫°p {len(mapping)} m√£ transcript.")
    return mapping

def normalize_centered_protein(seq, center_idx, target_len, pad_char='X'):
    """
    K·∫ø th·ª´a ho√†n to√†n logic 'Symmetric Crop/Pad' t·ª´ notebook DNA c·ªßa b·∫°n.
    """
    half = target_len // 2
    start = center_idx - half
    end = center_idx + half + 1
    
    pad_left = max(0, -start)
    pad_right = max(0, end - len(seq))
    
    crop_left = max(0, start)
    crop_right = min(len(seq), end)
    
    final_seq = (pad_char * pad_left) + seq[crop_left:crop_right] + (pad_char * pad_right)
    return final_seq[:target_len] # ƒê·∫£m b·∫£o lu√¥n ƒë·ªß 101

In [None]:
# --- LOGIC X·ª¨ L√ù CH√çNH ---

# 1. Load FASTA
protein_dict = load_ensembl_protein_fasta(FASTA_PATH)

# 2. ƒê·ªçc CSV theo chunk (cho 3 tri·ªáu d√≤ng)
print(f"üìä ƒêang x·ª≠ l√Ω file CSV: {CSV_PATH}")
chunk_size = 100000
first_chunk = True

for chunk in pd.read_csv(CSV_PATH, chunksize=chunk_size):
    results = []
    
    for idx, row in chunk.iterrows():
        try:
            # Tr√≠ch xu·∫•t Transcript ID (Feature)
            enst_id = str(row['Feature']).split('.')[0]
            if enst_id not in protein_dict:
                results.append((None, None))
                continue
            
            full_ref_protein = protein_dict[enst_id]
            
            # X·ª≠ l√Ω v·ªã tr√≠ (L·∫•y s·ªë ƒë·∫ßu ti√™n n·∫øu l√† d·∫£i 226-228)
            pos_str = str(row['Protein_position']).split('-')[0]
            if pos_str == '-':
                results.append((None, None))
                continue
            pos_1based = int(pos_str)
            pos_0based = pos_1based - 1
            
            # X·ª≠ l√Ω Amino Acid (Ref/Alt)
            aa_change = str(row['Amino_acids'])
            if '/' not in aa_change:
                results.append((None, None))
                continue
            ref_aa_part, alt_aa_part = aa_change.split('/')
            
            # --- X√ÅC MINH (K·∫ø th·ª´a verify_ref_seq_center) ---
            actual_ref_aa = full_ref_protein[pos_0based : pos_0based + len(ref_aa_part)]
            if actual_ref_aa != ref_aa_part:
                # N·∫øu kh√¥ng kh·ªõp tuy·ªát ƒë·ªëi, ƒë√°nh d·∫•u l·ªói ho·∫∑c b·ªè qua
                results.append((None, None))
                continue

            # --- T·∫†O CHU·ªñI ALT ---
            # Thay th·∫ø ƒëo·∫°n ref b·∫±ng alt
            alt_full_protein = (full_ref_protein[:pos_0based] + 
                                alt_aa_part + 
                                full_ref_protein[pos_0based + len(ref_aa_part):])
            
            # X·ª¨ L√ù NONSENSE/STOP: N·∫øu g·∫∑p * ho·∫∑c X trong ph·∫ßn Alt
            # Bi·∫øn t·∫•t c·∫£ ph√≠a sau ƒëi·ªÉm d·ª´ng th√†nh PAD_CHAR 'X'
            if '*' in alt_full_protein or 'X' in alt_aa_part:
                stop_idx = alt_full_protein.find('*')
                if stop_idx == -1: stop_idx = alt_full_protein.find('X', pos_0based)
                # Gi·ªØ l·∫°i k√Ω t·ª± d·ª´ng, c√≤n l·∫°i ph√≠a sau l√† X
                alt_full_protein = alt_full_protein[:stop_idx+1] + (PAD_CHAR * (len(full_ref_protein)))

            # --- C·∫ÆT C·ª¨A S·ªî 101 AA ---
            ref_101 = normalize_centered_protein(full_ref_protein, pos_0based, TARGET_LEN, PAD_CHAR)
            alt_101 = normalize_centered_protein(alt_full_protein, pos_0based, TARGET_LEN, PAD_CHAR)
            
            results.append((ref_101, alt_101))
            
        except:
            results.append((None, None))

    # G√°n k·∫øt qu·∫£ v√†o chunk
    chunk['prot_ref_seq'], chunk['prot_alt_seq'] = zip(*results)
    
    # L∆∞u xu·ªëng file (Append mode)
    chunk.dropna(subset=['prot_ref_seq']).to_csv(
        OUTPUT_PATH, 
        mode='a' if not first_chunk else 'w', 
        index=False, 
        header=first_chunk
    )
    first_chunk = False
    print(f" > ƒê√£ l∆∞u xong m·ªôt c·ª•m d·ªØ li·ªáu...")

print(f"üéâ Ho√†n th√†nh! File k·∫øt qu·∫£: {OUTPUT_PATH}")