In [None]:
#!pip install torch pandas numpy scikit-learn pyarrow "numpy<2"

In [None]:
#!pip install stable-baselines3[extra] gymnasium

In [None]:
# === 필요한 라이브러리 설치 ===
# 아나콘다 프롬프트에서 미리 설치하거나, 노트북 셀에서 실행
# pip install torch pandas numpy scikit-learn pyarrow "numpy<2"
# pip install stable-baselines3[extra] gymnasium

import os
import time
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import gymnasium as gym
from gymnasium import spaces

from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import SubprocVecEnv, DummyVecEnv

# --- 경로 설정 (로컬 환경 기준) ---
# Google Drive 경로가 G: 드라이브에 마운트되었다고 가정
try:
    # '내 드라이브'에 한글이 포함되어 raw string 또는 슬래시 사용 권장
    base_dir = "/content/drive/MyDrive/Antibody_AI_Data_Stage1_AbNumber"
    # 또는 base_dir = 'G:/내 드라이브/Antibody_AI_Data_Stage1_AbNumber'

    if not os.path.exists(base_dir):
        raise FileNotFoundError(f"기본 디렉토리를 찾을 수 없습니다: {base_dir}")

    feature_dir = os.path.join(base_dir, '2_feature_engineered')
    split_dir = os.path.join(base_dir, '3_split_data')
    model_save_dir = os.path.join(base_dir, '4_trained_models')
    os.makedirs(model_save_dir, exist_ok=True)

    print(f"기본 디렉토리: {base_dir}")
    print(f"분할 데이터 로드 경로: {split_dir}")
    print(f"모델 저장 경로: {model_save_dir}")

except FileNotFoundError as e:
    print(e)
    # 필요한 경우 스크립트 중단 또는 기본 경로 설정
    exit()
except Exception as e:
    print(f"경로 설정 중 오류 발생: {e}")
    exit()


# --- 데이터 로딩 ---
print("\n분할된 데이터 로딩 중...")
try:
    # 사용할 모델 임베딩 파일명 확인 (예: IgBert 사용 시)
    # 이전 단계에서 저장한 정확한 파일명 사용 필요
    model_name_safe = "Exscientia_IgBert" # 예시, 실제 사용한 모델명으로 변경
    vh_train_path = os.path.join(split_dir, f'X_train_vh.npy') # 경로 수정
    vl_train_path = os.path.join(split_dir, f'X_train_vl.npy') # 경로 수정

    # 실제 임베딩 파일명 생성 로직 (이전 단계 참고)
    # vh_embeddings_path = os.path.join(feature_dir, f'vh_embeddings_{model_name_safe}.npy')
    # vl_embeddings_path = os.path.join(feature_dir, f'vl_embeddings_{model_name_safe}.npy')
    # 위 경로는 전체 임베딩 파일이고, split_dir 아래에 분할된 npy 파일이 있어야 함.

    X_train_vh = np.load(vh_train_path)
    X_train_vl = np.load(vl_train_path)
    # X_val_vh = np.load(os.path.join(split_dir, 'X_val_vh.npy'))
    # X_val_vl = np.load(os.path.join(split_dir, 'X_val_vl.npy'))
    # X_test_vh = np.load(os.path.join(split_dir, 'X_test_vh.npy'))
    # X_test_vl = np.load(os.path.join(split_dir, 'X_test_vl.npy'))

    metadata_train = pd.read_parquet(os.path.join(split_dir, 'metadata_train.parquet'))
    # metadata_val = pd.read_parquet(os.path.join(split_dir, 'metadata_val.parquet'))
    # metadata_test = pd.read_parquet(os.path.join(split_dir, 'metadata_test.parquet'))

    print("데이터 로딩 완료.")
    print("Train set shape (VH):", X_train_vh.shape)

except FileNotFoundError as e:
    print(f"오류: 분할된 데이터 파일을 찾을 수 없습니다. 경로를 확인하세요: {e}")
    raise
except Exception as e:
    print(f"데이터 로딩 중 오류 발생: {e}")
    raise

# --- 데이터 준비 ---
# VH, VL 임베딩 결합 (예: 단순 연결)
X_train = np.concatenate((X_train_vh, X_train_vl), axis=1)
# X_val = np.concatenate((X_val_vh, X_val_vl), axis=1)
# X_test = np.concatenate((X_test_vh, X_test_vl), axis=1)
print("VH, VL 임베딩 결합 완료. Train shape:", X_train.shape)

