In [None]:
# -*- coding: utf-8 -*-
# 구글 코랩에서 실행 가능하며, `test.csv`와 `sample_submission.csv`가 현재 디렉토리에 있어야 합니다.
# ----------------------------------------------------------------------
# 1. 필수 라이브러리 설치 (Colab용)
# ----------------------------------------------------------------------
!pip install transformers datasets accelerate pandas numpy torch tqdm

# ----------------------------------------------------------------------
# 2. 라이브러리 임포트 및 유틸리티
# ----------------------------------------------------------------------
import os
import random
import math
from typing import List, Tuple

import numpy as np
import pandas as pd
from tqdm.auto import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from transformers import AutoTokenizer, AutoModel, get_linear_schedule_with_warmup

# ================================
# 0. 설정값 (하이퍼파라미터 튜닝 영역)
# ================================
# v2-50m 모델 사용
MODEL_NAME = "InstaDeepAI/nucleotide-transformer-v2-50m-multi-species"

TEST_PATH = "test.csv"
SAMPLE_SUB_PATH = "sample_submission.csv"
OUTPUT_PATH = "submission_ntv2_boosted_contrastive.csv"

MAX_SEQ_LEN = 512                                # 토큰 최대 길이
BASE_HIDDEN_DIM = 512                            # v2-50m 모델의 기본 hidden_size
EMBED_DIM = BASE_HIDDEN_DIM * 2                  # Mean + Max Pooling으로 1024차원 사용

DO_TRAIN = True                                  # 파인튜닝 여부 (대회에서는 True 권장)
MAX_TRAIN_SEQS = 20000                           # test 중 학습에 쓸 최대 시퀀스 수 (속도 고려)
EPOCHS = 1                                       # Colab 환경 최적화
BATCH_SIZE = 8
LR = 1e-5                                        # 학습률 조정 (1e-5 ~ 3e-5)
WARMUP_RATIO = 0.05

# Triplet Contrastive Learning 설정
# Mut_A: 작은 변이 (Positive 역할), Mut_B: 큰 변이 (Negative 역할)
MUTATION_LEVEL_A = 0.002                         # 0.2% SNV (작은 변이)
MUTATION_LEVEL_B = 0.01                          # 1.0% SNV (큰 변이)

# Triplet Loss Margin: dist(Anchor, Positive) + MARGIN < dist(Anchor, Negative)
TRIPLET_MARGIN = 0.2
ALPHA_MARGIN_SCALE = 1.0 # 동적 마진 스케일 조정 (PCC 개선)

SEED = 2025

# ================================
# 1. 유틸 함수들
# ================================
def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

def mutate_sequence_snvs(seq: str, mutation_ratio: float) -> Tuple[str, int]:
    """
    주어진 DNA 염기열에 대해 SNV(single nucleotide variants)만 랜덤으로 넣어서
    '조금 다른' variant 시퀀스를 만든다.
    """
    bases = ["A", "C", "G", "T"]
    seq = seq.upper()
    length = len(seq)

    # 최소 1개 변이 보장
    num_mutations = max(1, int(length * mutation_ratio))

    if num_mutations >= length:
        num_mutations = length // 2 if length >= 2 else 1

    positions = random.sample(range(length), num_mutations)
    seq_list = list(seq)

    for pos in positions:
        original = seq_list[pos]
        candidates = [b for b in bases if b != original]
        if not candidates:
            continue
        seq_list[pos] = random.choice(candidates)

    mutated = "".join(seq_list)
    return mutated, num_mutations

