In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
import numpy as np
import pandas as pd
import exrex
import random
from sklearn.model_selection import KFold

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

def generate_matching_strings(regex, num_samples, seq_length):
    # Generate strings that match the regular expression
    generated_strings = [exrex.getone(regex) for _ in range(num_samples)]
    generated_strings = [s[:seq_length] for s in generated_strings]
    return generated_strings

def generate_random_strings(alphabet, num_samples, seq_length):
    # Generate strings of specified length over the specified alphabet
    alphabet = ''.join(alphabet)
    res = [''.join(random.choices(alphabet, k=random.randint(0, seq_length))) for _ in range(num_samples)]
    return res

def generate_data(regex, alphabet, num_samples, seq_length, correct_proportion):
    num_correct = int(num_samples * correct_proportion)
    num_incorrect = num_samples - num_correct

    # Generate the correct and incorrect strings
    correct_data = generate_matching_strings(regex, num_correct, seq_length)
    incorrect_data = generate_random_strings(alphabet, num_incorrect, seq_length)

    # Combine them into one dataset with labels
    data = correct_data + incorrect_data
    labels = [1]*len(correct_data) + [0]*len(incorrect_data)

    # Shuffle data and labels in unison
    combined = list(zip(data, labels))
    random.shuffle(combined)
    data[:], labels[:] = zip(*combined)

    return data, labels

def sequences_to_one_hot(sequences, alphabet):
    alphabet_index = {char: i for i, char in enumerate(alphabet)}
    max_length = max(len(seq) for seq in sequences) if sequences else 1  # Ensure at least one time step

    one_hot_tensor = torch.zeros((len(sequences), max_length, len(alphabet)), dtype=torch.float32)
    for i, seq in enumerate(sequences):
        for j, char in enumerate(seq):
            one_hot_tensor[i, j, alphabet_index[char]] = 1

    sequence_lengths = torch.tensor([len(seq) if len(seq) > 0 else 1 for seq in sequences], dtype=torch.int64)
    return one_hot_tensor, sequence_lengths


class SimpleRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, noise_std=0.1):
        super(SimpleRNN, self).__init__()
        self.hidden_size = hidden_size
        self.noise_std = noise_std
        self.rnn = nn.RNN(input_size, hidden_size, nonlinearity='tanh', batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, lengths, scale_factor):
        x_packed = pack_padded_sequence(x, lengths, batch_first=True, enforce_sorted=False)
        
        # Initialize hidden state with noise and scale
        h_0 = (torch.randn(1, x.size(0), self.hidden_size, device=x.device) * self.noise_std) * scale_factor

        packed_output, h_n = self.rnn(x_packed, h_0)
        output, _ = pad_packed_sequence(packed_output, batch_first=True)

        # Scale hidden states and add noise
        noisy_output = (output * scale_factor) + (torch.randn_like(output) * self.noise_std)

        output = self.fc(noisy_output)
        output = torch.sigmoid(output[:, -1, :])
        return output


# Updating the train function to include scale_factor
def train(model, criterion, optimizer, data, lengths, labels, scale_factor, epochs=10):
    model.train()
    loss_vector = []
    gradient_norms = []

    for epoch in range(epochs):
        epoch_gradients = []
        for i in range(len(data)):
            inputs = data[i].unsqueeze(0).to(device)
            seq_length = lengths[i].unsqueeze(0)
            target = torch.tensor([labels[i]], dtype=torch.float).unsqueeze(1).to(device)

            optimizer.zero_grad()
            outputs = model(inputs, seq_length, scale_factor)
            loss = criterion(outputs, target)
            loss.backward()
            gradients = torch.sqrt(sum(p.grad.norm()**2 for p in model.parameters() if p.grad is not None))
            epoch_gradients.append(gradients.item())
            optimizer.step()

        loss_vector.append(loss.item())
        gradient_norms.append(epoch_gradients)

    return loss_vector, gradient_norms

# Updating the test function to include scale_factor
def test(model, data, lengths, labels, scale_factor):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for i in range(len(data)):
            inputs = data[i].unsqueeze(0).to(device)
            seq_length = lengths[i].unsqueeze(0)
            outputs = model(inputs, seq_length, scale_factor)
            predicted = outputs.round()
            total += 1
            correct += (predicted.item() == labels[i])

    accuracy = correct / total
    return accuracy

# Define regexes for testing
regexes = ['(11|00)*', '(0|1)*0{2,4}(0|1)*1{2,4}', '(abc|xyz){3}', '(abc|xyz){6}']
alphabets = [['0', '1'], ['0', '1'], ['a', 'b', 'c', 'x', 'y', 'z'], ['a', 'b', 'c', 'x', 'y', 'z']]

use_logarithmic_scaling = True  # Set to False for linear scaling
scaling_factors = [1, 3, 5, 9, 10, 30, 50, 90, 100, 300, 500, 900] if use_logarithmic_scaling else [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
seq_lengths = [5, 10, 20, 40, 80, 160]  # Example sequence lengths
hidden_sizes = [2, 4, 8, 16, 32]  # Example hidden state dimensions

PROPORTION = 0.5
NUM_SAMPLES = 1000
TRAIN_RATIO = 0.8

# Loop through each regex
for regex_index, regex in enumerate(regexes):
    print(f"\nTesting Regex: {regex}")
    results = []
    alphabet = alphabets[regex_index]
    INPUT_SIZE = len(alphabet)  # Adjusted for one-hot encoding

    for scale_factor in scaling_factors:
        for seq_length in seq_lengths:
            for hidden_size in hidden_sizes:
                print(f"\nScale Factor: {scale_factor}, Sequence Length: {seq_length}, Hidden Size: {hidden_size}")
                print("=======================================")

                data, labels = generate_data(regex, alphabet, NUM_SAMPLES, seq_length, PROPORTION)
                data, sequence_lengths = sequences_to_one_hot(data, alphabet)

                # Split data into training and testing
                split_index = int(len(data) * TRAIN_RATIO)
                train_data, test_data = data[:split_index], data[split_index:]
                train_lengths, test_lengths = sequence_lengths[:split_index], sequence_lengths[split_index:]
                train_labels, test_labels = labels[:split_index], labels[split_index:]

                model = SimpleRNN(INPUT_SIZE, hidden_size, OUTPUT_SIZE).to(device)
                criterion = nn.BCEWithLogitsLoss()
                optimizer = optim.Adam(model.parameters(), lr=0.001)

                loss_vector, gradient_norms = train(model, criterion, optimizer, train_data, train_lengths, train_labels, scale_factor)
                accuracy = test(model, test_data, test_lengths, test_labels, scale_factor)

                results.append({'Scale Factor': scale_factor,
                                'Sequence Length': seq_length,
                                'Hidden Size': hidden_size,
                                'Loss Vector': loss_vector, 
                                'Accuracy': accuracy
                                # ,'Average Gradient Norms': [np.mean(epoch) for epoch in zip(*gradient_norms)]
                })
                print(f"Accuracy: {accuracy}")

    # Save results to CSV
    df = pd.DataFrame(results)
    df.to_csv(f'asymptotic_analysis_{regex}.csv', index=False)