In [1]:
import pyro
from pyro.infer import SVI, Trace_ELBO
from pyro.optim import Adam
import pyro.distributions as dist
import pyro.poutine as poutine

from RAVDESS_dataset_util import *
from EmoClassCNN import *

torch.set_default_dtype(torch.float64)

pyro.enable_validation(False)

In [2]:
folder_path = '/home/studenti/ballerini/datasets/RAVDESS_frames'

In [3]:
NUM_CLASSES = len(emocat)
IMG_SIZE = 100
BATCH_SIZE = 8
DEFAULT_Z_DIM = 50

face_dataset = FaceEmotionDataset(root_dir=folder_path,
                                    transform=transforms.Compose([
                                        Rescale(IMG_SIZE), 
                                        CenterCrop(IMG_SIZE), 
                                        ToTensor()
                                    ]))        

dataset_loader = DataLoader(face_dataset, batch_size=BATCH_SIZE,
                        shuffle=True, num_workers=20)

print(len(face_dataset))

7200


In [4]:
def emotion_rating_conversion(cat):
    ratings = torch.zeros(NUM_CLASSES)
    ratings[cat] = 1
    return ratings
    
#torch.argmax(emotion_rating_conversion(3))

In [5]:
# helper functions
class Swish(nn.Module):
    """https://arxiv.org/abs/1710.05941"""
    def forward(self, x):
        return x * torch.sigmoid(x)

def swish(x):
    return x * torch.sigmoid(x)

In [6]:
class ProductOfExperts(nn.Module):
    """
    Return parameters for product of independent experts.
    See https://arxiv.org/pdf/1410.7827.pdf for equations.

    @param loc: M x D for M experts
    @param scale: M x D for M experts
    """
    def forward(self, loc, scale, eps=1e-8, use_cuda=True):
        scale = scale + eps # numerical constant for stability
        # precision of i-th Gaussian expert (T = 1/sigma^2)
        T = 1. / scale
        product_loc = torch.sum(loc * T, dim=0) / torch.sum(T, dim=0)
        product_scale = 1. / torch.sum(T, dim=0)
        return product_loc, product_scale

In [7]:
class ImageEncoder(nn.Module):
    """
    define the PyTorch module that parametrizes q(z|image).
    This goes from images to the latent z
    
    This is the standard DCGAN architecture.

    @param z_dim: integer
                  size of the tensor representing the latent random variable z
    """
    def __init__(self, z_dim, img_size):
        super(ImageEncoder, self).__init__()
        #torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride=1, 
        #                padding=0, dilation=1, groups=1, bias=True)
        # H_out = floor( (H_in + 2*padding - dilation(kernel_size-1) -1) / stride    +1)
        self.img_size = img_size
        
        self.features = nn.Sequential(
            nn.Conv2d(3, 16, 3, 1, 1, bias=False), Swish(),
            nn.BatchNorm2d(16),
            nn.Conv2d(16, 32, 3, 1, 1, bias=False), Swish(),
            nn.BatchNorm2d(32),
            nn.Conv2d(32, 64, 3, 1, 1, bias=False), Swish(),
            nn.BatchNorm2d(64),
            nn.Conv2d(64, 128, 3, 1, 1, bias=False), Swish(),
            nn.BatchNorm2d(128),
        )
        
        # Here, we define two layers, one to give z_loc and one to give z_scale
        self.z_loc_layer = nn.Sequential(
            nn.Linear(128 * (self.img_size**2), 128),
            Swish(),
            nn.Dropout(p=0.1),
            nn.Linear(128, z_dim))
        
        self.z_scale_layer = nn.Sequential(
            nn.Linear(128 * (self.img_size**2), 128),
            Swish(),
            nn.Dropout(p=0.1),
            nn.Linear(128, z_dim))
        
        self.z_dim = z_dim

    def forward(self, image):
        hidden = self.features(image)
        hidden = hidden.view(-1, 128 * (self.img_size**2))
        z_loc = self.z_loc_layer(hidden)
        z_scale = torch.exp(self.z_scale_layer(hidden)) #add exp so it's always positive
        return z_loc, z_scale
    
