### Try COLAB

In [None]:
try:
  import google.colab
  IN_COLAB = True
except:
  IN_COLAB = False
if IN_COLAB:
    !pip3 install torch matplotlib torchmetrics scikit-image segmentation-models-pytorch

# Import

In [None]:
import torch
from torch import nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence # for padding data

import pandas as pd # for making csv file

from sklearn.metrics import accuracy_score, f1_score

import wandb

import seaborn as sns

### CUDA

In [None]:
# GPU Support?
if torch.cuda.is_available():
    print("Using the GPU")
    device = torch.device('cuda')
else:
    print("Using the CPU")
    device = torch.device('cpu')

# Generate Dataset

In [None]:
import random

# Generate a dataset of sequences containing characters 'a', 'b', and 'c' in order
N = 10000 # Size of dataset (number of sequences)
dataset = []

for n in range(N):
    if n % 4 == 0: # 25% of the time add actual member of formal language family
        length = random.randint(1, 6)
        sequence = 'a' * length + 'b' * length + 'c' * length
        dataset.append(sequence)
    else:
        length = random.randint(3, 17)  # Random sequence length between 3 and (20-3 = 17). 3 is the smallest possible length for a sequence to be in the language
        counts = [1, 1, 1] # Initialize counts for 'a', 'b', and 'c'. Will have at least one of each letter.

        # Distribute the length among a, b, and c
        for i in range(length):
            counts[random.randint(0, 2)] += 1

        # Ensure alphabetical order and create the sequence
        sequence = 'a' * counts[0] + 'b' * counts[1] + 'c' * counts[2]
        dataset.append(sequence)

In [None]:
# print max length of dataset
max_len = max([len(s) for s in dataset])
min_len = min([len(s) for s in dataset])
print(min_len, max_len) # Should be 3, 20

### Functions for labels and encoding