def get_pooled_embedding(last_hidden_state: torch.Tensor, attention_mask: torch.Tensor) -> torch.Tensor:
    """
    Mean Pooling과 Max Pooling을 결합하여 (Concatenative Pooling) 임베딩을 추출합니다.
    [B, L, H] -> [B, 2*H]
    """
    # 1. Mean Pooling (패딩 제외)
    mask = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
    masked_hidden = last_hidden_state * mask
    summed = masked_hidden.sum(dim=1)
    counts = mask.sum(dim=1).clamp(min=1e-6)
    mean_emb = summed / counts # [B, H]

    # 2. Max Pooling (패딩 제외)
    # 패딩 위치는 -inf로 설정하여 Max Pooling 시 선택되지 않도록 함
    masked_hidden_max = last_hidden_state.masked_fill(~mask.bool(), -1e9)
    max_emb, _ = torch.max(masked_hidden_max, dim=1) # [B, H]

    # 3. Concatenate (결합)
    return torch.cat((mean_emb, max_emb), dim=1) # [B, 2*H]

def cosine_distance(a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
    """
    코사인 거리: 1 - cos_sim
    """
    return 1.0 - F.cosine_similarity(a, b)

# ================================
# 2. Dataset 정의 (Triplet Loss 구조)
# ================================
class MutationContrastiveDataset(Dataset):
    """
    Anchor(Original), Positive(Small Mutation), Negative(Large Mutation) Triplet을 생성
    """
    def __init__(self, seq_list: List[str]):
        self.seqs = seq_list
        # 작은 변이: Positive 역할을 하여 거리를 가깝게 유도
        self.mutation_ratio_P = MUTATION_LEVEL_A
        # 큰 변이: Negative 역할을 하여 거리를 멀게 유도
        self.mutation_ratio_N = MUTATION_LEVEL_B

    def __len__(self):
        return len(self.seqs)

    def __getitem__(self, idx):
        seq = self.seqs[idx]

        # P: Original - Small Mutation (Positive Pair)
        mut_P, n_mut_P = mutate_sequence_snvs(seq, self.mutation_ratio_P)

        # N: Original - Large Mutation (Negative Pair)
        mut_N, n_mut_N = mutate_sequence_snvs(seq, self.mutation_ratio_N)

        return {
            "anchor": seq,
            "positive": mut_P,
            "negative": mut_N,
            "num_mut_P": n_mut_P,
            "num_mut_N": n_mut_N,
        }

def collate_fn(batch, tokenizer, max_len: int):
    anchors = [b["anchor"] for b in batch]
    positives = [b["positive"] for b in batch]
    negatives = [b["negative"] for b in batch]

    num_mut_P = [b["num_mut_P"] for b in batch]
    num_mut_N = [b["num_mut_N"] for b in batch]

    # 모든 시퀀스를 한 번에 토큰화
    all_seqs = anchors + positives + negatives

    enc = tokenizer(
        all_seqs,
        padding=True,
        truncation=True,
        max_length=max_len,
        return_tensors="pt",
    )

    # 배치 크기
    B = len(anchors)

    # 결과 분리
    enc_A = {k: v[:B] for k, v in enc.items()}
    enc_P = {k: v[B:2*B] for k, v in enc.items()}
    enc_N = {k: v[2*B:] for k, v in enc.items()}

    num_mut_tensor = torch.tensor([num_mut_P, num_mut_N], dtype=torch.float32).T # [B, 2]

    return enc_A, enc_P, enc_N, num_mut_tensor


# ================================
# 3. 모델 아키텍처 및 로드 (Dropout 추가)
# ================================
class VariantSensitiveGLM(nn.Module):
    """
    기존 AutoModel 위에 Dropout 레이어를 추가하여 Fine-Tuning 안정성 및 일반화 개선
    """
    def __init__(self, model_name: str, hidden_dropout_prob: float = 0.1):
        super().__init__()
        # AutoModelForMaskedLM 대신, 임베딩 추출에 더 적합한 AutoModel 사용
        # ⭐⭐⭐ 수정: ignore_mismatched_sizes=True를 추가하여 MLM 헤드 가중치 불일치 무시 ⭐⭐⭐
        self.base_model = AutoModel.from_pretrained(
            model_name,
            trust_remote_code=True,
            ignore_mismatched_sizes=True # MLM 헤드 가중치 불일치 무시
        )
        # 임베딩 추출 시 안정성을 위해 마지막 히든 스테이트에 Dropout 적용
        self.dropout = nn.Dropout(hidden_dropout_prob)

    def forward(self, input_ids, attention_mask):
        outputs = self.base_model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            output_hidden_states=True,
            return_dict=True
        )

        # 마지막 히든 스테이트에 Dropout 적용
        last_hidden_state = self.dropout(outputs.last_hidden_state)

        # Concatenative Pooling을 통해 최종 임베딩 벡터 반환
        pooled_emb = get_pooled_embedding(last_hidden_state, attention_mask)

        return pooled_emb # [B, 2*H]

