In [1]:
## preprocess coco dataset

In [2]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from torch import nn
import torch
import timm
from transformers import BertModel, BertTokenizer
from torch import nn
from torchvision import transforms
from PIL import Image

In [3]:
trainfile = "train_words_gt.txt"
valfile = "val_words_gt.txt"

In [4]:

trainfile = "train_words_gt.txt"
valfile = "val_words_gt.txt"
with open(trainfile, 'r') as file:
    lines = [line.strip().split(',', 1) for line in file]
df1 = pd.DataFrame(lines, columns=['image', 'text'])
with open(valfile, 'r') as file:
    lines = [line.strip().split(',', 1) for line in file]
df2 = pd.DataFrame(lines, columns=['image', 'text'])
df = pd.concat([df1, df2], ignore_index=True)
df = df.dropna()
df['text'] = df['text'].apply(lambda x: x.lower())
df = df.reset_index()
print(df)

       index    image        text
0          0  1001724    chiquita
1          1  1001723  06/01/2009
2          2  1228192        brak
3          3  1080793        kirg
4          4  1228189        slow
...      ...      ...         ...
52509  52564  1103359        tony
52510  52565  1103361       small
52511  52566  1103360    elephant
52512  52567  1103357       waddy
52513  52568  1064658        stop

[52514 rows x 3 columns]


In [5]:
# convert text to encodings using charbert finetuned

In [6]:
class CFG:
    debug = False
    batch_size = 32
    num_workers = 2
    head_lr = 1e-3
    image_encoder_lr = 1e-4
    text_encoder_lr = 1e-5
    weight_decay = 1e-3
    patience = 1
    factor = 0.8
    epochs = 1
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model_name = 'resnet50'
    image_embedding = 2048
    text_encoder_model = "imvladikon/charbert-bert-wiki"
    # text_encoder_model = "google/byt5-large"
    text_embedding = 768
    # text_embedding = 1472
    text_tokenizer = "imvladikon/charbert-bert-wiki"
    # text_tokenizer = "google/byt5-large"
    max_length = 200

    pretrained = True # for both image encoder and text encoder
    trainable = True # for both image encoder and text encoder
    temperature = 1.0

    # image size
    size = 224

    # for projection head; used for both image and text encoders
    num_projection_layers = 1
    projection_dim = 256
    dropout = 0.1

class ImageEncoder(nn.Module):
    """
    Encode images to a fixed size vector
    """

    def __init__(
        self, model_name=CFG.model_name, pretrained=CFG.pretrained, trainable=CFG.trainable
    ):
        super().__init__()
        self.model = timm.create_model(
            model_name, pretrained, num_classes=0, global_pool="avg"
        )
        for p in self.model.parameters():
            p.requires_grad = trainable

    def forward(self, x):
        return self.model(x)

class TextEncoder(nn.Module):
    def __init__(self, model_name=CFG.text_encoder_model, pretrained=CFG.pretrained, trainable=CFG.trainable):
        super().__init__()
        if pretrained:
            # self.model = DistilBertModel.from_pretrained(model_name)
            self.model = BertModel.from_pretrained(model_name)
            # self.model = T5EncoderModel.from_pretrained(model_name)
        else:
            # self.model = DistilBertModel(config=DistilBertConfig())
            _ = None

        for p in self.model.parameters():
            p.requires_grad = trainable

        # we are using the CLS token hidden representation as the sentence's embedding
        self.target_token_idx = 0

    def forward(self, input_ids, attention_mask):
        output = self.model(input_ids=input_ids, attention_mask=attention_mask)
        last_hidden_state = output.last_hidden_state
        return last_hidden_state[:, self.target_token_idx, :]

class ProjectionHead(nn.Module):
    def __init__(
        self,
        embedding_dim,
        projection_dim=CFG.projection_dim,
        dropout=CFG.dropout
    ):
        super().__init__()
        self.projection = nn.Linear(embedding_dim, projection_dim)
        self.gelu = nn.GELU()
        self.fc = nn.Linear(projection_dim, projection_dim)
        self.dropout = nn.Dropout(dropout)
        self.layer_norm = nn.LayerNorm(projection_dim)

    def forward(self, x):
        projected = self.projection(x)
        x = self.gelu(projected)
        x = self.fc(x)
        x = self.dropout(x)
        x = x + projected
        x = self.layer_norm(x)
        return x