# GPU 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

기본 디렉토리: /content/drive/MyDrive/Antibody_AI_Data_Stage1_AbNumber
분할 데이터 로드 경로: /content/drive/MyDrive/Antibody_AI_Data_Stage1_AbNumber/3_split_data
모델 저장 경로: /content/drive/MyDrive/Antibody_AI_Data_Stage1_AbNumber/4_trained_models

분할된 데이터 로딩 중...
데이터 로딩 완료.
Train set shape (VH): (6862, 1024)
VH, VL 임베딩 결합 완료. Train shape: (6862, 2048)
Using device: cpu


In [None]:
# --- 속성 예측 모델 로드 또는 정의 ---

# 실제 구현 시에는 이 부분에 미리 학습된 모델을 로드하거나,
# 라벨 데이터가 있다면 여기서 모델을 정의하고 학습시켜야 합니다.

# 예시 1: 미리 학습된 모델 로드 (파일이 존재한다고 가정)
# predictor_model_path = os.path.join(model_save_dir, 'property_predictor_model.pth')
# if os.path.exists(predictor_model_path):
#     input_dim = X_train.shape[1]
#     predictor_model = PropertyPredictor(input_dim=input_dim).to(device) # Phase 2에서 정의한 모델 클래스 필요
#     try:
#         predictor_model.load_state_dict(torch.load(predictor_model_path, map_location=device))
#         predictor_model.eval()
#         print(f"속성 예측 모델 로드 완료: {predictor_model_path}")
#     except Exception as e:
#         print(f"경고: 속성 예측 모델 로드 실패 ({e}). 플레이스홀더 보상을 사용합니다.")
#         predictor_model = None
# else:
#     print("경고: 학습된 속성 예측 모델 파일을 찾을 수 없습니다. 플레이스홀더 보상을 사용합니다.")
#     predictor_model = None

# 예시 2: 플레이스홀더 함수 정의 (실제 모델 없을 경우)
# 이 함수는 실제 최적화 목표에 맞게 반드시 수정되어야 합니다!
def placeholder_property_predictor(embedding_tensor):
    """
    임베딩을 받아 임의의 속성 점수(보상 관련)를 반환하는 플레이스홀더 함수.
    실제 예측 모델로 교체하거나, 다른 보상 계산 로직(예: pLM likelihood)으로 대체 필요.
    """
    # 예시: 특정 타겟 임베딩과의 거리 (거리가 가까울수록 높은 점수)
    # target_embedding = torch.randn_like(embedding_tensor) # 실제로는 목표 임베딩 사용
    # score = -torch.norm(embedding_tensor - target_embedding, dim=1) # 거리의 음수값
    # return score.item()

    # 예시 2: 임베딩 벡터의 특정 부분 합계 (단순 예시)
    # score = torch.sum(embedding_tensor[:, :10], dim=1)
    # return score.item()

    # 가장 간단한 예시: 랜덤 점수 반환
    return np.random.rand() * 10 # 0 ~ 10 사이 랜덤 점수


predictor_model = placeholder_property_predictor # 플레이스홀더 함수 사용
print("알림: 실제 속성 예측 모델 대신 플레이스홀더 보상 함수를 사용합니다.")
print("      => 의미있는 최적화를 위해서는 이 부분을 반드시 수정해야 합니다.")

알림: 실제 속성 예측 모델 대신 플레이스홀더 보상 함수를 사용합니다.
      => 의미있는 최적화를 위해서는 이 부분을 반드시 수정해야 합니다.


