In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
cd /content/drive/MyDrive/AItom/safety_embedding_model

/content/drive/MyDrive/AItom/safety_embedding_model


In [4]:
import os
import sys
import argparse
from pathlib import Path
from typing import List, Tuple
import warnings
warnings.filterwarnings('ignore')

import pandas as pd
import numpy as np
import torch
from tqdm import tqdm
from utils.multi_property_embedding import get_combined_embedding, DEFAULT_PROPERTIES





In [5]:
def load_data(
    data_path: str,
    formula_column: str = "formula",
    label_column: str = "Class",
    cas_column: str = "CAS Reg. No.* (or other ID)",
) -> Tuple[List[str], List[int], List[str]]:
    """
    Excel 파일에서 데이터 로드

    Parameters
    ----------
    data_path : str
        데이터 파일 경로
    formula_column : str
        formula 컬럼명 (없으면 CAS에서 변환)
    label_column : str
        라벨 컬럼명
    cas_column : str
        CAS number 컬럼명 (formula가 없을 경우 사용)

    Returns
    -------
    formulas : List[str]
        화학식 리스트
    labels : List[int]
        라벨 리스트 (0 또는 1)
    cas_numbers : List[str]
        원본 CAS number 리스트 (디버깅용)
    """
    print(f"데이터 로딩: {data_path}")
    df = pd.read_excel(data_path)

    print(f"전체 데이터 수: {len(df)}")
    print(f"컬럼: {df.columns.tolist()}")

    # 라벨 추출
    if label_column not in df.columns:
        raise ValueError(f"컬럼 '{label_column}'을 찾을 수 없습니다. 사용 가능한 컬럼: {df.columns.tolist()}")
    labels = df[label_column].astype(int).tolist()

    # CAS number 추출 (디버깅용)
    cas_numbers = []
    if cas_column in df.columns:
        cas_numbers = df[cas_column].astype(str).tolist()
    else:
        cas_numbers = [f"unknown_{i}" for i in range(len(df))]

    # Formula 추출
    if formula_column in df.columns:
        print(f"\n'{formula_column}' 컬럼에서 formula를 읽습니다.")
        formulas = []
        for val in df[formula_column]:
            if pd.isna(val) or str(val).strip() == '' or str(val).lower() == 'nan':
                formulas.append(None)
            else:
                formulas.append(str(val).strip())
    else:
        raise ValueError(
            f"'{formula_column}' 컬럼을 찾을 수 없습니다. "
            f"사용 가능한 컬럼: {df.columns.tolist()}\n"
            f"또는 CAS number 컬럼('{cas_column}')이 있으면 먼저 formula로 변환하세요."
        )

    # 유효한 formula 개수 확인
    valid_count = sum(1 for f in formulas if f is not None)
    print(f"  - 유효한 formula: {valid_count}/{len(formulas)} ({100*valid_count/len(formulas):.1f}%)")

    return formulas, labels, cas_numbers


