In [None]:
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import Dataset, DataLoader, TensorDataset
import matplotlib.pyplot as plt
import random
import torch.optim as optim

# Set random seeds for reproducibility
torch.manual_seed(42)
random.seed(42)
np.random.seed(42)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Problem 1(5 points)

## Problem Description

We will apply **Natural Language Processing (NLP)** methods to perform a **binary classification task**: determining whether a given number `x` is divisible by 3.

The input `x` is provided as a sequence of **digits**, with each digit treated as a separate **input token**. Using this tokenized representation, the model learns to classify the input sequence as:

- `1` (True) if `x` is divisible by 3  
- `0` (False) otherwise


In [None]:
INPUT_SIZE = 10  # One-hot encoding for digits 0-9
HIDDEN_SIZE = 64
OUTPUT_SIZE = 2  # Binary classification: divisible by 3 or not
BATCH_SIZE = 128
EPOCHS = 300
LEARNING_RATE = 0.001
SEQ_LENGTH = 8  # Maximum length of input number



The following function, `generate_data`, creates synthetic labeled data for training a model to classify whether a number is divisible by 3. It operates as follows:

### Function: `generate_data(num_samples, max_length=SEQ_LENGTH)`

- **Inputs:**
  - `num_samples`: Number of data samples to generate.
  - `max_length`: Maximum number of digits per sample (default is `SEQ_LENGTH`).

- **Process:**
  1. For each sample, a random sequence of digits is generated with a length between 1 and `max_length`.
  2. The sequence is **zero-padded** at the beginning so that all samples have the same length (`max_length`), which is necessary for batch processing in neural networks.
  3. The label is determined based on whether the **sum of the digits** is divisible by 3 — a standard rule for checking divisibility by 3.

- **Output:**
  - `X`: A NumPy array of shape `(num_samples, max_length)` containing the padded digit sequences.
  - `y`: A NumPy array of shape `(num_samples,)` containing binary labels:  
    - `1` if the number is divisible by 3  
    - `0` otherwise


In [None]:
def generate_data(num_samples, max_length=SEQ_LENGTH):
    X = []
    y = []

    for _ in range(num_samples):
        # <YOUR CODE>

    return np.array(X), np.array(y)

In [None]:


def one_hot_encode(X):
    # Create one-hot encoding for digits 0-9
    X_one_hot = np.zeros((X.shape[0], X.shape[1], INPUT_SIZE))
    for i in range(X.shape[0]):
        for j in range(X.shape[1]):
            if X[i, j] > 0:  # Skip padding zeros
              X_one_hot[i, j, X[i, j]] = 1
    return X_one_hot


# <YOUR CODE>: REWRITE THE FOLLOWING PART USING split without repetitions of functions.

# train_X, train_y = generate_data(10000)
# val_X, val_y = generate_data(2000)
# test_X, test_y = generate_data(1000)

# train_X_one_hot = one_hot_encode(train_X)
# val_X_one_hot = one_hot_encode(val_X)
# test_X_one_hot = one_hot_encode(test_X)

# # Convert to PyTorch tensors
# train_X_tensor = torch.FloatTensor(train_X_one_hot)
# train_y_tensor = torch.LongTensor(train_y)
# val_X_tensor = torch.FloatTensor(val_X_one_hot)
# val_y_tensor = torch.LongTensor(val_y)
# test_X_tensor = torch.FloatTensor(test_X_one_hot)
# test_y_tensor = torch.LongTensor(test_y)

In [None]:
train_dataset = TensorDataset(train_X_tensor, train_y_tensor)
val_dataset = TensorDataset(val_X_tensor, val_y_tensor)
test_dataset = TensorDataset(test_X_tensor, test_y_tensor)



train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)


In [None]:
class DivisibilityRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(DivisibilityRNN, self).__init__()
        self.hidden_size = hidden_size
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        # <YOUR CODE>
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        batch_size = x.size(0)
        # Initialize hidden state
        h0 = torch.zeros(1, batch_size, self.hidden_size).to(x.device)

        # Forward propagate the RNN
        out, _ = self.rnn(x, h0)

        # Only use the output from the last time step
        out = # <YOUR CODE>
        return out

In [None]:
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs):
    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []

    for epoch in range(epochs):
        model.train()
        train_loss = 0
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = # <YOUR CODE>
            loss = # <YOUR CODE>
            # <YOUR CODE>

            train_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_loss = train_loss / len(train_loader)
        train_accuracy = 100 * correct / total
        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)

        # Validation
        model.eval()
        val_loss = 0
        correct = 0
        total = 0

        with torch.no_grad():
            for inputs, labels in val_loader:
                outputs = # <YOUR CODE>
                loss = # <YOUR CODE>

                val_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        val_loss = val_loss / len(val_loader)
        val_accuracy = 100 * correct / total
        val_losses.append(val_loss)
        val_accuracies.append(val_accuracy)
        if epoch % 10 == 9:
          print(f'Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.2f}%')

    return train_losses, val_losses, train_accuracies, val_accuracies

