In [1]:
from deap import gp
import operator
import math
import numpy as np

## Primitive set

In [2]:
def protectedDiv(left, right):
    try:
        return left / right
    except ZeroDivisionError:
        return 1

pset = gp.PrimitiveSet("MAIN", 1)
pset.addPrimitive(operator.add, 2)
pset.addPrimitive(operator.sub, 2)
pset.addPrimitive(operator.mul, 2)
pset.addPrimitive(protectedDiv, 2)
pset.addPrimitive(math.cos, 1)

pset.renameArguments(ARG0='x')

## Helpers

In [3]:
def generate_random_tree(pset, min_, max_):
    expr = gp.genHalfAndHalf(pset, min_=min_, max_=max_)
    tree = gp.PrimitiveTree(expr)
    return tree

def build_primitives_terminals_dict(pset):
    prims = dict()
    prims_funcs = list(pset.primitives.values())[0]
    prims_names = [p.name for p in prims_funcs]
    prims.update(zip(prims_names, prims_funcs))

    # for arguments, add key = value = name to the dict
    for arg in pset.arguments:
        prims[str(arg)] = arg

    return prims

def tree_to_nodes_matrix(tree: gp.PrimitiveTree, pset: gp.PrimitiveSet, prims_names: list, n_nodes=0):
    n_prims = pset.prims_count + len(pset.arguments)
    if n_nodes == 0:
        n_nodes = len(tree)
    m = np.zeros((n_nodes, n_prims))

    for i, prim in enumerate(tree):
        prim_name = prim.name.replace('ARG0', 'x')
        prim_idx = prims_names.index(prim_name)
        m[i, prim_idx] = 1.
    
    return m

def eval_fitness(tree, pset, points):
    func = gp.compile(tree, pset)

    sqerrors = ((func(x) - (x**2 + math.cos(x)))**2 for x in points)
    return math.fsum(sqerrors) / len(points)

def generate_dataset(n_samples, pset, min_, max_, points, prims_names):
    n_prims = pset.prims_count + len(pset.arguments)
    max_nodes = 2**(max_+1) - 1
    X = np.zeros((n_samples, max_nodes*n_prims))
    y = np.zeros((n_samples, 1))
    
    for i in range(n_samples):
        tree = generate_random_tree(pset, min_, max_)
        m = tree_to_nodes_matrix(tree, pset, prims_names, max_nodes).ravel()
        X[i,:] = m
        fit = eval_fitness(tree, pset, points)
        if math.isnan(fit) or math.isinf(fit):
            fit = 1e2
        y[i,:] = fit

    return X, y
    
    

## Generation of datasets

In [4]:
tree = generate_random_tree(pset, min_=1, max_=3)
print(tree)
prims = build_primitives_terminals_dict(pset)
tree_to_nodes_matrix(tree, pset, list(prims.keys()))

add(x, x)


array([[1., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 1.],
       [0., 0., 0., 0., 0., 1.]])

In [5]:
points = np.arange(0.,1.1,0.1)
print(points)
eval_fitness(tree, pset, points)

[0.  0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1. ]


0.2512503477471599

In [6]:
from torch.utils.data import Dataset, DataLoader, random_split
from torch import nn
import torch

In [9]:
torch.manual_seed(0)
torch.set_default_dtype(torch.float64)
min_ = 1
max_ = 3
max_nodes = 2**(max_+1) - 1
n_prims = n_prims = pset.prims_count + len(pset.arguments)
n_samples = 1000
X, y = generate_dataset(n_samples, pset, min_, max_, points, list(prims.keys()))
y_normalized = (y - np.mean(y))/np.std(y)
frac = 0.8
X_train, X_valid = random_split(X, [frac, 1-frac])
y_train, y_valid = random_split(y_normalized, [frac, 1-frac])
print(list(y_normalized))

