In [None]:
#%matplotlib notebook
import numpy as np
import matplotlib.pyplot as plt

import torch

from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split

X, y = fetch_openml('mnist_784', version=1, return_X_y=True, cache=True)
X/=255.
y = y.astype(int)
X,X_test,y,y_test = train_test_split(X,y,test_size=10000)

# Extract number of data points, and the height and width of the images for later reshaping
m = X.shape[0]
n = X.shape[1]

h = 28
w = 28

N = 10

X = torch.from_numpy(X)
X_test = torch.from_numpy(X_test)
y = torch.from_numpy(y)
y_test = torch.from_numpy(y_test)

X = X.to(torch.float32)
X_test = X_test.to(torch.float32)
y = y.to(torch.long)
y_test = y_test.to(torch.long)

device = torch.device('cuda:0' if torch.cuda.is_available() else "cpu")

X = X.to(device)
X_test = X_test.to(device)
y = y.to(device)
y_test = y_test.to(device)

In [None]:
from torch.utils.data import TensorDataset

training_data = TensorDataset(X,y)
test_data = TensorDataset(X_test,y_test)

batch_size = 256
train_loader = torch.utils.data.DataLoader(dataset=training_data,
                                           batch_size=batch_size, 
                                           shuffle=True)

batch_size = 256
test_loader = torch.utils.data.DataLoader(dataset=test_data,
                                           batch_size=batch_size, 
                                           shuffle=False)

In [None]:
latent_dim = 2
h_dim_1 = 512
h_dim_2 = 256

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Encoder(nn.Module):
    def __init__(self,n,latent_dim,h_dim_1,h_dim_2):
        """
        This method is where you'll want to instantiate parameters.
        we do this by creating two linear transformation functions, l1 and l2, which 
        have encoded in it both the weight matrices W_1 and W_2, and the bias vectors
        """
        super(Encoder,self).__init__()
        self.l1 = nn.Linear(n,h_dim_1) # Transform from input to hidden layer
        self.l2 = nn.Linear(h_dim_1,h_dim_2)
        self.l3_mu = nn.Linear(h_dim_2,latent_dim)
        self.l3_rho = nn.Linear(h_dim_2,latent_dim)
    
    def forward(self,x,sample=True):
        """
        This method runs the feedforward neural network.  It takes a tensor of size m x 784,
        applies a linear transformation, applies a sigmoidal activation, applies the second linear transform 
        and outputs the logits.
        """
        a1 = self.l1(x)
        z1 = torch.relu(a1)   
        
        a2 = self.l2(z1)
        z2 = torch.relu(a2)
        mu = self.l3_mu(z2)
        rho = self.l3_rho(z2)
        return mu, rho
    
# rho = log sigma^2
# sigma = exp( rho/2 )
    
class Decoder(nn.Module):
    def __init__(self,n,latent_dim,h_dim_1,h_dim_2):
        """
        This method is where you'll want to instantiate parameters.
        we do this by creating two linear transformation functions, l1 and l2, which 
        have encoded in it both the weight matrices W_1 and W_2, and the bias vectors
        """
        super(Decoder,self).__init__()
        self.l1 = nn.Linear(latent_dim,h_dim_1) # Transform from input to hidden layer
        self.l2 = nn.Linear(h_dim_1,h_dim_2)
        self.l3 = nn.Linear(h_dim_2, n)
        
    def forward(self,z):
        """
        This method runs the feedforward neural network.  It takes a tensor of size m x 784,
        applies a linear transformation, applies a sigmoidal activation, applies the second linear transform 
        and outputs the logits.
        """

        a1 = self.l1(z)
        z1 = torch.relu(a1)   
        
        a2 = self.l2(z1)
        z2 = torch.relu(a2) 
        
        a3 = self.l3(z2) 
        z3 = torch.sigmoid(a3)
        return z3

In [None]:
encoder = Encoder(n,latent_dim,h_dim_1,h_dim_2)
encoder.to(device)

decoder = Decoder(n,latent_dim,h_dim_1,h_dim_2)
decoder.to(device)

