In [27]:
import torch
import torch.nn as nn
import numpy as np
import pandas as pd

from torch.utils.data import Dataset, DataLoader

In [28]:
data = np.array([100, 110, 120, 125, 130, 140, 150, 155, 160, 170, 180, 190])

In [29]:
def create_sequences(data, seq_length):
    sequences = []
    targets = []
    for i in range(len(data) - seq_length):
        sequences.append(data[i:i + seq_length])
        targets.append(data[i + seq_length])
    return sequences, targets

In [30]:
seq_length = 3
sequences, targets = create_sequences(data, seq_length)

In [34]:
print(data)
for x, y in zip(sequences, targets):
    print(f'{x} ==> {y}')

[100 110 120 125 130 140 150 155 160 170 180 190]
[100 110 120] ==> 125
[110 120 125] ==> 130
[120 125 130] ==> 140
[125 130 140] ==> 150
[130 140 150] ==> 155
[140 150 155] ==> 160
[150 155 160] ==> 170
[155 160 170] ==> 180
[160 170 180] ==> 190


In [35]:
# Convert sequences and targets to PyTorch tensors
sequences = torch.tensor(sequences, dtype=torch.float32)
targets = torch.tensor(targets, dtype=torch.float32)

In [36]:
print(data)
for x, y in zip(sequences, targets):
    print(f'{x} ==> {y}')

[100 110 120 125 130 140 150 155 160 170 180 190]
tensor([100., 110., 120.]) ==> 125.0
tensor([110., 120., 125.]) ==> 130.0
tensor([120., 125., 130.]) ==> 140.0
tensor([125., 130., 140.]) ==> 150.0
tensor([130., 140., 150.]) ==> 155.0
tensor([140., 150., 155.]) ==> 160.0
tensor([150., 155., 160.]) ==> 170.0
tensor([155., 160., 170.]) ==> 180.0
tensor([160., 170., 180.]) ==> 190.0


In [37]:
# Split data into training and testing sets
train_size = int(0.8 * len(sequences))
train_sequences, test_sequences = sequences[:train_size], sequences[train_size:]
train_targets, test_targets = targets[:train_size], targets[train_size:]

In [39]:
class LSTMPredictor(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        super(LSTMPredictor, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out[:, -1, :])  # Get the last output in the sequence
        return out

input_size = 1  # Sales data is univariate
hidden_size = 8
num_layers = 1

model = LSTMPredictor(input_size, hidden_size, num_layers)

In [176]:
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.1)

In [43]:
train_sequences.shape

torch.Size([7, 3])

In [45]:
train_sequences.unsqueeze(2).shape

torch.Size([7, 3, 1])

In [49]:
outputs = model(train_sequences.unsqueeze(2))

In [54]:
outputs.unsqueeze(2).shape

torch.Size([7, 1, 1])

In [56]:
loss = criterion(outputs.unsqueeze(2), train_targets)

In [184]:
criterion(outputs, train_targets)

tensor(8152.7139, grad_fn=<MseLossBackward0>)

In [185]:
num_epochs = 10
for epoch in range(num_epochs):
    # Forward pass
    outputs = model(train_sequences.unsqueeze(2))
    loss = criterion(outputs, train_targets)

    # Backward pass and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')

print('Training complete!')

Epoch [10/10], Loss: 7217.6807
Training complete!


In [186]:
model.eval()  # Set the model to evaluation mode
with torch.no_grad():
    test_outputs = model(test_sequences.unsqueeze(2))
    mse = criterion(test_outputs, test_targets)
    print(f'Mean Squared Error on Test Data: {mse:.4f}')

    # Convert the test_outputs to a NumPy array
    predictions = test_outputs.numpy()
    print('Predicted Sales:', predictions)

Mean Squared Error on Test Data: 14634.6113
Predicted Sales: [[64.129776]
 [64.129776]]


In [187]:
test_targets

tensor([180., 190.])