def extract_embeddings(
    formulas: List[str],
    labels: List[int],
    cas_numbers: List[str],
    properties: List[str] = None,
    compute_device: str = None,
    batch_size: int = 1,
) -> Tuple[torch.Tensor, torch.Tensor, List[str], List[Tuple[str, str]]]:
    """
    Formula 리스트를 임베딩 벡터로 변환

    Parameters
    ----------
    formulas : List[str]
        화학식 리스트
    labels : List[int]
        라벨 리스트
    cas_numbers : List[str]
        CAS number 리스트 (디버깅용)
    properties : List[str], optional
        사용할 property 리스트 (기본값: DEFAULT_PROPERTIES)
    compute_device : str, optional
        계산 디바이스 (기본값: 자동 선택)
    batch_size : int
        배치 크기 (현재는 1로만 동작, 향후 개선 가능)

    Returns
    -------
    embeddings : torch.Tensor
        임베딩 벡터 텐서 (N, embedding_dim)
    valid_labels : torch.Tensor
        유효한 샘플의 라벨 (N,)
    valid_formulas : List[str]
        유효한 샘플의 formula 리스트
    failed_samples : List[Tuple[str, str]]
        실패한 샘플 리스트 (formula, error_message)
    """
    if properties is None:
        properties = DEFAULT_PROPERTIES

    embeddings = []
    valid_labels = []
    valid_formulas = []
    failed_samples = []

    print(f"\n임베딩 추출 시작...")
    print(f"  - Properties: {len(properties)}개")
    print(f"  - Device: {compute_device if compute_device else 'auto'}")

    # 디바이스 설정
    if compute_device is None:
        compute_device = 'cuda' if torch.cuda.is_available() else 'cpu'

    # tqdm 진행바 생성
    pbar = tqdm(
        zip(formulas, labels, cas_numbers),
        total=len(formulas),
        desc="임베딩 추출",
        unit="sample"
    )

    for idx, (formula, label, cas) in enumerate(pbar):
        # None인 formula는 스킵
        if formula is None:
            failed_samples.append((cas, "Formula is None"))
            pbar.set_postfix({
                'success': len(embeddings),
                'failed': len(failed_samples),
                'current': 'None'
            })
            continue

        try:
            # 임베딩 추출
            emb = get_combined_embedding(
                formula=formula,
                properties=properties,
                compute_device=compute_device,
                verbose=False,
            )

            # 1D 벡터로 변환
            if emb.dim() == 1:
                embeddings.append(emb.cpu())  # CPU로 이동 (메모리 절약)
            else:
                embeddings.append(emb.flatten().cpu())

            valid_labels.append(label)
            valid_formulas.append(formula)

            # 진행바 업데이트 (성공 시)
            pbar.set_postfix({
                'success': len(embeddings),
                'failed': len(failed_samples),
                'current': formula[:20] if len(formula) > 20 else formula
            })

        except Exception as e:
            failed_samples.append((formula, str(e)))
            # 진행바 업데이트 (실패 시)
            pbar.set_postfix({
                'success': len(embeddings),
                'failed': len(failed_samples),
                'current': f'ERROR: {formula[:15]}' if len(formula) > 15 else f'ERROR: {formula}'
            })
            if (idx + 1) % 100 == 0:  # 100개마다 실패 로그 출력
                print(f"\n[WARNING] {idx+1}번째 샘플 실패 - formula: {formula}, error: {str(e)[:50]}")

    pbar.close()

    # 텐서로 변환
    if len(embeddings) == 0:
        raise RuntimeError("모든 formula의 임베딩 추출 실패!")

    embeddings_tensor = torch.stack(embeddings)  # (N, embedding_dim)
    labels_tensor = torch.LongTensor(valid_labels)  # (N,)

    print(f"\n임베딩 추출 완료:")
    print(f"  - 성공: {len(embeddings)}/{len(formulas)} ({100*len(embeddings)/len(formulas):.1f}%)")
    print(f"  - 실패: {len(failed_samples)}/{len(formulas)} ({100*len(failed_samples)/len(formulas):.1f}%)")
    print(f"  - 임베딩 차원: {embeddings_tensor.shape[1]}")

    if len(failed_samples) > 0:
        print(f"\n실패한 샘플 (처음 10개):")
        for formula, error in failed_samples[:10]:
            print(f"  - {formula}: {error[:80]}")
        if len(failed_samples) > 10:
            print(f"  ... 외 {len(failed_samples)-10}개")

    return embeddings_tensor, labels_tensor, valid_formulas, failed_samples


