In [1]:
import torch
import torch.nn as nn
device = "cuda"
device = torch.device(device)

In [27]:
noise = 0.05
noise_n = 1

train_data = []
for i in range(1000):
    b = torch.tensor([i%2], dtype=torch.float32)
    b += (torch.FloatTensor(1).uniform_(-noise, +noise))
    b = b.to(device)

    res = torch.tensor([(i+1)%2], dtype=torch.float32).to(device)
    train_data.append((b,res))

In [50]:
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data.sampler import SubsetRandomSampler
import numpy as np

def get_loaders(train_data, device=device):
    test_size = 0.05
    valid_size = 0.05
    batch_size = 50
    num_workers = 0

    #cuda or cpu
    device = torch.device(device)

    num_train = len(train_data)
    indices = list(range(num_train))
    np.random.shuffle(indices)
    split = int(np.floor(test_size * num_train))
    split2 = int(np.floor((valid_size+test_size) * num_train))
    train_idx, valid_idx, test_idx = indices[split2:], indices[split:split2], indices[:split]

    train_sampler = SubsetRandomSampler(train_idx)
    valid_sampler = SubsetRandomSampler(valid_idx)
    test_sampler = SubsetRandomSampler(test_idx)

    # prepare data loaders
    train_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, sampler=train_sampler, num_workers=num_workers)
    valid_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, sampler=valid_sampler, num_workers=num_workers)
    test_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, sampler=test_sampler, num_workers=num_workers)
    return train_loader, valid_loader, test_loader

train_loader, valid_loader, test_loader = get_loaders(train_data)

In [51]:
import numpy as np
def noise_to_int(bits):
    bits = [round(float(b)) for b in bits]
    bits = "".join([str(b) if b in [0,1] else "0" if b<1/10**5 else "1" for b in bits])
    return int(bits,2)



In [52]:
import torch.nn.functional as F

X, Y = next(iter(train_loader))

for x,y in zip(X,Y):
    a = x
    b = y
    b1 = torch.round(x-1. if x > 0.5 else x+1)
    assert b == b1
print("ok")

ok


In [53]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math


class Norm(nn.Module):
    def __init__(self, num_hidden, eps = 1e-6):
        super().__init__()
    
        self.size = num_hidden
        
        # create two learnable parameters to calibrate normalisation
        self.alpha = nn.Parameter(torch.ones(self.size))
        self.bias = nn.Parameter(torch.zeros(self.size))
        
        self.eps = eps
    
    def forward(self, x):
        norm = self.alpha * (x - x.mean(dim=-1, keepdim=True)) \
        / (x.std(dim=-1, keepdim=True) + self.eps) + self.bias
        return norm


def attention(q, k, v, d_k, mask=None, dropout=None):
    scores = torch.matmul(q, k.transpose(-2, -1)) /  math.sqrt(d_k)
    # print("scores",scores.shape)
    if mask is not None:
        # print("mask",mask.shape)
        mask = mask.unsqueeze(1)
        scores = scores.masked_fill(mask == 0, -1e9)
    
    scores = F.softmax(scores, dim=-1)
    
    if dropout is not None:
        scores = dropout(scores)
    
    # print("v",v.shape)
    output = torch.matmul(scores, v)
    return output

class MultiHeadAttention(nn.Module):
    def __init__(self, heads, num_hidden, dropout = 0.1):
        super().__init__()
        
        self.num_hidden = num_hidden
        self.d_k = num_hidden // heads
        self.h = heads
        
        self.q_linear = nn.Linear(num_hidden, num_hidden)
        self.v_linear = nn.Linear(num_hidden, num_hidden)
        self.k_linear = nn.Linear(num_hidden, num_hidden)
        
        self.dropout = nn.Dropout(dropout)
        self.out = nn.Linear(num_hidden, num_hidden)
    
    def forward(self, q, k, v, mask=None):
        
        bs = q.size(0)
        
        # perform linear operation and split into N heads
        q = self.q_linear(q).view(bs, -1, self.h, self.d_k)
        k = self.k_linear(k).view(bs, -1, self.h, self.d_k)
        
        v = self.v_linear(v).view(bs, -1, self.h, self.d_k)
        
        # transpose to get dimensions bs * N * sl * num_hidden
        k = k.transpose(1,2)
        q = q.transpose(1,2)
        v = v.transpose(1,2)
        

        # calculate attention using function we will define next
        scores = attention(q, k, v, self.d_k, mask, self.dropout)
        # concatenate heads and put through final linear layer
        concat = scores.transpose(1,2).contiguous()\
        .view(bs, -1, self.num_hidden)
        output = self.out(concat)
    
        return output

