In [49]:
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from PIL import Image
import csv
import os
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from sklearn.feature_extraction.text import TfidfVectorizer
from torchtext.data import get_tokenizer
import string
import math
from transformers import BertTokenizer, BertModel
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn

In [2]:

# Choix de device

# device = 'cpu'
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [90]:
# Définition de START et END pour éviter des fautes de frappe :

START = "<start>"
END = "<end>"

MAX_VOCAB_SIZE = 1000
MAX_SEQ_LENGTH = 25

BATCH_SIZE = 16

EPOCHS = 20

DATASET_PATH = 'dataset/Flicker8k_Dataset/'

In [4]:
#
url_flicker_train = 'flickr_8k_train_dataset.txt';
def flicker_source(fName):
    """
    Une fonction qui charge un jeu de données, et renvoie un générateur
    permettant de le parcourir.

    Résultat:
        un générateur renvoyant des couples de string "nom d'image", "label de l'image"
    """
    images = []
    descriptions = []
    with open(fName, encoding='utf-8') as f:
        reader = csv.reader(f, delimiter='\t')
        next(reader) # skip the first row (column headers)
        for row in reader:
            image, texte = row
            images.append(image)
            descriptions.append(texte)
        return images, descriptions

def flicker_train():
    return flicker_source(url_flicker_train)

In [5]:
def preprocess_text(captions):
    nltk.download('stopwords')
    STOPWORDS = set(stopwords.words('english'))
    nltk.download('wordnet')
    lemmatizer = WordNetLemmatizer()

    # Tokenization des captions
    tokenizer = get_tokenizer("basic_english")
    tokenized_captions = []
    descriptions = []
    for caption in captions:
        description = caption.lower()
        description = description.replace(START, '')
        description = description.replace(END, '')
        description = description.translate(str.maketrans('', '', string.punctuation))
        description = description.split()
        description = [lemmatizer.lemmatize(word) for word in description if word not in STOPWORDS]
        description = ' '.join(description)
        descriptions.append(description)
        
    corpus = [word for description in descriptions for word in description.split(' ')]
    vocab = get_top_words(descriptions, math.ceil(len(set(corpus))*0.25))
                              
    preprocessed_descriptions = []
    for description in descriptions:
        valid_words = []
        for word in description.split(' '):
            if word in vocab:
                valid_words.append(word)
        preprocessed_descriptions.append(" ".join(valid_words))   
                          
    return preprocessed_descriptions

def get_top_words(descriptions, max_features=100):
    corpus = [word for description in descriptions for word in description.split(' ')]
    
    # Créer un vecteur de TF-IDF
    vectorizer = TfidfVectorizer(max_features=max_features)
    X = vectorizer.fit_transform(corpus)

    # Récupérer les mots les plus importants
    return list(vectorizer.vocabulary_.keys())

In [6]:
training_images, training_descriptions = flicker_train()

In [7]:
descriptions = preprocess_text(training_descriptions)

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...


In [83]:
# print(descriptions)

In [9]:
# Charger le tokenizer BERT
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Charger le modèle BERT
model = BertModel.from_pretrained('bert-base-uncased')

model.to(device)
def extract_text_features(description):
    tokens = tokenizer.encode(description, add_special_tokens=True)

    # Conversion des tokens en tenseur PyTorch
    tokens_tensor = torch.tensor([tokens]).to(device)

    # Encodage du texte avec le modèle BERT
    with torch.no_grad():
        outputs = model(tokens_tensor)

    # Récupération de la représentation du texte
    last_hidden_state = outputs[0]
    return torch.mean(last_hidden_state, dim=1).squeeze()

