In [3]:
import gc
import multiprocessing
from typing import Tuple, Any
!pip install pytorch-lightning



In [4]:
import numpy as np
import pytorch_lightning as pl
import torch
import torch.nn as nn
import torch.nn.functional as F
import os
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

In [5]:
from src.tools import *

In [6]:
torch.set_printoptions(precision=3, edgeitems=20, linewidth=250)
torch.set_float32_matmul_precision('high')
np.set_printoptions(precision=3, suppress=True, edgeitems=20, linewidth=250)
INT_BITS = 32
INT_MAX = (1 << (INT_BITS - 1)) - 1
INT_MIN = -(1 << (INT_BITS - 1))

In [7]:
class AND(nn.Module):
    def __init__(self):
        super().__init__()

        self.l1 = nn.Sequential(nn.Linear(2, 1), nn.ReLU())
        
        with torch.no_grad():
            self.l1[0].weight.fill_(1.0)
            self.l1[0].bias.fill_(-1.0)
        
        for layer in self.l1:
            if isinstance(layer, nn.Linear):
                layer.weight = nn.Parameter(layer.weight.double())
                layer.bias = nn.Parameter(layer.bias.double())

    def forward(self, x):
        return self.l1(x.double())

In [29]:
class OR(nn.Module):
    def __init__(self):
        super().__init__()

        self.l1 = nn.Sequential(nn.Linear(2, 1), nn.Hardtanh(max_val=1.0, min_val=-1.0))
        
        with torch.no_grad():
            self.l1[0].weight.fill_(1.0)
            self.l1[0].bias.fill_(0)
        
        for layer in self.l1:
            if isinstance(layer, nn.Linear):
                layer.weight = nn.Parameter(layer.weight.double())
                layer.bias = nn.Parameter(layer.bias.double())

    def forward(self, x):
        return self.l1(x.double())

In [53]:
class XOR(nn.Module):
    def __init__(self):
        super().__init__()

        self.vals = [AND(), OR()]
        
        for layer in self.vals:
            if isinstance(layer, nn.Linear):
                layer.weight = nn.Parameter(layer.weight.double())
                layer.bias = nn.Parameter(layer.bias.double())

    def forward(self, x):
        pred1 = self.vals[0](x)
        pred2 = self.vals[1](x)
        reshaped = torch.concatenate([torch.ones_like(pred1) - pred1, pred2], axis=1).reshape(len(pred1), 2)
        return self.vals[0](reshaped)

In [33]:
and1 = AND()
and1.forward(torch.tensor([[0, 0], [1, 0], [0, 1], [1, 1]], dtype=torch.float32))

AND(
  (l1): Sequential(
    (0): Linear(in_features=2, out_features=1, bias=True)
    (1): ReLU()
  )
)


tensor([[0.],
        [0.],
        [0.],
        [1.]], dtype=torch.float64, grad_fn=<ReluBackward0>)

In [54]:
xor = XOR()
list(xor.parameters())
xor.forward(torch.tensor([[0, 0], [1, 0], [0, 1], [1, 1]], dtype=torch.float32))

AND(
  (l1): Sequential(
    (0): Linear(in_features=2, out_features=1, bias=True)
    (1): ReLU()
  )
)


tensor([[0.],
        [1.],
        [1.],
        [0.]], dtype=torch.float64, grad_fn=<ReluBackward0>)

