In [None]:
# These codes come from TIGON  project (https://github.com/yutongo/TIGON/blob/19d6648195a47b4d2a2d5025b440d37cf0ac9a17/AE/models.py)

In [None]:
import torch
import torch.utils.data
from torch import nn, optim
from torch.nn import functional as F
# from torchvision import datasets, transforms
import collections
import sys
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from AE.utility import create_activation,compute_distance_matrix
sys.path.append('../')

class MLP(nn.Module):
    def __init__(self, layers_list, dropout, norm,activation,last_act=False):
        super(MLP, self).__init__()
        layers=nn.ModuleList()
        assert len(layers_list)>=2, 'no enough layers'
        for i in range(len(layers_list)-2):
            layers.append(nn.Linear(layers_list[i],layers_list[i+1]))
            if norm:
                layers.append(nn.BatchNorm1d(layers_list[i+1]))
            if activation is not None:
                layers.append(activation)
            if dropout>0.:
                layers.append(nn.Dropout(dropout))
        layers.append(nn.Linear(layers_list[-2],layers_list[-1]))
        if norm:
            layers.append(nn.BatchNorm1d(layers_list[-1]))
        if last_act:
            if activation is not None:
                layers.append(activation)
        if dropout>0.:
            layers.append(nn.Dropout(dropout))
        # layers.append(nn.Linear(layers_list[-1],out_dim))
        self.network = nn.Sequential(*layers)
        # self.apply(init_weights_xavier_uniform)
    def forward(self,x):
        for layer in self.network:
            x=layer(x)
        return x
class LatentModel(nn.Module):
    def __init__(self,n_hidden,n_latent,
                 kl_weight=1e-6, warmup_step=10000):
        super(LatentModel,self).__init__()
        self.mu = nn.Linear(n_hidden, n_latent)
        self.logvar = nn.Linear(n_hidden, n_latent)
        # self.kl = 0
        self.kl_weight = kl_weight
        self.step_count = 0
        self.warmup_step = warmup_step

    def kl_schedule_step(self):
        self.step_count += 1
        if self.step_count < self.warmup_step:
            self.kl_weight = 0.0
        else:
            self.kl_weight = self.kl_weight + (1e-2 - 1e-6) / self.warmup_step

        # elif self.step_count == self.warmup_step:
        #     pass
            # self.step_count = 0
            # self.kl_weight = 1e-6

    def forward(self, h):
        mu = self.mu(h)
        log_var = self.logvar(h)
        sigma = torch.exp(0.5 * log_var)
        epsilon = torch.randn_like(sigma)
        if self.training:
            z = mu + sigma * epsilon
            # (1 + log_var - mu ** 2 - log_var.exp()).sum()* self.kl_weight
            # print('hhhhhhh')
            self.kl = -0.5 * (1 + log_var - mu ** 2 - log_var.exp()).sum()  * self.kl_weight#/ z.shape[0]
            self.kl_schedule_step()
        else:
            z = mu
        return z