class ImageDecoder(nn.Module):
    """
    define the PyTorch module that parametrizes p(image|z).
    This goes from the latent z to the images
    
    This is the standard DCGAN architecture.

    @param z_dim: integer
                  size of the tensor representing the latent random variable z
    """
    def __init__(self, z_dim, img_size):
        super(ImageDecoder, self).__init__()
        self.img_size = img_size
        
        self.upsample = nn.Sequential(
            nn.Linear(z_dim, 128 * (self.img_size**2)),
            Swish())
        
        self.hallucinate = nn.Sequential(
            nn.ConvTranspose2d(128, 64, 3, 1, 1, bias=False), Swish(),
            nn.BatchNorm2d(64),
            nn.ConvTranspose2d(64, 32, 3, 1, 1, bias=False), Swish(),
            nn.BatchNorm2d(32),
            nn.ConvTranspose2d(32, 16, 3, 1, 1, bias=False), Swish(),
            nn.BatchNorm2d(16),
            nn.ConvTranspose2d(16, 3, 3, 1, 1, bias=False))

    def forward(self, z):
        # the input will be a vector of size |z_dim|
        z = self.upsample(z)
        z = z.view(-1, 128, self.img_size, self.img_size)
        image = self.hallucinate(z) # this is the image
        return image  # NOTE: no sigmoid here. See train.py

In [8]:
class EmotionEncoder(nn.Module):
    """
    define the PyTorch module that parametrizes q(z|emotion category).
    This goes from ratings to the latent z

    @param z_dim: integer
                  size of the tensor representing the latent random variable z
    """
    def __init__(self, z_dim, use_cuda=True):
        super(EmotionEncoder, self).__init__()
        self.net = nn.Linear(NUM_CLASSES, 32)
        
        self.z_loc_layer = nn.Sequential(
            nn.Linear(32, 32),
            Swish(),
            nn.Linear(32, z_dim))
        
        self.z_scale_layer = nn.Sequential(
            nn.Linear(32, 32),
            Swish(),
            nn.Linear(32, z_dim))
        self.z_dim = z_dim

    def forward(self, emocat):
        hidden = self.net(emocat)
        z_loc = self.z_loc_layer(hidden)
        z_scale = torch.exp(self.z_scale_layer(hidden))
        return z_loc, z_scale


class EmotionDecoder(nn.Module):
    """
    define the PyTorch module that parametrizes p(emotion category|z).
    This goes from the latent z to the ratings

    @param z_dim: integer
                  size of the tensor representing the latent random variable z
    """
    def __init__(self, z_dim):
        super(EmotionDecoder, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(z_dim, 32),
            Swish())
        
        self.emotion_loc_layer = nn.Sequential(
            nn.Linear(32, 32),
            Swish(),
            nn.Linear(32, NUM_CLASSES))
        
        self.emotion_scale_layer = nn.Sequential(
            nn.Linear(32, 32),
            Swish(),
            nn.Linear(32, NUM_CLASSES))

    def forward(self, z):
        #batch_size = z.size(0)
        hidden = self.net(z)
        emotion_loc = self.emotion_loc_layer(hidden)
        emotion_scale = torch.exp(self.emotion_scale_layer(hidden))
        # rating is going to be a |emotions| * 9 levels
        #rating = h.view(batch_size, EMOTION_VAR_DIM, 9)
        return emotion_loc, emotion_scale  # NOTE: no softmax here. See train.py

