In [None]:
import matplotlib.pyplot as plt
import numpy as np
import requests
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import tqdm
from collections import defaultdict as dd
from copy import deepcopy as dc
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms

DEVICE = torch.device("cuda")

transform = transforms.Compose([
    lambda img: transforms.functional.rotate(img, -90),
    lambda img: transforms.functional.hflip(img),
    transforms.Resize((32, 32)),
    transforms.ToTensor()
])
data = datasets.EMNIST(root="./emnist_data/", split="byclass", train=True, transform=transform, download=True)
dataloader = DataLoader(dataset=data, batch_size=1, shuffle=True)

images = []
outputs = []

i = 0
all_probs = []
for k, d in tqdm.tqdm(enumerate(dataloader, 0)):
    if i >= 30000: break
    x, y = d
    c = int(y.numpy())
    i += 1
    x = x.float().moveaxis(1, 3)
    b = 0
    while True:
        try:
            response = requests.post("http://inversion.advml.com/score", json={"data": x.tolist()})
            vals = response.json()["outputs"][0]
            images.append(x)
            outputs.append(vals)
            break
        except Exception as e:
            b += 1
            time.sleep(b * 2)
    all_probs.append((c, dc(outputs[-1])))

import pickle
with open("inversion.pkl", "wb+") as f:
    pickle.dump(all_probs, f)

class CustomDataset2(Dataset):
    def __init__(self, images, outputs):
        self.images = images
        self.outputs = outputs

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        return {
            "data": self.images[idx],
            "output": self.outputs[idx]
        }

custom_dataset = CustomDataset2(images, outputs)
custom_dataloader = DataLoader(dataset=custom_dataset, batch_size=32, shuffle=True)

class Inverse(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(8, 256)
        self.fc2 = nn.Linear(256, 1024)
        self.conv1 = nn.Conv2d(1, 16, 3, padding="same")
        self.conv2 = nn.Conv2d(16, 16, 3, padding="same")
        self.conv3 = nn.Conv2d(16, 1, 3, padding="same")

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = x.reshape(x.shape[0], 1, 32, 32)
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        return x

torch.cuda.empty_cache()

modelx = Inverse().to(DEVICE)
crit = nn.MSELoss()
optimizer = optim.Adam(modelx.parameters(), lr=0.001)

for epoch in range(1000):
    rloss = 0.0
    for k, d in enumerate(custom_dataloader, 0):
        inputs, labels = d["data"].squeeze(1).moveaxis(3, 1), d["output"]
        inputs = torch.Tensor(inputs).to(DEVICE)
        labels = labels.to(DEVICE)
        optimizer.zero_grad()
        preds = modelx(labels)
        loss = crit(inputs, preds)
        loss.backward()
        optimizer.step()
        rloss += loss.item()
    print(f"Epoch: {epoch + 1}, loss: {rloss / len(custom_dataloader):0.5f}")

torch.save(modelx.state_dict(), "model.pt")

labels = []
for i in range(8):
    ls = [0.0] * 8
    ls[i] = 0.75
    labels.append(torch.tensor(ls))
ds = CustomDataset2(images[:8], labels)
dl = DataLoader(dataset=ds, batch_size=8, shuffle=False)
for k, d in enumerate(dl, 0):
    ls = d["output"].to(DEVICE)
    preds = modelx(ls)
ps = preds.cpu().detach().numpy()
plt.figure(figsize=(40,10))
for i in range(8):
    plt.subplot(1, 8, i + 1)
    plt.imshow(ps[i][0])
plt.show()