# Robust Lookup Table: Coordinates prediction

The goal of this notebook is to predict the coordinates of the backends in the output lookup table.

## TODO

- [ ] Add noise to the input
- [ ] Add dropout
- [ ] Add regularization

In [1]:
import os
import math
import datetime
import time
import uuid
import random
import hashlib

import torch
from torch import nn

import numpy as np
from matplotlib import pyplot as plt

device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu"
print(f"Using {device} device")

Using cpu device


In [2]:
# Hyper parameters
learning_rate = 1e-3
epochs = 200
batch_size = 32

In [3]:
# Experiment parameters
# -- n_max: the maximum number of backends.
n_max = 5
# -- m: the size of the lookup table.
m = 15
# -- o: the number of axis for the coordinates.
o = 2
# scenario variables:
nBeforeBounds = (3, n_max)
nAfterBounds = (1, n_max)
variance = (1.0, 10)

In [4]:
# Config
# - nBeforeBounds(x, y): nBefore ∈ [x, y].
# - nAfterBounds(x, y): nAfter ∈ [x, y].
# - variance(x, y): x < min(nBefore,nAfter)/max(nBefore,nAfter); y < max(nBefore,nAfter) - min(nBefore,nAfter)
# - m: the fixed size of the lookup table.
# - o: the number of axis for coordinates.
class Config:
    n_max: int
    m: int
    o: int
    nBeforeBounds: (int, int)
    nAfterBounds: (int, int)
    variance: (float, int)

    def __init__(
        self,
        n_max: int,
        m: int,
        o: int,
        nBeforeBounds: (int, int),
        nAfterBounds: (int, int),
        variance: (float, int),
        # sizeBounds: (int, int),
    ):
        if nBeforeBounds[1] > m or nAfterBounds[1] > m:
            raise Exception("nBeforeBounds and nAfterBounds cannot exceed size")

        self.n_max = n_max
        self.m = m
        self.o = o
        self.nBeforeBounds = nBeforeBounds
        self.nAfterBounds = nAfterBounds
        self.variance = variance

Config(n_max, m, o, nBeforeBounds, nAfterBounds, variance).__dict__

{'n_max': 5,
 'm': 15,
 'o': 2,
 'nBeforeBounds': (3, 5),
 'nAfterBounds': (1, 5),
 'variance': (1.0, 10)}

In [5]:
# m: the size of the lookup table.
# o: the number of axis for the coordinates.
# the function returns the backend coordinates of size (o,)
def NewBackendCoordinates(m: int, o: int):
    id = uuid.uuid4()
    _h = hashlib.sha256()
    _h.update(id.bytes_le)
    hash_bytes = _h.digest()

    coord = []
    hash_length = 32
    slice = int(hash_length/o)

    for i in range(o):
        start = i*slice
        end = (i+1)*slice
        if start >= hash_length:
            continue
        if end > hash_length:
            end = hash_length
        hmod = int.from_bytes(hash_bytes[start:end], "little") % m
        coord.append(hmod)

    if len(coord) < o:
        raise Exception(f"Cannot create new backend tensor! please decrease `o`: got {o}")

    return coord

NewBackendCoordinates(m, o)

[11, 4]

In [6]:
class Scenario:
    nBefore: int
    nAfter: int
    m: int
    o: int

    def __init__(
        self,
        cfg: Config,
    ):
        self.cfg = cfg
        self.nBefore = random.randint(cfg.nBeforeBounds[0], cfg.nBeforeBounds[1])
        self.nAfter = random.randint(cfg.nAfterBounds[0], cfg.nAfterBounds[1])

    def GenerateBackends(self):
        l_min = []
        l_max = []
        _min = min([self.nBefore, self.nAfter])
        _max = max([self.nBefore, self.nAfter])
        # create the l_max backend array.
        l_max = np.array([ NewBackendCoordinates(self.cfg.m, self.cfg.o) for _ in range(_max) ])
        # create l_min array by randomly choosing _min elements of l_max.
        l_min = np.random.permutation(l_max)[0:_min]
        # pad
        l_max = [l_max[i] if i < _max else [0, 0] for i in range(self.cfg.n_max) ]
        l_min = [l_min[i] if i < _min else [0, 0] for i in range(self.cfg.n_max) ]
        # they used to be sorted but we don't need to anymore.
        if self.nBefore < self.nAfter:
            return (l_min, l_max)
        return (l_max, l_min)

