In [None]:
import os
import random
from typing import Tuple

import numpy as np
import torch
from torch import Tensor
from torch import nn
from torch import optim
from torch.backends import cudnn
from torch.nn import functional as F
from torch.utils.data.dataloader import DataLoader
from torch.utils.data.dataset import TensorDataset, random_split
from tqdm import tqdm
torch.set_printoptions(precision=4)


def set_seed(seed):
    os.environ["PYTHONHASHSEED"] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    cudnn.deterministic = True
    cudnn.benchmark = False

set_seed(1234)

In [None]:
# def create_dataset(
#     batch_size: int = 1,
#     num_samples: int = 30,
#     spread: float = 0.5,
#     split: float = 0.5,
#     seed: int = 1234,
# ) -> Tuple[DataLoader, DataLoader]:
#     """
#     Creates the dataset
#     :param batch_size: the batch size
#     :param num_samples: number of items per class/label/letter
#     :param spread: the std for normal sampling
#     :param split: train-val split (<1). Number given is allotted as the train %
#     :param seed: seed for the "random" dataset generator
#     :return: the dataloaders
#     """
#     np.random.seed(seed)

#     ideal_sensory_values = np.random.randint(-24, 23, (24, 16))
#     dataset = list()
#     classes = 24
#     for letter in range(classes):
#         for _ in range(num_samples):
#             sensors = []
#             for sensor in ideal_sensory_values[letter]:
#                 sensors.append(np.random.normal(loc=sensor, scale=spread))
            
#             sensors= np.array(sensors)
#             # if np.random.choice([True,False],p=[0.7,0.3]):
#             #     indices = np.random.choice(np.arange(sensors.size), replace=False,size=int(sensors.size * 0.3))
#             #     sensors[indices] = np.random.choice([0,1])
                
#             dataset.append([sensors, np.array([letter])])

    
#     x = list()
#     y = list()
#     num_samples = 2
#     for i in range(num_samples * 24): #24 if we train fully instead of classes
#         x.append(dataset[i][0])
#         y.append(dataset[i][1])

#     x = np.array(x)
#     y = np.array(y)
#     x = x[0:2]
#     y = y[0:2]
#     tensor_x = torch.Tensor(x)
#     tensor_y = torch.Tensor(y)

#     # train_split = int(split * len(x))
#     # val_split = len(x) - train_split
#     train_split = int(split * len(x))
#     val_split = len(x) - train_split
#     tensor_dataset = TensorDataset(tensor_x, tensor_y)
#     train_dataset, val_dataset = random_split(tensor_dataset, [train_split, val_split])
#     train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
#     val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)

#     return train_loader, val_loader

In [None]:
class LinearAE(nn.Module):
    def __init__(self, in_size=16, latent_size=64):
        super(LinearAE, self).__init__()

        self.encoder = nn.Sequential(
            nn.Linear(in_size, 64),
            nn.ReLU(),
            nn.Linear(64,latent_size),
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_size, 64),
            nn.ReLU(),
            nn.Linear(64, in_size),
            nn.Sigmoid(),
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)

        return x

In [None]:

def get_min_and_max(dataloader, device) -> Tuple[float, float]:
    """
    Finds the min and max of the dataset.
    This is used for Normalization of the dataset.

    :param dataloader: dataloader to calculate it for
    :param device: device to run computations on
    :return: tuple of mean and std
    """
    min_val, max_val = torch.Tensor([999]).to(device), torch.Tensor([-999]).to(device)
    for data, _ in tqdm(dataloader):
        data = data.to(device)
        min_val = torch.min(min_val, torch.min(data))
        max_val = torch.max(max_val, torch.max(data))

    return min_val.item(), max_val.item()

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# x = torch.rand(1, 1, 16).cuda()
model = LinearAE(16, 256).cuda()
train_loader, val_loader = create_dataset()
min_val, max_val = get_min_and_max(train_loader, device)

100%|██████████| 1/1 [00:00<00:00, 20.38it/s]


In [None]:
len(train_loader), len(val_loader)

(1, 1)

In [None]:
optimizer = optim.Adam(model.parameters())
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, verbose=True)

