In [75]:
import os
import json

# Define the paths to the folders
folders = ["data_symbolic_regression/test", "data_symbolic_regression/train", "data_symbolic_regression/val"]

# Dictionary to hold the data from each folder
data = {folder: [] for folder in folders}

# Iterate through each folder and read JSON files
for folder in folders:
    if os.path.exists(folder):
        for file_name in os.listdir(folder):
            if file_name.endswith(".json"):
                file_path = os.path.join(folder, file_name)
                try:
                    with open(file_path, "r") as file:
                        content = json.load(file)
                        data[folder].append(content)
                except Exception as e:
                    print(f"Error reading {file_path}: {e}")
    else:
        print(f"Folder {folder} does not exist.")

# Example: Accessing data
for folder, files in data.items():
    print(f"Folder: {folder}, Number of files: {len(files)}")




Folder: data_symbolic_regression/test, Number of files: 161
Folder: data_symbolic_regression/train, Number of files: 747
Folder: data_symbolic_regression/val, Number of files: 160


In [76]:
if data["data_symbolic_regression/test"]:
    # Access the first item in the test folder
    test = data["data_symbolic_regression/test"]

if data["data_symbolic_regression/train"]:
    # Access the first item in the test folder
    train = data["data_symbolic_regression/train"]

if data["data_symbolic_regression/val"]:
    # Access the first item in the test folder
    val = data["data_symbolic_regression/val"]

In [77]:
len(train)
for i in range(len(train)):
    print(i)
    print(train[i]["formula_human_readable"])