criterion = torch.nn.BCELoss()

optimizer = torch.optim.Adam([e for e in encoder.parameters()]+[p for p in decoder.parameters()],lr=1e-3)

epochs = 50
# Loop over the data
for epoch in range(epochs):
    encoder.train()
    decoder.train()
    # Loop over each subset of data
    dl = 0
    kl = 0
    n_batches = 0
    for d,_ in train_loader:

        # Zero out the optimizer's gradient buffer
        optimizer.zero_grad()
        
        # Make a prediction based on the model
        mu, rho = encoder(d)
        eps = torch.randn_like(rho)
        latent = mu + torch.exp(rho/2.)*eps
        reconstruction = decoder(latent)
        
        # Compute the loss
        #data_loss = 0.5*torch.sum((reconstruction - d)**2/sigma_data**2,axis=-1)
        data_loss = -0.5*torch.sum(d*torch.log(torch.clamp(reconstruction,min=1e-4)) + (1-d)*torch.log(torch.clamp(1-reconstruction,min=1e-4)),axis=-1)
        kl_loss = -0.5*torch.sum(1 + rho - mu**2 - torch.exp(rho),axis=-1)
        
        loss = torch.mean(data_loss + kl_loss)
        # Use backpropagation to compute the derivative of the loss with respect to the parameters
        loss.backward()
        #break
        #
        # Use the derivative information to update the parameters
        optimizer.step()
        
        dl += torch.mean(data_loss).item()
        kl += torch.mean(kl_loss).item()
        n_batches += 1
        
        
    print(dl/n_batches,kl/n_batches)

In [None]:

mu,rho = encoder(X_test)

z_intermediate = (mu + torch.randn_like(rho)*torch.exp(rho/2.)).detach().cpu().numpy()
plt.scatter(z_intermediate[:,0],z_intermediate[:,1],c=y_test.detach().cpu().numpy())
plt.colorbar()
plt.xlabel('z_0')
plt.ylabel('z_1')
fig = plt.gcf()
fig.set_size_inches(12,12)
plt.show()

In [None]:
# Number of samples per dimension
n = 21

# Sample between z=-2,2
z_0 = np.linspace(-1,1,n)
z_1 = np.linspace(-1,1,n)

img_size = 28

figure = np.zeros((img_size * n, img_size * n, 1))
z_rest = [np.random.randn() for i in range(14)]

for i,z0 in enumerate(z_0):
    for j,z1 in enumerate(z_1):
        z_sample = np.array([[z0,z1]])
        x_decoded = decoder(torch.from_numpy(z_sample).to(device).to(torch.float))
        img = x_decoded[0].reshape(img_size, img_size,1).cpu().detach().numpy()
        figure[i * img_size: (i + 1) * img_size,j * img_size: (j + 1) * img_size,:] = img

plt.figure(figsize=(10, 10))
plt.imshow(figure.squeeze(),cmap=plt.cm.gray,origin='upper',extent=(-1,1,-1,1))

#plt.scatter(z_intermediate[:,0],-z_intermediate[:,1],c=y_test.detach().cpu().numpy(),alpha=0.2)
plt.xlim(-1,1)
plt.ylim(-1,1)
#plt.colorbar()
plt.show()


In [None]:
from mpl_toolkits.mplot3d import Axes3D
mu,rho = encoder(X_test)

z_intermediate = (mu + torch.randn_like(rho)*torch.exp(rho/2.)).detach().cpu().numpy()
fig = plt.figure()
fig.set_size_inches(20,20)
ax = fig.add_subplot(111, projection='3d')
p = ax.scatter(z_intermediate[:,0],z_intermediate[:,1],z_intermediate[:,2],c=y_test.detach().cpu().numpy(),alpha=0.8)
fig.colorbar(p)
plt.show()

In [None]:
import matplotlib.pyplot as plt
import torch
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn

transformations = transforms.Compose([
    transforms.Resize((64,64)),
    transforms.ToTensor()
])
celeba_data = torchvision.datasets.ImageFolder('./img_align_celeba',transform=transformations)
device = torch.device('cuda:0' if torch.cuda.is_available() else "cpu")

