# Sheet 6: Training a RNN to learn sinusoidal oscillations

In [1]:
import torch as tc
import torch.nn as nn
import torch.optim as optim
import numpy as np
from matplotlib import pyplot as plt

In [2]:
# TASK 2: Set the hidden size (dimension of z_t)
# Try different values: start with 2, then 4, 8, 16...
# Find the minimum number that works well
hidden_size = None  # TODO: Replace with your choice

# Training parameters
epochs = 500
learning_rate = None # TODO: Try different values

# TASK 4: Mini-batching parameters (uncomment and modify when needed)
# seq_length = None     # Length of sampeld subsequences for training
# batch_size = None     # Number of subsequences per batch

In [None]:
#%% DATA LOADING AND VISUALIZATION

# Load the sinusoidal data: x_t = [sin(t*π/10), cos(t*π/10)] for t=0,...,40
data = tc.load('sinus.pt')
observation_size = data.shape[1]  # Should be 2 (sin and cos components)

print(f"Data shape: {data.shape}")
print(f"Observation size: {observation_size}")
print(f"Time steps: {data.shape[0]}")

# Plot the input data
plt.figure(figsize=(10, 4))
plt.plot(data[:, 0], label='sin(t*π/10)', linewidth=2)
plt.plot(data[:, 1], label='cos(t*π/10)', linewidth=2)
plt.xlabel('Time step')
plt.ylabel('Value')
plt.title('Input Sinusoidal Data')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

# Plot phase space (sin vs cos)
plt.figure(figsize=(6, 6))
plt.plot(data[:, 0], data[:, 1], 'b-', linewidth=2, alpha=0.7)
plt.scatter(data[0, 0], data[0, 1], c='green', s=100, label='Start', zorder=5)
plt.scatter(data[-1, 0], data[-1, 1], c='red', s=100, label='End', zorder=5)
plt.xlabel('sin(t*π/10)')
plt.ylabel('cos(t*π/10)')
plt.title('Phase Space Plot')
plt.legend()
plt.grid(True, alpha=0.3)
plt.axis('equal')
plt.show()

In [None]:
#%% MODEL DEFINITION

class LatentRNN(nn.Module):
    """
    Recurrent Neural Network for learning dynamical systems
    
    Architecture:
    z_t = tanh(C * x_{t-1} + W * z_{t-1} + h)  # Hidden state update
    \hat{x}_t = B * z_t + c                    # Output generation
    """
    
    def __init__(self, obs_dim, latent_dim):
        super(LatentRNN, self).__init__()
        
        self.obs_dim = obs_dim        # Dimension of observations (2 for sin/cos)
        self.latent_dim = latent_dim  # Dimension of hidden state z_t
        
        # TASK 2: Implement the RNN layers here

        # Option 1: Use nn.RNN (easier)
        #or 
        # Option 2: Implement manually with nn.Linear layers (more educational)
        # You need to define the following components:
        # 1. Input-to-hidden transformation: U matrix and bias b
        # 2. Hidden-to-hidden transformation: V matrix  
        # 3. Hidden-to-output transformation: W matrix and bias c
        # Hint: You can use nn.Linear layers or define nn.Parameter matrices
        
        # TODO: Implement the network architecture
        
        pass  # Remove this when you implement the layers
        
    def forward(self, time_series, h0):
        """
        Forward pass through the RNN
        
        Args:
            time_series: Input sequence of shape (seq_len, batch_size, obs_dim)
            h0: Initial hidden state of shape (1, batch_size, latent_dim)
            
        Returns:
            obs_output: Predicted observations of shape (seq_len, batch_size, obs_dim)
            h: Final hidden state of shape (1, batch_size, latent_dim)
        """
        
        # TASK 2: Implement the forward pass
        # The output should be predicted observations and the final hidden state

        # Hint: For each time step, update hidden state and generate output
        
        # TODO: Implement the forward loop
        
        obs_output = None  # Replace with your implementation
        h = None           # Replace with your implementation
        
        return obs_output, h
    

