In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

class TimeSeriesTransformer(nn.Module):
    def __init__(self, num_features, d_model, nhead, num_encoder_layers, dim_feedforward, dropout, n_classes):
        super(TimeSeriesTransformer, self).__init__()

        # Input projection from num_features to d_model
        self.input_projection = nn.Linear(num_features, d_model)

        # Positional encoding
        self.positional_encoding = nn.Parameter(torch.zeros(1, 1000, d_model))  # Assuming max seq_len of 1000

        # Transformer Encoder
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=dim_feedforward,
            dropout=dropout
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)

        # Output MLP for classification
        self.fc = nn.Sequential(
            nn.Linear(d_model, 128),
            nn.ReLU(),
            nn.Linear(128, n_classes)
        )

    def forward(self, x):
        # Input: (batch_size, seq_len, num_features)

        batch_size, seq_len, _ = x.size()

        # Input projection (embedding)
        x = self.input_projection(x)  # (batch_size, seq_len, d_model)

        # Add positional encoding
        x = x + self.positional_encoding[:, :seq_len, :]

        # Transpose for Transformer (needed by PyTorch's nn.Transformer)
        x = x.transpose(0, 1)  # (seq_len, batch_size, d_model)

        # Transformer Encoder
        x = self.transformer_encoder(x)  # (seq_len, batch_size, d_model)

        # Pooling: Take the mean over the sequence dimension
        x = x.mean(dim=0)  # (batch_size, d_model)

        # Classification MLP
        out = self.fc(x)  # (batch_size, n_classes)

        return out

# Example usage:
# Parameters
num_features = 10    # Number of features in the time series
seq_len = 50         # Length of each time series sequence
d_model = 64         # Transformer model dimension
nhead = 8            # Number of attention heads
num_encoder_layers = 3 # Number of Transformer encoder layers
dim_feedforward = 128 # Feedforward dimension
dropout = 0.1        # Dropout rate
n_classes = 5        # Number of output classes

# Create the model
model = TimeSeriesTransformer(num_features, d_model, nhead, num_encoder_layers, dim_feedforward, dropout, n_classes)

# Input (batch_size, seq_len, num_features)
batch_size = 32
x = torch.randn(batch_size, seq_len, num_features)  # Random input

# Forward pass
output = model(x)
print(output.shape)  # Should output (batch_size, n_classes)




torch.Size([32, 5])


Example training, one pass

In [10]:
# Example training setup
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Example target labels for the batch
target = torch.randint(0, n_classes, (batch_size,))

# Forward pass
output = model(x)

# Compute loss
loss = criterion(output, target)

# Backpropagation and optimization
optimizer.zero_grad()
loss.backward()
optimizer.step()

print(f"Training loss: {loss.item()}")

Training loss: 1.6488206386566162


Now, running through all epochs and batches

In [12]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# Assuming the TimeSeriesTransformer model from the previous example
class TimeSeriesTransformer(nn.Module):
    def __init__(self, num_features, d_model, nhead, num_encoder_layers, dim_feedforward, dropout, n_classes):
        super(TimeSeriesTransformer, self).__init__()
        self.input_projection = nn.Linear(num_features, d_model)
        self.positional_encoding = nn.Parameter(torch.zeros(1, 1000, d_model))
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
        self.fc = nn.Sequential(
            nn.Linear(d_model, 128),
            nn.ReLU(),
            nn.Linear(128, n_classes)
        )

    def forward(self, x):
        batch_size, seq_len, _ = x.size()
        x = self.input_projection(x)
        x = x + self.positional_encoding[:, :seq_len, :]
        x = x.transpose(0, 1)  # (seq_len, batch_size, d_model)
        x = self.transformer_encoder(x)
        x = x.mean(dim=0)  # (batch_size, d_model)
        out = self.fc(x)  # (batch_size, n_classes)
        return out

# Mock data for illustration (replace with your real dataset)
# Assuming time series data: (batch_size, seq_len, num_features)
num_samples = 1000
seq_len = 50
num_features = 10
n_classes = 5
batch_size = 32

# Generate random input data and labels for demonstration
X = torch.randn(num_samples, seq_len, num_features)
y = torch.randint(0, n_classes, (num_samples,))

# Create DataLoader
dataset = TensorDataset(X, y)
train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Initialize model, loss function, and optimizer
d_model = 64
nhead = 8
num_encoder_layers = 3
dim_feedforward = 128
dropout = 0.1

model = TimeSeriesTransformer(num_features, d_model, nhead, num_encoder_layers, dim_feedforward, dropout, n_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training Loop
n_epochs = 10

for epoch in range(n_epochs):
    model.train()  # Set model to training mode
    running_loss = 0.0

    for batch_idx, (inputs, labels) in enumerate(train_loader):
        # Move inputs and labels to device (e.g., GPU if available)
        # inputs, labels = inputs.to('cuda'), labels.to('cuda')
        # model = model.to('cuda')

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)

        # Compute loss
        loss = criterion(outputs, labels)

        # Backward pass and optimization step
        loss.backward()
        optimizer.step()

        # Print loss for every 10 batches
        running_loss += loss.item()
        if batch_idx % 10 == 9:  # Print every 10 batches
            print(f'Epoch {epoch+1}, Batch {batch_idx+1}, Loss: {running_loss / 10:.4f}')
            running_loss = 0.0

print("Training completed.")

Epoch 1, Batch 10, Loss: 1.6247
Epoch 1, Batch 20, Loss: 1.6165
Epoch 1, Batch 30, Loss: 1.6200
Epoch 2, Batch 10, Loss: 1.6090
Epoch 2, Batch 20, Loss: 1.6135
Epoch 2, Batch 30, Loss: 1.6131
Epoch 3, Batch 10, Loss: 1.6127
Epoch 3, Batch 20, Loss: 1.6120
Epoch 3, Batch 30, Loss: 1.6104
Epoch 4, Batch 10, Loss: 1.6095
Epoch 4, Batch 20, Loss: 1.6130
Epoch 4, Batch 30, Loss: 1.6116
Epoch 5, Batch 10, Loss: 1.6102
Epoch 5, Batch 20, Loss: 1.6008
Epoch 5, Batch 30, Loss: 1.6201
Epoch 6, Batch 10, Loss: 1.5957
Epoch 6, Batch 20, Loss: 1.6038
Epoch 6, Batch 30, Loss: 1.6036
Epoch 7, Batch 10, Loss: 1.5899
Epoch 7, Batch 20, Loss: 1.6089
Epoch 7, Batch 30, Loss: 1.6024
Epoch 8, Batch 10, Loss: 1.5973
Epoch 8, Batch 20, Loss: 1.6183
Epoch 8, Batch 30, Loss: 1.5948
Epoch 9, Batch 10, Loss: 1.5965
Epoch 9, Batch 20, Loss: 1.5975
Epoch 9, Batch 30, Loss: 1.6031
Epoch 10, Batch 10, Loss: 1.5882
Epoch 10, Batch 20, Loss: 1.6015
Epoch 10, Batch 30, Loss: 1.5876
Training completed.