In [91]:
class SumBinModule(nn.Module):
    def __init__(self, n_bits = 32):
        super().__init__()
        self.XOR = XOR()
        self.AND = AND()
        self.OR = OR()
        
    def forward(self, x:  torch.Tensor):
    
        reshaped = x.reshape(len(x) // 2, 2)
        after_and = self.AND(reshaped)
        after_xor = self.XOR(reshaped)
        after_roll = torch.roll(after_and, -1)
        after_roll[-1] = 0.0
        concat = torch.concatenate((after_roll, after_xor), axis=1)
        return torch.flatten(self.OR(concat))
        

In [92]:
summer = SumBinModule()
summer.forward(torch.tensor([0, 0, 1, 1, 1, 1], dtype=torch.float32))

AND(
  (l1): Sequential(
    (0): Linear(in_features=2, out_features=1, bias=True)
    (1): ReLU()
  )
)
AND(
  (l1): Sequential(
    (0): Linear(in_features=2, out_features=1, bias=True)
    (1): ReLU()
  )
)


tensor([1., 1., 0.], dtype=torch.float64, grad_fn=<ReshapeAliasBackward0>)

In [93]:

class SumNaturalBinAutoEncoder(pl.LightningModule):
    def __init__(self, encoder: SumBinModule, loss_function=F.mse_loss, lr=3 * 1e-3):
        super().__init__()
        self.encoder = encoder
        self.loss_function = loss_function
        self.lr = lr

    def predict(self, x):
        return self.encoder(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self.encoder(x)
        loss = self.loss_function(y_hat, y)
        return loss

    @staticmethod
    def _get_accuracy(y_hat, y) -> tuple[float, float]:
        eq = (y == torch.round(y_hat))
        return eq.all(axis=1).double().mean(), eq.double().mean()

    def test_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self.encoder(x)
        loss = self.loss_function(y_hat, y)
        acc, bin_acc = self._get_accuracy(y_hat, y)
        metrics = {"test_acc": acc, "test_loss": loss, "test_bin_acc": bin_acc}
        self.log_dict(metrics)
        return metrics

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=2.5 * 1e-3)
        return optimizer

    def validation_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self.encoder(x)
        val_loss = self.loss_function(y_hat, y)
        acc, bin_acc = self._get_accuracy(y_hat, y)
        metrics = {"test_acc": acc, "test_loss": val_loss, "test_bin_acc": bin_acc}
        self.log_dict(metrics)
        return metrics

        # Return the validation loss and any other metrics you computed
        # return {'val_loss': val_loss, 'val_accuracy': accuracy}

In [94]:
from torch.utils.data import Dataset
import torch
import os
from tqdm import tqdm
import torch.random
import numpy as np
import pandas as pd

In [95]:
import warnings


class SimpleRandomNaturalBinSumDataset(Dataset):
    def __init__(self, size, transform=None, target_transform=None):
        self.size = size
        self.transform = transform
        self.target_transform = target_transform
        ints1 = np.random.randint(INT_MIN, INT_MAX, size=self.size, dtype=np.int32)
        ints2 = np.random.randint(INT_MIN, INT_MAX, size=self.size, dtype=np.int32)

        array1 = np.array([SimpleRandomNaturalBinSumDataset.decompose_to_bool_array(i) for i in tqdm(ints1)])
        array2 = np.array([SimpleRandomNaturalBinSumDataset.decompose_to_bool_array(i) for i in tqdm(ints2)])
        self.X = np.concatenate((array1.reshape((self.size, INT_BITS)),
                                 array2.reshape((self.size, INT_BITS))), axis=1)

        self.labels = np.array([SimpleRandomNaturalBinSumDataset.decompose_to_bool_array(i) for i in
                                tqdm((ints1 + ints2) % (1 << INT_BITS) + INT_MIN)])

    @staticmethod
    def compose_from_bool_array(bool_array):
        if not isinstance(bool_array, np.ndarray):
            raise ValueError("bool_array should be a numpy ndarray.")

        if bool_array.shape != (INT_BITS,):
            raise ValueError("bool_array should be a 1-dimensional array of size 32.")

        binary_str = "".join(str(int(bit)) for bit in bool_array)
        first_bit = binary_str[0]
        number_str = binary_str[1:]

        if first_bit == '0':
            return int(number_str, 2)
        else:
            return int(number_str, 2) + INT_MIN

    @staticmethod
    def decompose_to_bool_array(number):
        if not isinstance(number, np.int32) and not isinstance(number, np.int64) and not isinstance(number, int):
            raise ValueError(f"Number should be an integer. Received {type(number)}")

        if number < 0:
            return SimpleRandomNaturalBinSumDataset._decompose_positive_add_first_bit(number - INT_MIN, str(1))
        else:
            return SimpleRandomNaturalBinSumDataset._decompose_positive_add_first_bit(number, str(0))

    @staticmethod
    def _decompose_positive_add_first_bit(number, first_bit: str):
        binary_str = first_bit + bin(number)[2:].zfill(31)
        bool_array = np.array([int(bit) for bit in binary_str], dtype=np.float64)
        return bool_array

    def __len__(self):
        return self.size

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        values, labels = self.X[idx], self.labels[idx]

        if self.transform:
            values = self.transform(values)
        if self.target_transform:
            labels = self.target_transform(labels)
        return values, labels


In [96]:
class PyplineOfSunModule:
    def __init__(self, model: SumNaturalBinAutoEncoder):
        self.model = model

    def predict(self, x: int, y: int):
        expected = x + y
        x_arr = SimpleRandomNaturalBinSumDataset.decompose_to_bool_array(x)
        y_arr = SimpleRandomNaturalBinSumDataset.decompose_to_bool_array(y)
        model_input = torch.tensor(np.concatenate((x_arr, y_arr)))
        predictions = self.model.predict(model_input).detach().numpy().round()
        print(f"Prediction:\t{predictions}")
        print(f"Expected:\t{SimpleRandomNaturalBinSumDataset.decompose_to_bool_array(expected)}")
        return SimpleRandomNaturalBinSumDataset.compose_from_bool_array(predictions)


In [97]:
# pypline_of_sum = PyplineOfSunModule(torch.load(f"autoencoder-V03.pt"))
# pypline_of_sum.predict(np.int32(-3), np.int32(5))

In [98]:
# Parameters:
activation_fun = nn.Sigmoid()
# layer_sizes = [INT_BITS, 8, 8, 8, INT_BITS]
layer_sizes = [INT_BITS * 2, 512, 128, INT_BITS]
# layer_sizes = [INT_BITS * 2, 128, INT_BITS]
# layer_sizes = [1, 32, 32, 32, 1]

In [100]:
dataset = SimpleRandomNaturalBinSumDataset(1_000_000)
train_loader = DataLoader(dataset, batch_size=10_000,
                          num_workers=multiprocessing.cpu_count(),
                          )
valid_dataset = SimpleRandomNaturalBinSumDataset(100_000)
valid_loader = DataLoader(valid_dataset, batch_size=5_000,
                          num_workers=multiprocessing.cpu_count(),
                          )


  0%|          | 0/100000 [00:00<?, ?it/s][A
  8%|▊         | 8329/100000 [00:00<00:01, 83273.40it/s][A
 19%|█▉        | 18795/100000 [00:00<00:00, 95849.01it/s][A
 31%|███       | 30537/100000 [00:00<00:00, 105695.19it/s][A
 42%|████▏     | 42128/100000 [00:00<00:00, 109724.39it/s][A
 54%|█████▍    | 53767/100000 [00:00<00:00, 112124.59it/s][A
 66%|██████▌   | 65526/100000 [00:00<00:00, 113978.98it/s][A
 78%|███████▊  | 77515/100000 [00:00<00:00, 115904.67it/s][A
100%|██████████| 100000/100000 [00:00<00:00, 112858.32it/s][A

  0%|          | 0/100000 [00:00<?, ?it/s][A
 10%|█         | 10425/100000 [00:00<00:00, 104236.18it/s][A
 22%|██▏       | 22089/100000 [00:00<00:00, 111525.02it/s][A
 34%|███▍      | 34256/100000 [00:00<00:00, 116150.99it/s][A
 47%|████▋     | 46511/100000 [00:00<00:00, 118671.97it/s][A
 58%|█████▊    | 58406/100000 [00:00<00:00, 118768.38it/s][A
 71%|███████   | 70722/100000 [00:00<00:00, 120257.18it/s][A
 83%|████████▎ | 82748/100000 [00:00<00:0

In [101]:
# model
autoencoder = SumNaturalBinAutoEncoder(SumBinModule(32))

AND(
  (l1): Sequential(
    (0): Linear(in_features=2, out_features=1, bias=True)
    (1): ReLU()
  )
)
AND(
  (l1): Sequential(
    (0): Linear(in_features=2, out_features=1, bias=True)
    (1): ReLU()
  )
)


In [102]:
test_dataset = SimpleRandomNaturalBinSumDataset(500_000)
test_loader = DataLoader(test_dataset, batch_size=1000, num_workers=multiprocessing.cpu_count())


  0%|          | 0/500000 [00:00<?, ?it/s][A
  1%|▏         | 6747/500000 [00:00<00:07, 67463.95it/s][A
  4%|▍         | 18826/500000 [00:00<00:04, 98827.51it/s][A
  6%|▌         | 31129/500000 [00:00<00:04, 109867.60it/s][A
  9%|▊         | 43066/500000 [00:00<00:04, 113615.03it/s][A
 11%|█         | 55641/500000 [00:00<00:03, 117976.50it/s][A
 14%|█▎        | 67855/500000 [00:00<00:03, 119387.50it/s][A
 16%|█▌        | 80430/500000 [00:00<00:03, 121463.67it/s][A
 19%|█▊        | 92947/500000 [00:00<00:03, 122637.93it/s][A
 21%|██        | 105264/500000 [00:00<00:03, 122801.08it/s][A
 24%|██▎       | 117698/500000 [00:01<00:03, 123268.73it/s][A
 26%|██▌       | 130025/500000 [00:01<00:03, 122048.93it/s][A
 28%|██▊       | 142233/500000 [00:01<00:02, 121153.04it/s][A
 31%|███       | 154512/500000 [00:01<00:02, 121639.38it/s][A
 33%|███▎      | 166698/500000 [00:01<00:02, 121700.71it/s][A
 36%|███▌      | 179276/500000 [00:01<00:02, 122921.10it/s][A
 38%|███▊      | 19

In [103]:
# train model
trainer = pl.Trainer(max_epochs=30)

# trainer.fit(autoencoder, train_dataloaders=train_loader)
trainer.test(autoencoder, test_loader)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing: 0it [00:00, ?it/s]

 36%|███▌      | 359390/1000000 [01:47<03:12, 3327.71it/s]  


RuntimeError: shape '[500, 2]' is invalid for input of size 64000

In [None]:
autoencoder.lr = 0.0025
# for i in range(5):
# print("#" * 40 + " " * 10 + f"Iteration № {i:5}" + " " * 10 + "#" * 40)
trainer = pl.Trainer(max_epochs=50)
trainer.fit(autoencoder, train_dataloaders=train_loader)
trainer.test(autoencoder, test_loader)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name    | Type                | Params
------------------------------------------------
0 | encoder | SumNaturalBinModule | 103 K 
------------------------------------------------
103 K     Trainable params
0         Non-trainable params
103 K     Total params
0.412     Total estimated model params size (MB)


Training: 0it [00:00, ?it/s]

In [None]:
torch.save(autoencoder, f"./models/autoencoder-[128-64]-V05.pt")

In [None]:
# x_rand = torch.tensor(np.random.randint(low=0, high=2, size=(1, INT_BITS)).astype(np.float64))
# y_hat = autoencoder.predict(x_rand).round()
# print(x_rand == y_hat)

In [None]:
# import plotly.express as px

In [None]:
# print(x[0].astype(int))
# print(y[0])
# print(y[0].round().astype(int))

In [None]:
# print(x_to_plot)
# print(y_to_plot)

In [None]:
# fig = px.line(x=x_to_plot, y=y_to_plot)
# fig.show()
# 
# fig = px.line(x=x_to_plot, y=differ.sum(axis=1))
# fig.show()