[array([-0.8128734]), array([1.25222875]), array([-0.81672707]), array([-0.79874873]), array([1.25222875]), array([1.25222875]), array([1.25222875]), array([1.25222875]), array([1.25222875]), array([-0.81330714]), array([-0.80318462]), array([-0.80787818]), array([-0.8128734]), array([-0.80318462]), array([-0.81330714]), array([-0.78839083]), array([1.25222875]), array([1.25222875]), array([-0.81265794]), array([-0.78839083]), array([-0.81330714]), array([1.25222875]), array([-0.8128734]), array([-0.81330714]), array([-0.80318462]), array([1.25222875]), array([-0.81258848]), array([1.25222875]), array([1.25222875]), array([-0.77938746]), array([1.25222875]), array([-0.81417083]), array([-0.81672707]), array([1.25222875]), array([-0.80318462]), array([-0.81417083]), array([-0.81131713]), array([1.25222875]), array([-0.80318462]), array([-0.81330714]), array([-0.81330714]), array([-0.78839083]), array([-0.81672707]), array([-0.80318462]), array([1.25222875]), array([-0.79900341]), array(

  return left / right
  return left / right


In [10]:
class CustomDataset(Dataset):
    def __init__(self, X, y, transform=None, target_transform=None):
        self.X = X
        self.y = y
        
    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return X[idx,:], y[idx, 0]

train_dataset = CustomDataset(X_train, y_train)
valid_dataset = CustomDataset(X_valid, y_valid)

In [18]:
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(max_nodes*n_prims, max_nodes*n_prims),
            nn.ReLU(),
            nn.Linear(max_nodes*n_prims, max_nodes*n_prims),
            nn.ReLU(),
            nn.Linear(max_nodes*n_prims, max_nodes*n_prims),
            nn.ReLU(),
            nn.Linear(max_nodes*n_prims, 1),
        )

    def forward(self, x):
        logits = self.linear_relu_stack(x)
        return logits

In [15]:
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    # Set the model to training mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction and loss
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


def test_loop(dataloader, model, loss_fn):
    # Set the model to evaluation mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    # Evaluating the model with torch.no_grad() ensures that no gradients are computed during test mode
    # also serves to reduce unnecessary gradient computations and memory usage for tensors with requires_grad=True
    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y).item()

    test_loss /= num_batches
    print(f"Test Error: Avg loss: {test_loss:>8f} \n")

In [None]:
model = NeuralNetwork()
loss_fn = nn.MSELoss()

learning_rate = 1e-2
batch_size = 1
epochs = 1000

train_dataloader = DataLoader(train_dataset, batch_size = batch_size)
valid_dataloader = DataLoader(valid_dataset, batch_size = batch_size)

optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_dataloader, model, loss_fn, optimizer)
    test_loop(valid_dataloader, model, loss_fn)
print("Done!")

Epoch 1
-------------------------------
loss: 0.141342  [    1/  801]
loss:     nan  [  101/  801]
loss:     nan  [  201/  801]
loss:     nan  [  301/  801]
loss:     nan  [  401/  801]
loss:     nan  [  501/  801]
loss:     nan  [  601/  801]
loss:     nan  [  701/  801]
loss:     nan  [  801/  801]
Test Error: Avg loss:      nan 

Epoch 2
-------------------------------
loss:     nan  [    1/  801]
loss:     nan  [  101/  801]
loss:     nan  [  201/  801]
loss:     nan  [  301/  801]
loss:     nan  [  401/  801]
loss:     nan  [  501/  801]
loss:     nan  [  601/  801]
loss:     nan  [  701/  801]
loss:     nan  [  801/  801]
Test Error: Avg loss:      nan 

Epoch 3
-------------------------------
loss:     nan  [    1/  801]
loss:     nan  [  101/  801]
loss:     nan  [  201/  801]
loss:     nan  [  301/  801]
loss:     nan  [  401/  801]
loss:     nan  [  501/  801]
loss:     nan  [  601/  801]
loss:     nan  [  701/  801]
loss:     nan  [  801/  801]
Test Error: Avg loss:      nan

In [None]:
test = np.zeros((max_nodes, n_prims))
test[:6,:] = np.array([[1,0,0,0,0,0], [0,0,1,0,0,0], [0,0,0,0,0,1], [0,0,0,0,0,1], [0,0,0,0,1,0], [0,0,0,0,0,1]])
test_tensor = torch.from_numpy(test.flatten())
pred = model(test_tensor)
pred

In [None]:
def softmax(x):
    sm = torch.nn.Softmax(dim=1)
    with torch.no_grad():
        x_reshaped = torch.reshape(x, (max_nodes, n_prims))
    return sm(x_reshaped)

In [None]:
def optimize_tree(x0: np.array, learning_rate, max_iter):
    x0 = torch.tensor(x0, requires_grad = True)
    optimizer_tree = torch.optim.SGD([x0], lr = learning_rate)
    #sm = torch.nn.Softmax(dim=1)
    for i in range(max_iter):
        pred = model(x0)
        pred.backward()
        optimizer_tree.step()
        optimizer_tree.zero_grad()
        #with torch.no_grad():
        #    x0_reshaped = torch.reshape(x0, (max_nodes, n_prims))
        #    x0 = sm(x0_reshaped).flatten().requires_grad_()
        print(pred)
        print(softmax(x0))
    return x0

In [None]:
tree = generate_random_tree(pset, min_=1, max_=3)
print(tree)
x0 = tree_to_nodes_matrix(tree, pset, list(prims.keys()), n_nodes = max_nodes)
x = optimize_tree(x0.ravel(), 1e-3, 100)

In [None]:
sm = torch.nn.Softmax(dim=1)
with torch.no_grad():
            x_reshaped = torch.reshape(x, (max_nodes, n_prims))
            x = sm(x_reshaped)
x

In [None]:
print(list(prims.keys()))