def validate_scenario(cfg: Config, scenario: Scenario) -> bool:
    var = cfg.variance[0]
    delta = cfg.variance[1]

    _min = min([scenario.nBefore, scenario.nAfter])
    _max = max([scenario.nBefore, scenario.nAfter])
    _var = _min/_max
    _delta = _max - _min
    _sz = scenario.cfg.m

    return _var <= var and _delta <= delta and _max <= _sz and _min != _max

# creates a new scenario generator.
def NewScenarioGenerator(cfg):
    while True:
        scenario = Scenario(cfg)
        if validate_scenario(cfg, scenario):
            yield scenario

cfg = Config(n_max, m, o, nBeforeBounds, nAfterBounds, variance)
scgen = next(NewScenarioGenerator(cfg))
print(scgen.__dict__)
bef, aft = scgen.GenerateBackends()
print("before:\n", bef)
print("after:\n", aft)

{'cfg': <__main__.Config object at 0x7886a80eff20>, 'nBefore': 5, 'nAfter': 1}
before:
 [array([ 1, 14]), array([2, 0]), array([ 5, 12]), array([1, 1]), array([ 1, 14])]
after:
 [array([1, 1]), [0, 0], [0, 0], [0, 0], [0, 0]]


In [13]:
# NewBatchGenerator yields a tuple of 2 batch of size batch_size.
def NewBatchGenerator(batch_size: int, scenario_generator):
    while True:
        before = []
        after = []
        for _ in range(batch_size):
            b, a = next(scenario_generator).GenerateBackends()
            before.append(b)
            after.append(a)
        yield (
            torch.tensor(np.array(before), dtype=torch.float32, requires_grad=False),
            torch.tensor(np.array(after), dtype=torch.float32, requires_grad=False),
        )

cfg = Config(n_max, m, o, nBeforeBounds, nAfterBounds, variance)
scenario_generator = NewScenarioGenerator(cfg)
bef, aft = next(NewBatchGenerator(batch_size, scenario_generator))
print(bef[0][0])
print(f"tensor size must be: [{batch_size}, {cfg.n_max}, {cfg.o}]")
bef.size(), aft.size()

tensor([13.,  6.])
tensor size must be: [32, 5, 2]


(torch.Size([32, 5, 2]), torch.Size([32, 5, 2]))

In [19]:
class NN(nn.Module):
    n_max: int
    m: int
    o: int

    flatten: nn.Flatten
    seq: nn.Sequential

    def __init__(
        self,
        n_max: int,
        m: int,
        o: int,
        hl_size=512, # hidden layer size
    ):
        super().__init__()
        self.n_max = n_max
        self.m = m
        self.o = o

        in_size = n_max*o
        out_size = m*o
        
        self.flatten = nn.Flatten(start_dim=1)
        self.seq = nn.Sequential(
            nn.Linear(in_size, hl_size),
            nn.ReLU(),
            nn.Linear(hl_size, hl_size),
            nn.ReLU(),
            nn.Linear(hl_size, out_size),
        )

    # takes an input of size:    [batch_size, n_max, o]
    # returns an output of size: [batch_size, m, o]
    def forward(self, x):
        x = self.flatten(x)
        x = nn.functional.normalize(x, dim=1) # normalize in.
        x = self.seq(x)
        x = nn.functional.normalize(x, dim=1) # normalize out.
        return x.view(-1, self.m, self.o) # make it 2-dimensional array.


cfg = Config(n_max, m, o, nBeforeBounds, nAfterBounds, variance)
scenario_generator = NewScenarioGenerator(cfg)
bef, _ = next(NewBatchGenerator(batch_size, scenario_generator))
model = NN(cfg.n_max, cfg.m, cfg.o, hl_size=32).to(device)
model(bef).size()

torch.Size([32, 15, 2])

## Loss functions

- Stability: measures how well the model predicts a lookup table that matches as much as possible entries from the "before" state.
- Accuracy: measures how precisely the model predicts coordinates. The precision is measured by comparing input and output coordinates.
- Distribution: measures how evenly the model distributes coordinates into the lookup table.