In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import math
from typing import Optional, Tuple

In [2]:
class PositionalEncoding(nn.Module):
    """Positional encoding for time series data with optional learnable components."""
    
    def __init__(self, d_model: int, max_seq_length: int = 5000, dropout: float = 0.1):
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        
        # Create positional encoding matrix
        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        
        # Create div_term for sinusoidal encoding
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * 
                           (-math.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        
        self.register_buffer('pe', pe)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Add positional encoding to input embeddings."""
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)



In [3]:
class MultiHeadAttention(nn.Module):
    """Multi-head self-attention mechanism optimized for time series."""
    
    def __init__(self, d_model: int, num_heads: int, dropout: float = 0.1):
        super().__init__()
        assert d_model % num_heads == 0
        
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)
        
        self.dropout = nn.Dropout(dropout)
        
    def scaled_dot_product_attention(self, Q: torch.Tensor, K: torch.Tensor, 
                                   V: torch.Tensor, mask: Optional[torch.Tensor] = None) -> Tuple[torch.Tensor, torch.Tensor]:
        """Compute scaled dot-product attention."""
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        
        attention_weights = F.softmax(scores, dim=-1)
        attention_weights = self.dropout(attention_weights)
        
        output = torch.matmul(attention_weights, V)
        return output, attention_weights
    
    def forward(self, query: torch.Tensor, key: torch.Tensor, value: torch.Tensor, 
                mask: Optional[torch.Tensor] = None) -> torch.Tensor:
        """Forward pass of multi-head attention."""
        batch_size = query.size(0)
        
        # Linear transformations and reshape for multi-head attention
        Q = self.w_q(query).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        K = self.w_k(key).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        V = self.w_v(value).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        
        # Apply attention
        attn_output, _ = self.scaled_dot_product_attention(Q, K, V, mask)
        
        # Concatenate heads and apply output projection
        attn_output = attn_output.transpose(1, 2).contiguous().view(
            batch_size, -1, self.d_model)
        
        return self.w_o(attn_output)


In [None]:
class FeedForward(nn.Module):
    """Position-wise feed-forward network."""
    
    def __init__(self, d_model: int, d_ff: int, dropout: float = 0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.linear2(self.dropout(F.relu(self.linear1(x))))

class TransformerBlock(nn.Module):
    """Single transformer encoder block."""
    
    def __init__(self, d_model: int, num_heads: int, d_ff: int, dropout: float = 0.1):
        super().__init__()
        self.attention = MultiHeadAttention(d_model, num_heads, dropout)
        self.feed_forward = FeedForward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None) -> torch.Tensor:
        # Self-attention with residual connection and layer norm
        attn_output = self.attention(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        
        # Feed-forward with residual connection and layer norm
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        
        return x

class TimeSeriesTransformer(nn.Module):
    """Complete transformer model for time series forecasting."""
    
    def __init__(
        self,
        input_dim: int,
        d_model: int = 512,
        num_heads: int = 8,
        num_layers: int = 6,
        d_ff: int = 2048,
        max_seq_length: int = 5000,
        output_dim: int = 1,
        prediction_length: int = 1,
        dropout: float = 0.1
    ):
        super().__init__()
        
        self.d_model = d_model
        self.prediction_length = prediction_length
        
        # Input projection
        self.input_projection = nn.Linear(input_dim, d_model)
        
        # Positional encoding
        self.pos_encoding = PositionalEncoding(d_model, max_seq_length, dropout)
        
        # Transformer blocks
        self.transformer_blocks = nn.ModuleList([
            TransformerBlock(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        
        # Output projection
        self.output_projection = nn.Linear(d_model, output_dim * prediction_length)
        
        # Initialize weights
        self.init_weights()
    
    def init_weights(self):
        """Initialize model weights."""
        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)
    
    def create_padding_mask(self, x: torch.Tensor, lengths: Optional[torch.Tensor] = None) -> torch.Tensor:
        """Create padding mask for variable length sequences."""
        if lengths is None:
            return None
        
        batch_size, max_len = x.size(0), x.size(1)
        mask = torch.arange(max_len, device=x.device).expand(
            batch_size, max_len) < lengths.unsqueeze(1)
        return mask.unsqueeze(1).unsqueeze(2)
    
    def forward(self, x: torch.Tensor, lengths: Optional[torch.Tensor] = None) -> torch.Tensor:
        """
        Forward pass of the transformer.
        
        Args:
            x: Input tensor of shape (batch_size, seq_length, input_dim)
            lengths: Optional tensor of actual sequence lengths for each batch item
            
        Returns:
            Output tensor of shape (batch_size, prediction_length, output_dim)
        """
        # Input projection and scaling
        x = self.input_projection(x) * math.sqrt(self.d_model)
        
        # Add positional encoding
        x = x.transpose(0, 1)  # (seq_length, batch_size, d_model)
        x = self.pos_encoding(x)
        x = x.transpose(0, 1)  # (batch_size, seq_length, d_model)
        
        # Create padding mask
        mask = self.create_padding_mask(x, lengths)
        
        # Apply transformer blocks
        for transformer_block in self.transformer_blocks:
            x = transformer_block(x, mask)
        
        # Use the last token for prediction (or average pool)
        if self.prediction_length == 1:
            # For single-step prediction, use the last token
            x = x[:, -1, :]  # (batch_size, d_model)
        else:
            # For multi-step prediction, use average pooling
            if lengths is not None:
                # Mask and average only valid tokens
                mask_expanded = mask.squeeze(1).squeeze(1)  # (batch_size, seq_length)
                x = (x * mask_expanded.unsqueeze(-1)).sum(dim=1) / lengths.unsqueeze(-1).float()
            else:
                x = x.mean(dim=1)  # (batch_size, d_model)
        
        # Output projection
        output = self.output_projection(x)  # (batch_size, output_dim * prediction_length)
        
        # Reshape for multi-step prediction
        if self.prediction_length > 1:
            output = output.view(-1, self.prediction_length, output.size(-1) // self.prediction_length)
        
        return output

# Example usage and training utilities
class TimeSeriesDataset(torch.utils.data.Dataset):
    """Simple dataset class for time series data."""
    
    def __init__(self, data: np.ndarray, seq_length: int, prediction_length: int = 1):
        self.data = data
        self.seq_length = seq_length
        self.prediction_length = prediction_length
        
    def __len__(self):
        return len(self.data) - self.seq_length - self.prediction_length + 1
    
    def __getitem__(self, idx):
        x = self.data[idx:idx + self.seq_length]
        y = self.data[idx + self.seq_length:idx + self.seq_length + self.prediction_length]
        return torch.FloatTensor(x), torch.FloatTensor(y)

def create_sample_data(n_samples: int = 1000, n_features: int = 1, seq_length: int = 100):
    """Create sample time series data for testing."""
    # Generate synthetic time series (sine wave with noise)
    t = np.linspace(0, 4 * np.pi, n_samples)
    data = np.sin(t) + 0.1 * np.random.randn(n_samples)
    
    if n_features > 1:
        # Add additional features
        additional_features = np.random.randn(n_samples, n_features - 1) * 0.1
        data = np.column_stack([data, additional_features])
    else:
        data = data.reshape(-1, 1)
    
    return data

# Example usage
if __name__ == "__main__":
    # Model parameters
    input_dim = 1  # Number of features
    seq_length = 50  # Input sequence length
    prediction_length = 10  # Number of steps to predict
    batch_size = 32
    
    # Create model
    model = TimeSeriesTransformer(
        input_dim=input_dim,
        d_model=128,
        num_heads=8,
        num_layers=4,
        d_ff=512,
        prediction_length=prediction_length,
        dropout=0.1
    )
    
    # Generate sample data
    data = create_sample_data(n_samples=1000, n_features=input_dim)
    dataset = TimeSeriesDataset(data, seq_length, prediction_length)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
    
    # Example forward pass
    sample_batch = next(iter(dataloader))
    x, y = sample_batch
    
    print(f"Input shape: {x.shape}")
    print(f"Target shape: {y.shape}")
    
    # Forward pass
    with torch.no_grad():
        predictions = model(x)
        print(f"Prediction shape: {predictions.shape}")
        
    print(f"\nModel has {sum(p.numel() for p in model.parameters()):,} parameters")
    print("Model created successfully!")

Input shape: torch.Size([32, 50, 1])
Target shape: torch.Size([32, 10, 1])
Prediction shape: torch.Size([32, 10, 1])

Model has 794,634 parameters
Model created successfully!
