# We verify that MLP can generalize symbolic constraints
- Sum of 25
- Increasing
- Symmetric
- Odd/Even (weird?) or Dividable by something

Real is much easier.
For symbolic, it would not be much harder if internally it learns embeddings for each number.

Observations:
3% of training data is too low -> generalization reaches 90% accuracy max (with embedding scheme). but 100% with proper normalization.
10% of training data is enough -> generalization reaches 100% accuracy.

Weirdly, real representation is much slower to learn with than onehot representation
-> was a normalization issue, rescaling all reals to [0;1] solves the problem

Ask expected "EVEN" XOR function is very hard to learn with real representation.
Surprisingly easy to learn with one-hot and embedding representations.

An interesting question is whether a form of curriculum could help a lot.

In [235]:
import torch
from torch import nn
import numpy as np
from coins import generate_combinations
from itertools import product
from torch.utils.data import DataLoader

def split(iterable, train_ratio=0.1, shuffle=True):
    if shuffle:
        np.random.shuffle(iterable)
    n = int(len(iterable)*train_ratio)
    train = iterable[:n]
    test = iterable[n:]
    return train, test

def to_onehot(x):
    x_flat = x.flatten()
    x_onehot = np.zeros((len(x_flat), 10), dtype=np.float32)
    x_onehot[range(len(x_onehot)), x_flat] = 1.
    return x_onehot.reshape(x.shape[0], x.shape[1], 10)

CONSTRAINTS = [
    'sum_25',
    'increasing',
    'symmetric',
    'even'
]
ENCODING_MODES = [
    'onehot',
    'embedding',
    'real'    
]
#CONSTRAINT = 'sum_25'
#CONSTRAINT = 'increasing'
#CONSTRAINT = 'symmetric'
CONSTRAINT = 'even'
#ENCODING_MODE = 'onehot'
#ENCODING_MODE = 'embedding'
ENCODING_MODE = 'real'
BATCH = 32

class Problem(object): 
    def __repr__(self):
        return '\n'.join([
         'Problem "{}":[{}]'.format(self.constraint, self.encoding_mode),
         '  train+: {}'.format(len(self.train_positive)),
         '  train-: {}'.format(len(self.train_negative)),
         '  test+: {}'.format(len(self.test_positive)),
         '  test-: {}'.format(len(self.test_negative))])
        
class Summary(object): pass

def make_infinite(iterable):
    while True:
        for i in iterable:
            yield i
            
def get_problem(CONSTRAINT, ENCODING_MODE):
    p = Problem()

    uniform = np.asarray(list(product(range(10),range(10),range(10),range(10),range(10))))
    if CONSTRAINT == 'sum_25':
        combinations = np.asarray(generate_combinations(25, range(10), 5))
        positive = combinations
        combinations_set = set(map(tuple, combinations))
        non_combinations = np.asarray([c for c in uniform if tuple(c) not in combinations_set])
        negative = non_combinations
    elif CONSTRAINT == 'increasing':
        mask = np.all(np.diff(uniform, 1) >= 0, 1)
        positive = uniform[mask]
        negative = uniform[~mask]
    elif CONSTRAINT == 'symmetric':
        mask = np.all(uniform[:,:2] == uniform[:,:2:-1], 1)
        positive = uniform[mask]
        negative = uniform[~mask]
    elif CONSTRAINT == 'even':
        mask = (uniform.sum(1)%2==0)
        positive = uniform[mask]
        negative = uniform[~mask]
    if ENCODING_MODE == 'real':
        positive = positive.astype(np.float32) / 10. # respect range
        negative = negative.astype(np.float32) / 10.
    else: # positive, negative must be Long before that
        positive = to_onehot(positive)
        negative = to_onehot(negative)
        
    p.constraint = CONSTRAINT
    p.encoding_mode = ENCODING_MODE
        
    p.positive = positive
    p.negative = negative

    p.train_positive, p.test_positive = split(positive)
    p.train_negative, p.test_negative = split(negative)

    p.train_positive_iter = make_infinite(DataLoader(p.train_positive, batch_size=BATCH, shuffle=True))
    p.test_positive_iter = make_infinite(DataLoader(p.test_positive, batch_size=BATCH, shuffle=True))
    p.train_negative_iter = make_infinite(DataLoader(p.train_negative, batch_size=BATCH, shuffle=True))
    p.test_negative_iter = make_infinite(DataLoader(p.test_negative, batch_size=BATCH, shuffle=True))

        
    return p