class FeedForward(nn.Module):
    def __init__(self, input_num, output_num, d_ff=2048, dropout = 0.1):
        super().__init__() 
    
        # We set d_ff as a default to 2048
        self.linear_1 = nn.Linear(input_num, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff, output_num)
    
    def forward(self, x):
        x = self.dropout(F.relu(self.linear_1(x)))
        x = self.linear_2(x)
        return x

In [54]:
import torch.nn.functional as F

class Not(nn.Module):
    def __init__(self):
        super(Not, self).__init__()
        
        self.body = nn.Sequential(
            nn.Linear(1,1),
            nn.Sigmoid(),
        )
        
    def forward(self, x):
        out = self.body(x)
        return out

In [95]:
def train(save_file, model, criterion, train_loader, valid_loader, optimizer=None, n_epochs = 100000, f=noise_to_int, lrate=0.005):
    # number of epochs to train the model

    if optimizer is None:
        # specify optimizer (stochastic gradient descent) and learning rate = 0.001
        optimizer = torch.optim.Adam(model.parameters(), lr=lrate)#, weight_decay=0.00000001)

    # initialize tracker for minimum validation loss
    valid_loss_min = np.Inf # set initial "min" to infinity
    
    for epoch in range(n_epochs):
        # monitor training loss
        train_loss = 0.0
        valid_loss = 0.0
        results = 0
        results_n = 0
        ###################
        # train the model #
        ###################
        model.train() # prep model for training
        i=0
        for X, target in train_loader:
            i+=1
            # clear the gradients of all optimized variables
            optimizer.zero_grad()
            # forward pass: compute predicted outputs by passing inputs to the model
            target = target.to(device)
            output = model(X)
            # calculate the loss
            # print(output)
            # print(target)
            loss = criterion(output, target) #
            # backward pass: compute gradient of the loss with respect to model parameters
            loss.backward()
            # perform a single optimization step (parameter update)
            optimizer.step()
            # update running training loss
            train_loss += loss.item()*X.size(0)
            if epoch%100 == 0:
                for x,y in zip(output,target):
                    # print(x.cpu().detach().numpy(),y)
                    a = f(x.cpu().detach().numpy())
                    # a = int(x[0])
                    b = f(y.cpu().detach().numpy())
                    # b = int(y[0])
                    # a = noise_to_int(x)
                    # b = noise_to_int(y)
                    
                    
                    # print(a,b)
                    # print(float(x[0]),float(y[0]))
                    if np.allclose(a,b, atol=1e-10):
                        
                        results +=1
                    results_n+=1
        ######################    
        # validate the model #
        ######################
        model.eval() # prep model for evaluation
        for X, target in valid_loader:
        
            # forward pass: compute predicted outputs by passing inputs to the model
            output = model(X)
            # target = target.to(device)
            # calculate the loss
            loss = criterion(output, target)
            # update running validation loss
            valid_loss += loss.item()*X.size(0)
            

        # print training/validation statistics
        # calculate average loss over an epoch
        train_loss = train_loss/len(train_loader.dataset)
        valid_loss = valid_loss/len(valid_loader.dataset)

        print('Epoch: {} \tTraining Loss: {:.12f} \tValidation Loss: {:.12f}'.format(
            epoch+1,
            train_loss,
            valid_loss
            ))

        # save model if validation loss has decreased
        if valid_loss <= valid_loss_min:
            print('Validation loss decreased ({:.12f} --> {:.12f}).  Saving model ...'.format(
                valid_loss_min,
                valid_loss))
            torch.save(model.state_dict(), save_file)
            valid_loss_min = valid_loss
            if train_loss <= 1e-12:
                print("stop: loss <= 0.00000")
                return
            else:
                print(" loss >= 0.00000")
        
        if results_n != 0 :
            print(f"{results/results_n=}")
            print(f"{results}")
            # if results == results_n and valid_loss <= valid_loss_min:
            #     print("stop: no errors")
            #     return
        

In [97]:
# model = Not().to(device)
criterion = nn.MSELoss()
train("not.pt", model, criterion, train_loader, valid_loader, f=lambda x:x, lrate=0.00005)