In [None]:
batch_size = 128
train_loader = torch.utils.data.DataLoader(dataset=celeba_data,
                                           batch_size=batch_size, 
                                           shuffle=True)

In [None]:
latent_dim = 64

class Encoder(nn.Module):
    def __init__(self):
        """
        This method is where you'll want to instantiate parameters.
        we do this by creating two linear transformation functions, l1 and l2, which 
        have encoded in it both the weight matrices W_1 and W_2, and the bias vectors
        """
        super(Encoder,self).__init__()
        self.conv_1 = nn.Conv2d(3,32,kernel_size=4,stride=2)
        self.bn_1 = nn.BatchNorm2d(32)
        
        self.conv_2 = nn.Conv2d(32,64,kernel_size=4,stride=2)
        self.bn_2 = nn.BatchNorm2d(64)
        
        self.conv_3 = nn.Conv2d(64,128,kernel_size=4,stride=2)
        self.bn_3 = nn.BatchNorm2d(128)
        
        self.conv_4 = nn.Conv2d(128,256,kernel_size=4,stride=2)
        self.bn_4 = nn.BatchNorm2d(256)
        
        self.fc_mu = nn.Linear(256*4*4,latent_dim)
        self.fc_rho = nn.Linear(256*4*4,latent_dim)
        
        self.act = torch.nn.LeakyReLU()
  
    def forward(self,x):
        """
        This method runs the feedforward neural network.  It takes a tensor of size m x 784,
        applies a linear transformation, applies a sigmoidal activation, applies the second linear transform 
        and outputs the logits.
        """
        fmap_1 = self.act(self.bn_1(self.conv_1(F.pad(x,(1,2,1,2)))))
        fmap_2 = self.act(self.bn_2(self.conv_2(F.pad(fmap_1,(1,2,1,2)))))
        fmap_3 = self.act(self.bn_3(self.conv_3(F.pad(fmap_2,(1,2,1,2)))))
        fmap_4 = self.act(self.bn_4(self.conv_4(F.pad(fmap_3,(1,2,1,2)))))
        
        fmap_flat = fmap_4.view(-1,256*4*4)
        mu = self.fc_mu(fmap_flat)
        rho = self.fc_rho(fmap_flat)
        
        return mu,rho
        
class Decoder(nn.Module):
    def __init__(self):

        super(Decoder,self).__init__()
        
        self.fc = nn.Linear(latent_dim,256*4*4)
        
        self.upsample_1 = nn.Upsample((8,8))
        self.conv_1 = nn.Conv2d(256,128,kernel_size=3,padding=1)
        self.bn_1 = nn.BatchNorm2d(128)
        
        self.upsample_2 = nn.Upsample((16,16))
        self.conv_2 = nn.Conv2d(128,64,kernel_size=3,padding=1)
        self.bn_2 = nn.BatchNorm2d(64)
        
        self.upsample_3 = nn.Upsample((32,32))
        self.conv_3 = nn.Conv2d(64,32,kernel_size=3,padding=1)
        self.bn_3 = nn.BatchNorm2d(32)
        
        self.upsample_4 = nn.Upsample((64,64))
        self.conv_4 = nn.Conv2d(32,3,kernel_size=3,padding=1)
 
        self.act = torch.nn.LeakyReLU()
  
    def forward(self,z):
        """
        This method runs the feedforward neural network.  It takes a tensor of size m x 784,
        applies a linear transformation, applies a sigmoidal activation, applies the second linear transform 
        and outputs the logits.
        """
        
        fc = self.fc(z)
        fc = fc.view(-1,256,4,4)
        
        #return self.conv_1(fc)
        
        fmap_1 = self.act(self.bn_1(self.conv_1(self.upsample_1(fc))))
        fmap_2 = self.act(self.bn_2(self.conv_2(self.upsample_2(fmap_1))))
        fmap_3 = self.act(self.bn_3(self.conv_3(self.upsample_3(fmap_2))))
        fmap_4 = torch.sigmoid(self.conv_4(self.upsample_4(fmap_3)))
        
        return fmap_4