class AutoEncoder(nn.Module):
    def __init__(
            self,
            in_dim: int,
            n_layers: int = 1,
            n_hidden: int = 500,
            n_latent: int = 10,
            activate_type: str='relu',
            dropout: float = 0.2,
            norm: bool = False,
            seed: int=42,
    ):
        '''
        Autoencoder model.
        Encoder and Decoder take identical architectures.

        Parameters:
            in_dim:
                dimension of the input feature
            n_layers:
                number of hidden layers
            n_hidden:
                dimension of hidden layer. All hidden layers take the same dimensions
            n_latent:
                dimension of latent space
            activate_type:
                activation functions.
                Options: 'leakyrelu','relu', 'gelu', 'prelu', 'elu', and None for identity map.
            dropout:
                dropout rate
            norm:
                whether to include batch normalization layer
            seed:
                random seed.
        '''
        super(AutoEncoder,self).__init__()
        torch.manual_seed(seed)
        torch.cuda.manual_seed(seed)
        self.in_dim=in_dim
        self.n_layers=n_layers
        self.n_hidden=n_hidden
        self.n_latent=n_latent
        self.dropout=dropout
        self.norm=norm
        self.activation=create_activation(activate_type)
        ## Encoder:
        self.encoder_layer=[in_dim]
        for i in range(n_layers):
            self.encoder_layer.append(n_hidden)
        self.encoder=MLP(self.encoder_layer,dropout,norm,self.activation,last_act=True)
        self.encoder_to_latent=MLP([self.encoder_layer[-1],n_latent],
                                   dropout,norm,self.activation)

        ## Decoder:
        self.decoder_layer=[n_latent]
        for i in range(n_layers):
            self.decoder_layer.append(n_hidden)
        self.decoder=MLP(self.decoder_layer,dropout,norm,self.activation,last_act=True)
        self.decoder_to_output=MLP([self.decoder_layer[-1],self.in_dim],dropout,norm,activation=None)


    def forward(self,x):
        rep=self.get_latent_representation(x,tensor=True)
        h = self.decoder(rep)
        x_recon=self.decoder_to_output(h)
        mse = nn.MSELoss(reduction='sum')
        loss = mse(x_recon, x)/x.shape[1]
        return loss


    def get_latent_representation(self,x,tensor:bool=False):
        '''
        Get latent space representation

        Parameters
        x:
            Input space
        tensor:
            If input x is a tensor, or it is a numpy array
        Return
        rep:
            latent space representation
            If tensor==True:
                return a tensor
            If tensor==Flase:
                return a numpy array
        '''
#        if not tensor:
#            x=torch.tensor(x,dtype=torch.float32)
#            self.eval()
        x=self.encoder(x)
        rep=self.encoder_to_latent(x)
        #if not tensor:
        #    rep=rep.detach().numpy()
        return rep
    def get_reconstruction(self, x):
        '''
        Reconstruct/impute gene expression data
        x:
            features. Numpy array
        Return
        x_recon:
            Numpy array
        '''
        self.eval()
        x=torch.tensor(x,dtype=torch.float32)
#        with torch.no_grad():
        x=self.encoder(x)
        x=self.encoder_to_latent(x)
        x = self.decoder(x)
        x_recon = self.decoder_to_output(x)

        #x_recon=x_recon.detach().numpy()
        return x_recon
    def get_generative(self,z):
        '''
        genereate gene expression data from latent space variable
        z:
            latent space representation. Numpy array
        Return
        x_recon:
            Numpy array
        '''
        self.eval()
        #z=torch.tensor(z,dtype=torch.float32)
#        with torch.no_grad():
        x = self.decoder(z)
        x_recon = self.decoder_to_output(x)
        #x_recon=x_recon.detach().numpy()
        return x_recon

这段代码定义了一个自编码器（Autoencoder, AE）和一些相关的模型架构，如 `MLP` 和 `LatentModel`。下面我将逐步解析每个函数的功能和具体实现。

### 1. **`MLP` 类**
- **功能**: 定义了一个多层感知机（MLP）模型。
- **`__init__` 方法**:
  - `layers_list`: 一个列表，定义了每一层的维度。
  - `dropout`: Dropout层的丢弃率，用于防止过拟合。
  - `norm`: 是否添加Batch Normalization层。
  - `activation`: 激活函数（如ReLU, Sigmoid等）。
  - `last_act`: 是否在最后一层添加激活函数。
  - `self.network`: 将这些层和操作堆叠在一起，形成一个完整的神经网络。

- **`forward` 方法**:
  - 依次通过网络中的各层，返回网络的输出。

### 2. **`LatentModel` 类**
- **功能**: 定义了一个变分自编码器（VAE）中的潜在空间模型，生成潜在向量并计算KL散度（用于变分推断）。
- **`__init__` 方法**:
  - `n_hidden`: 隐藏层的维度。
  - `n_latent`: 潜在空间的维度。
  - `kl_weight`: KL散度的权重。
  - `warmup_step`: 训练过程中KL散度的预热步数。

- **`kl_schedule_step` 方法**:
  - 用于逐步增加KL散度的权重，直到预热阶段结束。

- **`forward` 方法**:
  - 根据输入的隐藏层表示 `h`，计算潜在变量的均值 `mu` 和对数方差 `log_var`。
  - 如果模型处于训练模式，生成潜在向量 `z` 并计算KL散度。
  - 在推理模式下，直接使用均值 `mu` 作为潜在变量。

