In [None]:
import torch
from torchvision import transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import glob
import os


import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import cv2 as cv

In [None]:
data_dir = '../input/cryptopunks/txn_history-2021-10-07.jsonl'
image_dir = "../input/cryptopunks/imgs/imgs"
image_root = "../input/cryptopunks/imgs"

In [None]:
image_paths = glob.glob(image_dir + '/*.png')

#images in numpy format
images = [cv.imread(im_path) for im_path in image_paths[:100]]


In [None]:
from torchvision import datasets
#Turns any image into grayscale equivalent
#Images are 24 x 24
def to_gray(rgb):
    return np.dot(rgb[...,:3], [0.2989, 0.5870, 0.1140])
img_transform = transforms.Compose([
    transforms.ToTensor()
])

im2 = [img_transform(cv.imread(im_path)) for im_path in image_paths[:100]]

dataset = datasets.ImageFolder(root=image_root,
                                   transform=img_transform)
batch_size = 10
#create the dataloader
train_dataloader = torch.utils.data.DataLoader(dataset,
                                          batch_size=batch_size,
                                          shuffle=True)

dataiter = iter(train_dataloader) #dataloader is an iterator

img, _ = next(dataiter)

def tensor_imshow(img):
    img = img.to('cpu')
    npimg = img.detach().numpy()
    
    plt.figure(figsize=(3, 3))
    plt.axis('off')
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()
    
tensor_imshow(img[0])


In [None]:
# 2-d latent space, parameter count in same order of magnitude
# as in the original VAE paper (VAE paper has about 3x as many)
# latent_dims = 2
# num_epochs = 100
# batch_size = 128
# capacity = 64
# learning_rate = 1e-3
# variational_beta = 1
# use_gpu = True

# n-d latent space, for comparison with non-variational auto-encoder
latent_dims = 10
num_epochs = 100
batch_size = 50
capacity = 64
learning_rate = 1e-3
variational_beta = 1
use_gpu = True

In [None]:
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        c = capacity
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=c, kernel_size=4, stride=2, padding=1) # out: c x 12 x 12
        self.conv2 = nn.Conv2d(in_channels=c, out_channels=c*2, kernel_size=4, stride=2, padding=1) # out: 2*c x 6 x 6
        self.fc_mu = nn.Linear(in_features=c*2*6*6, out_features=latent_dims)
        self.fc_logvar = nn.Linear(in_features=c*2*6*6, out_features=latent_dims)
            
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = x.view(x.size(0), -1) # flatten batch of multi-channel feature maps to a batch of feature vectors
        x_mu = self.fc_mu(x)
        x_logvar = self.fc_logvar(x)
        return x_mu, x_logvar

class Decoder(nn.Module):
    def __init__(self):
        super(Decoder, self).__init__()
        c = capacity
        self.fc = nn.Linear(in_features=latent_dims, out_features=c*2*6*6)
        self.conv2 = nn.ConvTranspose2d(in_channels=c*2, out_channels=c, kernel_size=4, stride=2, padding=1)
        self.conv1 = nn.ConvTranspose2d(in_channels=c, out_channels=3, kernel_size=4, stride=2, padding=1)
            
    def forward(self, x):
        x = self.fc(x)
        x = x.view(x.size(0), capacity*2, 6, 6) # unflatten batch of feature vectors to a batch of multi-channel feature maps
        x = F.relu(self.conv2(x))
        x = torch.sigmoid(self.conv1(x)) # last layer before output is sigmoid, since we are using BCE as reconstruction loss
        return x
    
class VariationalAutoencoder(nn.Module):
    def __init__(self):
        super(VariationalAutoencoder, self).__init__()
        self.encoder = Encoder()
        self.decoder = Decoder()
    
    def forward(self, x):
        latent_mu, latent_logvar = self.encoder(x)
        latent = self.latent_sample(latent_mu, latent_logvar)
        x_recon = self.decoder(latent)
        return x_recon, latent_mu, latent_logvar
    
    def latent_sample(self, mu, logvar):
        if self.training:
            # the reparameterization trick
            std = logvar.mul(0.5).exp_()
            eps = torch.empty_like(std).normal_()
            return eps.mul(std).add_(mu)
        else:
            return mu

def vae_loss(recon_x, x, mu, logvar):
    # recon_x is the probability of a multivariate Bernoulli distribution p.
    # -log(p(x)) is then the pixel-wise binary cross-entropy.
    # Averaging or not averaging the binary cross-entropy over all pixels here
    # is a subtle detail with big effect on training, since it changes the weight
    # we need to pick for the other loss term by several orders of magnitude.
    # Not averaging is the direct implementation of the negative log likelihood,
    # but averaging makes the weight of the other loss term independent of the image resolution.
    recon_loss = F.binary_cross_entropy(recon_x.view(-1,np.prod(recon_x.shape[1:]) ), x.view(-1, np.prod(x.shape[1:])), reduction='sum')
    
    # KL-divergence between the prior distribution over latent vectors
    # (the one we are going to sample from when generating new images)
    # and the distribution estimated by the generator for the given image.
    kldivergence = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    
    return recon_loss + variational_beta * kldivergence
    
    
