In [9]:
!pip install arch -q
!git clone https://github.com/numenta/NAB.git -q

In [2]:
import os
import numpy as np
import torch
import torch.nn as nn
import random

class sRLSTMCell(nn.Module):
    def __init__(self, input_size, hidden_size, forget_bias=1.0, dense=None,
                 file_name='tweet', type='enc', component=1, partition=1, seed=None):
        super(sRLSTMCell, self).__init__()
        self.hidden_size = hidden_size
        self.forget_bias = forget_bias
        self.file_name = file_name
        self.type = type
        self.component = component
        self.partition = partition
        self.step = 0  # Equivalent to TensorFlow's _step

        # Initialize the main LSTM cell
        self.lstm = nn.LSTMCell(input_size, hidden_size)

        # Initialize additional weights and biases
        self.weight_h_2 = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        self.bias_h_2 = nn.Parameter(torch.Tensor(hidden_size))
        nn.init.trunc_normal_(self.weight_h_2, std=0.1)
        nn.init.zeros_(self.bias_h_2)

        # Activation function
        self.activation = torch.tanh

        # Initialize directories for saving masks
        self._init_directories()

        # Initialize mask generator
        if seed is not None:
            self.rng = np.random.RandomState(seed)
        else:
            self.rng = np.random.RandomState()

    def _init_directories(self):
        base_dir = f'./weight/{self.file_name}/{self.partition}/{self.component}/{self.type}'
        os.makedirs(base_dir, exist_ok=True)
        self.mask_dir = base_dir

    def masked_weight(self, load=False):
        mask_w1_path = os.path.join(self.mask_dir, f'W1_step_{self.step}.npy')
        mask_w2_path = os.path.join(self.mask_dir, f'W2_step_{self.step}.npy')

        if not load:
            # Generate masks
            masked_W1 = np.random.randint(2, size=self.hidden_size)
            if masked_W1.sum() == 0:
                masked_W2 = np.ones(self.hidden_size)
            else:
                masked_W2 = np.random.randint(2, size=self.hidden_size)
                # Ensure at least one mask is active
                if masked_W2.sum() == 0:
                    masked_W2[0] = 1

            # Save masks
            np.save(mask_w1_path, masked_W1)
            np.save(mask_w2_path, masked_W2)
        else:
            # Load masks
            if os.path.exists(mask_w1_path) and os.path.exists(mask_w2_path):
                masked_W1 = np.load(mask_w1_path)
                masked_W2 = np.load(mask_w2_path)
            else:
                # If masks do not exist, generate them
                masked_W1 = np.random.randint(2, size=self.hidden_size)
                if masked_W1.sum() == 0:
                    masked_W2 = np.ones(self.hidden_size)
                else:
                    masked_W2 = np.random.randint(2, size=self.hidden_size)
                    if masked_W2.sum() == 0:
                        masked_W2[0] = 1
                np.save(mask_w1_path, masked_W1)
                np.save(mask_w2_path, masked_W2)

        # Convert masks to torch tensors
        tf_mask_W1 = torch.tensor(masked_W1, dtype=torch.float32, device=self.weight_h_2.device)
        tf_mask_W2 = torch.tensor(masked_W2, dtype=torch.float32, device=self.weight_h_2.device)
        return tf_mask_W1, tf_mask_W2

    def forward(self, input, state, load_mask=False):
        """
        Args:
            input: Tensor of shape (batch_size, input_size)
            state: Tuple of (h, c), each of shape (batch_size, hidden_size)
            load_mask: Boolean indicating whether to load masks from files
        Returns:
            h: New hidden state
            new_state: Tuple of (new_h, new_c)
        """
        h, c = state
        self.step += 1

        # Compute LSTM cell output
        new_h_1, new_c = self.lstm(input, (h, c))

        # Compute new_h_2
        new_h_2 = torch.sigmoid(torch.matmul(h, self.weight_h_2) + self.bias_h_2)

        # Get masks
        mask_w1, mask_w2 = self.masked_weight(load=load_mask)

        # Apply masks
        new_h = new_h_1 * mask_w1 + new_h_2 * mask_w2

        new_state = (new_h, new_c)
        return new_h, new_state

    def reset_step(self):
        """Reset the step counter."""
        self.step = 0

    def save_masks(self):
        """Save current masks to files."""
        mask_w1, mask_w2 = self.masked_weight(load=False)
        # Masks are already saved in masked_weight when load=False
        # So no additional action is needed here
        pass

    def load_masks(self):
        """Load masks from files for the current step."""
        mask_w1, mask_w2 = self.masked_weight(load=True)
        return mask_w1, mask_w2

In [3]:
class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers=1, **kwargs):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # sRLSTMCell을 다층 구조로 사용하기 위해 ModuleList로 저장
        self.cells = nn.ModuleList([
            sRLSTMCell(
                input_size if i == 0 else hidden_size,
                hidden_size,
                type='enc',
                component=i+1,
                **kwargs
            )
            for i in range(num_layers)
        ])

    def forward(self, inputs):
        """
        Args:
            inputs: Tensor of shape (seq_len, batch_size, input_size)
        Returns:
            outputs: List of final hidden states for each layer
            states: List of final (h, c) tuples for each layer
        """
        batch_size = inputs.size(1)
        seq_len = inputs.size(0)

        # 초기 은닉 상태와 셀 상태를 0으로 초기화
        h = [torch.zeros(batch_size, self.hidden_size, device=inputs.device) for _ in range(self.num_layers)]
        c = [torch.zeros(batch_size, self.hidden_size, device=inputs.device) for _ in range(self.num_layers)]
        states = list(zip(h, c))

        # 각 타임스텝에 대해 순환
        for t in range(seq_len):
            input_t = inputs[t]
            for i, cell in enumerate(self.cells):
                h_i, c_i = states[i]
                h_i, (h_i, c_i) = cell(input_t, (h_i, c_i))
                states[i] = (h_i, c_i)
                input_t = h_i  # 다음 레이어의 입력으로 현재 레이어의 출력을 사용
        outputs = [state[0] for state in states]  # 각 레이어의 마지막 은닉 상태
        return outputs, states