def save_embeddings(
    embeddings: torch.Tensor,
    labels: torch.Tensor,
    formulas: List[str],
    failed_samples: List[Tuple[str, str]],
    output_path: str,
    properties: List[str] = None,
    metadata: dict = None,
):
    """
    임베딩 벡터를 PyTorch 텐서로 저장

    Parameters
    ----------
    embeddings : torch.Tensor
        임베딩 벡터 텐서 (N, embedding_dim)
    labels : torch.Tensor
        라벨 텐서 (N,)
    formulas : List[str]
        유효한 샘플의 formula 리스트
    failed_samples : List[Tuple[str, str]]
        실패한 샘플 리스트
    output_path : str
        저장 경로
    properties : List[str], optional
        사용한 property 리스트
    metadata : dict, optional
        추가 메타데이터
    """
    if properties is None:
        properties = DEFAULT_PROPERTIES

    # 저장할 데이터 구성
    data = {
        'embeddings': embeddings,  # (N, embedding_dim)
        'labels': labels,  # (N,)
        'formulas': formulas,  # List[str]
        'properties': properties,
        'embedding_dim': embeddings.shape[1],
        'num_samples': len(embeddings),
        'failed_samples': failed_samples,
    }

    # 메타데이터 추가
    if metadata:
        data['metadata'] = metadata

    # 저장
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    torch.save(data, output_path)
    print(f"\n임베딩 데이터 저장 완료: {output_path}")
    print(f"  - 파일 크기: {output_path.stat().st_size / 1024 / 1024:.2f} MB")

    # 실패한 샘플도 별도로 저장 (선택사항)
    if len(failed_samples) > 0:
        failed_path = output_path.parent / f"{output_path.stem}_failed.csv"
        failed_df = pd.DataFrame(failed_samples, columns=['formula', 'error'])
        failed_df.to_csv(failed_path, index=False, encoding='utf-8-sig')
        print(f"  - 실패한 샘플 저장: {failed_path}")

In [9]:
# 디바이스 설정
compute_device = 'cuda' if torch.cuda.is_available() else 'cpu'


# 1. 데이터 로드
data_path = '/content/drive/MyDrive/AItom/safety_embedding_model/training_data/final_dataset.xlsx'
cas_column = "CAS Reg. No.* (or other ID)"
label_column = "Class"
formula_column = "formula"
formulas, labels, cas_numbers = load_data(
        data_path=data_path,
        formula_column=formula_column,
        label_column=label_column,
        cas_column=cas_column,
)

# 2. 임베딩 추출
embeddings, valid_labels, valid_formulas, failed_samples = extract_embeddings(
        formulas=formulas[2500:],
        labels=labels[2500:],
        cas_numbers=cas_numbers[2500:],
        properties=DEFAULT_PROPERTIES,
        compute_device=compute_device,
)

output_path = "training_data/embeddings9.pt"
# 3. 저장
save_embeddings(
        embeddings=embeddings,
        labels=valid_labels,
        formulas=valid_formulas,
        failed_samples=failed_samples,
        output_path=output_path,
        properties=DEFAULT_PROPERTIES,
        metadata={
            'data_path': data_path,
            'formula_column': formula_column,
            'label_column': label_column,
            'device': compute_device,
        }
)

print("\n" + "="*60)
print("전처리 완료!")
print("="*60)

데이터 로딩: /content/drive/MyDrive/AItom/safety_embedding_model/training_data/final_dataset.xlsx
전체 데이터 수: 3058
컬럼: ['CAS Reg. No.* (or other ID)', 'Class', 'formula']

'formula' 컬럼에서 formula를 읽습니다.
  - 유효한 formula: 3058/3058 (100.0%)

임베딩 추출 시작...
  - Properties: 12개
  - Device: cuda


임베딩 추출: 100%|██████████| 558/558 [20:40<00:00,  2.22s/sample, success=546, failed=12, current=C2HNa3O6]


임베딩 추출 완료:
  - 성공: 546/558 (97.8%)
  - 실패: 12/558 (2.2%)
  - 임베딩 차원: 36

실패한 샘플 (처음 10개):
  - SbSc+3: +3 is an invalid formula!
  - H5O30P5V6-30: could not convert string to float: '6-30'
  - AlCaNaO4Si+2: +2 is an invalid formula!
  - C13H12NO+: + is an invalid formula!
  - AlNaO4P+: + is an invalid formula!
  - C12H17N4OS+: + is an invalid formula!
  - CoLiMnNiO+: + is an invalid formula!
  - C36H31Cl2KLiN17NaO15S4-: could not convert string to float: '4-'
  - C11H9N3NaO2+: + is an invalid formula!
  - C16H10AlN2O8S2+3: +3 is an invalid formula!
  ... 외 2개

임베딩 데이터 저장 완료: training_data/embeddings9.pt
  - 파일 크기: 0.09 MB
  - 실패한 샘플 저장: training_data/embeddings9_failed.csv

전처리 완료!