In [None]:
for epoch in range(500):
  train_loss = 0
  for batch_idx, data in enumerate(train_loader):
      vector, _ = data
      vector = vector.to(device)
      vector = (vector - min_val) / (max_val - min_val)

      optimizer.zero_grad()
      y = model(vector)
      # recon_batch, mu, logvar = model(x)
      loss = F.mse_loss(y, vector)
      # loss = loss_function(recon_batch, x, mu, logvar)
      loss.backward()
      optimizer.step()
      train_loss += loss.item()
  scheduler.step(train_loss/len(train_loader))
  print(epoch, train_loss / len(train_loader))

In [None]:
model_dir = '/content/drive/MyDrive/Latent Transfer/DomainA/models'
model_filename = 'domainA-encoder.pt'
model_filepath = os.path.join(model_dir, model_filename)
torch.save(model.state_dict(), model_filepath)

In [None]:
test = train_loader.dataset[0][0].cuda()
inp = (test - min_val) / (max_val - min_val)

In [None]:
test, inp

(tensor([ -5.2295,  13.7286, -11.5653,   0.0316,  -8.5336,  -1.1968,  16.7258,
           2.5654,   5.7764,  19.5220,   5.3834,  20.3019,   2.3145,   3.6445,
         -19.2770,  -8.6715], device='cuda:0'),
 tensor([0.3549, 0.8339, 0.1948, 0.4879, 0.2714, 0.4568, 0.9096, 0.5519, 0.6330,
         0.9803, 0.6231, 1.0000, 0.5455, 0.5791, 0.0000, 0.2680],
        device='cuda:0'))

In [None]:
enc = model.encoder(inp)
recon = model.decoder(enc)

In [None]:
inp, recon

(tensor([0.3549, 0.8339, 0.1948, 0.4879, 0.2714, 0.4568, 0.9096, 0.5519, 0.6330,
         0.9803, 0.6231, 1.0000, 0.5455, 0.5791, 0.0000, 0.2680],
        device='cuda:0'),
 tensor([0.3549, 0.8340, 0.1948, 0.4878, 0.2714, 0.4568, 0.9097, 0.5519, 0.6330,
         0.9822, 0.6231, 0.9950, 0.5455, 0.5791, 0.0046, 0.2679],
        device='cuda:0', grad_fn=<SigmoidBackward0>))

In [None]:
reconstructed = (recon  *  (max_val - min_val)) + min_val
test, reconstructed

(tensor([ -5.2295,  13.7286, -11.5653,   0.0316,  -8.5336,  -1.1968,  16.7258,
           2.5654,   5.7764,  19.5220,   5.3834,  20.3019,   2.3145,   3.6445,
         -19.2770,  -8.6715], device='cuda:0'),
 tensor([ -5.2292,  13.7308, -11.5657,   0.0315,  -8.5343,  -1.1972,  16.7293,
           2.5650,   5.7765,  19.5959,   5.3836,  20.1044,   2.3149,   3.6438,
         -19.0941,  -8.6725], device='cuda:0', grad_fn=<AddBackward0>))

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        
        self.fc1 = nn.Linear(16 , 120) 
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 24)

    def forward(self, x):
        
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

def DNN():
  net = Net()
  return net

In [None]:
model = DNN()
model.load_state_dict(torch.load('/content/drive/MyDrive/Latent Transfer/Validation model/models/dnn.pt'))
model.eval()

Net(
  (fc1): Linear(in_features=16, out_features=120, bias=True)
  (fc2): Linear(in_features=120, out_features=84, bias=True)
  (fc3): Linear(in_features=84, out_features=24, bias=True)
)

In [None]:
x_check = [[-5.2292,  13.7308, -11.5657,   0.0315,  -8.5343,  -1.1972,  16.7293,
             2.5650,   5.7765,  19.5959,   5.3836,  20.1044,   2.3149,   3.6438,
            -19.0941,  -8.6725], 
           [-5.2295,  13.7286, -11.5653,   0.0316,  -8.5336,  -1.1968,  16.7258,
             2.5654,   5.7764,  19.5220,   5.3834,  20.3019,   2.3145,   3.6445,
            -19.2770,  -8.6715]]

x_check = torch.FloatTensor(x_check)

In [None]:
out = model(x_check)
_, preds = torch.max(out, 1)

In [None]:
preds

tensor([0, 0])