In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure, plot, savefig
import time

In [None]:
class Encoder(nn.Module):
    def __init__(self, config):
        super(Encoder, self).__init__()

        self.config = config
        num_hidden_units = self.config['num_hidden_units']
        n_channel = self.config['n_channel']
        code_size = self.config['code_size']
        self.l_win = config['l_win']

        self.conv1 = nn.Conv2d(in_channels=n_channel, out_channels=num_hidden_units // 16, kernel_size=(3, n_channel), stride=(2, 1), padding='same')
        self.conv2 = nn.Conv2d(num_hidden_units // 16, out_channels=num_hidden_units // 8, kernel_size=(3, n_channel), stride=(2, 1), padding='same')
        self.conv3 = nn.Conv2d(num_hidden_units // 8, out_channels=num_hidden_units // 4, kernel_size=(3, n_channel), stride=(2, 1), padding='same')
        self.conv4 = nn.Conv2d(num_hidden_units // 4, out_channels=num_hidden_units, kernel_size=(4, n_channel), stride=1, padding='valid')
        self.conv5 = nn.Conv2d(in_channels=num_hidden_units // 4, out_channels=num_hidden_units, kernel_size=(6, n_channel), stride=1, padding='valid')

        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(self._get_conv_output_shape(), code_size * 4)
        self.code_mean = nn.Linear(code_size * 4, code_size)
        self.code_std_dev = nn.Linear(code_size * 4, code_size)

    def forward(self, x):
        x = x.unsqueeze(1)  # Add channel dimension

        if self.l_win == 24:
            x = F.pad(x, (0, 0, 4, 4), mode='reflect')
            x = F.leaky_relu(self.conv1(x))
            print("conv_1:", x.size())
            x = F.leaky_relu(self.conv2(x))
            print("conv_2:", x.size())
            x = F.leaky_relu(self.conv3(x))
            print("conv_3:", x.size())
            x = F.leaky_relu(self.conv4(x))
            print("conv_4:", x.size())

        if self.l_win == 48:
            x = F.leaky_relu(self.conv)
            print("conv_1:", x.size())
            x = F.leaky_relu(self.conv2(x))
            print("conv_2:", x.size())
            x = F.leaky_relu(self.conv3(x))
            print("conv_3:", x.size())
            x = F.leaky_relu(self.conv5(x))
            print("conv_5:", x.size())

        x = self.flatten(x)
        x = F.leaky_relu(self.fc1(x))

        code_mean = self.code_mean(x)
        code_std_dev = F.relu(self.code_std_dev(x)) + 1e-2  # Ensure std_dev is positive

        # Sampling from the distribution
        normal_dist = dist.Normal(loc=code_mean, scale=code_std_dev)
        code_sample = normal_dist.rsample()  # Reparameterization trick for backpropagation

        print("finish encoder: \n{}".format(self.code_sample))
        print("\n")

        return code_sample, code_mean, code_std_dev


In [None]:
import math

class Encoder(nn.Module):
    def __init__(self, config):
        super(Encoder, self).__init__()

        self.config = config
        num_hidden_units = self.config['num_hidden_units']
        n_channel = self.config['n_channel']
        code_size = self.config['code_size']
        self.l_win = config['l_win']

        self.conv1 = nn.Conv2d(in_channels=1, out_channels=num_hidden_units // 16, kernel_size=(3, n_channel), stride=(2, 1))
        self.conv2 = nn.Conv2d(num_hidden_units // 16, out_channels=num_hidden_units // 8, kernel_size=(3, n_channel), stride=(2, 1))
        self.conv3 = nn.Conv2d(num_hidden_units // 8, out_channels=num_hidden_units // 4, kernel_size=(3, n_channel), stride=(2, 1))
        self.conv4 = nn.Conv2d(num_hidden_units // 4, out_channels=num_hidden_units, kernel_size=(4, n_channel), stride=1, padding='valid')
        self.conv5 = nn.Conv2d(num_hidden_units // 4, out_channels=num_hidden_units, kernel_size=(6, n_channel), stride=1, padding='valid')

        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(num_hidden_units, code_size * 4)
        self.code_mean = nn.Linear(code_size * 4, code_size)
        self.code_std_dev = nn.Linear(code_size * 4, code_size)


    def same_padding(self,x, kernel_size, stride):
        if isinstance(kernel_size, tuple):
            kernel_height, kernel_width = kernel_size
        else:
            kernel_height = kernel_width = kernel_size

        if isinstance(stride, tuple):
            stride_height, stride_width = stride
        else:
            stride_height = stride_width = stride

        input_size = x.size()

        out_height = math.ceil(float(input_size[0]) / float(stride_height))
        out_width  = math.ceil(float(input_size[1]) / float(stride_width))

        pad_along_height = max((out_height - 1) * stride_height + kernel_height - input_size[0], 0)
        pad_along_width = max((out_width - 1) * stride_width + kernel_width - input_size[1], 0)

        pad_top = pad_along_height // 2
        pad_bottom = pad_along_height - pad_top
        pad_left = pad_along_width // 2
        pad_right = pad_along_width - pad_left

        pad = (pad_left, pad_right, pad_top, pad_bottom)

        return F.pad(x, pad, mode='replicate')




    def forward(self, x):
        x = x.unsqueeze(1)  # Add channel dimension
        print("unsqueeze:", x.size())

        if self.l_win == 24:

            x = F.pad(x, (0, 0, 4, 4), mode='replicate')
            x = self.same_padding(x, (3, self.config['n_channel']), (2, 1))
            x = F.leaky_relu(self.conv1(x))
            print("conv_1:", x.size())

            x = self.same_padding(x, (3, self.config['n_channel']), (2, 1))
            x = F.leaky_relu(self.conv2(x))
            print("conv_2:", x.size())

            x = self.same_padding(x, (3, self.config['n_channel']), (2, 1))
            x = F.leaky_relu(self.conv3(x))
            print("conv_3:", x.size())

            x = F.leaky_relu(self.conv4(x))
            print("conv_4:", x.size())

        elif self.l_win == 48:

            x = self.same_padding(x, (3, self.config['n_channel']), (2, 1))
            x = F.leaky_relu(self.conv1(x))
            print("conv_1:", x.size())

            x = self.same_padding(x, (3, self.config['n_channel']), (2, 1))
            x = F.leaky_relu(self.conv2(x))
            print("conv_2:", x.size())

            x = self.same_padding(x, (3, self.config['n_channel']), (2, 1))
            x = F.leaky_relu(self.conv3(x))
            print("conv_3:", x.size())

            x = F.leaky_relu(self.conv5(x))
            print("conv_5:", x.size())

        x = self.flatten(x)
        print("flatten:", x.size())
        x = F.leaky_relu(self.fc1(x))

        code_mean = self.code_mean(x)
        code_std_dev = F.relu(self.code_std_dev(x)) + 1e-2  # Ensure std_dev is positive

        # Sampling from the distribution
        normal_dist = dist.Normal(loc=code_mean, scale=code_std_dev)
        code_sample = normal_dist.rsample()  # Reparameterization trick for backpropagation

        print("finish encoder: \n{}".format(code_sample))
        print("\n")

        return code_sample, code_mean, code_std_dev

In [None]:
import torch
import torch.nn as nn
import torch.distributions as dist
import torch.nn.functional as F

# Paste the provided Encoder class code here

def test_encoder(config):
    """
    Test the correctness of the Encoder model.

    Args:
        config (dict): Configuration dictionary containing model hyperparameters.

    Returns:
        None
    """
    # 1. Summarize the provided code
    print("Encoder Model Summary:")
    print("- The Encoder model takes an input tensor and encodes it into a latent code representation.")
    print("- It consists of convolutional layers followed by fully connected layers.")
    print("- The model outputs the latent code sample, mean, and standard deviation.")

    # 2. Load the provided model and prepare the testing dataset
    encoder = Encoder(config)
    print("\nEncoder Model Architecture:")
    print(encoder)

    # Prepare sample input data with different shapes
    input_shapes = [(48, 10)]

    # 3. Implement a testing procedure to evaluate the model's calculation on input matrices with different shapes
    print("\nTesting Encoder Model:")
    for shape in input_shapes:
        print(f"\nTesting with input shape: {shape}")
        input_data = torch.randn(1, *shape)  # Create random input data with the specified shape

        # Forward pass through the encoder
        code_sample, code_mean, code_std_dev = encoder(input_data)

        # 4. Display or print the results of the model evaluation
        print("Encoder Output:")
        print("- Code Sample Shape:", code_sample.shape)
        print("- Code Mean Shape:", code_mean.shape)
        print("- Code Std Dev Shape:", code_std_dev.shape)

    print("\nEncoder Model Testing Completed.")

# Example usage
config = {
    'num_hidden_units': 128,
    'n_channel': 10,
    'code_size': 64,
    'l_win': 48
}

test_encoder(config)

Encoder Model Summary:
- The Encoder model takes an input tensor and encodes it into a latent code representation.
- It consists of convolutional layers followed by fully connected layers.
- The model outputs the latent code sample, mean, and standard deviation.

Encoder Model Architecture:
Encoder(
  (conv1): Conv2d(1, 8, kernel_size=(3, 10), stride=(2, 1))
  (conv2): Conv2d(8, 16, kernel_size=(3, 10), stride=(2, 1))
  (conv3): Conv2d(16, 32, kernel_size=(3, 10), stride=(2, 1))
  (conv4): Conv2d(32, 128, kernel_size=(4, 10), stride=(1, 1), padding=valid)
  (conv5): Conv2d(32, 128, kernel_size=(6, 10), stride=(1, 1), padding=valid)
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (fc1): Linear(in_features=128, out_features=256, bias=True)
  (code_mean): Linear(in_features=256, out_features=64, bias=True)
  (code_std_dev): Linear(in_features=256, out_features=64, bias=True)
)

Testing Encoder Model:

Testing with input shape: (48, 10)
unsqueeze: torch.Size([1, 1, 48, 10])
conv_1: torch.S

In [2]:
class Decoder(nn.Module):

    def __init__(self, config):
        super(Decoder, self).__init__()
        self.config = config
        self.is_code_input = config['is_code_input']  # Assuming this is determined elsewhere in your code
        self.num_hidden_units = config['num_hidden_units']
        self.n_channel = config['n_channel']
        self.l_win = config['l_win']
        self.code_size = config['code_size']
        self.TRAIN_sigma = config['TRAIN_sigma']
        self.sigma = config['sigma']
        self.sigma2_offset = config['sigma2_offset']

        # Layers
        self.fc1 = nn.Linear(self.code_size, self.num_hidden_units)
        self.conv1 = nn.Conv2d(self.num_hidden_units, self.num_hidden_units, kernel_size=1, padding='same')
        self.conv2 = nn.Conv2d(self.num_hidden_units // 4, self.num_hidden_units // 4, kernel_size=(3, 1), stride=1, padding='same')
        self.conv3 = nn.Conv2d(self.num_hidden_units // 8, self.num_hidden_units // 8, kernel_size=(3, 1), stride=1, padding='same')
        self.conv4 = nn.Conv2d(self.num_hidden_units // 16, self.num_hidden_units // 16, kernel_size=(3, 1), stride=1, padding='same')
        self.conv5 = nn.Conv2d(self.num_hidden_units // 32, self.n_channel, kernel_size=(9, 1), stride=1, padding='valid') ## 16 -> 32 , valid -> same

        self.conv_1 = nn.Conv2d(self.num_hidden_units, 256 * 3, kernel_size=1, padding='same')
        self.conv_2 = nn.Conv2d(256, 256, kernel_size=(3, 1), stride=1, padding='same')
        self.conv_3 = nn.Conv2d(128, 128, kernel_size=(3, 1), stride=1, padding='same')
        self.conv_4 = nn.Conv2d(32, 32, kernel_size=(3, 1), stride=1, padding='same')
        self.conv_5 = nn.Conv2d(16, self.n_channel, kernel_size=(5, self.n_channel), stride=1, padding='same')

        # Assuming sigma2 and sigma2_offset are defined in config

    def forward(self, code_input=None, code_sample=None):

        encoded = code_input if self.is_code_input else code_sample

        decoded_1 = F.leaky_relu(self.fc1(encoded))
        decoded_1 = decoded_1.view(-1, self.num_hidden_units, 1, 1)
        print("decoded_1 is: {}".format(decoded_1.size()))

        if self.l_win == 24:
            decoded_2 = F.leaky_relu(self.conv1(decoded_1))
            decoded_2 = decoded_2.view(-1, self.num_hidden_units // 4, 4, 1)
            print("decoded_2 is: {}".format(decoded_2.size()))

            decoded_3 = F.leaky_relu(self.conv2(decoded_2))
            decoded_3 = F.pixel_shuffle(decoded_3, upscale_factor=2)
            decoded_3 = decoded_3.view(-1, self.num_hidden_units // 8, 8, 1)
            print("decoded_3 is: {}".format(decoded_3.size()))

            decoded_4 = F.leaky_relu(self.conv3(decoded_3))
            decoded_4 = F.pixel_shuffle(decoded_4, upscale_factor=2)
            decoded_4 = decoded_4.view(-1, self.num_hidden_units // 16, 16, 1)
            print("decoded_4 is: {}".format(decoded_4.size()))

            decoded_5 = F.leaky_relu(self.conv4(decoded_4))
            decoded_5 = F.pixel_shuffle(decoded_5, upscale_factor=2)
            decoded_5 = decoded_5.view(-1, 16, self.num_hidden_units // 16, 1) ## test
            print("decoded_5 is: {}".format(decoded_5.size()))

            decoded = self.conv5(decoded_5)
            print("decoded_6 is: {}".format(decoded.size()))

            self.decoded = decoded.view(-1, self.l_win, self.n_channel)

        if self.l_win == 48:
            decoded_2 = F.leaky_relu(self.conv_1(decoded_1))
            decoded_2 = decoded_2.view(-1, 256, 3, 1)
            print("decoded_2 is: {}".format(decoded_2.size()))

            decoded_3 = F.leaky_relu(self.conv_2(decoded_2))
            decoded_3 = F.pixel_shuffle(decoded_3, upscale_factor=2)
            decoded_3 = decoded_3.view(-1, 128, 6, 1)
            print("decoded_3 is: {}".format(decoded_3.size()))

            decoded_4 = F.leaky_relu(self.conv_3(decoded_3))
            decoded_4 = F.pixel_shuffle(decoded_4, upscale_factor=2)
            decoded_4 = decoded_4.view(-1, 32, 24, 1)
            print("decoded_4 is: {}".format(decoded_4.size()))

            decoded_5 = F.leaky_relu(self.conv_4(decoded_4))
            decoded_5 = F.pixel_shuffle(decoded_5, upscale_factor=2)
            decoded_5 = decoded_5.view(-1, 16, 48, 1)
            print("decoded_5 is: {}".format(decoded_5.size()))

            decoded = self.conv_5(decoded_5)
            print("decoded_6 is: {}".format(decoded.size()))

            self.decoded = decoded.view(-1, self.l_win, self.n_channel)

        print("finish decoder: \n{}".format(self.decoded.size()))
        print('\n')

        return self.decoded

In [3]:
def test_decoder():
    # Sample configuration matching the expected structure
    config = {
        'is_code_input': True,
        'num_hidden_units': 512,
        'n_channel': 3,
        'l_win': 48,  # or 48 to test the other branch
        'code_size': 100,
        'TRAIN_sigma': 1,
        'sigma': 0.5,
        'sigma2_offset': 0.1  # Assuming sigma2_offset is defined somewhere
    }

    # Initialize the Decoder with the given config
    decoder = Decoder(config)

    print(decoder)


    # Create a sample input tensor
    if config['is_code_input']:
        code_input = torch.randn(1, config['code_size'])  # Batch size of 1
        output = decoder(code_input=code_input)
    else:
        code_sample = torch.randn(1, 24, config['code_size'])  # Batch size of 1
        output, sigma2 = decoder(code_sample=code_sample)

    # Check the output shape
    expected_output_shape = (1, config['l_win'], config['n_channel'])
    assert output.shape == expected_output_shape, f"Expected output shape {expected_output_shape}, got {output.shape}"

    print("Test passed successfully!")

# Run the test
test_decoder()

Decoder(
  (fc1): Linear(in_features=100, out_features=512, bias=True)
  (conv1): Conv2d(512, 512, kernel_size=(1, 1), stride=(1, 1), padding=same)
  (conv2): Conv2d(128, 128, kernel_size=(3, 1), stride=(1, 1), padding=same)
  (conv3): Conv2d(64, 64, kernel_size=(3, 1), stride=(1, 1), padding=same)
  (conv4): Conv2d(32, 32, kernel_size=(3, 1), stride=(1, 1), padding=same)
  (conv5): Conv2d(16, 3, kernel_size=(9, 1), stride=(1, 1), padding=valid)
  (conv_1): Conv2d(512, 768, kernel_size=(1, 1), stride=(1, 1), padding=same)
  (conv_2): Conv2d(256, 256, kernel_size=(3, 1), stride=(1, 1), padding=same)
  (conv_3): Conv2d(128, 128, kernel_size=(3, 1), stride=(1, 1), padding=same)
  (conv_4): Conv2d(32, 32, kernel_size=(3, 1), stride=(1, 1), padding=same)
  (conv_5): Conv2d(16, 3, kernel_size=(5, 3), stride=(1, 1), padding=same)
)
decoded_1 is: torch.Size([1, 512, 1, 1])
decoded_2 is: torch.Size([1, 256, 3, 1])
decoded_3 is: torch.Size([1, 128, 6, 1])
decoded_4 is: torch.Size([1, 32, 24, 1])

In [None]:
import torch
import torch.nn as nn
import numpy as np

class lstmPyTorchModel(nn.Module):
    def __init__(self, config):
        super(lstmPyTorchModel, self).__init__()

        self.config = config
        self.l_seq = config['l_seq']
        self.l_win = config['l_win']
        self.num_hidden_units_lstm = config['num_hidden_units_lstm']
        self.code_size = config['code_size']

        self.lstm1 = nn.LSTM(self.code_size, self.num_hidden_units_lstm, batch_first=True)
        self.lstm2 = nn.LSTM(self.num_hidden_units_lstm, self.num_hidden_units_lstm, batch_first=True)
        self.lstm3 = nn.LSTM(self.num_hidden_units_lstm, self.code_size, batch_first=True)

    def forward(self, x):
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)
        x, _ = self.lstm3(x)
        return x

    def produce_embeddings(self, model_vae, data, device):
        self.embedding_lstm_train = torch.zeros((data.n_train_lstm, self.l_seq, self.code_size), device=device)
        for i in range(data.n_train_lstm):
            with torch.no_grad():
                input_signal = data.train_set_lstm['data'][i].unsqueeze(0).to(device)
                code_input = torch.zeros((1, self.code_size), device=device)
                self.embedding_lstm_train[i] = model_vae(input_signal, False, code_input)[1]

        print("Finish processing the embeddings of the entire dataset.")
        print("The first a few embeddings are\n{}".format(self.embedding_lstm_train[0, 0:5]))

        self.x_train = self.embedding_lstm_train[:, : self.l_seq - 1]
        self.y_train = self.embedding_lstm_train[:, 1:]

        self.embedding_lstm_test = torch.zeros((data.n_val_lstm, self.l_seq, self.code_size), device=device)
        for i in range(data.n_val_lstm):
            with torch.no_grad():
                input_signal = data.val_set_lstm['data'][i].unsqueeze(0).to(device)
                code_input = torch.zeros((1, self.code_size), device=device)
                self.embedding_lstm_test[i] = model_vae(input_signal, False, code_input)[1]

        self.x_test = self.embedding_lstm_test[:, :self.l_seq - 1]
        self.y_test = self.embedding_lstm_test[:, 1:]

    def train(self, cp_callback):
        optimizer = optim.Adam(self.parameters(), lr=self.config['learning_rate_lstm'])
        criterion = nn.MSELoss()

        for epoch in range(self.config['num_epochs_lstm']):
            self.train()
            running_loss = 0.0
            for i in range(0, self.x_train.size(0), self.config['batch_size_lstm']):
                batch_x = self.x_train[i:i+self.config['batch_size_lstm']]
                batch_y = self.y_train[i:i+self.config['batch_size_lstm']]

                optimizer.zero_grad()
                outputs = self(batch_x)
                loss = criterion(outputs, batch_y)
                loss.backward()
                optimizer.step()

                running_loss += loss.item()

            epoch_loss = running_loss / (self.x_train.size(0) // config['batch_size_lstm'])
            print(f'Epoch {epoch+1}/{config["num_epochs_lstm"]}, Loss: {epoch_loss:.4f}')

            if cp_callback is not None:
                cp_callback(self, epoch_loss)

            self.eval()
            with torch.no_grad():
                test_outputs = self(self.x_test)
                test_loss = criterion(test_outputs, self.y_test)
                print(f'Test Loss: {test_loss:.4f}')

        print('Training finished.')


    def plot_reconstructed_lt_seq(self, idx_test, model_vae, data, device):
        with torch.no_grad():
            # VAE reconstruction
            input_signal = torch.zeros((self.l_seq, self.l_win, self.config['n_channel']), device=device)
            code_input = self.embedding_lstm_test[idx_test].unsqueeze(0).to(device)
            decoded_seq_vae = model_vae(input_signal, True, code_input)[0].squeeze().cpu().numpy()
            print(f"Decoded seq from VAE: {decoded_seq_vae.shape}")

            # LSTM reconstruction
            input_signal = torch.zeros((self.l_seq - 1, self.l_win, self.config['n_channel']), device=device)
            lstm_embedding_test = self.x_test[idx_test].unsqueeze(0).to(device)
            decoded_seq_lstm = model_vae(input_signal, True, lstm_embedding_test)[0].squeeze().cpu().numpy()
            print(f"Decoded seq from lstm: {decoded_seq_lstm.shape}")

        fig, axs = plt.subplots(self.config['n_channel'], 2, figsize=(15, 4.5 * self.config['n_channel']), edgecolor='k')
        fig.subplots_adjust(hspace=.4, wspace=.4)
        axs = axs.ravel()
        for j in range(self.config['n_channel']):
            for i in range(2):
                axs[i + j * 2].plot(np.arange(0, self.l_seq * self.l_win),
                                    np.reshape(data.val_set_lstm['data'][idx_test, :, :, j],
                                               (self.l_seq * self.l_win)))
                axs[i + j * 2].grid(True)
                axs[i + j * 2].set_xlim(0, self.l_seq * self.l_win)
                axs[i + j * 2].set_xlabel('samples')
            if self.config['n_channel'] == 1:
                axs[0 + j * 2].plot(np.arange(0, self.l_seq * self.l_win),
                                    np.reshape(decoded_seq_vae, (self.l_seq * self.l_win)), 'r--')
                axs[1 + j * 2].plot(np.arange(self.l_win, self.l_seq * self.l_win),
                                    np.reshape(decoded_seq_lstm, ((self.l_seq - 1) * self.l_win)), 'g--')
            else:
                axs[0 + j * 2].plot(np.arange(0, self.l_seq * self.l_win),
                                    np.reshape(decoded_seq_vae[:, :, j], (self.l_seq * self.l_win)), 'r--')
                axs[1 + j * 2].plot(np.arange(self.l_win, self.l_seq * self.l_win),
                                    np.reshape(decoded_seq_lstm[:, :, j], ((self.l_seq - 1) * self.l_win)), 'g--')
            axs[0 + j * 2].set_title(f'VAE reconstruction - channel {j}')
            axs[1 + j * 2].set_title(f'LSTM reconstruction - channel {j}')
            for i in range(2):
                axs[i + j * 2].legend(('ground truth', 'reconstruction'))
        plt.savefig(f"{self.config['result_dir']}lstm_long_seq_recons_{idx_test}.pdf")
        plt.close()


    def plot_lstm_embedding_prediction(self, idx_test, config, model_vae, data, device):
        self.plot_reconstructed_lt_seq(idx_test, config, model_vae, data, device)

        fig, axs = plt.subplots(2, self.code_size // 2, figsize=(15, 5.5), edgecolor='k')
        fig.subplots_adjust(hspace=.4, wspace=.4)
        axs = axs.ravel()
        for i in range(self.code_size):
            axs[i].plot(torch.arange(1, self.l_seq), self.embedding_lstm_test[idx_test, 1:, i].squeeze().cpu().numpy())
            axs[i].plot(torch.arange(1, self.l_seq), self.x_test[idx_test, :, i].squeeze().cpu().numpy())
            axs[i].set_xlim(1, self.l_seq - 1)
            axs[i].set_ylim(-2.5, 2.5)
            axs[i].grid(True)
            axs[i].set_title(f'Embedding dim {i}')
            axs[i].set_xlabel('windows')
            if i == self.code_size - 1:
                axs[i].legend(('VAE\nembedding', 'LSTM\nembedding'))
        plt.savefig(f"{config['result_dir']}lstm_seq_embedding_{idx_test}.pdf")
        plt.close()

In [None]:
import unittest
import torch

class TestLstmPyTorchModel(unittest.TestCase):
    def setUp(self):
        # Set up the test environment and initialize required objects
        self.config = {
            'l_seq': 10,
            'l_win': 5,
            'num_hidden_units_lstm': 64,
            'code_size': 32,
            'learning_rate_lstm': 0.001,
            'num_epochs_lstm': 10,
            'batch_size_lstm': 32,
            'n_channel': 1,
            'result_dir': './results/'
        }
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model = lstmPyTorchModel(self.config).to(self.device)
        self.model_vae = vaeModel(self.config).to(self.device)
        self.data = Data(self.config)

    def test_produce_embeddings(self):
        # Test the produce_embeddings method
        self.model.produce_embeddings(self.model_vae, self.data, self.device)

        # Assert the shape of the generated embeddings
        self.assertEqual(self.model.embedding_lstm_train.shape, (self.data.n_train_lstm, self.config['l_seq'], self.config['code_size']))
        self.assertEqual(self.model.embedding_lstm_test.shape, (self.data.n_val_lstm, self.config['l_seq'], self.config['code_size']))

        # Assert the shape of the input and target sequences
        self.assertEqual(self.model.x_train.shape, (self.data.n_train_lstm, self.config['l_seq'] - 1, self.config['code_size']))
        self.assertEqual(self.model.y_train.shape, (self.data.n_train_lstm, self.config['l_seq'] - 1, self.config['code_size']))
        self.assertEqual(self.model.x_test.shape, (self.data.n_val_lstm, self.config['l_seq'] - 1, self.config['code_size']))
        self.assertEqual(self.model.y_test.shape, (self.data.n_val_lstm, self.config['l_seq'] - 1, self.config['code_size']))

    def test_forward(self):
        # Test the forward method
        batch_size = 16
        input_seq = torch.randn(batch_size, self.config['l_seq'] - 1, self.config['code_size']).to(self.device)
        output_seq = self.model(input_seq)

        # Assert the shape of the output sequence
        self.assertEqual(output_seq.shape, (batch_size, self.config['l_seq'] - 1, self.config['code_size']))

    def test_train(self):
        # Test the train method
        def checkpoint_callback(model, loss):
            # Callback function for checkpointing during training
            pass

        self.model.train(checkpoint_callback)

        # Assert that the model is in evaluation mode after training
        self.assertFalse(self.model.training)

    def test_plot_reconstructed_lt_seq(self):
        # Test the plot_reconstructed_lt_seq method
        idx_test = 0
        self.model.plot_reconstructed_lt_seq(idx_test, self.model_vae, self.data, self.device)

        # Assert that the plot file is generated
        plot_file = f"{self.config['result_dir']}lstm_long_seq_recons_{idx_test}.pdf"
        self.assertTrue(os.path.exists(plot_file))

    def test_plot_lstm_embedding_prediction(self):
        # Test the plot_lstm_embedding_prediction method
        idx_test = 0
        self.model.plot_lstm_embedding_prediction(idx_test, self.config, self.model_vae, self.data, self.device)

        # Assert that the plot file is generated
        plot_file = f"{self.config['result_dir']}lstm_seq_embedding_{idx_test}.pdf"
        self.assertTrue(os.path.exists(plot_file))

if __name__ == '__main__':
    unittest.main()

E
ERROR: /root/ (unittest.loader._FailedTest)
----------------------------------------------------------------------
AttributeError: module '__main__' has no attribute '/root/'

----------------------------------------------------------------------
Ran 1 test in 0.013s

FAILED (errors=1)


SystemExit: True

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [None]:
import torch
import torch.nn as nn
import numpy as np

class BaseModel(nn.Module):
    def __init__(self, config):
        super(BaseModel, self).__init__()
        self.config = config
        self.two_pi = torch.tensor(2 * np.pi)
        self.global_step = 0
        self.cur_epoch = 0

    def save(self, checkpoint_path):
        print("Saving model...")
        torch.save(self.state_dict(), checkpoint_path)
        print("Model saved.")

    def load(self, checkpoint_path):
        print("checkpoint_dir at loading: {}".format(checkpoint_path))
        if os.path.exists(checkpoint_path):
            print("Loading model checkpoint {} ...\n".format(checkpoint_path))
            self.load_state_dict(torch.load(checkpoint_path))
            print("Model loaded.")
        else:
            print("No model loaded.")

    def define_loss(self):
        # KL divergence loss - analytical result
        KL_loss = 0.5 * (torch.sum(self.code_mean ** 2, dim=1)
                         + torch.sum(self.code_std_dev ** 2, dim=1)
                         - torch.sum(torch.log(self.code_std_dev ** 2), dim=1)
                         - self.config['code_size'])
        self.KL_loss = torch.mean(KL_loss)

        # norm 1 of standard deviation of the sample-wise encoder prediction
        self.std_dev_norm = torch.mean(self.code_std_dev, dim=0)

        weighted_reconstruction_error_dataset = torch.sum(
            (self.original_signal - self.decoded) ** 2, dim=[1, 2])
        weighted_reconstruction_error_dataset = torch.mean(weighted_reconstruction_error_dataset)
        self.weighted_reconstruction_error_dataset = weighted_reconstruction_error_dataset / (2 * self.sigma2)

        # least squared reconstruction error
        ls_reconstruction_error = torch.sum(
            (self.original_signal - self.decoded) ** 2, dim=[1, 2])
        self.ls_reconstruction_error = torch.mean(ls_reconstruction_error)

        # sigma regularisor - input elbo
        self.sigma_regularisor_dataset = self.input_dims / 2 * torch.log(self.sigma2)
        two_pi = self.input_dims / 2 * self.two_pi

        self.elbo_loss = two_pi + self.sigma_regularisor_dataset + \
                         0.5 * self.weighted_reconstruction_error_dataset + self.KL_loss

    def training_variables(self):
        encoder_vars = list(self.encoder.parameters())
        decoder_vars = list(self.decoder.parameters())
        sigma_vars = [self.sigma2]
        self.train_vars_VAE = encoder_vars + decoder_vars + sigma_vars

        num_encoder = sum(p.numel() for p in self.encoder.parameters() if p.requires_grad)
        num_decoder = sum(p.numel() for p in self.decoder.parameters() if p.requires_grad)
        num_sigma2 = self.sigma2.numel()
        self.num_vars_total = num_decoder + num_encoder + num_sigma2
        print("Total number of trainable parameters in the VAE network is: {}".format(self.num_vars_total))

    def compute_gradients(self):
        self.optimizer = torch.optim.Adam(self.train_vars_VAE, lr=self.lr, betas=(0.9, 0.95))
        self.train_step_gradient = self.optimizer.step()
        print("Reach the definition of loss for VAE")

    def clip_grad(self, grad):
        if grad is None:
            return grad
        return torch.clamp(grad, -1, 1)



class BaseTrain:
    def __init__(self, model, data, config):
        """
        Initializes the training session with the necessary components.

        :param model: The model to be trained.
        :param data: The dataset for training and validation.
        :param config: Configuration dictionary with training parameters and paths.

        Initializes PyTorch variables, prepares for training, and initializes
        various lists to keep track of training and validation losses, as well as
        other metrics like reconstruction and KL divergence losses.
        """
        self.model = model
        self.config = config
        self.data = data

        # Initialize lists to keep track of various training and validation metrics
        self.train_loss = []
        self.val_loss = []
        self.train_loss_ave_epoch = []
        self.val_loss_ave_epoch = []
        self.recons_loss_train = []
        self.recons_loss_val = []
        self.KL_loss_train = []
        self.KL_loss_val = []
        self.sample_std_dev_train = []
        self.sample_std_dev_val = []
        self.iter_epochs_list = []
        self.test_sigma2 = []

    def train(self):
        """
        Starts the training process over a specified number of epochs.

        Tracks and prints the elapsed time and estimated remaining time for training.
        Utilizes the `train_epoch` method (not defined in this snippet) for actual
        training logic specific to each epoch.
        """
        self.start_time = time.time()
        for cur_epoch in range(self.config['num_epochs_vae']):
            self.train_epoch()  # Train for one epoch, method to be defined elsewhere

            # Calculate and print time elapsed and estimated remaining training time
            self.current_time = time.time()
            elapsed_time = (self.current_time - self.start_time) / 60
            est_remaining_time = (self.current_time - self.start_time) / (cur_epoch + 1) * (self.config['num_epochs_vae'] - cur_epoch - 1) / 60
            print("Already trained for {} min; Remaining {} min.".format(elapsed_time, est_remaining_time))

            # Increment the current epoch counter in the model
            self.model.cur_epoch += 1

    def save_variables_VAE(self):
        """
        Saves important variables and metrics for inspection after training.

        Saves metrics such as training and validation losses, reconstruction and KL
        divergence losses, and model parameters to a file.
        """
        # Construct the filename from configuration parameters
        file_name = "{}{}-batch-{}-epoch-{}-code-{}-lr-{}.npz".format(self.config['result_dir'],
                                                                      self.config['exp_name'],
                                                                      self.config['batch_size'],
                                                                      self.config['num_epochs_vae'],
                                                                      self.config['code_size'],
                                                                      self.config['learning_rate_vae'])
        # Save metrics to the specified file
        np.savez(file_name,
                 iter_list_val=self.iter_epochs_list,
                 train_loss=self.train_loss,
                 val_loss=self.val_loss,
                 n_train_iter=self.n_train_iter,
                 n_val_iter=self.n_val_iter,
                 recons_loss_train=self.recons_loss_train,
                 recons_loss_val=self.recons_loss_val,
                 KL_loss_train=self.KL_loss_train,
                 KL_loss_val=self.KL_loss_val,
                 num_para_all=sum(p.numel() for p in self.model.parameters()),
                 sigma2=self.test_sigma2)

    def plot_train_and_val_loss(self):
        """
        Plots training and validation loss over time, as well as a breakdown of the
        validation loss into its components and the evolution of sigma^2.

        Generates three plots: overall training and validation losses, validation
        loss components (reconstruction and KL divergence losses), and sigma^2 values
        over the course of training.
        """
        # Plot overall training and validation losses
        plt.clf()
        figure(num=1, figsize=(8, 6))
        plot(self.train_loss, 'b-')
        plot(self.iter_epochs_list, self.val_loss_ave_epoch, 'r-')
        plt.legend(('Training Loss (Total)', 'Validation Loss'))
        plt.title('Training Loss Over Iterations (Validation @ Epochs)')
        plt.ylabel('Total Loss')
        plt.xlabel('Iterations')
        plt.grid(True)
        savefig(self.config['result_dir'] + '/loss.png')

        # Plot breakdown of validation loss into reconstruction and KL divergence losses
        plt.clf()
        figure(num=1, figsize=(8, 6))
        plot(self.recons_loss_val, 'b-')
        plot(self.KL_loss_val, 'r-')
        plt.legend(('Reconstruction Loss', 'KL Divergence Loss'))
        plt.title('Validation Loss Breakdown')
        plt.ylabel('Loss')
        plt.xlabel('Number of Batches')
        plt.grid(True)
        savefig(self.config['result_dir'] + '/val-loss.png')

        # Plot the evolution of sigma^2 over training
        plt.clf()
        figure(num=1, figsize=(8, 6))
        plot(self.test_sigma2, 'b-')
        plt.title('Sigma^2 Over Training')
        plt.ylabel('Sigma^2')
        plt.xlabel('Iteration')
        plt.grid(True)
        savefig(self.config['result_dir'] + '/sigma2.png')

In [None]:
class VAEmodel(BaseModel):
    def __init__(self, config):
        super(VAEmodel, self).__init__()
        self.encoder = Encoder(config)
        self.decoder = Decoder(config)
        self.config = config

        # Define sigma parameter
        self.sigma_trainable = self.config['TRAIN_sigma'] == 1
        sigma_value = self.config['sigma']
        self.sigma = nn.Parameter(torch.tensor(sigma_value, dtype=torch.float32), requires_grad=self.sigma_trainable)
        self.sigma2_offset = torch.tensor(config.get('sigma2_offset', 0), dtype=torch.float32)

    def reparameterize(self, mean, log_var):
        std = torch.exp(0.5 * log_var)
        eps = torch.randn_like(std)
        return mean + eps * std

    def forward(self, x):
        mean, log_var = self.encoder(x)
        z = self.reparameterize(mean, log_var)
        z = z.unsqueeze(-1).unsqueeze(-1)  # Reshape for the decoder
        return self.decoder(z), mean, log_var

    @property
    def sigma2(self):
        sigma_squared = self.sigma ** 2
        if self.sigma_trainable:
            sigma_squared += self.sigma2_offset
        return sigma_squared


In [None]:
# Example configuration
config = {
    'l_win': 24,
    'n_channel': 1,
    'num_hidden_units': 64,
    'code_size': 10,
    'TRAIN_sigma': 1,
    'sigma': 0.1,
    'sigma2_offset': 0.01
}

model = VAEmodel(config)
print(model)