class CLIPModel(nn.Module):
    def __init__(
        self,
        temperature=CFG.temperature,
        image_embedding=CFG.image_embedding,
        text_embedding=CFG.text_embedding,
    ):
        super().__init__()
        self.image_encoder = ImageEncoder()
        self.text_encoder = TextEncoder()
        self.image_projection = ProjectionHead(embedding_dim=image_embedding)
        self.text_projection = ProjectionHead(embedding_dim=text_embedding)
        self.temperature = temperature

    def forward(self, batch):
        # Getting Image and Text Features
        image_features = self.image_encoder(batch["image"])
        text_features = self.text_encoder(
            input_ids=batch["input_ids"], attention_mask=batch["attention_mask"]
        )
        # Getting Image and Text Embeddings (with same dimension)
        image_embeddings = self.image_projection(image_features)
        text_embeddings = self.text_projection(text_features)

        # Calculating the Loss
        logits = (text_embeddings @ image_embeddings.T) / self.temperature
        images_similarity = image_embeddings @ image_embeddings.T
        texts_similarity = text_embeddings @ text_embeddings.T
        targets = F.softmax(
            (images_similarity + texts_similarity) / 2 * self.temperature, dim=-1
        )
        texts_loss = cross_entropy(logits, targets, reduction='none')
        images_loss = cross_entropy(logits.T, targets.T, reduction='none')
        loss =  (images_loss + texts_loss) / 2.0 # shape: (batch_size)
        return loss.mean()



In [7]:
model = CLIPModel().to(CFG.device)
model.load_state_dict(torch.load("best.pt", map_location=CFG.device))
model.eval()
model = model.text_encoder
tokenizer = BertTokenizer.from_pretrained(CFG.text_tokenizer)

In [9]:
inputs = tokenizer(df['text'].values.tolist(), padding=True, truncation=True, max_length=CFG.max_length)
embs = list()
for i in range(len(inputs['input_ids'])):
    with torch.no_grad():
        text_features = model(
            input_ids=torch.tensor([inputs['input_ids'][i]]).to('cuda'), attention_mask=torch.tensor([inputs['attention_mask'][i]]).to('cuda')
        )
        torch.save(text_features.cpu().squeeze(), "./pipeline/text/"+str(i)+'.pt')
        embs.append(str(i)+'.pt')

In [10]:
df['text_emb'] = embs
df.tail(3)

Unnamed: 0,index,image,text,text_emb
52511,52566,1103360,elephant,52511.pt
52512,52567,1103357,waddy,52512.pt
52513,52568,1064658,stop,52513.pt


In [12]:
class Encoder(nn.Module):
    
    def __init__(self, input_dim, hidden_dim, latent_dim):
        super(Encoder, self).__init__()

        self.FC_input = nn.Linear(input_dim, hidden_dim)
        self.FC_input2 = nn.Linear(hidden_dim, hidden_dim)
        self.FC_input3 = nn.Linear(hidden_dim, hidden_dim)
        self.FC_mean  = nn.Linear(hidden_dim, latent_dim)
        self.FC_var   = nn.Linear (hidden_dim, latent_dim)
        
        self.LeakyReLU = nn.LeakyReLU(0.2)
        
        self.training = True
        
    def forward(self, x):
        h_       = self.LeakyReLU(self.FC_input(x))
        h_       = self.LeakyReLU(self.FC_input2(h_))
        h_       = self.LeakyReLU(self.FC_input3(h_))
        mean     = self.FC_mean(h_)
        log_var  = self.FC_var(h_)                     # encoder produces mean and log of variance 
                                                       #             (i.e., parateters of simple tractable normal distribution "q"
        
        return mean, log_var

class Decoder(nn.Module):
    def __init__(self, latent_dim, hidden_dim, output_dim):
        super(Decoder, self).__init__()
        self.FC_hidden = nn.Linear(latent_dim, hidden_dim)
        self.FC_hidden2 = nn.Linear(hidden_dim, hidden_dim)
        self.FC_hidden3 = nn.Linear(hidden_dim, hidden_dim)
        self.FC_output = nn.Linear(hidden_dim, output_dim)
        
        self.LeakyReLU = nn.LeakyReLU(0.2)
        
    def forward(self, x):
        h     = self.LeakyReLU(self.FC_hidden(x))
        h     = self.LeakyReLU(self.FC_hidden2(h))
        h     = self.LeakyReLU(self.FC_hidden3(h))
        
        x_hat = torch.sigmoid(self.FC_output(h))
        return x_hat

