In [40]:
# src/lstm_new/lstm_new.py
import torch
import torch.nn as nn
import torch.nn.functional as F

class Attention(nn.Module):
    """
    Улучшенный механизм внимания с маскировкой и масштабированием.
    """
    def __init__(self, hidden_size, attention_size):
        super(Attention, self).__init__()
        self.hidden_size = hidden_size
        self.attention_size = attention_size
        
        # Слои для вычисления внимания
        self.W_enc = nn.Linear(hidden_size, attention_size, bias=False)
        self.W_dec = nn.Linear(hidden_size, attention_size, bias=False)
        self.V = nn.Linear(attention_size, 1, bias=False)
        
        # Маскировка для предотвращения утечки будущего
        self.dropout = nn.Dropout(0.1)

    def forward(self, encoder_outputs, decoder_hidden, mask=None):
        """
        Вычисляет контекстный вектор с помощью механизма внимания.
        
        Args:
            encoder_outputs: [batch_size, seq_len, hidden_size]
            decoder_hidden: [batch_size, hidden_size]
            mask: [batch_size, seq_len] - маска для игнорирования padding
            
        Returns:
            context_vector: [batch_size, hidden_size]
            attention_weights: [batch_size, seq_len]
        """
        # decoder_hidden: [batch_size, hidden_size] -> [batch_size, 1, hidden_size]
        decoder_hidden_expanded = decoder_hidden.unsqueeze(1)
        
        # Вычисляем оценку внимания
        energy = torch.tanh(
            self.W_enc(encoder_outputs) + self.W_dec(decoder_hidden_expanded)
        )
        attention_scores = self.V(energy).squeeze(2)  # [batch_size, seq_len]
        
        # Применяем маску если есть
        if mask is not None:
            attention_scores.masked_fill_(mask == 0, -1e9)
        
        # Применяем softmax для получения весов внимания
        attention_weights = F.softmax(attention_scores, dim=1)  # [batch_size, seq_len]
        attention_weights = self.dropout(attention_weights)
        
        # Вычисляем контекстный вектор
        context_vector = torch.bmm(attention_weights.unsqueeze(1), encoder_outputs).squeeze(1)
        
        return context_vector, attention_weights


class LSTMEncoder(nn.Module):
    """
    Улучшенный LSTM энкодер с двунаправленными слоями и dropout.
    """
    def __init__(self, input_size, hidden_size, num_layers, dropout=0.2):
        super(LSTMEncoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # Двунаправленный LSTM для лучшего захвата контекста
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
            bidirectional=True
        )
        
        # Проекция для согласования размерностей
        self.projection = nn.Linear(hidden_size * 2, hidden_size)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        """
        Args:
            x: [batch_size, seq_len, input_size]
            
        Returns:
            outputs: [batch_size, seq_len, hidden_size]
            (h_n, c_n): кортеж скрытых состояний
        """
        outputs, (h_n, c_n) = self.lstm(x)
        # Проекция для уменьшения размерности от двунаправленного LSTM
        outputs = self.projection(outputs)
        outputs = self.dropout(outputs)
        return outputs, (h_n, c_n)


