In [57]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import pandas as pd
import numpy as np
import gc
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
device='cuda' if torch.cuda.is_available() else 'cpu'
gc.collect()
torch.cuda.empty_cache()

In [58]:
batch_size = 64
LEARNING_RATE = 0.0005
z_dim=10
activation_func='leaky_relu'
init='Normal'
EPOCHS =250
num_clusters=7

In [59]:
########################## 학습 시드 고정
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.cuda.manual_seed(RANDOM_SEED)
torch.cuda.manual_seed_all(RANDOM_SEED)
##########################


In [60]:
##dataset load##
data_type='sim'# #real_data 는 type에 따라 'real_merged','merged' 'real_sensitive' 'real_resistant' 총 세가지 있고, simulation data 이름은 데이터는sim_ALT_c5.tsv, 타겟은 sim_clades_c7.tsv
num_feature=300
alt = pd.read_csv(f'data/{data_type}_ALT_{num_feature}.tsv', sep='\t', header=None)

alt=alt.fillna(-1)
alt_transposed = alt.T
alt_data = alt_transposed.to_numpy()
alt_data[(alt_data != 0) & (alt_data != -1)] = 1


##### data_type이 simulation 데이터일 때만 clade있음 
if 'sim' in data_type:
    clade = pd.read_csv(f'data/{data_type}_clades_c{num_clusters}.tsv', sep='\t', header=None)
    clade_transposed = clade.T
    clade_data= clade_transposed.to_numpy()


In [61]:
clade_data.shape

(1, 5000)

In [62]:
datasets=alt_data
train_loader = DataLoader(datasets, batch_size=batch_size, shuffle=False, pin_memory=True)
test_loader = DataLoader(datasets, batch_size=len(datasets), shuffle=False, pin_memory=True)

In [63]:
input_dim=datasets[0].shape[0]

In [64]:
class AutoEncoder(nn.Module):
    def __init__(self, x_dim, hidden_dim_1, hidden_dim_2,latent_dim, init_type='Normal',
                 activation_type='ELU'):
        super(AutoEncoder, self).__init__()
        if activation_type == 'ELU':
            self.activation_fn = nn.ELU()
        elif activation_type == 'leaky_relu':
            self.activation_fn = nn.LeakyReLU()
        else:
            raise ValueError("Unsupported activation type. Choose 'relu' or 'leaky_relu'.")

        self.encoder = nn.ModuleList([
            nn.Linear(x_dim, hidden_dim_1),
            nn.BatchNorm1d(hidden_dim_1),
            self.activation_fn,
            nn.Dropout(),
            nn.Linear(hidden_dim_1, hidden_dim_2),
            nn.BatchNorm1d(hidden_dim_2),
            self.activation_fn,
            nn.Dropout(),
            nn.Linear(hidden_dim_2, latent_dim)
        ])
        
        # Decoder
        self.decoder = nn.ModuleList([
            nn.Linear(latent_dim, hidden_dim_2),
            nn.BatchNorm1d(hidden_dim_2),
            self.activation_fn,
            nn.Dropout(),
            nn.Linear(hidden_dim_2, hidden_dim_1),
            nn.BatchNorm1d(hidden_dim_1),
            self.activation_fn,
            nn.Dropout(),
            nn.Linear(hidden_dim_1, x_dim),
            self.activation_fn
        ])
        
        initialize_weights(self, init_type=init_type)
        
    def forward(self, x):
        encoder_outputs = []
        # Encoder: Apply layers and save activation function outputs
        for layer in self.encoder:
            x = layer(x)
            if layer==self.activation_fn:  # Activation is applied after each Linear 
                encoder_outputs.append(x)

        latent = x
        
        decoder_outputs = []
        # Decoder: Apply layers and save activation function outputs
        for layer in self.decoder:
            x = layer(x)
            if layer==self.activation_fn:  # Activation is applied after each Linear layer
                decoder_outputs.append(x)

        return latent, x,encoder_outputs, decoder_outputs


In [65]:
import torch.nn as nn

