In [4]:
import torch
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
import selfies as sf
from tqdm import tqdm
import os

# 데이터 경로 설정
DATA_PATH = './TransGEM/data/subLINCS.csv'

# 1. Vocab 구축 및 Tokenizer 함수
def build_vocab(smiles_list):
    """
    데이터셋의 모든 SMILES를 SELFIES로 변환하여 등장하는 모든 토큰의 사전을 만듭니다.
    [cite: 383] - all unique SELFIES are split into tokens to build a dictionary.
    """
    vocab = set()
    max_len = 0
    
    print("Building Vocabulary...")
    for smiles in tqdm(smiles_list):
        try:
            # SMILES -> SELFIES 변환
            selfie = sf.encoder(smiles)
            if selfie is None: continue
            
            # 토큰화 ([C], [O] 등으로 분리)
            tokens = list(sf.split_selfies(selfie))
            vocab.update(tokens)
            max_len = max(max_len, len(tokens))
        except Exception as e:
            continue
            
    # 특수 토큰 추가
    vocab = sorted(list(vocab))
    token2idx = {'<pad>': 0, '<sos>': 1, '<eos>': 2}
    for i, token in enumerate(vocab):
        token2idx[token] = i + 3
        
    idx2token = {v: k for k, v in token2idx.items()}
    
    print(f"Vocab Size: {len(token2idx)}")
    print(f"Max Sequence Length: {max_len}")
    
    return token2idx, idx2token, max_len

def smile_to_indices(smiles, token2idx, max_len):
    """
    SMILES를 정수 인덱스 리스트로 변환 (Padding 포함)
    """
    try:
        selfie = sf.encoder(smiles)
        tokens = list(sf.split_selfies(selfie))
        
        # <sos> + tokens + <eos>
        indices = [token2idx['<sos>']] + [token2idx.get(t, token2idx['<pad>']) for t in tokens] + [token2idx['<eos>']]
        
        # Padding
        if len(indices) < max_len:
            indices += [token2idx['<pad>']] * (max_len - len(indices))
        else:
            indices = indices[:max_len] # 잘라내기
            
        return indices
    except:
        return [token2idx['<pad>']] * max_len

In [5]:
class TransGEMDataset(Dataset):
    def __init__(self, df, token2idx, max_len):
        self.df = df.reset_index(drop=True)
        self.token2idx = token2idx
        self.max_len = max_len
        
        # 1. Cell Line
        self.cell_lines = sorted(df['cell_line'].unique().tolist())
        self.cell2idx = {name: i for i, name in enumerate(self.cell_lines)}
        
        # 2. Gene Column 확인
        if 'gene_e' in df.columns:
            self.gene_col_name = 'gene_e'
            print(f"✅ 'gene_e' column detected. Parsing 'sep=//' format.")
        else:
            # 백업 로직
            exclude = ['cell_line', 'dose', 'drug_id', 'smiles', 'selfies', 'pert_id', 'pert_iname', 'pert_type']
            self.gene_col_name = [c for c in df.columns if c not in exclude]
            
    def __len__(self):
        return len(self.df)
    
    def tenfold_binary_embedding(self, value):
        """논문의 Tenfold Binary Embedding [cite: 288-292]"""
        try:
            value = float(value)
        except:
            value = 0.0
        sign_bit = 1 if value > 0 else 0
        tenfold_value = int(abs(value) * 10)
        binary_str = bin(tenfold_value)[2:].zfill(9)
        if len(binary_str) > 9: binary_str = binary_str[-9:] 
        vec = [sign_bit] + [int(b) for b in binary_str]
        return vec

    def parse_gene_string(self, gene_str):
        """
        '-0.1//-0.3//...' 형태의 문자열을 파싱하여 float 리스트로 변환
        """
        if isinstance(gene_str, str):
            try:
                # [수정됨] '//'를 기준으로 자르고, 빈 문자열은 제외
                return [float(x) for x in gene_str.split('//') if x.strip()]
            except Exception as e:
                print(f"Parsing Error: {e} in string: {gene_str[:50]}...")
                return []
        elif isinstance(gene_str, (list, np.ndarray)):
            return gene_str
        else:
            return []

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        
        # 1. Cell Line
        cell_idx = self.cell2idx.get(row['cell_line'], 0)
        
        # 2. Gene Expression
        if isinstance(self.gene_col_name, str):
            raw_gene_data = row[self.gene_col_name]
            gene_values = self.parse_gene_string(raw_gene_data)
        else:
            gene_values = row[self.gene_col_name].values.astype(float)
            
        # 3. Tenfold Binary Embedding
        gene_embeds = []
        for val in gene_values:
            gene_embeds.extend(self.tenfold_binary_embedding(val))
            
        gene_tensor = torch.tensor(gene_embeds, dtype=torch.float32)
        
        # 4. Target (Molecule)
        target_indices = smile_to_indices(row['smiles'], self.token2idx, self.max_len)
        target_tensor = torch.tensor(target_indices, dtype=torch.long)
        
        return {
            'cell_idx': torch.tensor(cell_idx, dtype=torch.long),
            'gene_tensor': gene_tensor, 
            'target': target_tensor
        }