Epoch: 1 	Training Loss: 0.000000000028 	Validation Loss: 0.000000000002
Validation loss decreased (inf --> 0.000000000002).  Saving model ...
 loss >= 0.00000
results/results_n=0.44222222222222224
398
Epoch: 2 	Training Loss: 0.000000000028 	Validation Loss: 0.000000000002
Epoch: 3 	Training Loss: 0.000000000028 	Validation Loss: 0.000000000002
Epoch: 4 	Training Loss: 0.000000000028 	Validation Loss: 0.000000000002
Validation loss decreased (0.000000000002 --> 0.000000000002).  Saving model ...
 loss >= 0.00000
Epoch: 5 	Training Loss: 0.000000000028 	Validation Loss: 0.000000000002
Epoch: 6 	Training Loss: 0.000000000028 	Validation Loss: 0.000000000002
Epoch: 7 	Training Loss: 0.000000000028 	Validation Loss: 0.000000000002
Epoch: 8 	Training Loss: 0.000000000028 	Validation Loss: 0.000000000002
Epoch: 9 	Training Loss: 0.000000000028 	Validation Loss: 0.000000000002
Validation loss decreased (0.000000000002 --> 0.000000000002).  Saving model ...
 loss >= 0.00000
Epoch: 10 	Trainin

KeyboardInterrupt: 

In [113]:
model = Not().to(device)
model.load_state_dict(torch.load("not.pt"))

<All keys matched successfully>

In [114]:
model.eval()
X, Y = next(iter(train_loader))

# print(X,Y)
results = 0
results_n = 0
O = model(X)
for x,y in zip(O,Y):
    a = x.cpu().detach().numpy()
    b = y.cpu().detach().numpy()
    # b = int(y[0]) pos_to_int
    # b = noise_to_int(y)
    # print(x,y)
    print(a,b)
    
    if np.allclose(a,b,atol=1e-12):
        
        results +=1
    results_n +=1
print(f"{results/results_n=}")

[0.] [0.]
[1.] [1.]
[0.] [0.]
[1.] [1.]
[1.] [1.]
[1.] [1.]
[1.] [1.]
[1.] [1.]
[0.] [0.]
[1.] [1.]
[1.] [1.]
[1.] [1.]
[0.] [0.]
[1.] [1.]
[1.] [1.]
[0.] [0.]
[1.] [1.]
[0.] [0.]
[0.] [0.]
[1.] [1.]
[1.] [1.]
[1.] [1.]
[1.] [1.]
[1.] [1.]
[0.] [0.]
[0.] [0.]
[1.] [1.]
[0.] [0.]
[0.] [0.]
[0.] [0.]
[0.] [0.]
[1.] [1.]
[1.] [1.]
[1.] [1.]
[0.] [0.]
[0.] [0.]
[0.] [0.]
[0.] [0.]
[0.] [0.]
[0.] [0.]
[1.] [1.]
[1.] [1.]
[0.] [0.]
[0.] [0.]
[0.] [0.]
[1.] [1.]
[1.] [1.]
[0.] [0.]
[0.] [0.]
[0.] [0.]
results/results_n=1.0


In [125]:
model.eval()
X, Y = next(iter(train_loader))

# print(X,Y)
results = 0
results_n = 0
O = X
n = 1000
for i in range(n):
    O = model(O)
for x,y in zip(X,O):
    # print(x,y)
    a = x.cpu().detach().numpy()
    b = y.cpu().detach().numpy()
    # b = int(y[0]) pos_to_int
    # b = noise_to_int(y)
    # print(a,b)
    r = a
    for i in range(n):
        r = np.round(r-1.0 if r > 0.5 else r+1)
    if np.allclose(r,b,atol=1e-12):
        
        results +=1
    results_n +=1
print(f"after {n} mix columns")
print(f"{results/results_n=}")
# print(model.state_dict())

after 1000 mix columns
results/results_n=1.0


In [112]:
import torch
state = torch.load("not.pt")

for w in state["body.0.weight"]:
    for i in range(len(w)):
        if w[i] < 0:
            w[i] = -10000000.0
        else:
            w[i] = 0.0

for i in range(len(state["body.0.bias"])):
    state["body.0.bias"][i] = 5000000.0
torch.save(state,"not.pt")
print(state)

OrderedDict([('body.0.weight', tensor([[-10000000.]], device='cuda:0')), ('body.0.bias', tensor([5000000.], device='cuda:0'))])
