In [2]:
from typing import Tuple

In [3]:
import torch
from torch_geometric.data import InMemoryDataset, Data, Batch
from torch.utils.data import DataLoader, Subset
from sklearn.calibration import LabelEncoder
from generate_dataset import generate_dataset
import json 

Constants

In [1]:
GENERATED_DATASET_SIZE = 130
BATCH_SIZE = 32
TRAIN_SAMPLES = 100

In [17]:
def dict_to_geometric_representation(in_graph_dict: dict, encoder) -> Data:
    node_list = []
    values = []
    edge_mappings = []
    def traverse_graph(graph = in_graph_dict):
        nonlocal node_list
        nonlocal edge_mappings
        nonlocal values
        node_list.append(graph["id"])
        values.append(str(graph["val"]))
        if hasattr(graph,"children"):
            for child in graph["children"]:
                edge_mappings.append((graph["id"], child["id"]) )
                traverse_graph(child)
    traverse_graph()
    nodes = torch.tensor(node_list,dtype=torch.long)
    edges = torch.tensor([[x[0] for x in edge_mappings], [x[1] for x in edge_mappings]], dtype=torch.long) # Probably slow and mentally degenerated
    geom_data = Data(nodes, edges)
    return geom_data

In [18]:
OPERATIONS = ["ADD", "MUL", "FUNC", "POW"]
FUNCTIONS = ["SIN", "COS", "TAN", "EXP", "LOG", "f", "g", "h"]
ATOMICS = ["LITERAL", "VARIABLE"]
VARIABLE_ALPHABET = [chr(x) for x in range(ord("a"), ord("z")+1) if chr(x) not in ["f", "g", "h"]]

In [19]:
def make_node_attribute_encoder(label_encoder:LabelEncoder, rep = 3):
    def node_attr_encoder(attr):
        if isinstance(attr, str):
            res = label_encoder.transform(attr)
            return [res]*(rep + 1)
        else:
            return [0] + [attr]*rep
            
    return node_attr_encoder

In [20]:

class MathExpressionDataset(InMemoryDataset):
    def __init__(self, root, expression, transform=None, pre_transform=None, pre_filter=None):
        super().__init__(root, transform, pre_transform, pre_filter)
        self.expr = expression
        self.load(self.processed_paths[0])
        self.le = LabelEncoder()
        self.le.fit(OPERATIONS+FUNCTIONS+ATOMICS+VARIABLE_ALPHABET)
        
    @property
    def raw_file_names(self):
        return ['math_datagen.json']

    @property
    def processed_file_names(self):
        return ['data.pt']
    

    def process(self):
        # Read data into huge `Data` list.
        data_list = []
        for file in self.raw_file_names:
            with open(file) as file_handle:
                object_data = json.load(file_handle)
                for comparison in object_data:
                    expr = comparison[self.expr]
                    score = comparison["score"]
                    geometric_expr = dict_to_geometric_representation(expr, make_node_attribute_encoder(self.le))
                    geometric_expr.y = score
                    data_list.append(geometric_expr)
                    
        if self.pre_filter is not None:
            data_list = [data for data in data_list if self.pre_filter(data)]

        if self.pre_transform is not None:
            data_list = [self.pre_transform(data) for data in data_list]

        self.save(data_list, self.processed_paths[0])
    

In [None]:
class ExpressionPairDataset(torch.utils.data.Dataset):
    def __init__(self, root, transform=None, pre_transform=None, pre_filter=None):
        super().__init__()
        self.dataset_l = MathExpressionDataset(root,"expr_l",transform=None, pre_transform=None, pre_filter=None)
        self.dataset_r = MathExpressionDataset(root,"expr_r",transform=None, pre_transform=None, pre_filter=None)
        

    def __getitem__(self, idx):
        return self.dataset_l[idx], self.dataset_r[idx]

In [21]:
generate_dataset(130,"math_datagen.json")
dataset = ExpressionPairDataset(root="/dataset")

In [22]:
from torch import nn
from torch.nn import Linear, ReLU
from torch_geometric.nn import GCNConv

In [24]:
class FormulaNet(nn.Module):
    def __init__(self, hidden_channels, embedding_space):
        super(FormulaNet, self).__init__()
        self.dense_1 = Linear(dataset.num_features, 16) 
        self.relu_1 = ReLU()
        self.gconv_1 = GCNConv(dataset.num_node_features, hidden_channels)
        self.gconv_2 = GCNConv(hidden_channels, hidden_channels)
        self.dense_2 = Linear(hidden_channels, embedding_space)
    
    def forward(self, data):
        pass
    

In [25]:
class SiameseFormulaNet(nn.Module):
    def __init__(self, hidden_channels, embedding_space):
        super(SiameseFormulaNet, self).__init__()
        self.formulanet_1 = FormulaNet(hidden_channels, embedding_space)
        self.formulanet_2 = FormulaNet(hidden_channels, embedding_space)
    
    def forward_once(self, x):
        pass

    def forward(self, expr_l, expr_r):
        pass

In [None]:
train_dataset = Subset(dataset, list(range(TRAIN_SAMPLES)))
test_dataset = Subset(dataset, list(range(TRAIN_SAMPLES, len(dataset))))

In [None]:

def collate(self, data_list):
    batchA = Batch.from_data_list([data[0] for data in data_list])
    batchB = Batch.from_data_list([data[1] for data in data_list])
    return batchA, batchB
# NOTE: Type ignore only for collate_fn_t ... make sure it doesn't get in the way of correct typing for the dataset
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate) # type: ignore
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate) # type: ignore