In [1]:
import os
import random
import json

def generate_nback_sequences(n, alphabet, seq_length, matches, num_sequences):
    sequences = []
    for _ in range(num_sequences):
        seq, conditions = [], []
        match_positions = random.sample(range(n, seq_length), matches)
        match_positions.sort()

        for i in range(seq_length):
            if i in match_positions:
                seq.append(seq[i - n])
                conditions.append('m')
            else:
                random_letter = random.choice(alphabet)
                while i > n and seq[i - n] == random_letter:
                    random_letter = random.choice(alphabet)
                seq.append(random_letter)
                conditions.append('-')
        
        sequences.append((''.join(seq), ''.join(conditions)))

    return sequences

def save_sequences_to_json(sequences, output_file):
    data = []
    for seq, conditions in sequences:
        data.append({
            "input": seq,
            "target": conditions
        })
    with open(output_file, 'w') as f:
        json.dump(data, f, indent=4)

alphabet = 'bcdfghjklnpqrstvwxyz'
seq_length = 24
matches = 8
num_sequences_train = 800
num_sequences_test = 200

for n in [1, 2, 3]:
    sequences_train = generate_nback_sequences(n, alphabet, seq_length, matches, num_sequences_train)
    sequences_test = generate_nback_sequences(n, alphabet, seq_length, matches, num_sequences_test)
    
    save_sequences_to_json(sequences_train, f'nback_{n}_train.json')
    save_sequences_to_json(sequences_test, f'nback_{n}_test.json')


In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import json

# Define the dataset
class NBackDataset(Dataset):
    def __init__(self, json_file, char_to_idx, label_to_idx):
        self.data = []
        with open(json_file, 'r') as f:
            data_json = json.load(f)
            for item in data_json:
                input_seq = [char_to_idx[char] for char in item['input']]
                target_seq = [label_to_idx[label] for label in item['target']]
                self.data.append((torch.tensor(input_seq), torch.tensor(target_seq)))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

# Define the model
class Seq2SeqTransformer(nn.Module):
    def __init__(self, input_dim, model_dim, num_heads, num_layers, output_dim):
        super(Seq2SeqTransformer, self).__init__()
        self.embedding = nn.Embedding(input_dim, model_dim)
        self.pos_encoder = nn.Embedding(5000, model_dim)  # Assuming max sequence length of 5000
        self.transformer = nn.Transformer(d_model=model_dim, nhead=num_heads, num_encoder_layers=num_layers, num_decoder_layers=num_layers)
        self.fc_out = nn.Linear(model_dim, output_dim)

    def forward(self, src, tgt):
        src_seq_len, tgt_seq_len = src.size(0), tgt.size(0)
        src_pos = torch.arange(0, src_seq_len).unsqueeze(1).expand(src_seq_len, src.size(1)).to(src.device)
        tgt_pos = torch.arange(0, tgt_seq_len).unsqueeze(1).expand(tgt_seq_len, tgt.size(1)).to(tgt.device)
        
        src = self.embedding(src) + self.pos_encoder(src_pos)
        tgt = self.embedding(tgt) + self.pos_encoder(tgt_pos)
        
        transformer_out = self.transformer(src, tgt)
        output = self.fc_out(transformer_out)
        return output


In [3]:

# Hyperparameters
alphabet = 'bcdfghjklnpqrstvwxyz'
conditions = ['-', 'm']
char_to_idx = {char: idx for idx, char in enumerate(alphabet)}
label_to_idx = {label: idx for idx, label in enumerate(conditions)}
input_dim = len(alphabet)
output_dim = len(conditions)
model_dim = 512
num_heads = 8
num_layers = 6
learning_rate = 0.0001
batch_size = 32
num_epochs = 10


In [4]:

# Load the data
train_dataset = NBackDataset('nback_1_train.json', char_to_idx, label_to_idx)  # Change the file name as needed
test_dataset = NBackDataset('nback_1_test.json', char_to_idx, label_to_idx)    # Change the file name as needed
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Model, Loss, Optimizer
device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')
model = Seq2SeqTransformer(input_dim, model_dim, num_heads, num_layers, output_dim).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training Loop
for epoch in range(num_epochs):
    model.train()
    for src, tgt in train_loader:
        src, tgt = src.to(device), tgt.to(device)
        outputs = model(src, tgt[:-1, :])
        loss = criterion(outputs.view(-1, output_dim), tgt[1:, :].view(-1))

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')


: 

In [None]:

# Evaluation
model.eval()
with torch.no_grad():
    total_loss = 0
    for src, tgt in test_loader:
        # print the input and target sequences
        print('Input:', ''.join([alphabet[char] for char in src[:, 0].tolist()]))
        print('Target:', ''.join([conditions[label] for label in tgt[:, 0].tolist()]))
        src, tgt = src.to(device), tgt.to(device)
        outputs = model(src, tgt[:-1, :])
        # print the predicted sequence
        predicted = torch.argmax(outputs, dim=-1)
        print('Predicted:', ''.join([conditions[label] for label in predicted[:, 0].tolist()]))
        loss = criterion(outputs.view(-1, output_dim), tgt[1:, :].view(-1))
        total_loss += loss.item()

    avg_loss = total_loss / len(test_loader)
    print(f'Test Loss: {avg_loss:.4f}')
