In [3]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import xarray as xr
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split

In [None]:
class BrainSpanData:
    """
    뇌 발달 데이터 전처리 클래스스
    """
    
    def __init__(self):
        self.data_array = None
        
    def load_data(self, expression_path:str, rows_path:str, columns_path:str):
        """
        데이터 로드
        
        Args:
            expression_path: 발현 데이터 파일 경로
            rows_path: 유전자 정보 파일 경로
            columns_path: 샘플 정보 파일 경로
            
        Returns:
            xarray.DataArray: xarray로 통합 및 변환된 데이터
        """
        expression = pd.read_csv(expression_path, header=None)
        gene_info = pd.read_csv(rows_path)
        sample_info = pd.read_csv(columns_path)
        
        
        # Expression matrix 처리 (세 번째 열 = gene ID)
        gene_ids = gene_info.iloc[:,0].values
        expr_values = expression.iloc[:, 1:].values.astype(np.float32)
        
        # Multi-dimensional array 생성
        self.data_array = xr.DataArray(
            expr_values,
            dims=['gene', 'sample'],
            coords={
                'gene': gene_ids,
                'sample': range(len(sample_info)),
                'gene_symbol': ('gene', gene_info['gene-symbol'].values),
                'gene_name': ('gene', gene_info['gene-name'].values),
                'chromosome': ('gene', gene_info['chromosome'].values),
                'donor_id': ('sample', sample_info['donor_id'].values),
                'donor_age': ('sample', sample_info['donor_age'].values),
                'structure_name': ('sample', sample_info['structure_name'].values),
                'structure_id': ('sample', sample_info['structure_id'].values)
            }
        )
        
        return self.data_array
    
    def preprocess_for_deep_learning(self):
        """딥러닝용 전처리"""
        # Age를 numerical로 변환
        age_mapping = self._parse_ages()
        
        # # Brain region을 categorical encoding --> 나중에 필요하면 사용하기
        # structure_encoder = LabelEncoder()
        # structure_encoded = structure_encoder.fit_transform(
        #     self.data_array.coords['structure_name'].values
        # )
        
        # 발현 데이터 정규화
        normalized_data = self._gene_wise_normalizatoin()
        
        feature = normalized_data.values.T
        target = age_mapping

        return feature, target
        
        
    
    def _parse_ages(self):
        """발달 단계를 numerical 값으로 변환"""
        ages = self.data_array.coords['donor_age'].values
        age_numeric = []
        
        for age in ages:
            if 'pcw' in age:  # post-conception weeks
                weeks = float(age.split()[0])
                age_numeric.append(weeks)
            elif 'mos' in age:  # months
                months = float(age.split()[0])
                age_numeric.append(months * 4.33 + 40)  # week 단위로 변환환
            elif 'yrs' in age:  # years
                years = float(age.split()[0])
                age_numeric.append(years * 52 + 40)  # week 단위로 변환환
                
        return np.array(age_numeric)
    
    def _gene_wise_normalizatoin(self):
        """유전자 종류별 정규화"""
        gene_means = self.data_array.mean(axis=1, keepdims=True)
        gene_stds = self.data_array.std(axis=1, keepdims=True)
        
        gene_stds = np.where(gene_stds == 0,1, gene_stds) # 표준편차 0인 값 조정
        
        gene_normalized = (self.data_array - gene_means.values) / gene_stds
        
        return gene_normalized
    
    
    def create_data_loader(self, features:np,ndarray, targets:np.ndarray) -> DataLoader:
        """PyTorch DataLoader 생성"""
        dataset = BrainSpanDataset(features, targets)
        return DataLoader(dataset)

class BrainSpanDataset(Dataset):
    """PyTorch Dataset for BrainSpan data"""
    
    def __init__(self, features: np.ndarray, targets: np.ndarray):
        self.features = torch.FloatTensor(features)
        self.targets = torch.FloatTensor(targets)
    
    def __len__(self):
        return len(self.features)
    
    def __getitem__(self, idx):
        return self.features[idx], self.targets[idx]


In [37]:
## contigs

test_size = 0.2
random_state = 42


In [38]:
processor = BrainSpanData()

# 데이터 로딩
processor.load_data('Data/Expression.csv', 'Data/Rows.csv', 'Data/Columns.csv')

# 전처리
feature, target= processor.preprocess_for_deep_learning()

# 데이터 분할 - 만약 결과 좋지 않으면, 나이별 계층 구분해 데이터 분할하기
X_train, X_test, y_train, y_test = train_test_split(feature, target, test_size=test_size, random_state=random_state)