In [188]:
model.eval()  # Set the model to evaluation mode
with torch.no_grad():
    test_outputs = model(train_sequences.unsqueeze(2))
    mse = criterion(test_outputs, test_targets)
    print(f'Mean Squared Error on Test Data: {mse:.4f}')

    # Convert the test_outputs to a NumPy array
    predictions = test_outputs.numpy()
    print('Predicted Sales:', predictions)

Mean Squared Error on Test Data: 14634.6104
Predicted Sales: [[64.129776]
 [64.129776]
 [64.129776]
 [64.129776]
 [64.129776]
 [64.129776]
 [64.129776]]


***

In [189]:
data = {
    'Price': [100, 105, 110, 120, 115, 112, 108, 109, 105, 102],
    'Label': [0, 0, 1, 1, 1, 0, 0, 1, 1, 0]
}

df = pd.DataFrame(data)

In [190]:
class PriceLabelDataset(Dataset):
    def __init__(self, data, sequence_length=3):
        self.data = data.values  # Convert to numpy array
        self.sequence_length = sequence_length

    def __len__(self):
        return len(self.data) - self.sequence_length

    def __getitem__(self, idx):
        prices = self.data[idx:idx + self.sequence_length, 0]
        label = self.data[idx + self.sequence_length, 1]
        return torch.FloatTensor(prices), torch.LongTensor([label])

# Create the dataset
dataset = PriceLabelDataset(df, sequence_length=3)

# Create a data loader
batch_size = 2  # Adjust as needed
loader = DataLoader(dataset, batch_size=batch_size, shuffle=False)

In [195]:
for i in loader:
    x, y = i
    break

In [196]:
x

tensor([[100., 105., 110.],
        [105., 110., 120.]])

In [197]:
y

tensor([[1],
        [1]])

In [191]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        out, _ = self.lstm(x.view(x.size(0), -1, 1), (h0, c0))
        out = self.fc(out[:, -1, :])
        return out

# Create the model
input_size = 1
hidden_size = 8
num_layers = 8
output_size = 2
model = LSTMModel(input_size, hidden_size, num_layers, output_size)

In [193]:
import torch.optim as optim

# Define hyperparameters
learning_rate = 0.001
num_epochs = 100

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()  # Cross-entropy loss for binary classification
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(num_epochs):
    total_loss = 0.0
    for prices, labels in loader:
        optimizer.zero_grad()
        print(prices)
        # Forward pass
        outputs = model(prices)
        
        # Calculate loss
        loss = criterion(outputs, labels.view(-1))
        
        # Backpropagation and optimization
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {total_loss / len(loader):.4f}")

print("Training finished.")


tensor([[100., 105., 110.],
        [105., 110., 120.]])
tensor([[110., 120., 115.],
        [120., 115., 112.]])
tensor([[115., 112., 108.],
        [112., 108., 109.]])
tensor([[108., 109., 105.]])
Epoch [1/100], Loss: 0.6950
tensor([[100., 105., 110.],
        [105., 110., 120.]])
tensor([[110., 120., 115.],
        [120., 115., 112.]])
tensor([[115., 112., 108.],
        [112., 108., 109.]])
tensor([[108., 109., 105.]])
Epoch [2/100], Loss: 0.6936
tensor([[100., 105., 110.],
        [105., 110., 120.]])
tensor([[110., 120., 115.],
        [120., 115., 112.]])
tensor([[115., 112., 108.],
        [112., 108., 109.]])
tensor([[108., 109., 105.]])
Epoch [3/100], Loss: 0.6934
tensor([[100., 105., 110.],
        [105., 110., 120.]])
tensor([[110., 120., 115.],
        [120., 115., 112.]])
tensor([[115., 112., 108.],
        [112., 108., 109.]])
tensor([[108., 109., 105.]])
Epoch [4/100], Loss: 0.6934
tensor([[100., 105., 110.],
        [105., 110., 120.]])
tensor([[110., 120., 115.],
   