# Initialize the model
model = LatentRNN(observation_size, hidden_size)
# Print model information
print(f"\nModel Architecture:")
print(f"- Observation dimension: {observation_size}")
print(f"- Hidden dimension: {hidden_size}")
print(f"- Total parameters: {sum(p.numel() for p in model.parameters())}")

In [None]:
def train(learning_rate, moment=0, optimizer_function='SGD', print_loss=True, batch_size=1, batch_sequence_length=1):
    """
    Training function with configurable optimizers and mini-batching
    
    Students need to implement the missing parts marked with TODO comments.
    """
    
    # TODO: Implement optimizer selection
    # Create the appropriate optimizer based on optimizer_function parameter
    # Available options: 'SGD' (with momentum support) and 'ADAM'
    # Hint: Use optim.SGD() and optim.Adam() from torch.optim
    
    if optimizer_function == 'SGD':
        # TODO: Initialize SGD optimizer with learning_rate and momentum
        # optimizer = ...
        pass
    elif optimizer_function == 'ADAM':
        # TODO: Initialize Adam optimizer with learning_rate
        # optimizer = ...
        pass
    else:
        raise ValueError(f"Unknown optimizer: {optimizer_function}")
    
    # TODO: Define the loss function
    # Use Mean Squared Error (MSE) loss for this regression task
    # loss_function = ...
    
    losses = []
    
    print(f"\nStarting training for {epochs} epochs...")
    print(f"Optimizer: {optimizer_function}, LR: {learning_rate}, Batch size: {batch_size}, Sequence length: {batch_sequence_length}")
    
    for epoch in range(epochs):
        # TODO: Initialize hidden state for the batch
        # Create a tensor of shape (1, batch_size, hidden_size) with random values
        # h0 = ...
        
        # Prepare full sequences (input and target)
        x = data[:-1]  # Input: all timesteps except the last
        y = data[1:]   # Target: all timesteps except the first
        
        # TODO: Create batch tensors for mini-batching
        # Initialize tensors to hold batch data of shape (batch_sequence_length, batch_size, observation_dim):
        # X = ...
        # Y = ...
        
        # TODO: Implement mini-batching
        # For each element in the batch, sample a random subsequence from the data
        # The subsequence should start at a random index and have length batch_sequence_length
        # Make sure the random index doesn't exceed the data bounds
        
        for j in range(batch_size):
            # TODO: Sample a random starting index for the subsequence
            # Ensure: 0 <= ind <= len(x) - batch_sequence_length
            # ind = ...
            
            # TODO: Extract subsequence and assign to batch tensors
            # X[j] = ...
            # Y[j] = ...
            pass
        
        # TODO: Forward pass
        # 1. Zero the gradients from previous iteration
        # 2. Run the model forward pass with input X and initial hidden state h0
        # 3. Calculate the loss between model output and target Y
        
        # TODO: Backward pass and optimization step
        # 1. Compute gradients via backpropagation
        # 2. Update model parameters

        
        # Store loss for plotting
        losses.append(epoch_loss.item())
        
        # Print progress
        if epoch % 10 == 0 and print_loss:
            print(f"Epoch: {epoch} loss {epoch_loss.item():.6f}")
    
    return losses

In [None]:
# Train the model
losses = train() # TODO: input the right hyperparameters

In [None]:
#%% MODEL EVALUATION AND PREDICTION

# Generate predictions for 5 times the original sequence length
prediction_length = 5 * data.shape[0]

with tc.no_grad():
    # Initialize hidden state and predictions tensor
    # h = 
    # predictions = 
    
    # Start with first data point
    input_ = data[0:1].unsqueeze(1)  # Shape: (1, 1, obs_dim)
    
    # Generate sequence autoregressively, i.e. freely by providing the output as input
    for i in range(prediction_length):
        # TODO: make predictions and use prediction as input
        #predictions[i] =  store predictions


In [None]:
#%% VISUALIZATION OF RESULTS