In [7]:
import pandas as pd
import selfies as sf
from tqdm import tqdm
import os

# 1. 데이터 로드 (df 정의)
DATA_PATH = './TransGEM/data/subLINCS.csv'
if os.path.exists(DATA_PATH):
    df = pd.read_csv(DATA_PATH)
    print(f"✅ Data Loaded! Shape: {df.shape}")
else:
    print("❌ Data file not found!")

# 2. build_vocab 함수 정의 (만약 위쪽 셀에서 실행 안 했다면 필요)
def build_vocab(smiles_list):
    vocab = set()
    print("Building Vocabulary...")
    for smiles in tqdm(smiles_list):
        try:
            selfie = sf.encoder(smiles)
            if selfie is None: continue
            tokens = list(sf.split_selfies(selfie))
            vocab.update(tokens)
        except:
            continue
    
    vocab = sorted(list(vocab))
    token2idx = {'<pad>': 0, '<sos>': 1, '<eos>': 2}
    for i, token in enumerate(vocab):
        token2idx[token] = i + 3
    return token2idx

# 3. Vocab 생성 (token2idx 정의)
# 이 부분이 실행되어야 다음 셀의 token2idx 에러도 안 납니다.
if 'df' in locals():
    token2idx = build_vocab(df['smiles'].tolist())
    print(f"✅ Vocab Size: {len(token2idx)}")

✅ Data Loaded! Shape: (27378, 6)
Building Vocabulary...


100%|██████████| 27378/27378 [00:10<00:00, 2737.43it/s]

✅ Vocab Size: 73





In [8]:
# 2. Dataset & DataLoader 생성
# 데이터 로드 시에만 시간이 좀 걸리고, 이후에는 금방 됩니다.
dataset = TransGEMDataset(df, token2idx, max_len=100) 
dataloader = DataLoader(dataset, batch_size=4, shuffle=True) 

# 3. 샘플 배치 확인
try:
    sample_batch = next(iter(dataloader))
    print("\n✅ Sample Batch Loaded Successfully!")
    print("Cell Index Shape:", sample_batch['cell_idx'].shape)     # [4]
    print("Gene Tensor Shape:", sample_batch['gene_tensor'].shape) # [4, 9780] 이어야 함
    print("Target Shape:", sample_batch['target'].shape)           # [4, 100]
except Exception as e:
    print(f"\n❌ Error loading batch: {e}")
    # 디버깅을 위해 첫 번째 행의 컬럼들을 출력해봅니다
    print("First row columns:", df.iloc[0].index.tolist()[:10])

✅ 'gene_e' column detected. Parsing 'sep=//' format.

✅ Sample Batch Loaded Successfully!
Cell Index Shape: torch.Size([4])
Gene Tensor Shape: torch.Size([4, 9780])
Target Shape: torch.Size([4, 100])