Downloading (…)solve/main/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/28.0 [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/440M [00:00<?, ?B/s]

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.predictions.bias', 'cls.predictions.transform.dense.weight', 'cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.bias', 'cls.predictions.decoder.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [10]:
description_features = []
for description in descriptions:
    description_features.append(extract_text_features(description))
    
# print(description_features)

In [92]:
 print(description_features[4].shape)

torch.Size([768])


In [23]:
model = models.resnet18(pretrained=True)
model.to(device)

def get_image_feature(path):
    img = Image.open(path)
    preprocess = transforms.Compose([
       transforms.Resize(256),
       transforms.CenterCrop(224),
       transforms.ToTensor(),
       transforms.Normalize(mean=[0.485, 0.456, 0.406],
                            std=[0.229, 0.224, 0.225])
    ])
    img_tensor = preprocess(img).to('cuda')
    img_tensor.unsqueeze_(0)

    # Utilisez le modèle pour extraire les features
    with torch.no_grad():
        features = model.conv1(img_tensor)
        features = model.bn1(features)
        features = model.relu(features)
        features = model.maxpool(features)

        features = model.layer1(features)
        features = model.layer2(features)
        features = model.layer3(features)
        features = model.layer4(features)
        features = model.avgpool(features)
        
    return features

In [24]:
features = []
for training_image in training_images:
    features.append(get_image_feature(DATASET_PATH + training_image))

In [88]:
print(len(features))

30000


In [34]:
torch.cat((description_features[0], features[0].flatten())).shape
concat_features = []
for i in range(len(features)):
    concat_features.append(torch.cat((description_features[i], features[i].flatten())))

In [38]:
print(len(concat_features))

30000


In [47]:
print(concat_features[0].shape)
print(torch.ones((len(concat_features))))

torch.Size([1280])
tensor([1., 1., 1.,  ..., 1., 1., 1.])


In [78]:
# Définir une classe Dataset pour vos données d'entraînement
class ImageDescDataset(Dataset):
    def __init__(self, features, similarities):
        self.features = features
        self.similarities = similarities
        
    def __len__(self):
        return len(self.features)
    
    def __getitem__(self, idx):
        feature = self.features[idx]
        similarity = self.similarities[idx]
        return feature, similarity

# Définir votre modèle
class SimilarityModel(nn.Module):
    def __init__(self, input_size):
        super(SimilarityModel, self).__init__()
        hidden_size = input_size // 2
        self.fc1 = nn.Linear(input_size, hidden_size)
        
        input_size = hidden_size
        hidden_size = input_size // 2
        self.fc2 = nn.Linear(input_size, hidden_size)
        
        input_size = hidden_size
        hidden_size = input_size // 2
        self.fc3 = nn.Linear(input_size, hidden_size)
        
        input_size = hidden_size
        self.fc4 = nn.Linear(input_size, 1)
        
    def forward(self, features):
        x = torch.cat((image_features, desc_features), dim=1)
        x = self.fc1(x)
        x = torch.relu(x)
        x = self.fc2(x)
        x = torch.relu(x)
        x = self.fc3(x)
        x = torch.relu(x)
        x = self.fc4(x)
        x = torch.sigmoid(x)
        return x

In [91]:
# Définir votre fonction de perte et l'optimiseur
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

features = concat_features
similarities = torch.ones((len(concat_features))).to(device)
train_dataset = ImageDescDataset(features[:24000], similarities[:24000])
val_dataset = ImageDescDataset(features[24000:27000], similarities[24000:27000])
test_dataset = ImageDescDataset(features[27000:], similarities[27000:])

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

model = SimilarityModel(input_size=1280).to(device)

for epoch in range(EPOCHS):
     # Boucle d'entraînement
    for i, (feature, similarity) in enumerate(train_loader):
        optimizer.zero_grad()
        outputs = model(feature)
        loss = criterion(outputs, similarity)
        loss.backward()
        optimizer.step()
        # Afficher la perte toutes les 50 itérations
        if (i+1) % 50 == 0:
            print(f'Epoch {epoch+1}, iteration {i+1}, loss = {loss.item()}')
    
    # Boucle de validation
    with torch.no_grad():
        total_loss = 0
        for feature, similarity in val_loader:
            outputs = model(feature)
            loss = criterion(outputs, similarity)
            total_loss += loss.item() * image_features.size(0)
        val_loss = total_loss / len(val_dataset)
        print(f'Epoch {epoch+1}, validation loss = {val_loss}')
        
# Tester les performances de votre modèle sur l'ensemble de test
with torch.no_grad():
    total_loss = 0
    for feature, similarity in test_loader:
        outputs = model(feature)
        loss = criterion(outputs, similarity)
        total_loss += loss.item() * image_features.size(0)
    test_loss = total_loss / len(test_dataset)
    print(f'Test loss = {test_loss}')

Epoch 1, iteration 50, loss = 0.24931737780570984
Epoch 1, iteration 100, loss = 0.24931737780570984
Epoch 1, iteration 150, loss = 0.24931737780570984
Epoch 1, iteration 200, loss = 0.24931737780570984
Epoch 1, iteration 250, loss = 0.24931737780570984
Epoch 1, iteration 300, loss = 0.24931737780570984
Epoch 1, iteration 350, loss = 0.24931737780570984
Epoch 1, iteration 400, loss = 0.24931737780570984
Epoch 1, iteration 450, loss = 0.24931737780570984
Epoch 1, iteration 500, loss = 0.24931737780570984
Epoch 1, iteration 550, loss = 0.24931737780570984
Epoch 1, iteration 600, loss = 0.24931737780570984
Epoch 1, iteration 650, loss = 0.24931737780570984
Epoch 1, iteration 700, loss = 0.24931737780570984
Epoch 1, iteration 750, loss = 0.24931737780570984
Epoch 1, iteration 800, loss = 0.24931737780570984
Epoch 1, iteration 850, loss = 0.24931737780570984
Epoch 1, iteration 900, loss = 0.24931737780570984
Epoch 1, iteration 950, loss = 0.24931737780570984
Epoch 1, iteration 1000, loss = 