def load_glm_model(model_name: str, device: torch.device):
    tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
    # tokenizer는 변경 없음
    model = VariantSensitiveGLM(model_name) # 여기서 ignore_mismatched_sizes가 적용됨
    model.to(device)

    return tokenizer, model

# ================================
# 4. 파인튜닝 루프 (Triplet Loss 적용)
# ================================
def train_variant_sensitive_glm(
    model: VariantSensitiveGLM,
    tokenizer: AutoTokenizer,
    train_seqs: List[str],
    device: torch.device,
):
    dataset = MutationContrastiveDataset(train_seqs)
    dataloader = DataLoader(
        dataset,
        batch_size=BATCH_SIZE,
        shuffle=True,
        collate_fn=lambda batch: collate_fn(batch, tokenizer, MAX_SEQ_LEN),
    )

    num_training_steps = EPOCHS * len(dataloader)
    warmup_steps = int(num_training_steps * WARMUP_RATIO)

    optimizer = torch.optim.AdamW(model.parameters(), lr=LR)
    scheduler = get_linear_schedule_with_warmup(
        optimizer,
        num_warmup_steps=warmup_steps,
        num_training_steps=num_training_steps,
    )

    model.train()

    # 동적 마진 계산: mutation_ratio_N - mutation_ratio_P = 0.01 - 0.002 = 0.008
    dynamic_margin = TRIPLET_MARGIN + ALPHA_MARGIN_SCALE * (MUTATION_LEVEL_B - MUTATION_LEVEL_A)
    dynamic_margin = torch.tensor(dynamic_margin, dtype=torch.float32).to(device)
    print(f"Triplet Loss Margin: {dynamic_margin.item():.4f}")


    for epoch in range(EPOCHS):
        epoch_loss = 0.0
        progress_bar = tqdm(dataloader, desc=f"Epoch {epoch+1}/{EPOCHS}")

        for step, (enc_A, enc_P, enc_N, num_mut_tensor) in enumerate(progress_bar):
            optimizer.zero_grad()

            # 1. Anchor, Positive, Negative 임베딩 추출 (Pooling은 모델 내부에서 처리)
            # [B, 2*H]
            emb_A = model(enc_A["input_ids"].to(device), enc_A["attention_mask"].to(device))
            emb_P = model(enc_P["input_ids"].to(device), enc_P["attention_mask"].to(device))
            emb_N = model(enc_N["input_ids"].to(device), enc_N["attention_mask"].to(device))

            # 2. 코사인 거리 계산
            dist_AP = cosine_distance(emb_A, emb_P) # Anchor-Positive 거리 (작아야 함) [B]
            dist_AN = cosine_distance(emb_A, emb_N) # Anchor-Negative 거리 (커야 함) [B]

            # 3. Triplet Margin Loss 적용
            # Triplet Loss: max(0, dist(A, P) - dist(A, N) + margin)

            loss_triplet = F.relu(dist_AP - dist_AN + dynamic_margin)
            loss = loss_triplet.mean()

            loss.backward()

            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            scheduler.step()

            epoch_loss += loss.item()
            progress_bar.set_postfix({"loss": f"{loss.item():.4f}"})

        print(f"[Epoch {epoch+1}] mean loss = {epoch_loss / len(dataloader):.4f}")

    return model

# ================================
# 5. test.csv 전체 임베딩 추출 (추론)
# ================================
class TestSeqDataset(Dataset):
    def __init__(self, df: pd.DataFrame):
        self.ids = df["ID"].tolist()
        self.seqs = df["seq"].astype(str).tolist()

    def __len__(self):
        return len(self.ids)

    def __getitem__(self, idx):
        return self.ids[idx], self.seqs[idx]