In [None]:
# --- 강화학습 환경 정의 (Gymnasium Env 상속) ---
class AntibodyOptimizeEnv(gym.Env):
    """
    항체 임베딩 최적화를 위한 커스텀 Gymnasium 환경.
    State: 현재 항체 임베딩 (VH+VL 결합).
    Action: 임베딩 벡터에 작은 변화를 주는 벡터 (연속 공간).
    Reward: 속성 예측 모델의 출력 또는 다른 지표 (높을수록 좋음).
    """
    metadata = {'render_modes': ['human']} # 필요시 렌더링 모드 정의

    def __init__(self, initial_embeddings_pool, predictor_fn, max_steps=100, action_scale=0.01):
        """
        Args:
            initial_embeddings_pool (np.ndarray): 초기 상태로 사용할 임베딩 풀 (예: X_train).
            predictor_fn (callable): 임베딩 텐서를 입력받아 속성 점수를 반환하는 함수.
            max_steps (int): 한 에피소드의 최대 스텝 수.
            action_scale (float): 액션(변화량)의 크기를 조절하는 스케일 팩터.
        """
        super().__init__()

        self.embeddings_pool = initial_embeddings_pool
        self.predictor_fn = predictor_fn
        self.max_steps = max_steps
        self.action_scale = action_scale
        self.embedding_dim = initial_embeddings_pool.shape[1]

        # Observation Space: 임베딩 벡터 (연속 공간)
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf,
                                            shape=(self.embedding_dim,), dtype=np.float32)

        # Action Space: 임베딩 벡터에 더할 변화량 (연속 공간, -1 ~ 1 범위)
        # 실제 적용 시에는 action * action_scale 만큼 변화시킴
        self.action_space = spaces.Box(low=-1, high=1,
                                       shape=(self.embedding_dim,), dtype=np.float32)

        self.current_step = 0
        self.current_embedding = None

    def _get_initial_embedding(self):
        # 풀에서 무작위로 초기 임베딩 선택
        idx = np.random.randint(len(self.embeddings_pool))
        return self.embeddings_pool[idx].copy()

    def reset(self, seed=None, options=None):
        # 환경 초기화
        super().reset(seed=seed) # Gymnasium 표준 시드 설정
        self.current_embedding = self._get_initial_embedding()
        self.current_step = 0
        observation = self.current_embedding
        info = {} # 추가 정보 (필요시)
        # print("Environment reset.") # 디버깅용
        return observation, info

    def step(self, action):
        # 1. Action 적용 (임베딩 수정)
        # action은 -1 ~ 1 범위, action_scale로 실제 변화량 조절
        perturbation = action * self.action_scale
        self.current_embedding += perturbation
        # (선택사항) 임베딩 값 범위 제한 등 후처리 가능

        self.current_step += 1

        # 2. Reward 계산
        # 현재 임베딩을 Tensor로 변환하여 예측 함수에 전달
        embedding_tensor = torch.tensor(self.current_embedding, dtype=torch.float32).unsqueeze(0).to(device)
        # predictor_fn 이 Tensor를 직접 처리 못하면 numpy로 변환 후 전달 필요
        # 예: reward = self.predictor_fn(self.current_embedding)
        reward = self.predictor_fn(embedding_tensor) # predictor_fn의 반환 타입 확인 필요 (스칼라 값이어야 함)

        # 3. 종료 조건 확인 (Done)
        terminated = self.current_step >= self.max_steps
        truncated = False # 시간 제한 외 다른 이유로 잘리는 경우 (여기서는 사용 안함)

        # 4. 추가 정보 (Info) - 디버깅 등에 활용
        info = {}

        # 상태, 보상, 종료 여부 반환
        observation = self.current_embedding
        # print(f"Step: {self.current_step}, Reward: {reward:.4f}") # 디버깅용
        return observation, reward, terminated, truncated, info

    def render(self):
        # 시각화 로직 (필요시 구현)
        pass

    def close(self):
        # 환경 정리 작업 (필요시 구현)
        pass