In [None]:
encoder = Encoder()
decoder = Decoder()

encoder.to(device)
decoder.to(device)

criterion = torch.nn.BCELoss()

optimizer = torch.optim.Adam([e for e in encoder.parameters()]+[p for p in decoder.parameters()],lr=1e-3)

sigma_data = 1.0
epochs = 200
# Loop over the data
for epoch in range(epochs):
    encoder.train()
    decoder.train()
    # Loop over each subset of data
    dl = 0
    kl = 0
    n_batches = 0
    for d,_ in train_loader:
        d = d.to(device)

        # Zero out the optimizer's gradient buffer
        optimizer.zero_grad()
        
        # Make a prediction based on the model
        mu, rho = encoder(d)
        eps = torch.randn_like(rho).to(device)
        latent = mu + 0.5*torch.exp(rho)*eps
        reconstruction = decoder(latent)
        
        # Compute the loss
        #data_loss = 0.5*torch.sum((reconstruction - d)**2/sigma_data**2,axis=-1)
        data_loss = -0.5*torch.sum(d*torch.log(torch.clamp(reconstruction,min=1e-5)) + (1-d)*torch.log(torch.clamp(1-reconstruction,min=1e-5)),dim=(1,2,3))
        kl_loss = -0.5*torch.sum(1 + rho - mu**2 - torch.exp(rho),axis=-1 )
        
        loss = torch.mean(data_loss + kl_loss)
        # Use backpropagation to compute the derivative of the loss with respect to the parameters
        loss.backward()
        #break
        #
        # Use the derivative information to update the parameters
        optimizer.step()
        
        dl += torch.mean(data_loss).item()
        kl += torch.mean(kl_loss).item()
        n_batches += 1
        
        if n_batches%20==0:
           print('minibatch',n_batches,torch.mean(data_loss).item(),torch.mean(kl_loss).item())
        
        
    print('epoch:',epoch,dl/n_batches,kl/n_batches)
    torch.save(encoder.state_dict(), 'celeba_encoder_64.h5')
    torch.save(decoder.state_dict(), 'celeba_decoder_64.h5')

In [None]:
encoder.load_state_dict(torch.load('celeba_encoder_64.h5'))
decoder.load_state_dict(torch.load('celeba_decoder_64.h5'))

In [None]:
encoder.load_state_dict?

In [None]:
torch.cuda.is_available()

In [None]:
import numpy as np
idx = np.random.randint(d.cpu().numpy().shape[0])
plt.imshow(np.moveaxis(d.cpu().numpy()[idx],0,2))

In [None]:
plt.imshow(np.moveaxis(reconstruction.detach().cpu().numpy()[idx],0,2))

In [None]:
import matplotlib.pyplot as plt
n = 5
input_shape=[64,64]
z_0 = np.linspace(-2,2,n)
z_1 = np.linspace(-2,2,n)

p = np.random.randint(64)
q = np.random.randint(64)
figure = np.zeros((input_shape[0] * n, input_shape[1] * n, 3))
z_t = np.array([[np.random.randn()*0.0 for i in range(latent_dim)]])
#z_t = np.array([[np.random.randn()*0.0 for i in range(32)]])

print(p,q)
for i,z0 in enumerate(z_0):
    for j,z1 in enumerate(z_1):
        z_sample = z_t.copy()
        z_sample[0,p] = z0
        z_sample[0,q] = z1
        img = np.moveaxis(decoder(torch.tensor(z_sample).to(device).to(torch.float))[0].detach().cpu().numpy(),0,2)

        figure[i * input_shape[0]: (i + 1) * input_shape[0],j * input_shape[1]: (j + 1) * input_shape[1],:] = img

        
#figure*=255
#figure = figure.astype(int)

plt.figure(figsize=(12, 12))
plt.imshow(figure.squeeze())
plt.show()

In [None]:
x_decoded.shape