In [None]:
import torch 
import torch.nn as nn  
import numpy as np
from copy import deepcopy
from tqdm import tqdm

In [None]:
print(torch.__version__)
torch.cuda.is_available()

# Simple model

In [None]:

class NN_multilayer(nn.Module):
    
    def __init__(self, n_inp, n_hid, n_layer = 1):
        super(NN_multilayer, self).__init__()
        self.linear = nn.Linear(n_inp, n_hid)
        
        self.linear_h =  nn.ModuleList([nn.Linear(n_hid, n_hid) for i in range(n_layer-1)])
        
        #self.linearM = nn.Linear(n_hid, n_hid)
        
        self.Sigmoid = nn.Sigmoid()
        self.linear2 = nn.Linear(n_hid, 1)  #here train 10 layers instead of 1 and repeat the experiment
    
    def forward(self, input):
        x = self.Sigmoid(self.linear(input))
        
        for layer in self.linear_h:
            x = self.Sigmoid(layer(x))
        
        ##x = self.Sigmoid(self.linearM(x))

        yh = self.linear2(x)
        return yh

In [None]:
class EarlyStop():
    def __init__(self, patience):
        self.last = 999999999999999999.99999
        self.cnt = 0
        self.patience = patience
    
    def item(self, item):
        stop = False
        if item >= self.last:
            #print(item, self.last)
            self.cnt += 1
            if self.cnt >= self.patience:
                stop = True
        else:
            self.cnt = 0
        self.last = item
        
        return stop
    

In [None]:
def accuracy(scores, y):
    predictions = [(0, 1)[i > .5] for i in scores] 
    num_correct =sum(a_ == b_ for a_, b_ in zip(predictions, y)) 
    accuracy = num_correct / len(y)
    return accuracy


scores = [0.1, 0.2, 0.5, 0.6, 0.8]
y = [0, 1, 0, 1, 1]

if accuracy(scores, y) == 1.0:
    print("done")

print(accuracy(scores, y))

In [None]:
def train_once(nr_inp, nr_hid, Xs, y):
    #xor_network = XOR(nr_inp, nr_hid).cuda()
    
    xor_network = NN_multilayer(nr_inp, nr_hid, 1).cuda()
    
    epochs = 80000 
    mseloss = nn.MSELoss() 
    optimizer = torch.optim.Adam(xor_network.parameters(), lr = 0.03) 
    all_losses = [] 
    current_loss = 0 
    plot_every = 100 

    cuda = torch.device('cuda')  

    #Xs = torch.tensor([[0.,0], [0,1],[1.,0], [1,1]]).cuda()
    #y = torch.tensor([[0.], [1], [1.], [0]]).cuda()
    
    converged = False
    
    
    patience = 125    
    early_stop = EarlyStop(patience)

   
    for epoch in tqdm(range(epochs)): 

        # input training example and return the prediction   
        yhat = xor_network.forward(Xs)

        # calculate MSE loss   
        loss = mseloss(yhat, y)

        # backpropogate through the loss gradiants   
        loss.backward()

        # update model weights   
        optimizer.step()

        # remove current gradients for next iteration   
        optimizer.zero_grad() 

        # append to loss   
        current_loss += loss  
        
        if epoch % plot_every == 0:       
            all_losses.append(current_loss / plot_every)  
            if (current_loss) < 0.00001:
                    converged = True
                    #print (epoch)
                    break
            
            
            scores = xor_network(Xs)
            accur = accuracy(scores, y)
            
            if accur == 1.0:
                converged = True
                print("Perfect accuracy")
                break
            
            #print(scores, y, accur)
            """predictions = [(0, 1)[i < .5] for i in scores] 
            num_correct += (predictions == y).sum()
            
            """
            #if test_performance(): 
            #    pass
            
            
            res = early_stop.item(current_loss)
            if res:
                print("...early stopping", current_loss.cpu().detach().numpy())
                break
                
            current_loss = 0

        # print progress   
        #if epoch > 0 and epoch % 500 == 0:     
        #    print(f'Epoch: {epoch} completed')
        
    
    
    #print("Loss: ", current_loss.cpu().detach().numpy())
    
    
    #To compute the number of trainable parameters:
    model_parameters = filter(lambda p: p.requires_grad, xor_network.parameters())
    params = sum([np.prod(p.size()) for p in model_parameters])
    #print("Pars:", params)
    
    
    return current_loss.cpu().detach().numpy(), epoch, converged, params, accur.cpu().detach().numpy()
    
    


## Generate data for generalized XOR  (inverters)

In [None]:
def inverter_data(level):
    O = [[]]
    I = [[0]]

    for i in range(level):

        Out = deepcopy(O) 
        Inp = deepcopy(I)

        for elem in O:
            Out.append(elem)

        for elem in I:
            Inp.append(elem)

        for n, elem  in enumerate(Out):
            #print (n, elem)
            if n < len(Out)//2:
                elem.insert(0, 0)
            else:
                elem.insert(0, 1)

        for n, elem in enumerate(Inp):
            #print (n, elem)
            if n < len(Inp)//2:
                elem[0] = 1 - elem[0]
            #print (n, elem)

        #print(Out)
        O = deepcopy(Out)
        I = deepcopy(Inp)
    
    return Out, Inp 