class VAE(nn.Module):
    def __init__(self, Encoder, Decoder):
        super(VAE, self).__init__()
        self.Encoder = Encoder
        self.Decoder = Decoder
        
    def reparameterization(self, mean, var):
        epsilon = torch.randn_like(var).to('cuda')        # sampling epsilon        
        z = mean + var*epsilon                          # reparameterization trick
        return z
        
                
    def forward(self, x):
        mean, log_var = self.Encoder(x)
        z = self.reparameterization(mean, torch.exp(0.5 * log_var)) # takes exponential function (log var -> var)
        x_hat            = self.Decoder(z)
        
        return x_hat, mean, log_var

In [13]:
x_dim  = 224*224
hidden_dim = 512
latent_dim = 256

encoder = Encoder(input_dim=x_dim, hidden_dim=hidden_dim, latent_dim=latent_dim)
decoder = Decoder(latent_dim=latent_dim, hidden_dim = hidden_dim, output_dim = x_dim)

vae = VAE(Encoder=encoder, Decoder=decoder).to('cuda')
vae.load_state_dict(torch.load('vae.pt'))
vae.eval()

VAE(
  (Encoder): Encoder(
    (FC_input): Linear(in_features=50176, out_features=512, bias=True)
    (FC_input2): Linear(in_features=512, out_features=512, bias=True)
    (FC_input3): Linear(in_features=512, out_features=512, bias=True)
    (FC_mean): Linear(in_features=512, out_features=256, bias=True)
    (FC_var): Linear(in_features=512, out_features=256, bias=True)
    (LeakyReLU): LeakyReLU(negative_slope=0.2)
  )
  (Decoder): Decoder(
    (FC_hidden): Linear(in_features=256, out_features=512, bias=True)
    (FC_hidden2): Linear(in_features=512, out_features=512, bias=True)
    (FC_hidden3): Linear(in_features=512, out_features=512, bias=True)
    (FC_output): Linear(in_features=512, out_features=50176, bias=True)
    (LeakyReLU): LeakyReLU(negative_slope=0.2)
  )
)

In [14]:
tf = transforms.Compose([transforms.Grayscale(), transforms.Resize((224,224)), transforms.ToTensor()])
img_embs = list()
with torch.no_grad():
    for i in range(len(df)):
        input = Image.open("./cocodata/"+df['image'].iloc[i]+".jpg")
        input = tf(input)
        input = input.view(input.size()[0], x_dim).to('cuda')
        mean, log_var = vae.Encoder(input)
        output = vae.reparameterization(mean, torch.exp(0.5 *log_var)).cpu()
        output = output.squeeze()
        torch.save(output, "./pipeline/img/"+str(i)+'.pt')
        img_embs.append(str(i)+'.pt')

df['img_emb'] = img_embs
df.tail(3)

Unnamed: 0,index,image,text,text_emb,img_emb
52511,52566,1103360,elephant,52511.pt,52511.pt
52512,52567,1103357,waddy,52512.pt,52512.pt
52513,52568,1064658,stop,52513.pt,52513.pt


In [15]:
df.to_csv("./pipeline/data.csv", index=False)

In [21]:
from torch.utils.data import Dataset
class CustomDataset(Dataset):

    def __init__(self, df, transform=None):
        value_counts = df['text'].value_counts()
        values_to_keep = value_counts[(value_counts >= 5) & (value_counts <= 10)].index
        df = df[df['text'].isin(values_to_keep)]
        df['id'] = pd.factorize(df['text'])[0]
        self.df = df
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):

        # label = self.df['id'].iloc[idx]
        emb_id = self.df["text_emb"].iloc[idx]
        emb = torch.load("./pipeline/text/"+emb_id).squeeze()
        
        # img = Image.open("./cocodata/"+str(self.df['image'].iloc[idx]))
        # if self.transform:
        #     img = self.transform(img)

        img = torch.load("./pipeline/img/"+emb_id).squeeze()

        return img, emb

In [22]:
dataset = CustomDataset(pd.read_csv("./pipeline/data.csv"))

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['id'] = pd.factorize(df['text'])[0]