In [9]:
# 데이터 로드 (이미 되어 있다면 df 변수 사용)
# df = pd.read_csv('./TransGEM/data/subLINCS.csv') 

print("--- gene_e 컬럼 데이터 샘플 ---")
sample_gene = df['gene_e'].iloc[0]
print(f"값: {sample_gene[:100]} ... (생략)") # 앞부분만 출력
print(f"타입: {type(sample_gene)}")

--- gene_e 컬럼 데이터 샘플 ---
값: -0.1//-0.3//-0.2//0.1//0.1//-0.1//-0.2//0.0//0.0//0.1//-0.0//-0.2//-0.0//-0.1//0.4//-0.1//0.0//0.2// ... (생략)
타입: <class 'str'>


In [10]:
import pandas as pd
import torch

# 1. 데이터 로드 (확인을 위해 상위 1개 행만 로드)
# 경로가 맞는지 꼭 확인하세요!
DATA_PATH = './TransGEM/data/subLINCS.csv'
df_sample = pd.read_csv(DATA_PATH, nrows=1)

# 2. Raw Data 확인 (특이한 포맷 // 구분자)
raw_gene_string = df_sample['gene_e'].iloc[0]
print("=== [Step 1] Raw Data in CSV (String Format) ===")
print(f"Format Check: {raw_gene_string[:60]} ...") 
print(f" (Separator: '//' detected)\n")

# 3. Parsing (문자열 -> 실수 리스트 변환)
parsed_values = [float(x) for x in raw_gene_string.split('//') if x.strip()]
print("=== [Step 2] Parsed Float Values (978 Genes) ===")
print(f"First 5 Genes: {parsed_values[:5]}")
print(f"Total Gene Count: {len(parsed_values)}\n")

# 4. Tenfold-Binary Embedding (핵심 로직 시각화)
def tenfold_binary_embedding_demo(value):
    sign_bit = 1 if value > 0 else 0
    tenfold_value = int(abs(value) * 10)
    binary_str = bin(tenfold_value)[2:].zfill(9)
    if len(binary_str) > 9: binary_str = binary_str[-9:]
    vec = [sign_bit] + [int(b) for b in binary_str]
    return vec

# 예시 값으로 변환 과정 보여주기
sample_val_1 = parsed_values[0]   # 실제 데이터의 첫 번째 값
sample_val_2 = 7.3                # 논문 예시 값

embed_1 = tenfold_binary_embedding_demo(sample_val_1)
embed_2 = tenfold_binary_embedding_demo(sample_val_2)

print("=== [Step 3] Tenfold-Binary Embedding Application ===")
print(f"Ex 1 (Real Data): {sample_val_1} \t---> {embed_1}")
print(f"Ex 2 (Paper Ex):  {sample_val_2} \t---> {embed_2}")
print("                   [Sign] + [9-bit Magnitude]\n")

# 5. 최종 차원 확장 확인
print("=== [Step 4] Final Input Dimension Expansion ===")
print(f"Original: {len(parsed_values)} genes (Floats)")
print(f"Expanded: {len(parsed_values)} * 10 = {len(parsed_values) * 10} dimensions (Binary Vector)")

=== [Step 1] Raw Data in CSV (String Format) ===
Format Check: -0.1//-0.3//-0.2//0.1//0.1//-0.1//-0.2//0.0//0.0//0.1//-0.0/ ...
 (Separator: '//' detected)

=== [Step 2] Parsed Float Values (978 Genes) ===
First 5 Genes: [-0.1, -0.3, -0.2, 0.1, 0.1]
Total Gene Count: 978

=== [Step 3] Tenfold-Binary Embedding Application ===
Ex 1 (Real Data): -0.1 	---> [0, 0, 0, 0, 0, 0, 0, 0, 0, 1]
Ex 2 (Paper Ex):  7.3 	---> [1, 0, 0, 1, 0, 0, 1, 0, 0, 1]
                   [Sign] + [9-bit Magnitude]

=== [Step 4] Final Input Dimension Expansion ===
Original: 978 genes (Floats)
Expanded: 978 * 10 = 9780 dimensions (Binary Vector)