### 3. **`AutoEncoder` 类**
- **功能**: 定义了一个标准的自编码器（Autoencoder）。在训练过程中，通过最小化输入数据与重构数据之间的均方误差（MSE）来训练模型。
- **`__init__` 方法**:
  - `in_dim`: 输入数据的维度。
  - `n_layers`: 隐藏层的层数。
  - `n_hidden`: 隐藏层的维度。
  - `n_latent`: 潜在空间的维度。
  - `activate_type`: 激活函数类型（如ReLU, LeakyReLU等）。
  - `dropout`: Dropout层的丢弃率。
  - `norm`: 是否添加Batch Normalization层。
  - `seed`: 随机种子，用于确保可重复性。
  - **Encoder 部分**:
    - 通过`MLP`类构建编码器，逐层将输入映射到潜在空间。
  - **Decoder 部分**:
    - 通过另一个`MLP`类构建解码器，将潜在空间映射回输入空间。

- **`forward` 方法**:
  - 将输入 `x` 传递通过编码器得到潜在表示 `rep`，然后通过解码器得到重构数据。
  - 计算输入和重构数据之间的均方误差（MSE），作为损失函数。

- **`get_latent_representation` 方法**:
  - 给定输入 `x`，通过编码器和编码器到潜在空间的映射，返回潜在表示。

- **`get_reconstruction` 方法**:
  - 给定输入 `x`，通过整个自编码器的过程得到重构数据。

- **`get_generative` 方法**:
  - 给定潜在空间的表示 `z`，通过解码器生成相应的重构数据。

### 代码的优缺点

#### 优点
- **模块化和可扩展性**: 代码通过 `MLP` 类封装了神经网络结构，使得可以在不同模型中复用。`AutoEncoder` 类则根据需要定义了编码器和解码器，便于修改和扩展。
- **可配置性**: 在初始化时，用户可以灵活设置激活函数、隐藏层数、潜在空间的维度等参数，这增加了模型的灵活性。

#### 缺点
- **结构的复杂性**: 代码中存在一定的重复，例如编码器和解码器的结构非常相似，如果模型规模进一步增大，维护起来可能会变得困难。
- **潜在空间模型的简化**: `LatentModel` 看起来是为变分自编码器（VAE）设计的，但并没有完全实现VAE的完整结构。具体而言，虽然计算了KL散度，但该类并没有包含生成部分，也没有显式地应用到主模型中，似乎是为了潜在空间的优化而存在，缺少对生成数据的明确操作。

#### 为什么这个模型看起来没有使用VAE
- 这段代码虽然定义了 `LatentModel` 来计算潜在空间的KL散度，但 `AutoEncoder` 类并没有采用该潜在模型来进行生成式建模。`AutoEncoder` 类的重点在于输入和输出之间的重构误差，而不是生成新数据。因此，虽然代码涉及潜在变量并计算KL散度，但没有将其与生成模型结合，缺少像VAE中通过重参数化技巧生成样本的部分。因此，这个模型可以看作是一个普通的自编码器，而不是一个完整的变分自编码器（VAE）。

### 总结
- 这个代码定义了一个标准的自编码器，其中涉及到潜在空间的操作，但没有完全实现VAE的生成过程。如果你打算扩展成VAE，应该进一步集成KL散度和生成过程到主模型的损失函数中。
- 总体来说，代码逻辑清晰，结构可复用，但有进一步优化空间，尤其是在潜在空间模型的实现上，可以更好地与生成任务结合。

In [None]:
import torch
import torch.utils.data
from torch import nn, optim
from torch.nn import functional as F
# from torchvision import datasets, transforms
import collections
import sys
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from AE.utility import create_activation