In [None]:


# Create model, loss function and optimizer
model = DivisibilityRNN(INPUT_SIZE, HIDDEN_SIZE, OUTPUT_SIZE)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

# Train the model
train_losses, val_losses, train_accuracies, val_accuracies = train_model(
    model, train_loader, val_loader, criterion, optimizer, EPOCHS
)

# Plot training and validation metrics
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Loss')

plt.subplot(1, 2, 2)
plt.plot(train_accuracies, label='Training Accuracy')
plt.plot(val_accuracies, label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.legend()
plt.title('Training and Validation Accuracy')
plt.tight_layout()
plt.show()

# Evaluate on test set
model.eval()
test_loss = 0
correct = 0
total = 0

with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        test_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

test_loss = test_loss / len(test_loader)
test_accuracy = 100 * correct / total
print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%')

# Test with specific examples
def test_number(model, number):
    # Convert number to digits
    digits = [int(d) for d in str(number)]

    # Pad with zeros
    padded_digits = np.zeros(SEQ_LENGTH, dtype=int)
    padded_digits[-len(digits):] = digits

    # Convert to one-hot encoding
    one_hot = np.zeros((1, SEQ_LENGTH, INPUT_SIZE))
    for j in range(SEQ_LENGTH):
        if padded_digits[j] > 0:
            one_hot[0, j, padded_digits[j]] = 1

    # Convert to tensor
    input_tensor = torch.FloatTensor(one_hot)

    # Predict
    model.eval()
    with torch.no_grad():
        output = model(input_tensor)
        _, predicted = torch.max(output.data, 1)

    is_divisible_by_3 = number % 3 == 0
    prediction = bool(predicted.item())

    print(f"Number: {number}")
    print(f"Actually divisible by 3: {is_divisible_by_3}")
    print(f"Model prediction: {prediction}")
    print(f"Prediction {'correct' if is_divisible_by_3 == prediction else 'incorrect'}\n")

# Test some examples
test_number(model, 123)  # Divisible by 3 (1+2+3=6, 6%3=0)
test_number(model, 456)  # Divisible by 3 (4+5+6=15, 15%3=0)
test_number(model, 7890) # Divisible by 3 (7+8+9+0=24, 24%3=0)
test_number(model, 12)   # Divisible by 3 (1+2=3, 3%3=0)
test_number(model, 5)    # Not divisible by 3 (5%3=2)
test_number(model, 1234) # Not divisible by 3 (1+2+3+4=10, 10%3=1)

# YOUR Baseline is 95% Validation accuracy




# Problem 2 (5 Points)

## Problem Description

We will apply **Natural Language Processing (NLP)** methods to learn a sequence-to-sequence mapping task: predicting the output sequence corresponding to an input sequence of digits.

Specifically, the model will be trained to predict `3x` given `x`, where `x` is a sequence of digits. Each digit is treated as an individual **input token**, and the model learns to generate the sequence of digits representing `3x` as the output.

This setup treats digit sequences as a language and leverages NLP architectures such as **LSTM** to model the transformation.

**LSTM** is a special type of Recurrent Neural Network (RNN) designed to capture long-term dependencies and mitigate the vanishing gradient problem found in traditional RNNs.

> 🧠 **Note:** In our problem, we will use an **LSTM** instead of a vanilla **RNN** to better handle long-range dependencies in the sequence data.

## Key Features

- Designed to remember information for long periods.
- Uses gates to control the flow of information.
- Effective for tasks like language modeling, time-series forecasting, and machine translation.

## LSTM Cell Structure

An LSTM cell contains the following components:

1. **Forget Gate** (`f_t`): Decides what information to discard from the cell state.
   $$
   f_t = \sigma(W_f \cdot [h_{t-1}, x_t] + b_f)
   $$

2. **Input Gate** (`i_t`) and **Candidate Cell State** (`\tilde{C}_t`): Decide what new information to store.
   $$
   i_t = \sigma(W_i \cdot [h_{t-1}, x_t] + b_i)
   $$
   $$
   \tilde{C}_t = \tanh(W_C \cdot [h_{t-1}, x_t] + b_C)
   $$

3. **Cell State Update** (`C_t`):
   $$
   C_t = f_t \cdot C_{t-1} + i_t \cdot \tilde{C}_t
   $$

4. **Output Gate** (`o_t`) and Hidden State (`h_t`): Decide the output.
   $$
   o_t = \sigma(W_o \cdot [h_{t-1}, x_t] + b_o)
   $$
   $$
   h_t = o_t \cdot \tanh(C_t)
   $$

## LSTM Cell Diagram

![LSTM Cell](https://drive.google.com/uc?export=view&id=1ddc2g4NFCy4Tt-K0Qu6-2htu5B2VHJEM)

## Advantages over Vanilla RNNs

- Better at preserving long-term dependencies.
- Reduces vanishing gradient issues.
- Widely used in NLP and sequence modeling.

In [None]:
input_size = 10  # 0-9 digits
hidden_size = 64
num_layers = 1
output_size = 10  # 0-9 digits
batch_size = 128
learning_rate = 0.1
num_epochs = 10


In [None]:
def generate_dataset(size=10000):
    X = []
    y = []

    # Generate numbers from 1 to 9999
    numbers = random.sample(range(1, 10000), size)

    for num in numbers:
        # Format input as 4 digits with leading zeros if needed
        input_digits = f"{num:04d}"

        # Calculate the result (multiply by 3)
        result = num * 3

        # Format output as 5 digits with leading zeros if needed
        output_digits = f"{result:05d}"

        # Convert to digit lists
        input_seq = [int(d) for d in input_digits]
        output_seq = [int(d) for d in output_digits]

        # reverse numbers
        input_seq.reverse()
        output_seq.reverse()

        X.append(input_seq)
        y.append(output_seq)

    return X, y

# Custom Dataset class
class MultiplicationDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        # One-hot encode the input and output
        input_tensor = torch.zeros(len(self.X[idx]), input_size)
        target_tensor = torch.zeros(len(self.y[idx]), output_size)

        for i, digit in enumerate(self.X[idx]):
            input_tensor[i, digit] = 1

        for i, digit in enumerate(self.y[idx]):
            target_tensor[i, digit] = 1

        return input_tensor, target_tensor

# Encoder model
class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size

        self.rnn = nn.LSTM(input_size, hidden_size, num_layers=num_layers, bidirectional=True,
                          batch_first=True)


    def forward(self, x):
        # x shape: [batch_size, seq_len, input_size]
        output, hidden = self.rnn(x)
        return output, hidden

# Decoder model
class Decoder(nn.Module):
    def __init__(self, hidden_size, output_size, rnn_type='LSTM'):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.rnn = nn.LSTM(input_size, hidden_size, num_layers=num_layers, bidirectional=True,
                          batch_first=True)

        self.out = nn.Linear(hidden_size*2, output_size)

    def forward(self, x, hidden):
        # <YOUR CODE>






### Teacher Forcing

**Teacher forcing** is a technique used to train RNNs where the true output from the training data is fed as the next input, instead of the model's own prediction.

- It helps the model learn faster and more accurately.
- During training: use the correct previous output.
- During testing: the model must use its own predictions.

**Pros**: Faster learning, better early training.  
**Cons**: May cause errors during inference due to reliance on correct inputs.

In [None]:
# Seq2Seq model
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, teacher_forcing_ratio=0.5):
        # src shape: [batch_size, src_seq_len, input_size]
        batch_size = src.size(0)
        target_len = 5  # Output is always 5 digits
        target_size = self.decoder.out.out_features

        # Tensor to store decoder outputs
        outputs = torch.zeros(batch_size, target_len, target_size).to(self.device)

        # Encode the source sequence
        _, hidden = self.encoder(src)

        # First input to the decoder is the <SOS> token (represented as all zeros)
        decoder_input = torch.zeros(batch_size, 1, target_size).to(self.device)

        # Decode one step at a time
        for t in range(target_len):
            output, hidden = self.decoder(decoder_input, hidden)
            outputs[:, t:t+1, :] = output

            # Teacher forcing
            use_teacher_forcing = random.random() < teacher_forcing_ratio

            # Get the highest predicted token
            top1 = output.argmax(2)

            # If teacher forcing, use actual next token as next input
            # If not, use predicted token
            if use_teacher_forcing and t < target_len - 1:
                # Use target as next input
                # <YOUR CODE>
            else:
                # Use prediction as next input
                # <YOUR CODE>

        return outputs

# Training function
def train(model, train_loader, optimizer, criterion, device, teacher_forcing_ratio=0.5):
    model.train()
    epoch_loss = 0

    for batch_idx, (src, target) in enumerate(train_loader):
        src, target = src.to(device), target.to(device)

        optimizer.zero_grad()
        output = model(src, teacher_forcing_ratio)

        # Reshape output and target for loss calculation
        output_dim = output.shape[-1]
        output = output.view(-1, output_dim)
        target = target.view(-1, output_dim)

        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(train_loader)

In [None]:
def evaluate(model, val_loader, criterion, device):
    model.eval()
    epoch_loss = 0
    correct_predictions = 0
    total_predictions = 0

    with torch.no_grad():
        for batch_idx, (src, target) in enumerate(val_loader):
            src, target = src.to(device), target.to(device)

            output = model(src, 0)  # No teacher forcing during evaluation

            # Reshape output and target for loss calculation
            output_dim = output.shape[-1]
            output = output.view(-1, output_dim)
            target = target.view(-1, output_dim)

            loss = criterion(output, target)
            epoch_loss += loss.item()

            # Calculate accuracy
            pred = output.argmax(dim=1)
            target_indices = target.argmax(dim=1)
            correct_predictions += (pred == target_indices).sum().item()
            total_predictions += pred.size(0)

    accuracy = correct_predictions / total_predictions
    return epoch_loss / len(val_loader), accuracy

def test_model(model, num_examples=5):
    model.eval()
    results = []

    with torch.no_grad():
        for _ in range(num_examples):
            # Generate a random number
            num = random.randint(1, 9999)
            input_digits = f"{num:04d}"[::-1]

            # Expected output
            expected = num * 3
            expected_digits = f"{expected:05d}"[::-1]

            # Prepare input tensor
            input_tensor = torch.zeros(1, 4, input_size)
            for i, digit in enumerate(input_digits):
                input_tensor[0, i, int(digit)] = 1

            input_tensor = input_tensor.to(device)

            # Forward pass
            output = model(input_tensor, 0)

            # Get predicted digits
            predicted_indices = output.argmax(dim=2).squeeze().cpu().numpy()
            predicted_digits = ''.join(str(idx) for idx in predicted_indices)

            results.append({
                'input': input_digits[::-1],
                'expected': expected_digits[::-1],
                'predicted': predicted_digits[::-1],
                'correct': expected_digits == predicted_digits
                        })

    return results

In [None]:
def main(params):
    input_size = params['input_size']
    hidden_size = params['hidden_size']
    num_layers = params['num_layers']
    output_size = params['output_size']
    batch_size = params['batch_size']
    learning_rate = params['learning_rate']
    num_epochs = params['num_epochs']


    # Generate dataset
    X_data, y_data = generate_dataset(size=8000)

    # Split into train and validation sets
    split = int(0.8 * len(X_data))
    X_train, X_val = # <YOUR CODE>
    y_train, y_val = # <YOUR CODE>

    # Create datasets
    train_dataset = MultiplicationDataset(X_train, y_train)
    val_dataset = MultiplicationDataset(X_val, y_val)

    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)

    # Initialize model
    model = # <YOUR CODE>

    # Initialize optimizer and criterion
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.BCEWithLogitsLoss()

    # Training loop
    train_losses = []
    val_losses = []
    val_accuracies = []

    for epoch in range(num_epochs):
        train_loss = train(model, train_loader, optimizer, criterion, device)
        val_loss, val_accuracy = evaluate(model, val_loader, criterion, device)

        train_losses.append(train_loss)
        val_losses.append(val_loss)
        val_accuracies.append(val_accuracy)

        print(f'Epoch: {epoch+1}, \tTrain Loss: {train_loss:.4f}, \tVal Loss: {val_loss:.4f}, \tVal Accuracy: {val_accuracy:.4f}')

    # Plot training and validation loss
    plt.figure(figsize=(10, 5))
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Training and Validation Loss')
    plt.savefig('loss_plot.png')

    # Plot validation accuracy
    plt.figure(figsize=(10, 5))
    plt.plot(val_accuracies, label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.title('Validation Accuracy')
    plt.savefig('accuracy_plot.png')

    # Test the model on some examples
    test_results = test_model(model, 20)

    print("\nTest Results:")
    for result in test_results:
        status = "✓" if result['correct'] else "✗"
        print(f"{result['input']} * 3 = {result['expected']} | Predicted: {result['predicted']} {status}")

    # Calculate overall accuracy
    correct = sum(1 for result in test_results if result['correct'])
    print(f"\nTest Accuracy: {correct / len(test_results):.2f}")

    # Save the model
    torch.save(model.state_dict(), 'multiplication_model.pth')
    print("Model saved as 'multiplication_model.pth'")

In [None]:
params = {
  'input_size': 10,  # 0-9 digits
  'hidden_size': 64,
  'num_layers': 1,
  'output_size': 10,  # 0-9 digits
  'batch_size': 128,
  'learning_rate': 0.1,
  'num_epochs': 20
}

main(params)

# YOUR Baseline is 95% Validation accuracy