In [None]:
# Importa bibliotecas necessárias

import json
import torch
import pickle
import warnings
import torch.nn as nn
import torchvision.transforms as transforms

from PIL import Image
from tqdm import tqdm

warnings.filterwarnings('ignore')

In [None]:
def load_base(name_arq):

    f = open(name_arq, encoding="utf8")
    data = json.load(f)

    # Pega apenas as "respondiveis"
    #data = [d for d in data if d["answerable"] == 1]

    return data

In [None]:
# convert data to a normalized torch.FloatTensor
transform_patches = transforms.Compose([
    transforms.Resize(size=(224,224)),
    transforms.ToTensor()])

transform_img = transforms.Compose([
    transforms.Resize(size=(112,112)),
    transforms.ToTensor()])

In [None]:
class Encoder(nn.Module):

    def __init__(self,
                 num_input_channels : int,
                 base_channel_size : int,
                 latent_dim : int,
                 act_fn : object = nn.GELU):
        """
        Inputs:
            - num_input_channels : Number of input channels of the image. For CIFAR, this parameter is 3
            - base_channel_size : Number of channels we use in the first convolutional layers. Deeper layers might use a duplicate of it.
            - latent_dim : Dimensionality of latent representation z
            - act_fn : Activation function used throughout the encoder network
        """
        super().__init__()
        c_hid = base_channel_size
        self.net = nn.Sequential(
            nn.Conv2d(num_input_channels, c_hid, kernel_size=3, padding=1, stride=2), # 32x32 => 16x16
            act_fn(),
            nn.Conv2d(c_hid, c_hid, kernel_size=3, padding=1),
            act_fn(),
            nn.Conv2d(c_hid, 2*c_hid, kernel_size=3, padding=1, stride=2), # 16x16 => 8x8
            act_fn(),
            nn.Conv2d(2*c_hid, 2*c_hid, kernel_size=3, padding=1),
            act_fn(),
            nn.Conv2d(2*c_hid, 2*c_hid, kernel_size=3, padding=1, stride=2), # 8x8 => 4x4
            act_fn(),
            nn.Flatten(), # Image grid to single feature vector
            #nn.Linear(2*16*c_hid, latent_dim)
            nn.Linear(2*196*c_hid, latent_dim)
        )

    def forward(self, x):
        return self.net(x)

#################################################################################################
class Decoder(nn.Module):

    def __init__(self,
                 num_input_channels : int,
                 base_channel_size : int,
                 latent_dim : int,
                 act_fn : object = nn.GELU):
        """
        Inputs:
            - num_input_channels : Number of channels of the image to reconstruct. For CIFAR, this parameter is 3
            - base_channel_size : Number of channels we use in the last convolutional layers. Early layers might use a duplicate of it.
            - latent_dim : Dimensionality of latent representation z
            - act_fn : Activation function used throughout the decoder network
        """
        super().__init__()
        c_hid = base_channel_size
        self.linear = nn.Sequential(
            #nn.Linear(latent_dim, 2*16*c_hid),
            nn.Linear(latent_dim, 2*196*c_hid),
            act_fn()
        )
        self.net = nn.Sequential(
            nn.ConvTranspose2d(2*c_hid, 2*c_hid, kernel_size=3, output_padding=1, padding=1, stride=2), # 4x4 => 8x8
            act_fn(),
            nn.Conv2d(2*c_hid, 2*c_hid, kernel_size=3, padding=1),
            act_fn(),
            nn.ConvTranspose2d(2*c_hid, c_hid, kernel_size=3, output_padding=1, padding=1, stride=2), # 8x8 => 16x16
            act_fn(),
            nn.Conv2d(c_hid, c_hid, kernel_size=3, padding=1),
            act_fn(),
            nn.ConvTranspose2d(c_hid, num_input_channels, kernel_size=3, output_padding=1, padding=1, stride=2), # 16x16 => 32x32
            nn.Tanh() # The input images is scaled between -1 and 1, hence the output has to be bounded as well
        )

    def forward(self, x):
        x = self.linear(x)
        #x = x.reshape(x.shape[0], -1, 4, 4)
        x = x.reshape(x.shape[0], -1, 14, 14)
        x = self.net(x)
        return x

##################################################################################################
class Autoencoder(nn.Module):

    def __init__(self,
                 base_channel_size: int = 112,
                 latent_dim: int = 768,
                 encoder_class : object = Encoder,
                 decoder_class : object = Decoder,
                 num_input_channels: int = 3,
                 width: int = 112,
                 height: int = 112):
        super().__init__()
        # Creating encoder and decoder
        self.encoder = encoder_class(num_input_channels, base_channel_size, latent_dim)
        self.decoder = decoder_class(num_input_channels, base_channel_size, latent_dim)

    def forward(self, x):
        """
        The forward function takes in an image and returns the reconstructed image
        """
        z = self.encoder(x)
        x_hat = self.decoder(z)
        return x_hat

In [None]:
model = Autoencoder()
checkpoint = torch.load("autoencoder.ckpt", map_location=torch.device('cpu'))
model.load_state_dict(checkpoint['model_state_dict'])

In [None]:
def get_embedding_autoencoder(model, img_tensor):

    model.eval()
    x = model.encoder(img_tensor)

    return x

In [None]:
# Visão
"""
{"imagem.jpg": [vetores],
"imagem2.jpg": [vetores],
...
}
"""

In [None]:
def get_train_images(base, num):
    return torch.stack([base[i] for i in range(num)], dim=0)

In [None]:
def get_info_visao(name_arq, model, tam_base):

    # Realiza a leitura da base
    data = load_base(name_arq)

    data = data[18000:]

    # Define diretório onde se encontram as imagens
    dir_img = name_arq.split(".json")[0]

    # Irá carregar as informações visuais referentes a cada uma das imagens
    info_visao = {}
    base = []

    for info in tqdm(data):

        # Pega o nome da imagem
        name_img = info["image"]

        # Faz a leitura da imagem
        img = Image.open(dir_img+"/"+name_img)

        # Padroniza a imagem
        img_patches = transform_patches(img)
        img_tensor = transform_img(img)

        # Coloca a imagem e seus patches em uma lista para obter os seus embeddings
        base.append(img_tensor)
        base.append(img_patches[:, :112, :112])
        base.append(img_patches[:, :112, 112:])
        base.append(img_patches[:, 112:, :112])
        base.append(img_patches[:, 112:, 112:])

        images = get_train_images(base, 5)

        # Pega os embeddings referentes a imagem
        embedding_img = get_embedding_autoencoder(model, images)

        # Atualiza as informações da imagem
        info_visao[name_img] = embedding_img

    return info_visao

In [None]:
def save_info_visao(name_arq_in, info_visao):

    name_arq_out = name_arq_in.split(".json")[0]+"_info_visao.pkl"

    file = open(name_arq_out, 'ab')
    pickle.dump(info_visao, file, pickle.HIGHEST_PROTOCOL)
    file.close()

    return

In [None]:
%%time

name_arq = "train.json"
info_visao = get_info_visao(name_arq, model, 1)
save_info_visao(name_arq, info_visao)

#### Fontes

https://colab.research.google.com/drive/1jIflL9-gktbXq_2cEE_KM7yYq2PaOKRM?authuser=1#scrollTo=8cRyNhMQaAyh

https://khvmaths.medium.com/vision-transformer-understanding-the-underlying-concept-83d699d71180