def collate_test(batch, tokenizer, max_len: int):
    ids = [b[0] for b in batch]
    seqs = [b[1] for b in batch]
    enc = tokenizer(
        seqs,
        padding=True,
        truncation=True,
        max_length=max_len,
        return_tensors="pt",
    )
    return ids, enc


def extract_embeddings(
    model: VariantSensitiveGLM,
    tokenizer: AutoTokenizer,
    test_df: pd.DataFrame,
    device: torch.device,
) -> pd.DataFrame:
    model.eval()
    dataset = TestSeqDataset(test_df)
    dataloader = DataLoader(
        dataset,
        batch_size=BATCH_SIZE * 4, # 추론 시 배치 사이즈 증대
        shuffle=False,
        collate_fn=lambda batch: collate_test(batch, tokenizer, MAX_SEQ_LEN),
    )

    all_ids = []
    all_embs = []

    with torch.no_grad():
        for ids, enc in tqdm(dataloader, desc="Extracting embeddings"):
            input_ids = enc["input_ids"].to(device)
            attn_mask = enc["attention_mask"].to(device)

            # 모델의 forward 함수는 이미 Concatenative Pooling된 [B, 2*H] 임베딩을 반환
            emb = model(input_ids, attn_mask)

            emb = emb.detach().cpu().numpy()
            all_ids.extend(ids)
            all_embs.append(emb)

    all_embs = np.vstack(all_embs)  # [N, EMBED_DIM]

    # submission 포맷으로 변환
    sub = pd.read_csv(SAMPLE_SUB_PATH)

    # ID 순서에 맞춰서 정렬
    id_to_index = {id_: i for i, id_ in enumerate(all_ids)}
    # test_df의 크기로 배열 생성
    ordered_embs = np.zeros((len(test_df), EMBED_DIM), dtype=np.float32)

    # test_df 순서로 정렬
    for i, id_ in enumerate(test_df["ID"].tolist()):
        idx = id_to_index[id_]
        ordered_embs[i] = all_embs[idx]

    emb_cols = [f"emb_{i:04d}" for i in range(EMBED_DIM)]
    emb_df = pd.DataFrame(ordered_embs, columns=emb_cols)
    out_df = pd.concat([test_df[["ID"]], emb_df], axis=1) # ID와 임베딩 결합

    return out_df

# ================================
# 6. 메인 실행부
# ================================
def main():
    set_seed(SEED)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Device: {device}")
    print(f"Final Embedding Dimension: {EMBED_DIM}")

    # 1. 데이터 로드
    test_df = pd.read_csv(TEST_PATH)
    print("test shape:", test_df.shape)

    # 2. gLM 로드
    tokenizer, model = load_glm_model(MODEL_NAME, device)

    # 3. ---------- 파인튜닝 (Self-Supervised Contrastive) ----------
    if DO_TRAIN:
        # 학습에 사용할 시퀀스 준비
        uniq_seqs = test_df["seq"].astype(str).unique().tolist()
        random.shuffle(uniq_seqs)
        train_seqs = uniq_seqs[:MAX_TRAIN_SEQS] if len(uniq_seqs) > MAX_TRAIN_SEQS else uniq_seqs

        print(f"Train sequences (randomly selected from test.csv): {len(train_seqs)}")
        model = train_variant_sensitive_glm(
            model=model,
            tokenizer=tokenizer,
            train_seqs=train_seqs,
            device=device,
        )

    # 4. ---------- 임베딩 추출 ----------
    submission_df = extract_embeddings(
        model=model,
        tokenizer=tokenizer,
        test_df=test_df,
        device=device,
    )

    # 5. 저장
    submission_df.to_csv(OUTPUT_PATH, index=False)
    print("Saved submission to:", OUTPUT_PATH)
    print("\n--- Final Submission Head ---")
    print(submission_df.head())


if __name__ == "__main__":
    main()