0
((cos(var_1)+(var_2*C_0))*((var_0*var_0)*cos(var_2)))
1
(((var_1+C_0)+tan(var_0))+tanh(exp(var_2)))
2
(pow_2(cosh(var_2))+reverse((var_1*var_0)))
3
((gaussian(var_1)+(var_0+C_0))+tan((var_1*var_2)))
4
(sqrt((var_2*var_0))+neg(cosh(var_1)))
5
(((var_2+var_0)*sqrt(var_1))*tanh((var_1+var_2)))
6
log(((var_0+C_0)*(var_1+var_2)))
7
((sin(var_1)+tanh(var_0))*sinh(sqrt(var_2)))
8
(tanh(log(var_0))+tanh((var_2+var_1)))
9
(sinh(tanh(var_0))+cos((var_1*var_2)))
10
(log(sinh(var_0))+neg((var_2*var_1)))
11
((log(var_0)*sqrt(var_1))+sin(pow_2(var_2)))
12
(((var_0*C_0)+sinh(var_1))*reverse(gaussian(var_2)))
13
(tan(reverse(var_2))+exp((var_0+var_1)))
14
(log((var_1+var_0))*sqrt((var_2+C_0)))
15
((gaussian(var_1)+(var_0*C_0))+sqrt(var_2))
16
((var_1*cos(var_2))*tanh(sqrt(var_0)))
17
neg((tan(var_2)*(var_0*var_1)))
18
((tan(var_2)*sin(var_1))+cos(var_0))
19
(((var_2*C_0)+tan(var_0))*sinh((var_0*var_1)))
20
((cos(var_0)+tanh(var_2))*sqrt(sin(var_1)))
21
(sin((var_0+var_2))*pow_2(neg(var_1)))
22
((log

In [78]:
import re

def regex_tokenize_formula_with_tags(formula):
    """
    Tokenizes a formula using regex and replaces `(` and `)` with tags `[OPEN_PAREN]` and `[CLOSE_PAREN]`.
    Args:
        formula (str): A mathematical formula as a string.
    Returns:
        list: List of tokens with parenthesis replaced by tags.
    """
    # Replace parentheses with tags
    formula = formula.replace("(", " [OPEN_PAREN] ").replace(")", " [CLOSE_PAREN] ")

    # Define regex pattern
    pattern = r"""
        (var_\d+)          |  # Match variables like var_0, var_1
        (C_\d+)            |  # Match constants like C_0, C_1
        ([a-zA-Z_]+)       |  # Match functions like sqrt, sin, log
        (\[OPEN_PAREN\])   |  # Match custom open parenthesis tag
        (\[CLOSE_PAREN\])  |  # Match custom close parenthesis tag
        ([\+\-\*/\^])      |  # Match operators
        (\d+\.\d+|\d+)        # Match numbers (integers or decimals)
    """

    # Compile regex
    regex = re.compile(pattern, re.VERBOSE)

    # Tokenize using regex
    tokens = [match.group(0) for match in regex.finditer(formula)]
    return tokens

def tokenize_dataset(dataset):
    """
    Tokenizes the formula_human_readable field of each entry in the dataset.
    Args:
        dataset (list): List of dictionaries with the "formula_human_readable" field.
    Returns:
        list: List of tokenized formulas for the entire dataset.
    """
    tokenized_formulas = []

    for i, entry in enumerate(dataset):
        if "formula_human_readable" in entry:
            formula = entry["formula_human_readable"]
            tokens = regex_tokenize_formula_with_tags(formula)
            tokenized_formulas.append(tokens)
            print(f"Entry {i}: {tokens}")  # Debugging/logging

    return tokenized_formulas

# Tokenize the train dataset
tokenized_train_formulas = tokenize_dataset(train)

# Example output of tokenized formulas
print(f"Tokenized Formulas for Train Dataset: {tokenized_train_formulas}")

Entry 0: ['[OPEN_PAREN]', '[OPEN_PAREN]', 'cos', '[OPEN_PAREN]', 'var_1', '[CLOSE_PAREN]', '+', '[OPEN_PAREN]', 'var_2', '*', 'C_0', '[CLOSE_PAREN]', '[CLOSE_PAREN]', '*', '[OPEN_PAREN]', '[OPEN_PAREN]', 'var_0', '*', 'var_0', '[CLOSE_PAREN]', '*', 'cos', '[OPEN_PAREN]', 'var_2', '[CLOSE_PAREN]', '[CLOSE_PAREN]', '[CLOSE_PAREN]']
Entry 1: ['[OPEN_PAREN]', '[OPEN_PAREN]', '[OPEN_PAREN]', 'var_1', '+', 'C_0', '[CLOSE_PAREN]', '+', 'tan', '[OPEN_PAREN]', 'var_0', '[CLOSE_PAREN]', '[CLOSE_PAREN]', '+', 'tanh', '[OPEN_PAREN]', 'exp', '[OPEN_PAREN]', 'var_2', '[CLOSE_PAREN]', '[CLOSE_PAREN]', '[CLOSE_PAREN]']
Entry 2: ['[OPEN_PAREN]', 'pow_', '2', '[OPEN_PAREN]', 'cosh', '[OPEN_PAREN]', 'var_2', '[CLOSE_PAREN]', '[CLOSE_PAREN]', '+', 'reverse', '[OPEN_PAREN]', '[OPEN_PAREN]', 'var_1', '*', 'var_0', '[CLOSE_PAREN]', '[CLOSE_PAREN]', '[CLOSE_PAREN]']
Entry 3: ['[OPEN_PAREN]', '[OPEN_PAREN]', 'gaussian', '[OPEN_PAREN]', 'var_1', '[CLOSE_PAREN]', '+', '[OPEN_PAREN]', 'var_0', '+', 'C_0', '[CLOSE

In [80]:
import torch
import torch.nn as nn
from collections import defaultdict
import re


class FormulaTokenizer:
    def __init__(self):
        self.token_to_id = defaultdict(lambda: self.token_to_id["[UNK]"])  # Default to [UNK] for unknown tokens
        self.token_to_id["[PAD]"] = 0
        self.token_to_id["[UNK]"] = 1
        self.token_to_id["[OPEN_PAREN]"] = 2
        self.token_to_id["[CLOSE_PAREN]"] = 3
        self.id_to_token = {v: k for k, v in self.token_to_id.items()}

    def build_vocab(self, tokenized_formulas):
        """
        Builds vocabulary from the tokenized formulas.
        Args:
            tokenized_formulas (list of lists): Each inner list contains tokens of a formula.
        """
        for formula in tokenized_formulas:
            for token in formula:
                if token not in self.token_to_id:
                    new_id = len(self.token_to_id)
                    self.token_to_id[token] = new_id
                    self.id_to_token[new_id] = token

    def encode(self, tokens, max_length):
        """
        Encodes a list of tokens into token IDs, with padding/truncation to max_length.
        Args:
            tokens (list): List of tokens.
            max_length (int): Maximum length of the sequence.
        Returns:
            list: List of token IDs padded/truncated to max_length.
        """
        token_ids = [self.token_to_id[token] for token in tokens]
        if len(token_ids) < max_length:
            token_ids += [self.token_to_id["[PAD]"]] * (max_length - len(token_ids))
        else:
            token_ids = token_ids[:max_length]
        return token_ids

    def decode(self, token_ids):
        """
        Decodes a list of token IDs back into tokens.
        Args:
            token_ids (list): List of token IDs.
        Returns:
            list: List of tokens.
        """
        return [self.id_to_token[token_id] for token_id in token_ids]

class FormulaEmbeddingModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        """
        Embedding model for formulas.
        Args:
            vocab_size (int): Size of the vocabulary.
            embedding_dim (int): Dimension of the embedding vectors.
        """
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)

    def forward(self, token_ids):
        """
        Forward pass to get embeddings for the token IDs.
        Args:
            token_ids (Tensor): Tensor of token IDs (batch_size, max_length).
        Returns:
            Tensor: Embedding tensor (batch_size, max_length, embedding_dim).
        """
        return self.embedding(token_ids)

# Tokenize dataset with updated tokenization
tokenized_train_formulas = [
    regex_tokenize_formula_with_tags(entry["formula_human_readable"])
    for entry in train
    if "formula_human_readable" in entry
]

# Prepare tokenized formulas and build vocabulary
tokenizer = FormulaTokenizer()
tokenizer.build_vocab(tokenized_train_formulas)

# Example parameters
embedding_dim = 128
max_length = 50

# Convert tokenized formulas into token IDs
encoded_formulas = [
    tokenizer.encode(tokens, max_length=max_length)
    for tokens in tokenized_train_formulas
]
encoded_formulas_tensor = torch.tensor(encoded_formulas, dtype=torch.long)  # Convert to tensor

# Initialize embedding model
vocab_size = len(tokenizer.token_to_id)
embedding_model = FormulaEmbeddingModel(vocab_size, embedding_dim)

# Generate embeddings for the formulas
with torch.no_grad():
    formula_embeddings = embedding_model(encoded_formulas_tensor)  # (batch_size, max_length, embedding_dim)

print(f"Formula Embeddings Shape: {formula_embeddings.shape}")


Formula Embeddings Shape: torch.Size([747, 50, 128])


In [81]:
# Print each token in the vocabulary with its corresponding embedding
def print_vocab_embeddings(tokenizer, embedding_model):
    vocab_size = len(tokenizer.token_to_id)
    token_to_id = tokenizer.token_to_id

    # Iterate over each token in the vocabulary
    for token, token_id in token_to_id.items():
        # Generate the embedding for the token
        token_tensor = torch.tensor([token_id], dtype=torch.long)  # Create a tensor for the token ID
        with torch.no_grad():  # Disable gradients for inference
            embedding = embedding_model.embedding(token_tensor).squeeze(0)  # Retrieve embedding
        
        # Print token and its embedding
        print(f"Token: {token}, Embedding: {embedding.tolist()}")

# Call the function to print embeddings for all tokens
print_vocab_embeddings(tokenizer, embedding_model)

Token: [PAD], Embedding: [0.6182436347007751, 0.4672608971595764, 0.6158357858657837, -0.09018786996603012, -1.7831112146377563, 0.33897748589515686, 0.44942158460617065, -0.8402643799781799, -1.330326795578003, 0.8680840134620667, -1.354119896888733, 0.7781286835670471, -0.4992981553077698, 1.054263949394226, -0.2371353656053543, -1.7991414070129395, 1.1808527708053589, 0.41620153188705444, 2.392763614654541, 1.8136359453201294, 0.9335570335388184, 0.6367294788360596, -0.8861139416694641, 0.0792979747056961, 0.27534401416778564, 1.144988775253296, 1.1336487531661987, -0.8119630217552185, 1.3932677507400513, 1.5234756469726562, -1.193407654762268, 1.2890238761901855, 1.0727283954620361, 0.38307681679725647, -0.06749553233385086, -2.0140645503997803, 0.06699328124523163, 1.78082275390625, 0.5275349617004395, 1.44270658493042, -1.2807718515396118, -0.621057391166687, -1.2997692823410034, 0.5771022439002991, 0.6576032042503357, -0.2604776620864868, 1.2314543724060059, -0.08778391778469086

In [83]:
from sklearn.preprocessing import MinMaxScaler
import pandas as pd
normalized_train = []
normalized_val = []
for i in train:
    scaler = MinMaxScaler()
    data = pd.DataFrame.from_dict(i["points"])
    normalized_data = scaler.fit_transform(data)
    normalized_train.append(pd.DataFrame(normalized_data, columns=data.columns))
for i in val:
    data = pd.DataFrame.from_dict(i["points"])
    normalized_data = scaler.fit_transform(data)
    normalized_val = pd.DataFrame(normalized_data, columns=data.columns)

In [86]:
len(normalized_val)

100

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import numpy as np
import pandas as pd

# Define Formula Embedding Model
class FormulaEmbeddingModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)

    def forward(self, token_ids):
        return self.embedding(token_ids)

# Define Dataset Class for Precomputed Embeddings
class DiffusionDatasetWithTargets(Dataset):
    def __init__(self, precomputed_data):
        self.data = precomputed_data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

# Precompute embeddings for training and validation

def precompute_embeddings_with_targets(data, normalized_data, tokenizer, vocab_size, embedding_dim, max_length):
    embedding_model = FormulaEmbeddingModel(vocab_size, embedding_dim)
    precomputed = []

    for item, norm_item in zip(data, normalized_data):
        # Tokenize and encode formula
        tokens = tokenizer(item['formula_human_readable'])
        token_ids = torch.tensor(tokens, dtype=torch.long)

        # Pad or truncate to max_length
        if len(token_ids) < max_length:
            token_ids = torch.cat(
                [token_ids, torch.full((max_length - len(token_ids),), tokenizer['[PAD]'], dtype=torch.long)]
            )
        else:
            token_ids = token_ids[:max_length]

        # Generate embedding
        with torch.no_grad():
            embedding = embedding_model(token_ids.unsqueeze(0)).squeeze(0)

        # Use normalized data directly
        normalized_points = norm_item.drop(columns=['target']).to_dict(orient='list')
        normalized_target = norm_item['target'].values

        precomputed.append((normalized_points, embedding, normalized_target))

    return precomputed

# Diffusion Model
class DiffusionModel(nn.Module):
    def __init__(self, embedding_dim, hidden_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(embedding_dim + 1, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, embedding_dim)
        )

    def forward(self, x, t):
        t_emb = t.view(-1, 1).expand_as(x)  # Broadcast t to match x shape
        x_t = torch.cat([x, t_emb], dim=-1)
        return self.net(x_t)

# Noise Scheduler
class NoiseScheduler:
    def __init__(self, timesteps, beta_start=1e-4, beta_end=2e-2):
        self.timesteps = timesteps
        self.betas = torch.linspace(beta_start, beta_end, timesteps)
        self.alphas = 1.0 - self.betas
        self.alpha_cumprod = torch.cumprod(self.alphas, dim=0)

    def add_noise(self, x, t):
        noise = torch.randn_like(x)
        sqrt_alpha_cumprod = self.alpha_cumprod[t].sqrt()
        sqrt_one_minus_alpha_cumprod = (1 - self.alpha_cumprod[t]).sqrt()
        x_t = sqrt_alpha_cumprod * x + sqrt_one_minus_alpha_cumprod * noise
        return x_t, noise


Number of parameters in model: 329344
Batch 1/24
Points Shape: [torch.Size([32, 100]), torch.Size([32, 100]), torch.Size([32, 100]), torch.Size([32, 100])], Embeddings Shape: torch.Size([32, 50, 128])
Random Timesteps: tensor([724, 850,  79, 441, 213, 545, 632,  21, 609, 304,  10, 976, 762, 613,
         87,   5, 710, 368, 533, 661, 358, 398, 618, 302, 552, 511, 994, 307,
         92, 999, 260, 948])
Batch 2/24
Points Shape: [torch.Size([32, 100]), torch.Size([32, 100]), torch.Size([32, 100]), torch.Size([32, 100])], Embeddings Shape: torch.Size([32, 50, 128])
Random Timesteps: tensor([530, 169, 291, 767, 233, 288, 508, 943, 571, 885, 342, 699, 249, 518,
        760, 992, 825, 819, 995, 588,  62, 667, 342, 192, 216, 155, 598,  94,
        660, 285, 226, 538])
Batch 3/24
Points Shape: [torch.Size([32, 100]), torch.Size([32, 100]), torch.Size([32, 100]), torch.Size([32, 100])], Embeddings Shape: torch.Size([32, 50, 128])
Random Timesteps: tensor([598, 901, 309, 595, 846, 324, 962, 764, 8

In [None]:


# Dummy Data
train_data = [
        {
            "formula_human_readable": "var_0 + var_1",
            "points": {
                "var_0": [0.1, 0.2, 0.3],
                "var_1": [0.4, 0.5, 0.6],
                "target": [0.5, 0.7, 0.9]
            },
        },
        {
            "formula_human_readable": "var_0 * var_1",
            "points": {
                "var_0": [0.2, 0.4, 0.6],
                "var_1": [0.1, 0.3, 0.5],
                "target": [0.02, 0.12, 0.3]
            },
        },
    ]


vocab_size = len(tokenizer)
embedding_dim = 128
max_length = 10

    # Precompute embeddings
precomputed_train = precompute_embeddings_with_targets(train_data, normalized_train, tokenizer_func, vocab_size, embedding_dim, max_length)

    # Initialize diffusion model and noise scheduler
hidden_dim = 256
diffusion_model = DiffusionModel(embedding_dim, hidden_dim)
timesteps = 1000
scheduler = NoiseScheduler(timesteps)

    # Training setup
optimizer = torch.optim.Adam(diffusion_model.parameters(), lr=1e-3)
criterion = nn.MSELoss()

    # Training loop
for epoch in range(10):  # Number of epochs
        total_loss = 0
        for points, embedding, target in precomputed_train:
            t = torch.randint(0, timesteps, (1,))  # Random timestep

            # Add noise to embeddings and targets
            x_t, noise = scheduler.add_noise(embedding, t)
            target_t, target_noise = scheduler.add_noise(torch.tensor(target, dtype=torch.float32), t)

            # Predict noise
            pred_noise = diffusion_model(x_t, t)

            # Compute loss
            loss = criterion(pred_noise, target_noise)
            total_loss += loss.item()
# Backpropagation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Print epoch loss
        print(f"Epoch {epoch + 1}, Loss: {total_loss / len(precomputed_train):.4f}")


In [55]:
def generate_formula_from_points(model, diffusion, tokenizer, test_points, timesteps=1000, embedding_dim=128):
    """
    Generate a formula from input points using the trained diffusion model.
    Args:
        - model: Trained diffusion model.
        - diffusion: Diffusion process instance.
        - tokenizer: Tokenizer for decoding formula embeddings.
        - test_points: Tensor of points [batch_size, num_points, num_vars].
        - timesteps: Number of timesteps in the diffusion process.
        - embedding_dim: Dimension of the formula embeddings.
    Returns:
        - Generated formula (decoded from the embedding).
    """
    model.eval()
    with torch.no_grad():
        # Initialize random noisy embedding
        batch_size, num_points, num_vars = test_points.shape
        noisy_embedding = torch.randn(batch_size, embedding_dim)  # Shape: [batch_size, embedding_dim]

        # Reverse diffusion process
        for t in reversed(range(timesteps)):
            # Prepare timestep tensor
            t_tensor = torch.tensor([t] * batch_size, dtype=torch.long)  # Shape: [batch_size]

            # Predict noise and denoise
            pred_noise = model(noisy_embedding.unsqueeze(1), t_tensor)  # Add sequence dimension if needed
            alpha_t = diffusion.alpha_cumprod[t].sqrt()
            beta_t = (1 - diffusion.alpha_cumprod[t]).sqrt()
            noisy_embedding = (noisy_embedding - beta_t * pred_noise.squeeze(1)) / alpha_t

        # Decode the final embedding
        final_embedding = noisy_embedding.squeeze(0)  # Remove batch dimension if needed
        print(f"Final Embedding Shape: {final_embedding.shape}")  # Debugging shape
        print(f"Final Embedding Values: {final_embedding}")

        # Handle possible single-dimensional embedding
        if final_embedding.dim() == 1:
            final_embedding = final_embedding.unsqueeze(0)

        # Generate tokens (ensure this produces a sequence of token IDs)
        generated_tokens = torch.argmax(final_embedding, dim=-1)  # Token indices (sequence)
        print(f"Generated Tokens: {generated_tokens}")

        # Ensure `generated_tokens` is converted to a list
        if isinstance(generated_tokens, torch.Tensor):
            generated_tokens = generated_tokens.tolist()
        if not isinstance(generated_tokens, list):
            raise ValueError("Generated tokens are not converted to a list correctly")

        # Decode tokens into formula
        formula = tokenizer.decode(generated_tokens)

    return formula


In [None]:
# Initialize the diffusion process
timesteps = 1000
betas, alpha_cumprod = noise_schedule(timesteps)
diffusion = Diffusion(
    model=trained_model,
    betas=betas,
    alpha_cumprod=alpha_cumprod,
    timesteps=timesteps
)

# Prepare test points dynamically based on available variables
test_sample = test[0]  # Assume you want to generate a formula for the first test sample

# Extract all variables from the test sample's points
test_points = torch.tensor(
    [test_sample["points"][var_name] for var_name in test_sample["points"].keys()],
    dtype=torch.float32
).permute(1, 0)  # Shape: [num_points, num_vars]


# Add a batch dimension
test_points = test_points.unsqueeze(0)  # Shape: [batch_size=1, num_points, num_vars]
# Generate the formula using the trained model
generated_formula = generate_formula_from_points(
    model=trained_model,
    diffusion=diffusion,
    tokenizer=tokenizer,
    test_points=test_points,
    timesteps=timesteps,
    embedding_dim=128
)

print("Generated Formula:", generated_formula)


tensor([[[-0.7995, -4.5013,  0.3758,  1.4940],
         [ 2.0476,  0.7573, -4.2495, -1.5227],
         [-2.0467,  3.4944,  3.7023,  5.3250],
         [-3.9617,  3.5842,  1.2609,  6.7068],
         [ 1.0098, -4.4722, -2.1025,  0.9404],
         [-2.4049,  2.1883,  2.5770,  4.2106],
         [-1.4915, -4.6379,  2.0217,  4.0208],
         [ 1.8481, -1.7090,  3.6834, -0.5564],
         [ 2.1821, -2.2918,  0.5333, -2.4133],
         [-0.0689, -2.5810, -3.8520,  2.6486],
         [-4.4660,  1.8132, -4.1209,  7.2646],
         [-1.8492, -4.6484,  1.6990,  4.2383],
         [ 4.9174,  3.6242, -0.8887, -5.5131],
         [-0.5731,  4.1466, -3.1705,  3.4813],
         [-1.6938, -0.9856,  4.6556,  3.9295],
         [-2.1024,  4.2126, -3.3647,  5.4208],
         [-4.3650,  1.8480, -2.4453,  8.1548],
         [ 1.0649, -0.9851,  0.1692, -3.1668],
         [ 3.0727,  0.5860, -1.5146, -4.4722],
         [-1.8288,  0.4510, -1.5192,  1.9894],
         [-4.2670,  1.9117, -1.0972,  6.0042],
         [-1.

NameError: name 'final_embedding' is not defined