In [4]:
class Decoder(nn.Module):
    def __init__(self, hidden_size, output_size, num_layers=1, **kwargs):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.num_layers = num_layers

        # sRLSTMCell을 다층 구조로 사용하기 위해 ModuleList로 저장
        self.cells = nn.ModuleList([
            sRLSTMCell(
                hidden_size,
                hidden_size,
                type='dec',
                component=i+1,
                **kwargs
            )
            for i in range(num_layers)
        ])
        # 최종 출력을 위한 선형 레이어
        self.output_layer = nn.Linear(hidden_size, output_size)

    def forward(self, targets, encoder_states):
        """
        Args:
            targets: Tensor of shape (seq_len, batch_size, output_size)
            encoder_states: List of (h, c) tuples from the encoder
        Returns:
            outputs: Reconstructed outputs of shape (seq_len, batch_size, output_size)
        """
        batch_size = targets.size(1)
        seq_len = targets.size(0)

        # 인코더의 마지막 상태를 디코더의 초기 상태로 사용
        h = [state[0].detach() for state in encoder_states]  # detach to prevent gradients flowing back to encoder
        c = [state[1].detach() for state in encoder_states]
        states = list(zip(h, c))

        outputs = []
        # 각 타임스텝에 대해 순환
        for t in range(seq_len):
            input_t = targets[t]
            for i, cell in enumerate(self.cells):
                h_i, c_i = states[i]
                h_i, (h_i, c_i) = cell(input_t, (h_i, c_i))
                states[i] = (h_i, c_i)
                input_t = h_i  # 다음 레이어의 입력으로 현재 레이어의 출력을 사용
            output_t = self.output_layer(input_t)
            outputs.append(output_t)
        outputs = torch.stack(outputs, dim=0)
        return outputs

In [5]:
class AutoEncoder(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1, **kwargs):
        super(AutoEncoder, self).__init__()
        self.encoder = Encoder(input_size, hidden_size, num_layers, **kwargs)
        self.decoder = Decoder(hidden_size, output_size, num_layers, **kwargs)

    def forward(self, inputs, targets):
        """
        Args:
            inputs: Tensor of shape (seq_len, batch_size, input_size)
            targets: Tensor of shape (seq_len, batch_size, output_size)
        Returns:
            outputs: Reconstructed outputs of shape (seq_len, batch_size, output_size)
        """
        encoder_outputs, encoder_states = self.encoder(inputs)
        outputs = self.decoder(targets, encoder_states)
        return outputs


In [6]:
class EnsembleAutoEncoder(nn.Module):
    def __init__(self, N, input_size, hidden_size, output_size, num_layers=1, **kwargs):
        """
        Args:
            N: Number of AutoEncoders in the ensemble
            input_size: Size of the input features
            hidden_size: Size of the hidden state in LSTM
            output_size: Size of the output features
            num_layers: Number of layers in each AutoEncoder
            **kwargs: Additional keyword arguments for sRLSTMCell
        """
        super(EnsembleAutoEncoder, self).__init__()
        self.N = N
        self.autoencoders = nn.ModuleList([
            AutoEncoder(input_size, hidden_size, output_size, num_layers, **kwargs)
            for _ in range(N)
        ])

    def forward(self, inputs, targets):
        """
        Args:
            inputs: Tensor of shape (seq_len, batch_size, input_size)
            targets: Tensor of shape (seq_len, batch_size, output_size)
        Returns:
            outputs: Averaged reconstructed outputs of shape (seq_len, batch_size, output_size)
        """
        ensemble_outputs = []
        for autoencoder in self.autoencoders:
            output = autoencoder(inputs, targets)
            ensemble_outputs.append(output)
        # Stack and average the outputs
        stacked_outputs = torch.stack(ensemble_outputs, dim=0)  # Shape: (N, seq_len, batch_size, output_size)
        averaged_output = torch.mean(stacked_outputs, dim=0)    # Shape: (seq_len, batch_size, output_size)
        return averaged_output

    def reset_steps(self):
        """Reset step counters for all AutoEncoders in the ensemble."""
        for autoencoder in self.autoencoders:
            for cell in autoencoder.encoder.cells:
                cell.reset_step()
            for cell in autoencoder.decoder.cells:
                cell.reset_step()


In [7]:
def set_random_seed(seed):
    os.environ['PYTHONHASHSEED'] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    # 딥러닝 재현성을 위해 추가 설정
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

In [8]:
# 랜덤 시드 설정
random_seed = 777
set_random_seed(random_seed)

In [None]:
# To-Do List
# 1. Skip connection 구현
# 2. Skip connection의 길이 L을 하이퍼파라미터로 설정할 수 있도록 수정
# 3. h1, h2, mask 개념을 충실히 반영하여 기존 연결과 스킵 연결들을 랜덤하기 masking하도록 보장