In [None]:
X, Y = inverter_data(level = 5)
print(X)
print(Y)

# Learn generlized XOR

In [None]:
inversion_depth = 10 #8, 9, 10

neurons_start = 3   #9, 10, 10

                    #120k, 400k, 800k
    

#n_hidden_layers = 22222 
    

n_step = 1

#Xs = torch.tensor([[0.,0], [0,1],[1.,0], [1,1]]).cuda()
#y = torch.tensor([[0.], [1], [1.], [0]]).cuda()

Xs, y = inverter_data(inversion_depth)

Xs[0][0] = float(Xs[0][0])
y[0][0] = float(y[0][0])
#print(Xs)
#print(y)
Xs = torch.tensor(Xs).cuda()
y = torch.tensor(y).cuda()

print("inversion_depth", inversion_depth)

h_nrs = []
params = []
epch = []

for rep in range(10):

    for i in range(neurons_start, 150, n_step):
        hid = 1+i
        loss, ep, cnv, prcnt, acc = train_once(inversion_depth, hid, Xs, y)

        if cnv:
            string = "Converged with"
        else:
            string = "--- "

        print(string, "loss:", loss, "in", ep+1, "epochs;", hid, "hidden neurons and",  prcnt, "parameters; Accuracy:", acc,  "; Repetition:", rep+1)
        torch.cuda.empty_cache()

        if cnv:
            break
    
    h_nrs.append(hid)
    params.append(prcnt)
    epch.append(ep+1)
    
print()
print("inversion_depth", inversion_depth)    
print(h_nrs, np.mean(h_nrs), np.std(h_nrs))
print(params, np.mean(params), np.std(params))
print(epch, np.mean(epch), np.std(epch))

# Generate data for AND and OR functions

In [None]:
def AND_OR(depth = 2, logic = "mix"):
    if depth == 1:
        return 0, 0
    
    #if depth == 2: #AND
    #    return [[0,0], [0,1], [1,0], [1,1]], [[0], [0], [0], [1]]
    
       
    I = [[0,0], [0,1], [1,0], [1,1]]
    
    if logic == "mix" or logic == "AND":
        O = [[0],[0],[0],[1]]
        
    else:
        O = [[0],[1],[1],[1]]
    #Out = deepcopy(O) 
    #Inp = deepcopy(I)
    
    if depth == 2:
        return I, O
    
    for i in range(depth-2):
        Out = deepcopy(O) 
        Inp = deepcopy(I)
        
        for elem in O:
            Out.append(elem)

        for elem in I:
            Inp.append(elem)
            
            
        for n, elem  in enumerate(Inp):
            #print (n, elem)
            if n < len(Out)//2:
                elem.insert(0, 0)
            else:
                elem.insert(0, 1)

        for n, elem in enumerate(Out):
            #print (n, elem)
            if n >= len(Inp)//2:
                #elem[0] = 1 - elem[0]
                #print (n, elem)
        
                if (i%2 == 0 and logic == "mix") or logic == "OR":
                    #OR
                    elem[0] = int(any(Inp[n][1:]))
                    
                else:
                    #AND
                    elem[0] = int(all(Inp[n][1:]))
                
        
        
        O = deepcopy(Out)
        I = deepcopy(Inp)
       
         
    return Inp, Out
    


In [None]:
X, Y = AND_OR(depth = 4, logic = "AND")
print(X)
print("---------------------------")
print(Y)

# Learn AND and OR functions

In [None]:
inversion_depth = 12 #8, 9, 10

neurons_start = 2   #9, 10, 10

                    #120k, 400k, 800k
    

#n_hidden_layers = 22222 
    

n_step = 1

#Xs = torch.tensor([[0.,0], [0,1],[1.,0], [1,1]]).cuda()
#y = torch.tensor([[0.], [1], [1.], [0]]).cuda()

#Xs, y = inverter_data2(inversion_depth)

Xs, y = AND_OR(inversion_depth, "AND")

Xs[0][0] = float(Xs[0][0])
y[0][0] = float(y[0][0])
#print(Xs)
#print(y)
Xs = torch.tensor(Xs).cuda()
y = torch.tensor(y).cuda()

print("inversion_depth", inversion_depth)

h_nrs = []
params = []
epch = []

for rep in range(10):

    for i in range(neurons_start, 150, n_step):
        hid = 1+i
        loss, ep, cnv, prcnt, acc = train_once(inversion_depth, hid, Xs, y)

        if cnv:
            string = "Converged with"
        else:
            string = "--- "

        print(string, "loss:", loss, "in", ep+1, "epochs;", hid, "hidden neurons and",  prcnt, "parameters; Accuracy:", acc,  "; Repetition:", rep+1)
        torch.cuda.empty_cache()

        if cnv:
            break
    
    h_nrs.append(hid)
    params.append(prcnt)
    epch.append(ep+1)
    
print()
print("inversion_depth", inversion_depth)    
print(h_nrs, np.mean(h_nrs), np.std(h_nrs))
print(params, np.mean(params), np.std(params))
print(epch, np.mean(epch), np.std(epch))