vae = VariationalAutoencoder()

device = torch.device("cuda:0" if use_gpu and torch.cuda.is_available() else "cpu")
vae = vae.to(device)

num_params = sum(p.numel() for p in vae.parameters() if p.requires_grad)
print('Number of parameters: %d' % num_params)

In [None]:
optimizer = torch.optim.Adam(params=vae.parameters(), lr=learning_rate, weight_decay=1e-5)

# set to training mode
vae.train()

train_loss_avg = []

print('Training ...')
for epoch in range(num_epochs):
    train_loss_avg.append(0)
    num_batches = 0
    
    for image_batch, _ in train_dataloader:
        
        image_batch = image_batch.to(device)

        # vae reconstruction
        image_batch_recon, latent_mu, latent_logvar = vae(image_batch)
        
        # reconstruction error
        loss = vae_loss(image_batch_recon, image_batch, latent_mu, latent_logvar)
        
        # backpropagation
        optimizer.zero_grad()
        loss.backward()
        
        # one step of the optmizer (using the gradients from backpropagation)
        optimizer.step()
        
        train_loss_avg[-1] += loss.item()
        num_batches += 1
        
    train_loss_avg[-1] /= num_batches
    print('Epoch [%d / %d] average reconstruction error: %f' % (epoch+1, num_epochs, train_loss_avg[-1]))

In [None]:
plt.ion()

fig = plt.figure()
plt.plot(train_loss_avg)
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.show()

In [None]:
import torchvision.utils
test_dataloader = train_dataloader
vae.eval()

# This function takes as an input the images to reconstruct
# and the name of the model with which the reconstructions
# are performed
def to_img(x):
    x = x.clamp(0, 1)
    return x

def show_image(img):
    img = to_img(img)
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))

def visualise_output(images, model):

    with torch.no_grad():
    
        images = images.to(device)
        images, _, _ = model(images)
        images = images.cpu()
        images = to_img(images)
        np_imagegrid = torchvision.utils.make_grid(images, 10, 5).numpy()
        plt.imshow(np.transpose(np_imagegrid, (1, 2, 0)))
        plt.show()

images, labels = iter(test_dataloader).next()

# First visualise the original images
print('Original images')
show_image(torchvision.utils.make_grid(images,10,5))
plt.show()

# Reconstruct and visualise the images using the vae
print('VAE reconstruction:')
visualise_output(images, vae)

In [None]:
images = images[:10].to(device)
latent_mus, latent_logvars = vae.encoder(images[:10])
rand_latent_mu = latent_mus.sum(0); rand_latent_logvar = latent_logvars.sum(0)


In [None]:
with torch.no_grad():
    x = rand_latent_mu.view(1, len(rand_latent_mu))
    rand_recon = vae.decoder(x)
    
    rand_recon = rand_recon.cpu()
    rand_recon = to_img(rand_recon).reshape(3,24,24)
    plt.imshow(np.transpose(rand_recon, (1, 2, 0)))


In [None]:
with torch.no_grad():
    b = torch.Tensor(np.random.uniform(-1,1, 10)).reshape(1,10).to(device)
    rand_recon = vae.decoder(b)
    rand_recon = rand_recon.cpu()
    rand_recon = to_img(rand_recon).reshape(3,24,24)
    plt.imshow(np.transpose(rand_recon, (1, 2, 0)))

In [None]:
visualise_output(images[0:2], vae)

In [None]:
class PunkHunter:
    
    def __init__(self):
        self.log_diffs = torch.zeros(1)
        self.sigmas = torch.zeros(1)
    def log_posterior(self, mu):
        lp = dist.Normal(torch.zeros_like(mu), 
                         torch.ones_like(mu)
                        ).log_prob(mu).sum()
        return lp
    def MCMC_step(self, mu_start, threshold = 0.4):
        
        was_accepted = False
        
        proposal = dist.Normal(mu_start, sigma*np.ones_like(mu_start)).sample()

        numerator = self.log_posterior(mu_start)
        denominator = self.log_posterior(proposal)

        if numerator - denominator > torch.log(threshold):
            was_accepted = True
            return proposal, was_accepted
            
        return mu_start, was_accepted

    def calc_sigma(self):
        
        sigma_old = self.sigmas[-1]
        sigma_new = torch.exp(torch.log(sigma_old) - self.log_diffs[-1])
        self.sigmas = torch.cat((self.sigmas, sigma_new))
        return sigma_new
    
    def run(self, mu_start, log_diff):
        self.log_diffs = torch.cat((self.log_diffs, log_diff))
        
        sigma = self.calc_sigma()
        new_mu, was_accepted = self.MCMC_step(mu_start)
        while not was_accepted:
            sigma/=2
            new_mu, was_accepted = self.MCMC_step(mu_start)
        
        return new_mu
        

In [None]:
torch.save(vae.state_dict(), 'vae_10_dim.pth')

In [None]:
def MCMC_step(sigma, mu = 0):
    move = dist.Normal()