In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam



In [13]:
class Lstm(nn.Module):
    def __init__(self, hidden_size=32):
        super().__init__()
        self.hidden_size = hidden_size
        mean = torch.tensor(0.0)
        std = torch.tensor(0.1)

        self.wlr1 = nn.Parameter(torch.normal(mean, std, size=(hidden_size, hidden_size)))
        self.wlr2 = nn.Parameter(torch.normal(mean, std, size=(1, hidden_size)))
        self.blr1 = nn.Parameter(torch.zeros(hidden_size))

        self.wpr1 = nn.Parameter(torch.normal(mean, std, size=(hidden_size, hidden_size)))
        self.wpr2 = nn.Parameter(torch.normal(mean, std, size=(1, hidden_size)))
        self.bpr1 = nn.Parameter(torch.zeros(hidden_size))

        self.wp1 = nn.Parameter(torch.normal(mean, std, size=(hidden_size, hidden_size)))
        self.wp2 = nn.Parameter(torch.normal(mean, std, size=(1, hidden_size)))
        self.bp1 = nn.Parameter(torch.zeros(hidden_size))

        self.wo1 = nn.Parameter(torch.normal(mean, std, size=(hidden_size, hidden_size)))
        self.wo2 = nn.Parameter(torch.normal(mean, std, size=(1, hidden_size)))
        self.bo1 = nn.Parameter(torch.zeros(hidden_size))

        self.output_layer = nn.Linear(hidden_size, 1)

    def lstm_unit(self, input_value, long_term_state, short_term_state):
        input_value = input_value.view(1, -1)

        long_remember_percent = torch.sigmoid(torch.mm(short_term_state, self.wlr1) +
                                           torch.mm(input_value, self.wlr2) + self.blr1)

        potential_remember_percent = torch.sigmoid(torch.mm(short_term_state, self.wpr1) +
                                                torch.mm(input_value, self.wpr2) + self.bpr1)

        potential_memory = torch.tanh(torch.mm(short_term_state, self.wp1) +
                                   torch.mm(input_value, self.wp2) + self.bp1)

        updated_long_term_state = (long_remember_percent * long_term_state +
                                potential_remember_percent * potential_memory)

        output_percent = torch.sigmoid(torch.mm(short_term_state, self.wo1) +
                                    torch.mm(input_value, self.wo2) + self.bo1)

        updated_short_term_state = output_percent * torch.tanh(updated_long_term_state)

        return updated_long_term_state, updated_short_term_state

    def forward(self, input_seq):
        batch_size = 1
        long_term_state = torch.zeros(batch_size, self.hidden_size)
        short_term_state = torch.zeros(batch_size, self.hidden_size)

        for i in range(len(input_seq)):
            long_term_state, short_term_state = self.lstm_unit(
                input_seq[i],
                long_term_state,
                short_term_state
            )

        output = self.output_layer(short_term_state)
        return output.squeeze()

    def configure_optimizers(self):
        return Adam(self.parameters(), lr=0.001)

    def training_step(self, batch, batch_idx):
        input_i, label_i = batch
        output_i = self(input_i)
        loss = F.mse_loss(output_i, label_i)  # MSE loss
        return loss

In [14]:
def test_lstm_improved():
    sequences = torch.tensor([
        [0.1, 0.2, 0.3, 0.4, 0.5],
        [0.2, 0.4, 0.6, 0.8, 1.0],
        [0.1, 0.3, 0.5, 0.7, 0.9],
        [0.05, 0.1, 0.15, 0.2, 0.25]
    ], dtype=torch.float32)

    targets = torch.tensor([0.6, 1.2, 1.1, 0.3], dtype=torch.float32)

    dataset = TensorDataset(sequences, targets)
    train_loader = DataLoader(dataset, batch_size=2, shuffle=True)

    model = Lstm(hidden_size=32)
    optimizer = model.configure_optimizers()

    num_epochs = 1000
    best_loss = float('inf')
    patience = 20
    patience_counter = 0

    print("Starting training...")
    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0

        for batch_sequences, batch_targets in train_loader:
            optimizer.zero_grad()

            batch_loss = 0
            for seq, target in zip(batch_sequences, batch_targets):
                output = model(seq)
                batch_loss += F.mse_loss(output, target)

            batch_loss /= len(batch_sequences)
            batch_loss.backward()

            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

            optimizer.step()
            epoch_loss += batch_loss.item()

        avg_epoch_loss = epoch_loss / len(train_loader)

        if avg_epoch_loss < best_loss:
            best_loss = avg_epoch_loss
            best_state = model.state_dict()
            patience_counter = 0
        else:
            patience_counter += 1

        if patience_counter >= patience:
            print(f"Early stopping at epoch {epoch+1}")
            break

        if (epoch + 1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Average Loss: {avg_epoch_loss:.6f}')

    model.load_state_dict(best_state)

    # Test the model
    print("\nTesting the model...")
    model.eval()
    with torch.no_grad():
        test_sequences = torch.tensor([
            [0.15, 0.3, 0.45, 0.6, 0.75],
            [0.2, 0.25, 0.3, 0.35, 0.4],
            [0.1, 0.2, 0.4, 0.8, 1.6]
        ], dtype=torch.float32)

        expected_values = torch.tensor([0.9, 0.45, 3.2], dtype=torch.float32)

        for i, (test_seq, expected) in enumerate(zip(test_sequences, expected_values)):
            prediction = model(test_seq)
            print(f'\nTest Sequence {i+1}: {test_seq.tolist()}')
            print(f'Predicted value: {prediction.item():.4f}')
            print(f'Expected value: {expected.item():.4f}')
            print(f'Prediction error: {abs(prediction.item() - expected.item()):.4f}')

if __name__ == "__main__":
    test_lstm_improved()

Starting training...
Epoch [10/1000], Average Loss: 0.434728
Epoch [20/1000], Average Loss: 0.258023
Epoch [30/1000], Average Loss: 0.053162
Epoch [40/1000], Average Loss: 0.048572
Epoch [50/1000], Average Loss: 0.040620
Epoch [60/1000], Average Loss: 0.031184
Epoch [70/1000], Average Loss: 0.024332
Epoch [80/1000], Average Loss: 0.018692
Epoch [90/1000], Average Loss: 0.013903
Epoch [100/1000], Average Loss: 0.010252
Epoch [110/1000], Average Loss: 0.007994
Epoch [120/1000], Average Loss: 0.006605
Epoch [130/1000], Average Loss: 0.005768
Epoch [140/1000], Average Loss: 0.005579
Epoch [150/1000], Average Loss: 0.005263
Epoch [160/1000], Average Loss: 0.005284
Epoch [170/1000], Average Loss: 0.004952
Epoch [180/1000], Average Loss: 0.004818
Epoch [190/1000], Average Loss: 0.004931
Epoch [200/1000], Average Loss: 0.004753
Epoch [210/1000], Average Loss: 0.004517
Epoch [220/1000], Average Loss: 0.004381
Epoch [230/1000], Average Loss: 0.004276
Epoch [240/1000], Average Loss: 0.004311
Epoc