# --- 병렬 환경 생성 및 에이전트 학습 ---
if __name__ == '__main__': # 멀티프로세싱 사용 시 필요

    # 병렬 환경 수 설정 (CPU 코어 수 고려)
    num_cpu = os.cpu_count()
    n_envs = max(1, num_cpu - 2) if num_cpu else 1 # 사용 가능한 CPU 코어보다 적게 설정 권장
    print(f"\n사용할 병렬 환경 수: {n_envs}")

    # 환경 생성 함수 정의
    # 초기 임베딩 풀(X_train), 예측 함수(predictor_model) 등을 인자로 전달
    env_kwargs = {
        'initial_embeddings_pool': X_train,
        'predictor_fn': predictor_model, # 실제 예측 모델 또는 개선된 보상 함수로 교체!
        'max_steps': 200, # 에피소드 최대 길이 조절 필요
        'action_scale': 0.01 # 임베딩 변화 크기 조절 필요
    }

    try:
        # 병렬 환경 생성 (SubprocVecEnv 사용)
        vec_env = make_vec_env(
            lambda: AntibodyOptimizeEnv(**env_kwargs),
            n_envs=n_envs,
            vec_env_cls=SubprocVecEnv # 멀티프로세스 기반 병렬 환경
            # vec_env_cls=DummyVecEnv # 디버깅 시 사용 (단일 프로세스)
        )
        print("병렬 환경 생성 완료.")

        # PPO 에이전트 정의
        # MlpPolicy: 표준 MLP 기반 정책/가치 네트워크
        # verbose=1: 학습 진행 상황 출력
        # device='auto': 사용 가능한 경우 GPU 자동 사용
        # n_steps, batch_size, learning_rate 등 주요 하이퍼파라미터 튜닝 필요
        rl_model = PPO("MlpPolicy", vec_env, verbose=1, device='auto',
                       n_steps=1024, batch_size=64, n_epochs=10, learning_rate=3e-4)
        print("PPO 에이전트 정의 완료.")

        # 에이전트 학습
        total_timesteps = 200000 # 총 학습 타임스텝 (충분히 길게 설정 필요)
        print(f"\n--- 강화학습 에이전트 학습 시작 (총 {total_timesteps} 타임스텝) ---")
        start_time_rl = time.time()
        rl_model.learn(total_timesteps=total_timesteps, progress_bar=True) # 진행바 표시
        end_time_rl = time.time()
        print(f"--- 강화학습 에이전트 학습 완료 (소요 시간: {end_time_rl - start_time_rl:.2f} 초) ---")

        # 학습된 모델 저장
        model_save_path = os.path.join(model_save_dir, "antibody_ppo_agent")
        rl_model.save(model_save_path)
        print(f"학습된 RL 에이전트 저장 완료: {model_save_path}")

        # 병렬 환경 종료
        vec_env.close()

    except Exception as e:
        print(f"\n강화학습 실행 중 오류 발생: {e}")
        # 오류 발생 시 생성된 환경 닫기 시도
        if 'vec_env' in locals() and vec_env is not None:
            try:
                vec_env.close()
                print("오류 발생 후 병렬 환경 종료 시도.")
            except Exception as close_e:
                print(f"병렬 환경 종료 중 추가 오류: {close_e}")
        raise # 오류 다시 발생시켜 확인

In [None]:
# --- 학습된 RL 에이전트 활용 (예시) ---

# 1. 모델 로드 (필요시)
# rl_model = PPO.load(model_save_path, device=device)

# 2. 특정 시작 임베딩으로 최적화 실행
# initial_embedding = X_test[0].copy() # 예: 테스트셋의 첫 번째 임베딩
# obs, _ = env.reset() # 단일 환경 필요 (또는 VecEnv의 reset 사용)
# obs[:] = initial_embedding # VecEnv 사용 시 상태 설정 방식 다름

# optimized_embeddings = []
# current_obs = initial_embedding.copy() # 관찰 상태 초기화

# # 단일 환경에서 에피소드 실행 (병렬 환경 아님)
# single_env = AntibodyOptimizeEnv(**env_kwargs) # 평가용 단일 환경
# obs, _ = single_env.reset()
# obs = initial_embedding # 시작 상태 설정

# for _ in range(env_kwargs['max_steps']):
#     action, _states = rl_model.predict(obs, deterministic=True) # 결정론적 액션 선택
#     obs, reward, terminated, truncated, info = single_env.step(action)
#     print(f"  Eval Step Reward: {reward:.4f}")
#     if terminated or truncated:
#         break
# optimized_embeddings.append(obs.copy())

# print("\n최적화된 임베딩 (예시):", optimized_embeddings[-1][:10]) # 마지막 임베딩 일부 출력

# 3. 결과 분석
# - 얻어진 optimized_embeddings 들의 속성 예측값 확인
# - 초기 임베딩 대비 얼마나 개선되었는지 평가
# - (고급) 임베딩을 다시 서열로 변환하는 방법 필요 (예: Decoder 모델, 가장 가까운 실제 서열 찾기 등)

print("\n--- 평가 및 활용 단계 (개념적 설명) ---")
print("학습된 에이전트를 사용하여 새로운 임베딩을 생성/최적화하고,")
print("그 결과를 속성 예측 모델 등으로 평가해야 합니다.")
print("임베딩을 서열로 변환하는 디코딩 단계가 필요할 수 있습니다.")