sys.path.append('../')
def dataloader_split(X,test_size,seed,batch_size):
    X_train, X_test = train_test_split(X, test_size=test_size, random_state=seed)

    train_dataset = TensorDataset(torch.tensor(X_train,dtype=torch.float32))
    test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32))

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    return train_loader,test_loader
class Trainer(object):
    def __init__(self,
                 model,
                 X,
                 test_size:float=0.1,
                 batch_size:int=32,
                 lr:float=1e-3,
                 weight_decay:float=0.0,
                 seed:int=42,):
        '''
        Trainer for pretrain model.
        Parameters:
        model:
            A pytorch model defined in "models.py"
        X:
            Feature matrix. mxn numpy array.
                a standarized logorithmic data (i.e., zero mean, unit variance)
        test_size:
            fraction of testing/validation data size. default: 0.2
        batch_size:
            batch size.
        lr:
            learning rate.
        weight_decay:
            L2 regularization.
        seed:
            random seed.
        '''
        # self.args = args
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model = model
        self.model.to(self.device)
        # if self.model.decoder_type=='normal':
        self.train_loader,self.test_loader=\
            dataloader_split(X,test_size,seed,batch_size)
        self.seed=seed
        torch.manual_seed(self.seed)
        torch.cuda.manual_seed(self.seed)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr,weight_decay=weight_decay)
    def train_step(self):
        self.model.train()
        train_loss = 0
        for batch_idx, (data,) in enumerate(self.train_loader):
            data = data.to(self.device)
            # scale_factor = scale_factor.to(self.device)
            self.optimizer.zero_grad()
            loss = self.model(data,)
            loss.backward()

            train_loss += loss.item()#*data.shape[0]
            self.optimizer.step()
        train_loss=train_loss / len(self.train_loader.dataset)

        return train_loss
    def test(self):
        self.model.eval()
        test_loss = 0
        with torch.no_grad():
            for batch_idx, (data,) in enumerate(self.test_loader):
                data = data.to(self.device)
                loss =self.model(data,)
                test_loss += loss.item()#*data.shape[0]
        test_loss /= len(self.test_loader.dataset)
        return test_loss
    def train(self, max_epoch=500,tol=1e-2,  patient=30):
        # self.model.train()
        self.model.history = {'train_loss': [], 'val_loss': [],
                              'train_loss_ae':[],'val_loss_ae':[],
                              'train_loss_topo':[],'val_loss_topo':[],
                              'epoch':[]}
        best_val_error = float('inf')
        num_patient_epochs = 0
        for epoch in range(max_epoch):
            self.epoch=epoch
            # Train for one epoch and get the training loss
            train_loss = self.train_step()
            # Compute the validation error
            val_loss = self.test()
            self.model.history['train_loss'].append(train_loss)
            self.model.history['val_loss'].append(val_loss)
            self.model.history['epoch'].append(epoch)
            # if epoch % 10==0:
            print(f"Epoch {epoch}: train loss = {train_loss:.4f}, val error = {val_loss:.4f}")
            # Check if the validation error has decreased by at least tol
            if best_val_error - val_loss >= tol:
                best_val_error = val_loss
                num_patient_epochs = 0
            else:
                num_patient_epochs += 1
                # Check if we have exceeded the patience threshold
            if num_patient_epochs >= patient:
                print(f"No improvement in validation error for {patient} epochs. Stopping early.")
                break
        print(f"Best validation error = {best_val_error:.4f}")

这段代码定义了一个训练过程的管理类 `Trainer`，用于训练PyTorch模型，并包含了一些与训练相关的功能（如数据集拆分、训练过程、验证过程、早停机制等）。下面将逐行解释每个部分的功能和意义。

### 1. **`dataloader_split` 函数**
- **功能**: 将数据 `X` 划分为训练集和测试集，并创建相应的 `DataLoader`。
- **参数**:
  - `X`: 输入特征矩阵，形状为 `m x n`，即 `m` 个样本，`n` 个特征。
  - `test_size`: 测试集的占比，默认是0.1（即90%的数据用于训练，10%的数据用于测试）。
  - `seed`: 随机种子，用于确保划分的可重复性。
  - `batch_size`: 每个批次的大小。
- **实现**:
  - 使用 `train_test_split` 将数据 `X` 随机分为训练集和测试集，比例由 `test_size` 参数决定。
  - 将训练集和测试集转化为 `TensorDataset`，并用 `DataLoader` 包装，以便于后续批量处理。
  - `DataLoader` 用于生成可迭代的数据批次。
- **返回值**: `train_loader` 和 `test_loader`，分别是训练集和测试集的 `DataLoader`。

### 2. **`Trainer` 类**
- **功能**: 这个类用于管理模型的训练过程。它将模型的训练和验证过程封装在一起，方便训练的执行和管理。

