In [39]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import torch
import torch.nn as nn
torch.autograd.set_detect_anomaly(True)

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=8, stride=1, padding=4):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding)
        self.bn1 = nn.BatchNorm1d(out_channels)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size=kernel_size, stride=1, padding=padding)
        self.bn2 = nn.BatchNorm1d(out_channels)
        self.conv3 = nn.Conv1d(out_channels, out_channels, kernel_size=kernel_size, stride=1, padding=padding)
        self.bn3 = nn.BatchNorm1d(out_channels)
        self.conv4 = nn.Conv1d(out_channels, out_channels, kernel_size=kernel_size, stride=1, padding=padding)
        self.bn4 = nn.BatchNorm1d(out_channels)

    
    def forward(self, x):
        identity = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)
        out = self.conv3(out)
        out = self.bn3(out)
        out = self.relu(out)
        out = self.conv4(out)
        out = self.bn4(out)
        out = self.relu(out)
        
        out = out + identity
        return out
    
class Encoder(nn.Module):
    def __init__(self, config, kernel_size, num_layers = 3):
        super(Encoder, self).__init__()
        self.input_size = config.filter
        self.output_size = config.filter
        self.res_blocks = nn.ModuleList([ResidualBlock(self.input_size, self.output_size, kernel_size, 1, padding=(kernel_size - 1) // 2) for i in range(num_layers)])
        self.pool = nn.MaxPool1d(kernel_size=kernel_size, stride=1, padding=(kernel_size-1)//2)

    def forward(self, x):
        encoder_output = []
        for block in self.res_blocks:
            encoder_output.append(x)
            x = self.pool(x)  # Apply MaxPooling after each residual block
            x = block(x)
        encoder_output.append(x)
        x = self.pool(x)
        return x, encoder_output
    
class Representation(nn.Module):
    def __init__(self, config, kernel_size, num_layers = 3):
        super(Representation, self).__init__()
        self.input_size = config.filter
        self.output_size = config.filter
        self.padding = (kernel_size - 1) // 2
        self.conv1 = nn.Conv1d(self.input_size, self.output_size, kernel_size=kernel_size, stride=1, padding=self.padding)
        self.bn1 = nn.BatchNorm1d(self.output_size)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv1d(self.output_size, self.output_size, kernel_size=kernel_size, stride=1, padding=self.padding)
        self.bn2 = nn.BatchNorm1d(self.output_size)
        self.conv3 = nn.Conv1d(self.output_size, self.output_size, kernel_size=kernel_size, stride=1, padding=self.padding)
        self.bn3 = nn.BatchNorm1d(self.output_size)
        self.dropout = nn.Dropout(0.2)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.conv3(x)
        x = self.bn3(x)
        x = self.relu(x)
        x = self.dropout(x)
        return x
    
# write decoder same as encoder
class Decoder(nn.Module):
    def __init__(self, config, kernel_size, num_layers = 3):
        super(Decoder, self).__init__()
        self.input_size = config.filter
        self.output_size = config.filter
        self.padding = (kernel_size - 1) // 2
        self.res_blocks = nn.ModuleList([ResidualBlock(self.input_size, self.output_size, kernel_size, 1, padding=(kernel_size - 1) // 2) for i in range(num_layers)])
        self.conv_transpose = nn.ConvTranspose1d(self.input_size, self.input_size, kernel_size=kernel_size, stride=1, padding=self.padding)

    def forward(self, x, encoder_output):
        for i, block in enumerate(self.res_blocks):
            temp = encoder_output.pop()
            x = self.conv_transpose(x)
            x += temp
            x = block(x)
        x = self.conv_transpose(x)
        x += encoder_output.pop()
        return x
    
class ResNetSeq2Seq(nn.Module):
    def __init__(self, config):
        super(ResNetSeq2Seq, self).__init__()
        self.config = config
        self.num_layers = config.num_layers
        self.input_size = config.input_size
        self.output_size = config.output_size
        self.filter = config.filter
        self.kernel_size = 3
        self.padding = (self.kernel_size - 1) // 2

        #input Layer
        self.conv1 = nn.Conv1d(self.input_size, self.filter, kernel_size=self.kernel_size, stride=1, padding=self.padding)
        self.bn1 = nn.BatchNorm1d(self.filter)
        self.relu = nn.ReLU()

        #Encoder, Representation and Decoder
        self.encoder = Encoder(self.config, self.kernel_size, self.num_layers)
        self.representation = Representation(self.config, self.kernel_size, self.num_layers)
        self.decoder = Decoder(self.config, self.kernel_size, self.num_layers)

        #output Layer
        self.linear = nn.Linear(self.filter * self.config.lag_size, self.output_size * self.config.lag_size)
        self.flatten = nn.Flatten()

    def forward(self, x):
        x = torch.transpose(x, 1, 2)
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        encoded, residual_records = self.encoder(x)
        representation = self.representation(encoded)
        decoded = self.decoder(representation, residual_records)
        decoded = torch.transpose(decoded, 1, 2)
        decoded = self.flatten(decoded)
        decoded = self.linear(decoded)
        decoded = torch.reshape(decoded, (self.config.batch_size, self.config.lag_size))
        return decoded