In [20]:
#from models.nalu import NALU
import torch.nn.functional as F
import torch
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
plt.style.use('seaborn')

In [21]:
def generate_synthetic_arithmetic_dataset(arithmetic_op, min_value, max_value, sample_size, set_size, boundaries = None):
    """
    generates a dataset of integers for the synthetics arithmetic task

    :param arithmetic_op: the type of operation to perform on the sum of the two sub sections can be either :
    ["add" , "subtract", "multiply", "divide", "root", "square"]
    :param min_value: the minimum possible value of the generated integers
    :param max_value: the maximum possible value of the generated integers
    :param sample_size: the number of integers per sample
    :param set_size: the number of samples in the dataset
    :param boundaries: [Optional] an iterable of 4 integer indices in the following format :
    [start of 1st section, end of 1st section, start of 2nd section, end of 2nd section]
    if None, the boundaries are randomly generated.
    :return: the training dataset input, the training true outputs, the boundaries of the sub sections used
    """
    scaled_input_values = np.random.uniform(min_value, max_value, (set_size, sample_size))

    if boundaries is None:
        boundaries = [np.random.randint(sample_size) for i in range(4)]
        boundaries[1] = np.random.randint(boundaries[0], sample_size)
        boundaries[3] = np.random.randint(boundaries[2], sample_size)
    else:
        if len(boundaries) != 4:
            raise ValueError("boundaries is expected to be a list of 4 elements but found {}".format(len(boundaries)))

    a = np.array([np.sum(sample[boundaries[0]:boundaries[1]]) for sample in scaled_input_values])
    b = np.array([np.sum(sample[boundaries[2]:boundaries[3]]) for sample in scaled_input_values])

    true_outputs = None
    if "add" in str.lower(arithmetic_op):
        true_outputs = a + b
    elif "sub" in str.lower(arithmetic_op):
        true_outputs = a - b
    elif "mult" in str.lower(arithmetic_op):
        true_outputs = a * b
    elif "div" in str.lower(arithmetic_op):
        true_outputs = a / b
    elif "square" == str.lower(arithmetic_op):
        true_outputs = a * a
    elif "root" in str.lower(arithmetic_op):
        true_outputs = np.sqrt(a)
    
    scaled_input_values = torch.tensor(scaled_input_values, dtype=torch.float32)
    true_outputs = torch.tensor(true_outputs, dtype=torch.float32).unsqueeze(1)
        
    return scaled_input_values, true_outputs, boundaries

In [22]:
from models.nalu import NALU


In [23]:
import torch
import torchvision.utils as vutils
import numpy as np
import torchvision.models as models
from torchvision import datasets
from tensorboardX import SummaryWriter
import datetime
import os

In [24]:
FEATURES_NUM = 100
epochs = 100000
sample_rate = 44100
batch_size = 1
operator = "add"

In [25]:
in_dim = FEATURES_NUM
hidden_dim = 2
out_dim = 1
num_layers = 2

dim = in_dim # dimensition for generating data

model = NALU(num_layers, in_dim, hidden_dim, out_dim)
print(model)

NALU(
  (model): Sequential(
    (0): NeuralArithmeticLogicUnitCell(
      in_dim=100, out_dim=2
      (nac): NeuralAccumulatorCell(in_dim=100, out_dim=2)
    )
    (1): NeuralArithmeticLogicUnitCell(
      in_dim=2, out_dim=1
      (nac): NeuralAccumulatorCell(in_dim=2, out_dim=1)
    )
  )
)


In [26]:
X_train, y_train, boundaries = generate_synthetic_arithmetic_dataset(operator, -10, 10, FEATURES_NUM, 1000)
X_test, y_test, _ = generate_synthetic_arithmetic_dataset(operator, -100, 100, FEATURES_NUM, 1000, boundaries)

optimizer = torch.optim.RMSprop(model.parameters(),lr=0.001)


path = 'checkpoints/' + str(datetime.datetime.now().strftime("%m-%d-%H:%M")) + "/"
if not os.path.exists('checkpoints'):
    os.mkdir('checkpoints')
writer = SummaryWriter(path)
iteration = 0
for epoch in range(epochs):

    for batch in range(len(X_train) // batch_size):
        iteration += 1
        
        model.train()
        optimizer.zero_grad()

        X_batch_train = X_train[batch:(batch+batch_size),:]
        y_batch_train = y_train[batch:(batch+batch_size),:]
    
        out = model(X_batch_train)

        loss = F.mse_loss(out, y_batch_train)
        
        loss.backward()
        optimizer.step()

        writer.add_scalar("data/Loss", loss.item(), iteration)
        
    if epoch % 100 == 0:
        print("hey")
        model.eval()
        
        X_batch_test = X_test[batch:(batch+batch_size),:]
        y_batch_test = y_test[batch:(batch+batch_size),:]
        
        output_test = model(X_batch_train)
        
        acc = np.sum(np.isclose(output_test.detach().numpy(), y_batch_test.detach().numpy(), atol=.1, rtol=0)) / len(y_batch_test)
        #print("epoch \t", epoch, "\t loss \t",  np.round(loss.detach().numpy(),4), "\t acc \t", acc)
        
        writer.add_scalar("data/Accuracy", float(acc), iteration)

NameError: name 'ys' is not defined