def initialize_weights(net, init_type='Xavier'):
    for m in net.modules():
        if isinstance(m, nn.Conv2d) or isinstance(m, nn.ConvTranspose2d) or isinstance(m, nn.Linear):
            if init_type == 'He':
                # He 초기화는 Kaiming 초기화의 특별한 경우로, 일반적으로 'leaky_relu'를 비선형성으로 사용
                nn.init.kaiming_normal_(m.weight, nonlinearity='leaky_relu')
            elif init_type == 'Xavieru':
                nn.init.xavier_uniform_(m.weight)
            elif init_type == 'Xavier':
                nn.init.xavier_normal_(m.weight)
            elif init_type == 'Normal':
                m.weight.data.normal_(0, 0.02)
            
            elif init_type == 'Kaiming_Uniform':
                nn.init.kaiming_uniform_(m.weight, nonlinearity='leaky_relu')
            else:
                raise ValueError(f"Unsupported initialization type: {init_type}")
            # 편향 (bias) 초기화
            if m.bias is not None:
                nn.init.constant_(m.bias, 0)

In [66]:
model = AutoEncoder(input_dim, 200,   100,  z_dim,init_type=init,activation_type=activation_func)

model=model.to(device)

In [67]:
criterion_reconst = nn.MSELoss(reduction='none') 
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
#scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.8)

In [68]:
loss_list = []
model.train()
for epoch in tqdm(range(EPOCHS)):
    train_loss = 0
    for batch_idx, x in enumerate(train_loader):
        x = x.float()
        x   = x.to(device)
        mask = (x!= -1).float()
        optimizer.zero_grad()
        latent, outputs,encoder_outputs, decoder_outputs = model(x)
        x_hat = outputs
        loss = criterion_reconst(x_hat, x)
        loss = loss *mask
        loss = loss.sum() / mask.sum()
        train_loss += loss.item()
        loss.backward()
        optimizer.step()
    avg_loss_train = train_loss / len(train_loader)
    loss_list.append(avg_loss_train)

  0%|          | 0/250 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [72]:
for batch_idx, x in enumerate(train_loader):
        print(len(x))

64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
64
8


In [53]:
model.eval()

test_loss = 0
for batch_idx, x in enumerate(test_loader):
    x = x.float()
    x = x.to(device)
    mask = (x != -1).float()
    latent, x_hat, encoder_outputs, decoder_outputs = model(x)
    loss = criterion_reconst(x_hat, x)
    ## missing value 는 loss 계산에서 무시
    loss = loss * mask

    loss = loss.sum() / mask.sum()
    ##
    test_loss += loss.item()
    z_v = latent
    z_v = z_v.cpu().detach().numpy()
    latent_feature = np.vstack((z_v))

avg_loss_test = test_loss / len(test_loader)



In [54]:

# Latent feature 및 인덱스 배열 준비
X = latent_feature

decoder_output_0 = decoder_outputs[0].cpu().detach().numpy()
decoder_output_1 = decoder_outputs[1].cpu().detach().numpy()

# Latent feature와 decoder outputs 결합
combined_features = np.concatenate((X, decoder_output_0), axis=1)

In [73]:
combined_features.shape

(5000, 110)

In [55]:
from sklearn.mixture import BayesianGaussianMixture
# Bayesian Gaussian Mixture Clustering
bgmm = BayesianGaussianMixture(n_components=num_clusters, max_iter=500, n_init=10, random_state=RANDOM_SEED)
bgmm.fit(combined_features)
y_pred = bgmm.predict(combined_features)
np.savetxt(f"outputs/merge_zd1.tsv", y_pred, delimiter="\t", fmt="%d")


In [56]:
## simulation data라면 target 이 있어서 ARI score 점수 계산 수행
if 'sim' in data_type:
    from sklearn.metrics import adjusted_rand_score
    target=clade_data
    ari_score = adjusted_rand_score(target[0], y_pred)
    print("ARI Score for simulation dataset:", ari_score)


ARI Score for simulation dataset: 0.9683295997040139