In [9]:
class MVAE(nn.Module):
    """
    This class encapsulates the parameters (neural networks), models & guides needed to train a
    multimodal variational auto-encoder.
    Modified from https://github.com/mhw32/multimodal-vae-public
    Multimodal Variational Autoencoder.

    @param z_dim: integer
                  size of the tensor representing the latent random variable z
                  
    Currently all the neural network dimensions are hard-coded; 
    in a future version will make them be inputs into the constructor
    """
    def __init__(self, z_dim, img_size=128, use_cuda=True):
        super(MVAE, self).__init__()
        self.z_dim = z_dim
        self.img_size = img_size
        self.experts = ProductOfExperts()
        self.image_encoder = ImageEncoder(z_dim, img_size)
        self.image_decoder = ImageDecoder(z_dim, img_size)
        self.emotion_encoder = EmotionEncoder(z_dim)
        self.emotion_decoder = EmotionDecoder(z_dim)
        
        self.use_cuda = use_cuda
        # relative weights of losses in the different modalities
        self.LAMBDA_IMAGES = 1.0
        self.LAMBDA_RATINGS = 50.0
        
        # using GPUs for faster training of the networks
        if self.use_cuda:
            self.cuda()
            
    def model(self, images=None, emotions=None, annealing_beta=1.0):
        # register this pytorch module and all of its sub-modules with pyro
        pyro.module("mvae", self)
        
        batch_size = 0
        if images is not None:
            batch_size = images.size(0)
        elif emotions is not None:
            batch_size = emotions.size(0)
        
        with pyro.plate("data"):      
            # sample the latent z from the (constant) prior, z ~ Normal(0,I)
            z_prior_loc  = torch.zeros(size=[batch_size, self.z_dim])
            z_prior_scale = torch.exp(torch.zeros(size=[batch_size, self.z_dim]))     
            
            if self.use_cuda:
                z_prior_loc, z_prior_scale = z_prior_loc.cuda(), z_prior_scale.cuda()
            
            # sample from prior (value will be sampled by guide when computing the ELBO)
            with poutine.scale(scale=annealing_beta):
                z = pyro.sample("z", dist.Normal(z_prior_loc, z_prior_scale))

            # decode the latent code z (image decoder)
            img_loc = self.image_decoder.forward(z)
            
            # score against actual images
            if images is not None:
                with poutine.scale(scale=self.LAMBDA_IMAGES):
                    #img_loc = (img_loc - torch.min(img_loc)) / (torch.max(img_loc) - torch.min(img_loc))
                    pyro.sample("obs_img", dist.Bernoulli(img_loc), obs=images)
            
            # decode the latent code z (emotion decoder)
            emotion_loc, emotion_scale = self.emotion_decoder.forward(z)
            if emotions is not None:
                with poutine.scale(scale=self.LAMBDA_RATINGS):
                    pyro.sample("obs_emotion", dist.Normal(emotion_loc, emotion_scale), obs=emotions)

            # return the loc so we can visualize it later
            return img_loc, emotion_loc
        
    def guide(self, images=None, emotions=None, annealing_beta=1.0):
        # register this pytorch module and all of its sub-modules with pyro
        pyro.module("mvae", self)
        
        batch_size = 0
        if images is not None:
            batch_size = images.size(0)
        elif emotions is not None:
            batch_size = emotions.size(0)
            
        with pyro.plate("data"):
            # use the encoder to get the parameters used to define q(z|x)
                        
            # initialize the prior expert.
            # we initalize an additional dimension, along which we concatenate all the 
            #   different experts.
            # self.experts() then combines the information from these different modalities
            #   by multiplying the gaussians together
            
            z_loc = torch.zeros(torch.Size((1, batch_size, self.z_dim))) + 0.5
            z_scale = torch.ones(torch.Size((1, batch_size, self.z_dim))) * 0.1
            
            if self.use_cuda:
                z_loc, z_scale = z_loc.cuda(), z_scale.cuda()
                
            if images is not None:
                image_z_loc, image_z_scale = self.image_encoder.forward(images)
                z_loc = torch.cat((z_loc, image_z_loc.unsqueeze(0)), dim=0)
                z_scale = torch.cat((z_scale, image_z_scale.unsqueeze(0)), dim=0)
            
            if emotions is not None:
                emotion_z_loc, emotion_z_scale = self.emotion_encoder.forward(emotions)
                z_loc = torch.cat((z_loc, emotion_z_loc.unsqueeze(0)), dim=0)
                z_scale = torch.cat((z_scale, emotion_z_scale.unsqueeze(0)), dim=0)
            
            z_loc, z_scale = self.experts(z_loc, z_scale)
            # sample the latent z
            with poutine.scale(scale=annealing_beta):
                pyro.sample("z", dist.Normal(z_loc, z_scale))
                
                
    def forward(self, image=None, emotion=None):
        z_loc, z_scale  = self.infer(image, emotion)
        z = pyro.sample("z", dist.Normal(z_loc, z_scale).independent(1))
        # reconstruct inputs based on that gaussian
        image_recon = self.image_decoder(z)
        rating_recon = self.emotion_decoder(z)
        return image_recon, rating_recon, z_loc, z_scale
    
    
    def infer(self, images=None, emotions=None):
        batch_size = 0
        if images is not None:
            batch_size = images.size(0)
        elif emotions is not None:
            batch_size = emotions.size(0)
            
        # initialize the prior expert
        # we initalize an additional dimension, along which we concatenate all the 
        #   different experts.
        # self.experts() then combines the information from these different modalities
        #   by multiplying the gaussians together
        z_loc = torch.zeros(torch.Size((1, BATCH_SIZE, self.z_dim))) + 0.5
        z_scale = torch.ones(torch.Size((1, BATCH_SIXE, self.z_dim))) * 0.1
        
        if self.use_cuda:
            z_loc, z_scale = z_loc.cuda(), z_scale.cuda()

        if images is not None:
            image_z_loc, image_z_scale = self.image_encoder.forward(images)
            z_loc = torch.cat((z_loc, image_z_loc.unsqueeze(0)), dim=0)
            z_scale = torch.cat((z_scale, image_z_scale.unsqueeze(0)), dim=0)

        if emotions is not None:
            emotion_z_loc, emotion_z_scale = self.emotion_encoder.forward(emotions)
            z_loc = torch.cat((z_loc, emotion_z_loc.unsqueeze(0)), dim=0)
            z_scale = torch.cat((z_scale, emotion_z_scale.unsqueeze(0)), dim=0)

        z_loc, z_scale = self.experts(z_loc, z_scale)
        return z_loc, z_scale

    
    # define a helper function for reconstructing images
    def reconstruct_img(self, images):
        # encode image x
        z_loc, z_scale = self.image_encoder(images)
        # sample in latent space
        z = dist.Normal(z_loc, z_scale).sample()
        # decode the image (note we don't sample in image space)
        img_loc = self.image_decoder.forward(z)
        return img_loc

    
    # define a helper function for reconstructing images without sampling
    def reconstruct_img_nosample(self, images):
        # encode image x
        z_loc, z_scale = self.image_encoder(images)
        ## sample in latent space
        #z = dist.Normal(z_loc, z_scale).sample()
        # decode the image (note we don't sample in image space)
        img_loc = self.image_decoder.forward(z_loc)
        return img_loc