class LSTMDecoder(nn.Module):
    """
    Улучшенный LSTM декодер с механизмом внимания и teacher forcing.
    """
    def __init__(self, input_size, hidden_size, output_size, attention, dropout=0.2):
        super(LSTMDecoder, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.attention = attention
        
        # LSTM декодер
        self.lstm = nn.LSTM(
            input_size=input_size + hidden_size,  # вход + контекст
            hidden_size=hidden_size,
            num_layers=2,
            batch_first=True,
            dropout=dropout
        )
        
        # Выходной слой
        self.output_projection = nn.Sequential(
            nn.Linear(hidden_size + hidden_size, hidden_size),  # hidden + context
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_size, output_size)
        )
        
    def forward(self, encoder_outputs, encoder_hidden, targets=None, teacher_forcing_ratio=0.5):
        """
        Args:
            encoder_outputs: [batch_size, seq_len, hidden_size]
            encoder_hidden: кортеж (h_n, c_n) из энкодера
            targets: [batch_size, target_len, output_size] - целевые значения
            teacher_forcing_ratio: вероятность использования teacher forcing
            
        Returns:
            outputs: [batch_size, target_len, output_size]
        """
        batch_size, seq_len, _ = encoder_outputs.size()
        target_len = 32  # фиксированная длина прогноза
        
        # Инициализация выходов
        outputs = torch.zeros(batch_size, target_len, self.output_size, device=encoder_outputs.device)
        
        # Инициализация скрытого состояния декодера
        # Берем последние слои из энкодера
        h_n, c_n = encoder_hidden
        decoder_hidden = h_n[-2:].contiguous()  # последние 2 слоя
        decoder_cell = c_n[-2:].contiguous()
        
        # Начальное входное значение (последнее значение из истории)
        decoder_input = encoder_outputs[:, -1:, :1]  # берем только цену закрытия
        
        for t in range(target_len):
            # Вычисляем контекст через внимание
            context_vector, _ = self.attention(
                encoder_outputs, 
                decoder_hidden[-1]  # последний слой скрытого состояния
            )
            
            # Подготавливаем вход для LSTM
            lstm_input = torch.cat([decoder_input, context_vector.unsqueeze(1)], dim=2)
            
            # Пропускаем через LSTM
            lstm_output, (decoder_hidden, decoder_cell) = self.lstm(
                lstm_input, 
                (decoder_hidden, decoder_cell)
            )
            
            # Подготавливаем вход для выходного слоя
            output_input = torch.cat([lstm_output.squeeze(1), context_vector], dim=1)
            
            # Вычисляем выход
            output = self.output_projection(output_input)
            outputs[:, t:t+1] = output.unsqueeze(1)
            
            # Подготовка decoder_input для следующего шага
            use_teacher_forcing = torch.rand(1).item() < teacher_forcing_ratio
            if use_teacher_forcing and targets is not None and t < targets.size(1):
                decoder_input = targets[:, t:t+1]  # использовать реальное значение
            else:
                decoder_input = output.unsqueeze(1)  # использовать предсказание
                
        return outputs


class TradingLSTM(nn.Module):
    """
    Мощная LSTM Encoder-Decoder модель с улучшенным вниманием для прогнозирования цен.
    """
    def __init__(
        self,
        feature_size=256,
        encoder_hidden_size=512,
        decoder_hidden_size=512,
        num_layers=2,
        output_size=1,
        target_len=32,
        dropout=0.3,
        teacher_forcing_ratio=0.5
    ):
        super(TradingLSTM, self).__init__()
        self.target_len = target_len
        self.teacher_forcing_ratio = teacher_forcing_ratio
        
        # Механизм внимания
        self.attention = Attention(encoder_hidden_size, encoder_hidden_size // 2)
        
        # Энкодер
        self.encoder = LSTMEncoder(
            input_size=feature_size,
            hidden_size=encoder_hidden_size,
            num_layers=num_layers,
            dropout=dropout
        )
        
        # Декодер
        self.decoder = LSTMDecoder(
            input_size=output_size,  # прогнозируем только цену закрытия
            hidden_size=decoder_hidden_size,
            output_size=output_size,
            attention=self.attention,
            dropout=dropout
        )
        
        # Нормализация входа
        self.layer_norm = nn.LayerNorm(feature_size)
        
    def forward(self, src, tgt=None):
        """
        Прямой проход модели.
        
        Args:
            src: [batch_size, seq_len, feature_size] - исторические данные
            tgt: [batch_size, target_len, output_size] - целевые значения (для обучения)
            
        Returns:
            outputs: [batch_size, target_len, output_size] - прогнозы
        """
        # Нормализация входа
        src = self.layer_norm(src)
        
        # Энкодер
        encoder_outputs, encoder_hidden = self.encoder(src)
        
        # Декодер
        outputs = self.decoder(
            encoder_outputs, 
            encoder_hidden, 
            targets=tgt,
            teacher_forcing_ratio=self.teacher_forcing_ratio if self.training else 0.0
        )
        
        return outputs


# Пример использования
if __name__ == "__main__":
    # Параметры
    batch_size, seq_len, feature_size = 4, 256, 256
    target_len, output_size = 32, 1
    
    # Создание модели
    model = TradingLSTM(
        feature_size=feature_size,
        encoder_hidden_size=512,
        decoder_hidden_size=512,
        num_layers=2,
        output_size=output_size,
        target_len=target_len,
        dropout=0.3,
        teacher_forcing_ratio=0.5
    )
    
    # Тестовые данные
    src = torch.randn(batch_size, seq_len, feature_size)
    tgt = torch.randn(batch_size, target_len, output_size)
    
    # Обучение
    model.train()
    output_train = model(src, tgt)
    print(f"Выход при обучении: {output_train.shape}")  # [4, 32, 1]
    
    # Инференс
    model.eval()
    with torch.no_grad():
        output_infer = model(src)
        print(f"Выход при инференсе: {output_infer.shape}")  # [4, 32, 1]

Выход при обучении: torch.Size([4, 32, 1])
Выход при инференсе: torch.Size([4, 32, 1])