#### `__init__` 方法
- **功能**: 初始化训练器。
- **参数**:
  - `model`: 要训练的模型（一个PyTorch模型对象）。
  - `X`: 特征矩阵，类型为 `numpy` 数组。
  - `test_size`: 测试集占比，默认为 `0.1`。
  - `batch_size`: 批次大小，默认为 `32`。
  - `lr`: 学习率，默认为 `1e-3`。
  - `weight_decay`: L2正则化系数，默认为 `0.0`。
  - `seed`: 随机种子，默认为 `42`。
- **实现**:
  - 根据是否有GPU，决定使用GPU（`cuda`）或CPU作为设备。
  - 将模型加载到设备上（`model.to(self.device)`）。
  - 使用 `dataloader_split` 函数分割数据 `X` 为训练集和测试集，并创建 `train_loader` 和 `test_loader`。
  - 设置随机种子，保证实验可复现。
  - 使用 `Adam` 优化器对模型的参数进行优化，学习率和L2正则化系数由外部传入。

#### `train_step` 方法
- **功能**: 执行一次训练步骤，即一次梯度更新。
- **实现**:
  - 将模型设置为训练模式 (`self.model.train()`)，这样会启用Dropout和BatchNorm等训练时特有的操作。
  - 初始化 `train_loss` 变量，累积训练损失。
  - 遍历 `train_loader` 中的每一个批次：
    - 获取训练数据并将其移动到指定设备（GPU或CPU）。
    - 清零优化器的梯度缓存。
    - 计算模型的损失（通过调用模型的 `forward` 方法）。
    - 反向传播计算梯度（`loss.backward()`）。
    - 累积损失。
    - 更新优化器的参数。
  - 返回平均训练损失。

#### `test` 方法
- **功能**: 执行一次测试步骤，即计算验证集的损失。
- **实现**:
  - 将模型设置为评估模式 (`self.model.eval()`)，这样会禁用Dropout和BatchNorm等训练时特有的操作。
  - 初始化 `test_loss` 变量，累积测试损失。
  - 不计算梯度（`torch.no_grad()`），以节省计算资源。
  - 遍历 `test_loader` 中的每一个批次：
    - 获取测试数据并将其移动到指定设备（GPU或CPU）。
    - 计算模型的损失。
    - 累积损失。
  - 返回平均测试损失。

#### `train` 方法
- **功能**: 训练模型，包含多个训练周期（Epochs），并实现了早停（Early Stopping）机制。
- **参数**:
  - `max_epoch`: 最大训练轮次，默认为 `500`。
  - `tol`: 如果验证损失降低小于 `tol`，则认为模型没有进步，默认为 `1e-2`。
  - `patient`: 如果验证损失连续若干轮没有改善，提前停止训练，默认为 `30`。
- **实现**:
  - 初始化历史记录字典 `self.model.history`，用来记录每一轮的训练和验证损失。
  - 设置 `best_val_error` 为无穷大，用于记录验证集上的最佳损失。
  - 设置 `num_patient_epochs` 用于记录当前没有进步的训练轮次。
  - 循环 `max_epoch` 轮次进行训练：
    - 调用 `train_step` 获取当前轮次的训练损失。
    - 调用 `test` 获取当前轮次的验证损失。
    - 将当前轮次的损失添加到历史记录中。
    - 打印当前轮次的训练损失和验证损失。
    - 检查验证损失是否比 `best_val_error` 小，如果减小则更新 `best_val_error`，并重置没有进步的计数。
    - 如果连续 `patient` 轮次验证损失没有改善，则提前停止训练。
  - 打印最终的最佳验证损失。

### 总结
这段代码实现了一个训练器类 `Trainer`，将模型训练、验证、早停等过程封装为一个类。它提供了以下功能：
- 数据集的拆分和加载。
- 每轮训练和验证的损失计算。
- 训练过程中的早停机制。
- 训练历史记录的保存。

该类结构清晰，功能分明，适用于一般的机器学习模型训练过程，尤其适合于基于PyTorch框架的深度学习模型。它通过合理的抽象，帮助用户方便地管理和训练模型，同时还能灵活地设置训练的参数（如学习率、批次大小、训练轮数等）。