In [10]:
import gc

gc.collect()
torch.cuda.empty_cache()

pyro.clear_param_store()

class Args:
    learning_rate = 1e-5
    weight_decay = 1e-6
    num_epochs = 50 #500
    z_dim = DEFAULT_Z_DIM
    img_size = IMG_SIZE
    seed = 30
    cuda = True
    
args = Args()

# setup the optimizer
adam_args = {"lr": args.learning_rate}#, "weight_decay":args.weight_decay}
optimizer = Adam(adam_args)

# setup the VAE
mvae = MVAE(z_dim=args.z_dim, img_size=args.img_size, use_cuda=args.cuda)

# setup the inference algorithm
svi = SVI(mvae.model, mvae.guide, optimizer, loss=Trace_ELBO())

In [11]:
from tqdm import tqdm

train_elbo = []
# training loop
for epoch in tqdm(range(args.num_epochs)):
    # initialize loss accumulator
    epoch_loss = 0.
    # do a training epoch over each mini-batch returned
    # by the data loader
    for batch_num, sample in enumerate(dataset_loader):
                                  
        faces, emotions = sample['image'], sample['cat']
        emotions = torch.stack([emotion_rating_conversion(emo) for emo in emotions])
        
        # if on GPU put mini-batch into CUDA memory
        if args.cuda:
            faces, emotions = faces.cuda(), emotions.cuda()
        
        # do ELBO gradient and accumulate loss
        #print("Batch: ", batch_num, "out of", len(train_loader))
        epoch_loss += svi.step(images=faces, emotions=emotions)
        epoch_loss += svi.step(images=faces, emotions=None)
        epoch_loss += svi.step(images=None, emotions=emotions)

    # report training diagnostics
    normalizer_train = len(dataset_loader)
    total_epoch_loss_train = epoch_loss / normalizer_train
    train_elbo.append(total_epoch_loss_train)
    
    # report training diagnostics
    print("average training loss: %.4f" % (total_epoch_loss_train))

  2%|▌                         | 1/50 [07:53<6:26:34, 473.37s/it]