In [None]:
# Get labels
def get_labels(data):
    y = torch.zeros(len(data))
    for i, sequence in enumerate(data):
        if sequence == 'a'*(len(sequence)//3) + 'b'*(len(sequence)//3) + 'c'*(len(sequence)//3):
            y[i] = 1
    return y

In [None]:
# Create a one-hot encoding of the sequences and a labels tensor
def one_hot_encode(sequence):
    encoded = torch.zeros(len(sequence), 3)
    for i, char in enumerate(sequence):
        encoded[i, 'abc'.index(char)] = 1
    return encoded

## One-Hot Encoding

In [None]:
encoded_dataset = [one_hot_encode(sequence) for sequence in dataset]
print(encoded_dataset[0], "\n", dataset[0])

In [None]:
# pad sequences
padded_dataset = pad_sequence(encoded_dataset, batch_first=True)
print(padded_dataset.shape)

In [None]:
# Make training data
train_size = int(0.8*N) # 80/20 train/test split
test_size = N - train_size

train_data = padded_dataset[:train_size]
test_data = padded_dataset[train_size:]
y_train = get_labels(dataset[:train_size])
y_test = get_labels(dataset[train_size:])
print(train_data.shape, test_data.shape)
print(y_train.shape, y_test.shape)

In [None]:
y_train.sum()/len(y_train) # % of sequences that are in the language

In [None]:
train_loader = DataLoader(list(zip(train_data, y_train)), batch_size=128, shuffle=True) # HVAD SKAL BATCH SIZE VÆRE??
test_loader = DataLoader(list(zip(test_data, y_test)), batch_size=test_data.size(0), shuffle=False)

In [None]:
# Test train_loader
for data, labels in train_loader:
    print(data.shape, labels.shape)
    print(data[0], labels[0])
    break

# RNN Model

In [None]:
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(RNN, self).__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
        # x -> (batch_length, seq_length, input_size/vocab_size)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, input):
        out, _ = self.rnn(input)
        # out -> (batch_size, seq_length, hidden_size)
        # reshape to get last output
        out = out[:, -1, :]
        out = self.fc(out)
        return out 


# LSTM Model

In [None]:
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(LSTM, self).__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        # x -> (batch_length, seq_length, input_size)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, input):
        out, _ = self.lstm(input)
        # out -> (batch_size, seq_length, hidden_size)
        # reshape to get last output
        out = out[:, -1, :]
        out = self.fc(out)

        return out


# Training

In [None]:
# params
input_size =  3 # 'a' 'b' 'c'
num_classes = 1 # binary classification
hidden_size = 50 # hyperparameter; can be tuned
num_layers = 1 # hyperparameter; can be tuned


criterion = nn.BCEWithLogitsLoss() # "hyperparameter" (maybe BCE without LogitsLoss is better?)
learning_rate = 0.005 # hyperparameter; can be tuned


In [None]:
rnn = RNN(input_size, hidden_size, num_layers, num_classes).to(device)
optimizer = optim.SGD(rnn.parameters(), lr=learning_rate) # SGD (vanishing gradiant midigation)

# Training loop RNN
num_epochs = 200
for epoch in range(num_epochs):
    for data, labels in train_loader:
        data, labels = data.to(device), labels.to(device)
        
        # Forward pass
        outputs = rnn(data)
        loss = criterion(outputs.squeeze(), labels) # BCEWithLogitsLoss expects 1D input, output from RNN is 2D
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    if (epoch+1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
print("Finished Training")

In [None]:
lstm = LSTM(input_size, hidden_size, num_layers, num_classes).to(device)
optimizer = optim.Adam(lstm.parameters(), lr=learning_rate) # Adam (no vanishing gradient problem)

# Training loop LSTM
num_epochs = 100
for epoch in range(num_epochs):
    for data, labels in train_loader:
        data, labels = data.to(device), labels.to(device)

        # Forward pass
        outputs = lstm(data)
        loss = criterion(outputs.squeeze(), labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if (epoch+1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
print("Finished training")

# Artifact

In [None]:
# # Make dataset with labels to csv
# df = pd.DataFrame(dataset)
# df['label'] = get_labels(dataset)
# df.to_csv('formal_language.csv', index=False)

# # Instantiate a WandB run
# wandb.login()
# run = wandb.init(project="formal_language_rnn_lstm")

# # Create an artifact for data
# artifact = wandb.Artifact("formal_language_data", type="dataset") 
# artifact.add_file("formal_language.csv") 
# run.log_artifact(artifact)

# Sweeping

In [None]:
# Sweep config
sweep_config = {
    "method": "random",
    "metric": {"name": "loss", "goal": "minimize"},
    "parameters": {
        "model": { "values": ["RNN", "LSTM"] },
        "epochs": {"values": [200, 500] },    
        "optimizer": { "values": ["SGD", "Adam"] },
        "hidden_size": {
            "values": [2, 20, 50]
        },
        "num_layers": {
            "values": [1, 2]
        },
        "learning_rate": {
            "values": [0.001, 0.01]
        }
    }
}

In [None]:
def train(config=None):
    with wandb.init(project="formal_language_rnn_lstm", config=config):
        config = wandb.config

        # Get hyperparameters
        hidden_size = config.hidden_size
        num_layers = config.num_layers
        learning_rate = config.learning_rate
        num_epochs = config.epochs

        # Input size and number of classes
        num_classes = 1 # binary classification
        input_size = 3 # 'a' 'b' 'c'

        # Set criterion
        criterion = nn.BCEWithLogitsLoss()

        # Get model
        if config.model == "RNN":
            model = RNN(input_size, hidden_size, num_layers, num_classes).to(device)
        else:
            model = LSTM(input_size, hidden_size, num_layers, num_classes).to(device)
        
        # Get optimizer
        if config.optimizer == "SGD":
            optimizer = optim.SGD(model.parameters(), lr=learning_rate)
        else:
            optimizer = optim.Adam(model.parameters(), lr=learning_rate)

        wandb.watch(model, criterion, log="all")

        # Training loop
        for epoch in range(num_epochs):
            for data, labels in train_loader:
                data, labels = data.to(device), labels.to(device)

                # Forward pass
                outputs = model(data)
                loss = criterion(outputs.squeeze(), labels)

                # Backward and optimize
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
            if (epoch+1) % 10 == 0:
                wandb.log({"epoch": epoch+1, "loss": loss.item()})
            if (epoch+1) % 100 == 0:
                print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
        print("Finished training")

In [None]:
# Initialize sweep_id
# sweep_id = wandb.sweep(sweep_config, project="formal_language_rnn_lstm")

In [None]:
# Run the sweep
# wandb.agent(sweep_id, function=train, count=10)

# Evaluation

In [None]:
def report_evaluation(model, loader):
    model.eval()
    y_true = []
    y_pred = []
    with torch.no_grad():
        for data, label in loader:
            data, label = data.to(device), label.to(device)

            # Get the model's predictions
            output = model(data)
            pred = torch.round(torch.sigmoid(output)) # Sigmoid to get probabilities, round to get binary predictions

            y_true.extend(label.tolist())
            y_pred.extend(pred.tolist())

    accuracy = accuracy_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred)
    return accuracy, f1

## Report F1 Score (and accuracy)

In [None]:
rnn_accuracy, rnn_f1 = report_evaluation(rnn, test_loader)
lstm_accuracy, lstm_f1 = report_evaluation(lstm, test_loader)


print(f'RNN Test Accuracy: {rnn_accuracy*100:.2f}%')
print(f'RNN F1 Score: {rnn_f1:.2f}')
print(f'LSTM Test Accuracy: {lstm_accuracy*100:.2f}%')
print(f'LSTM F1 Score: {lstm_f1:.2f}')