In [227]:
class Model(nn.Module):
    EMBEDDING = 'embedding' # share embeddings between same numbers
    ONEHOT = 'onehot'  # one hot without embedding sharing
    REAL = 'real'
    def __init__(self, mode, dims=100):
        nn.Module.__init__(self)
        self.mode = mode
        if mode == self.ONEHOT:
            self.main = nn.Sequential(
                nn.Linear(5*10, dims),  # 50 embeddings of dimension dims (digits*position) -> are all added together
                nn.ReLU(True),
                nn.Linear(dims, dims),
                nn.ReLU(True),
                nn.Linear(dims, 2),
                nn.LogSoftmax()
            )
        elif mode == self.REAL:
            self.main = nn.Sequential(
                nn.Linear(5, dims),
                nn.ReLU(True),
                nn.Linear(dims, dims),
                nn.ReLU(True),
                nn.Linear(dims, 2),
                nn.LogSoftmax()
            )
        elif mode == self.EMBEDDING:
            self.embedder = nn.Linear(10, dims) # 10 embeddings of dimension dims (position) -> concatenated together
            self.main = nn.Sequential(  # ReLU is not needed
                nn.Linear(5*dims, dims),
                nn.ReLU(True),
                nn.Linear(dims, 2),
                nn.LogSoftmax()
            )
                
    def forward(self, x):
        if self.mode == self.REAL:
            return self.main(x)
        elif self.mode == self.ONEHOT:
            x = x.view(len(x), -1)
            return self.main(x)
        elif self.mode == self.EMBEDDING:
            out = x.view(-1, 10)
            # embed
            out = self.embedder(out)
            out = out.view(len(x), -1)
            return self.main(out)

In [240]:
def get_loss(model, positive, negative):
    positive_pred = model(positive)
    negative_pred = model(negative)    
    positive_target = torch.ones(len(positive), dtype=torch.long)
    negative_target = torch.zeros(len(negative), dtype=torch.long)
    
    positive_loss = criterion(positive_pred, positive_target)
    negative_loss = criterion(negative_pred, negative_target)
    
    positive_accuracy = (positive_pred.argmax(1) == positive_target).float().mean()
    negative_accuracy = (negative_pred.argmax(1) == negative_target).float().mean()
    
    loss = 0.5 * (positive_loss + negative_loss)
    accuracy = 0.5 * (positive_accuracy + negative_accuracy)
    
    # Accuracy
    return loss, accuracy

def evaluate_model(model, 
                   problem,
                   iterations,
                   log_every=500):
    p = problem
    s = Summary()
    s.problem = problem
    s.losses = []
    s.test_losses = []
    s.test_accuracies = []

    criterion = torch.nn.NLLLoss()
    optimizer = torch.optim.Adam(model.parameters())
    
    for iteration in xrange(iterations):
        # train
        positive = p.train_positive_iter.next()
        negative = p.train_negative_iter.next()
        loss, accuracy = get_loss(model, positive, negative)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # test
        test_positive = p.test_positive_iter.next().float()
        test_negative = p.test_negative_iter.next().float()    
        test_loss, test_accuracy = get_loss(model, test_positive, test_negative)

        s.losses.append(loss.item())
        s.test_losses.append(test_loss.item())
        s.test_accuracies.append(test_accuracy.item())
        if log_every and iteration % log_every == 0:
            print 'Iteration', iteration
            print 'Train', np.mean(s.losses[-min(len(losses), 50):])
            print 'Test ', np.mean(s.test_losses[-min(len(s.test_losses), 50):])
            print 'Test Accuracy ', np.mean(s.test_accuracies[-min(len(s.test_accuracies), 50):])
            
    return s

In [None]:
test_accuracies = {}
for constraint in CONSTRAINTS:
    for encoding_mode in ENCODING_MODES:
        problem = get_problem(constraint, encoding_mode)
        model = Model(mode=encoding_mode)
        print '\nEvaluating problem', problem
        s = evaluate_model(model, problem, iterations=4000, log_every=0)
        test_accuracies.setdefault(constraint, {})
        test_accuracies[constraint].setdefault(encoding_mode, {})
        test_accuracies[constraint][encoding_mode] = np.mean(s.test_accuracies[-min(len(s.test_accuracies), 50):])

In [238]:
test_accuracies

{'even': {'embedding': 0.48875, 'onehot': 0.6546875, 'real': 0.504375},
 'increasing': {'embedding': 0.9509375, 'onehot': 0.92625, 'real': 0.97625},
 'sum_25': {'embedding': 0.9984375, 'onehot': 0.998125, 'real': 0.958125},
 'symmetric': {'embedding': 0.9728125, 'onehot': 0.924375, 'real': 0.99625}}