average training loss: 7349660.6373


  4%|█                         | 2/50 [16:36<6:41:57, 502.46s/it]

average training loss: 7321895.4475


  4%|█                         | 2/50 [21:34<8:37:45, 647.21s/it]


KeyboardInterrupt: 

In [None]:
import matplotlib.pyplot as plt

epochs = range(0, args.num_epochs)
plt.plot(epochs, train_elbo, 'g', label='Training loss')
plt.title('Training Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
NUM_SAMPLES = 10
input_array = np.zeros(shape=(IMG_SIZE, 1, 3), dtype="uint8")
reconstructed_array = np.zeros(shape=(IMG_SIZE, 1, 3), dtype="uint8")
    
# pick NUM_SAMPLES random test images from the first mini-batch and
# visualize how well we're reconstructing them

faces = next(iter(dataset_loader))['image']
if args.cuda:
    faces = faces.cuda()

reco_indices = np.random.randint(0, faces.size(0), NUM_SAMPLES)
for index in reco_indices:
    input_img = faces[index, :]
    # storing the input image
    input_img_display = np.array(input_img.cpu()*255., dtype='uint8')
    input_img_display = input_img_display.transpose((1, 2, 0))
    input_array = np.concatenate((input_array, input_img_display), axis=1)

    # generating the reconstructed image and adding to array
    input_img = input_img.view(1, 3, IMG_SIZE, IMG_SIZE)
    reconstructed_img = mvae.reconstruct_img_nosample(input_img)
    reconstructed_img = reconstructed_img.cpu().view(3, IMG_SIZE, IMG_SIZE).detach().numpy()
    reconstructed_img = np.array(reconstructed_img*255., dtype='uint8')
    reconstructed_img = reconstructed_img.transpose((1, 2, 0))
    reconstructed_array = np.concatenate((reconstructed_array, reconstructed_img), axis=1)

from PIL import Image

# remove first, blank column, and concatenate
input_array = input_array[:,1:,:]
reconstructed_array = reconstructed_array[:,1:,:]
display_array = np.concatenate((input_array, reconstructed_array), axis=0)
Image.fromarray(display_array)

In [None]:
import os

# save model if you decide to modify the above code to train your own model
savemodel = True
if savemodel:
    if not os.path.exists('./trained_models'):
      os.mkdir('./trained_models')
    pyro.get_param_store().save('trained_models/mvae_pretrained_02.save')

min average training loss: 1,576,161.5907 model 01