In [1]:
import networkx as nx
import numpy as np
import torch_geometric
import torch

import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset 

import matplotlib.pyplot as plt

from torch_geometric.data import Dataset, Data
from torch_geometric.loader.dataloader import DataLoader

from GraphDataset import RandomGraphDataset
from GraphToSequence import graphToSequence, sequenceToGraph

from SimpleTransformer import SimpleTransformer

In [2]:
data_folder_training = "/eos/user/c/czeh/graphsequencer/fixed_edge_graph_training"
data_folder_test = "/eos/user/c/czeh/graphsequencer/fixed_edge_graph_test"


dataset_training = RandomGraphDataset(data_folder_training, nodes=100, edges=20, data_count=1000)
dataset_test = RandomGraphDataset(data_folder_test, nodes=100,  edges=20, data_count=50)

In [3]:
def string_list_to_char_tensor(strings, char_to_index=None, max_len=-1):
    """Converts a list of strings to a character-level encoded tensor.

    Args:
        strings: A list of strings.

    Returns:
        A LongTensor where each row represents a string, and each element
        in the row is the character index.  Returns None if the input
        list is empty.
    """

    if not strings:
        return None

    if not char_to_index:
        # 1. Create a vocabulary (mapping char -> index)
        all_chars = sorted(list(set("".join(strings))))  # Unique characters
        char_to_index = {char: index for index, char in enumerate(all_chars)}

    # 2. Determine maximum string length for padding
    if max_len <= 0:
        max_len = max(len(s) for s in strings)

    # 3. Create the tensor
    tensor = torch.zeros(len(strings), max_len, dtype=torch.long)  # Initialize with padding (0)

    # 4. Populate the tensor
    for i, string in enumerate(strings):
        for j, char in enumerate(string):
            tensor[i, j] = char_to_index[char]

    return char_to_index, tensor

In [4]:
def generate_data(num_samples, sequence, sequence_length):
    idx = np.random.randint(0, len(seq)-3, (num_samples,))
    vals = [seq[idx[i]:idx[i]+sequence_length+1] for i in idx]
    char_to_index, inputs = string_list_to_char_tensor(vals)
    inputs = torch.randint(0, vocab_size, (num_samples, sequence_length))
    return char_to_index, inputs[:,:-1], inputs[:,1:]

In [5]:
def char_tensor_to_string_list(char_to_index, list):
    index_to_char = {index: char for char, index in char_to_index.items()}
    for row in char_tensor:
        decoded_string = "".join([index_to_char[idx.item()] for idx in row if idx != 0]) #Ignore padding (0)
        print(decoded_string)

In [6]:
# optional: plot the training loss and save to a file
plot_training_loss = True

# Check for GPU availability
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = "cpu"

# Hyperparameters
seq = "4.8.;13.11.(12.6.(7.9.10.5.*6.3.1.2.*11.))"
embed_size = 512
sequence_length = 12
vocab_size = 14  # Simple vocab size for demonstration
num_samples = 10000
batch_size = 32
learning_rate = 0.001
epochs = 100

In [7]:
# Model, loss, and optimizer
model = SimpleTransformer(embed_size, sequence_length, vocab_size).to(device)
loss_fn = nn.CrossEntropyLoss()

# Optionally introduce weight decay
# optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-5)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Generating data
char_to_index, inputs, targets = generate_data(num_samples, seq, sequence_length)

# Create a TensorDataset to hold the inputs and targets
dataset = TensorDataset(inputs, targets)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [8]:
inputs

tensor([[10, 12,  7,  ..., 10, 13, 10],
        [ 2,  3,  5,  ...,  2,  5, 10],
        [ 3,  3, 13,  ...,  7,  0,  2],
        ...,
        [ 9,  6,  9,  ...,  4,  4,  5],
        [12,  9,  0,  ...,  0,  3,  0],
        [ 2, 11,  3,  ..., 10, 13, 10]])

In [9]:
targets

tensor([[12,  7,  9,  ..., 13, 10,  6],
        [ 3,  5,  2,  ...,  5, 10,  3],
        [ 3, 13,  4,  ...,  0,  2,  9],
        ...,
        [ 6,  9,  3,  ...,  4,  5, 12],
        [ 9,  0,  8,  ...,  3,  0, 10],
        [11,  3,  7,  ..., 13, 10,  0]])

In [None]:
losses_per_epoch = []

# Optionally introduce gradient clipping
# torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

# Training loop
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for input_batch, target_batch in dataloader:
        input_batch, target_batch = input_batch.to(device), target_batch.to(device)

        optimizer.zero_grad()
        output = model(input_batch)
        loss = loss_fn(output.view(-1, vocab_size), target_batch.view(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    loss = total_loss / len(dataloader)
    print(f"Epoch {epoch+1}, Loss: {loss:.6f}")
    losses_per_epoch.append(loss)

In [None]:
# Example sequence
_, sample_sequence = string_list_to_char_tensor(['4.8.;13.11.('], char_to_index, sequence_length)
sample_tensor = (
    torch.tensor(sample_sequence.numpy()[0], dtype=torch.long).unsqueeze(0).to(device)
)  # Add batch dimension and send to device

model.eval()  # Set the model to evaluation mode
with torch.no_grad():  # Disable gradient computation for inference
    predictions = model(sample_tensor)
    predicted_index = predictions.argmax(
        -1
    )  # Get the index of the max log-probability for the last position

predicted_number = predicted_index[0, -1].item()  # Convert to Python number
print(f"Input Sequence: {sample_sequence}")
print(f"Predicted Next Number: {predicted_number}")

if plot_training_loss:
    plt.plot(losses_per_epoch)
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title("Training Loss")

In [None]:
sample_tensor