Format lại code

In [1]:
import numpy as np
from numpy import array
import pandas as pd
import matplotlib.pyplot as plt
import string
import os
from PIL import Image
import glob
from pickle import dump, load
import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, TensorDataset
import torch.nn.functional as F
import torchvision.models as models
from torchvision import transforms
import math

from transformers import BertTokenizerFast

tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased')
BATCH_SIZE = 64
device = 'cuda'
SEQ_LENGTH = 16


In [2]:
image_transforms = transforms.Compose([
    transforms.Resize((324, 324)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # Dùng cho ImageNet
])

In [3]:
def get_data(): # Do dataloader bị lỗi đa luồng ...
    import util
    data_folder_path = "Flickr8k/Flicker8k_Dataset"
    train_dataset = util.Flickr8kDataset(
        data_folder_path,
        "Data_bert/train_set_bert.pkl",
        image_transforms,
        device='cpu',
        seq_length=SEQ_LENGTH
    )
    trainloader = train_dataset.get_dataloader(batch_size=BATCH_SIZE, num_workers=0, shuffle=False)
    test_dataset = util.Flickr8kDataset(
        data_folder_path,
        "Data_bert/test_set_bert.pkl",
        image_transforms,
        device='cpu',
        seq_length=SEQ_LENGTH
    )
    testloader = test_dataset.get_dataloader(batch_size=BATCH_SIZE, num_workers=0, shuffle=False)
    print(len(train_dataset))
    print(len(test_dataset)) 
    sample_image, sample_caption, sample_target = test_dataset[0]
    print(sample_image.shape, sample_caption.shape, sample_target.shape)
    print(sample_caption)
    print(sample_target)
    final_train_data = []
    progress = 0
    last_log = 0
    for images, captions, targets in trainloader:
        for i in range(len(images)):
            final_train_data.append((images[i], captions[i], targets[i]))
        progress += 1
        current = progress / len(trainloader) * 100
        if ( current - last_log > 5):
            last_log = current
            print(f"Train {current} %")
    final_test_data = []
    for images, captions, targets in testloader:
        for i in range(len(images)):
            final_test_data.append((images[i], captions[i], targets[i]))
    trainloader = DataLoader(final_train_data, batch_size = BATCH_SIZE, num_workers=2, shuffle=False)
    testloader = DataLoader(final_test_data, batch_size = BATCH_SIZE, num_workers=2, shuffle=False)
    return trainloader, testloader
trainloader, testloader = get_data()

TypeError: Flickr8kDataset.__init__() got an unexpected keyword argument 'seq_length'

In [None]:
detokenize = tokenizer.convert_ids_to_tokens

for images, captions, targets in trainloader:
    for i in range(captions.shape[0]):
        print(detokenize(captions[i]))
        print("-", detokenize(targets[i]))
    break

['[CLS]', 'a', 'child', 'in', 'a', 'pink', 'dress', 'is', 'climbing', 'up', 'a', 'set', 'of', 'stairs', 'in', 'an']
- ['a', 'child', 'in', 'a', 'pink', 'dress', 'is', 'climbing', 'up', 'a', 'set', 'of', 'stairs', 'in', 'an', 'entry']
['a', 'child', 'in', 'a', 'pink', 'dress', 'is', 'climbing', 'up', 'a', 'set', 'of', 'stairs', 'in', 'an', 'entry']
- ['child', 'in', 'a', 'pink', 'dress', 'is', 'climbing', 'up', 'a', 'set', 'of', 'stairs', 'in', 'an', 'entry', 'way']
['child', 'in', 'a', 'pink', 'dress', 'is', 'climbing', 'up', 'a', 'set', 'of', 'stairs', 'in', 'an', 'entry', 'way']
- ['in', 'a', 'pink', 'dress', 'is', 'climbing', 'up', 'a', 'set', 'of', 'stairs', 'in', 'an', 'entry', 'way', '.']
['in', 'a', 'pink', 'dress', 'is', 'climbing', 'up', 'a', 'set', 'of', 'stairs', 'in', 'an', 'entry', 'way', '.']
- ['a', 'pink', 'dress', 'is', 'climbing', 'up', 'a', 'set', 'of', 'stairs', 'in', 'an', 'entry', 'way', '.', '[SEP]']
['[CLS]', 'a', 'black', 'dog', 'and', 'a', 'tri', '-', 'colored

In [None]:
class EncoderCNN(nn.Module):
    def __init__(self, output_size: int):
        super(EncoderCNN, self).__init__()
        self.inception_model = models.inception_v3(pretrained=True)
        #self.inception_model.fc = torch.nn.Identity()
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.2)
        self.fc = nn.Linear(1000, output_size)
        for name, param in self.inception_model.named_parameters():
            if "fc.weight" in name or "fc.bias" in name:
                param.requires_grad = True
            else:
                param.requires_grad = False
    def forward(self, images: torch.Tensor):
        features = self.inception_model(images) #[1, 2048]
        if isinstance(features, tuple):  # Nếu là tuple
            features = features[0] 
        features = self.relu(features)
        features = self.dropout(features)
        features = self.fc(features)
        return features
class DecoderRNN(nn.Module):
    def __init__(self, embed_size, vocab_size, hidden_size, input_size, num_layers):
        super(DecoderRNN, self).__init__()
        self.vocab_size = vocab_size
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size + input_size, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        self.num_layers = num_layers
        self.hidden_size = hidden_size
    def forward(self, features: torch.Tensor, captions: torch.Tensor, hidden_state: tuple[torch.Tensor, torch.Tensor] = None): 
        # seq = 1
        # features : image_features : [bsz, embed]
        # captions : [bsz, seq]
        # hidden : [1, bsz, embed]
        embeddings = self.embed(captions) # [bsz, seq, embed]
        features = features.unsqueeze(1).expand(-1, embeddings.shape[1], -1) # [bsz, seq, embed]
        combined = torch.cat((features, embeddings), dim=2) # [bsz, seq, embed*2]
        if hidden_state == None:
            hidden_state = (
                torch.zeros(self.num_layers, captions.shape[0], self.hidden_size).to(captions.device),
                torch.zeros(self.num_layers, captions.shape[0], self.hidden_size).to(captions.device)
            )
        output, hidden = self.lstm(combined, hidden_state) # [bsz, seq, hid]
        output = self.relu(output)
        output = self.dropout(output)
        output = self.linear(output) #[batch_size, seq_len, vocab_size]
        return output, hidden
class ImageToTextModel(nn.Module):
    def __init__(self, encoder: nn.Module, decoder: nn.Module):
        self.encoder: EncoderCNN = encoder
        self.decoder: DecoderRNN = decoder
    def forward(self, images: torch.Tensor, captions: torch.Tensor):
        bsz = images.shape[0]
        hidden_state: tuple[torch.Tensor, torch.Tensor] = None
        features = self.encoder(images)
        seq_predicted = []
        seq_predicted.append(torch.zeros((bsz, self.decoder.vocab_size), dtype=torch.float32).to(device))
        decoder_input = captions[:, 0]
        seq_length = captions.shape[1]
        for di in range(1, seq_length):
            output_decoder, hidden_state = self.decoder(features, decoder_input, hidden_state)
            decoder_input = captions[:, di]
            seq_predicted.append(output_decoder)
        return torch.tensor(seq_predicted)
    def predict(self, images: torch.Tensor, captions: torch.Tensor, predict_length: int):
        hidden_state: tuple[torch.Tensor, torch.Tensor] = None
        features = self.encoder(images)
        seq_predicted = []
        decoder_input = captions[:, 0]
        seq_length = captions.shape[1]
        for di in range(1, seq_length):
            output_decoder, hidden_state = self.decoder(features, decoder_input, hidden_state)
            decoder_input = captions[:, di]
        for di in range(1, predict_length):
            output_decoder, hidden_state = self.decoder(features, decoder_input, hidden_state)
            seq_predicted.append(output_decoder)
            decoder_input = output_decoder.argmax(1)
        return torch.tensor(seq_predicted)

In [None]:
image_size = 128
encoder = EncoderCNN(
    output_size=image_size
)
decoder = DecoderRNN(
    embed_size=16,
    vocab_size=tokenizer.vocab_size,
    hidden_size=128,
    input_size=image_size,
    num_layers=1
)
image_to_text_model = ImageToTextModel(
    encoder=encoder,
    decoder=decoder
)
optimizer = torch.optim.Adam(image_to_text_model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
num_epochs = 10
image_to_text_model.to(device)
image_to_text_model.train()
detokenize = tokenizer.convert_ids_to_tokens

for epoch in tqdm.trange(num_epochs):
    i = 0
    total_loss = 0
    count = 0
    for images, captions in trainloader:
        optimizer.zero_grad()
        images = images.to(device)
        captions = captions.to(device)
        targets = targets.to(device)
        encoder_output = encoder(images)
        # captions = torch.clamp(captions, 1, 1e5).long()
        decoder_output, output_hidden = decoder(encoder_output, captions)
        decoder_output: torch.Tensor
        # decoder_output = torch.clamp(decoder_output, 1e-2, 1e2)

        loss: torch.Tensor = criterion(decoder_output.view(-1, decoder_output.shape[2]), captions)
        loss.backward()
        optimizer.step()

        total_loss += loss
        count += 1 
        i+=1
        # print("Finish batch")
        # print(f"{loss:.5f} | {i}/{len(trainloader)}")
    print(f"Epoch {epoch+1} | Test loss : {total_loss/count}")
    # random : log(1/30k) ~ 10.31


 10%|█         | 1/10 [00:27<04:05, 27.25s/it]

Epoch 1 | Test loss : 6.559357643127441


 20%|██        | 2/10 [00:55<03:40, 27.56s/it]

Epoch 2 | Test loss : 4.8456621170043945


 30%|███       | 3/10 [01:22<03:13, 27.59s/it]

Epoch 3 | Test loss : 4.175093650817871


 40%|████      | 4/10 [01:50<02:46, 27.67s/it]

Epoch 4 | Test loss : 3.698627471923828


In [None]:
from PIL import Image
def interactive_test(
        encoder: nn.Module,
        decoder: nn.Module,
        image_path: str,
        text: str
    ):
    tokens: list[int] = tokenizer.encode(text)
    tokens.pop(-1)
    while(len(tokens) < SEQ_LENGTH):
        tokens = [0] + tokens
    tokens: torch.Tensor = torch.tensor(tokens)
    images = torch.Tensor(image_transforms(Image.open(image_path).convert("RGB")))
    images = images.unsqueeze(0)
    tokens = tokens.unsqueeze(0)
    encoder.eval()
    decoder.eval()

    images = images.to(device)
    tokens = tokens.to(device)
    encoder_output = encoder(images)
        # captions = torch.clamp(captions, 1, 1e5).long()
    decoder_output, output_hidden = decoder(encoder_output, tokens)
    decoder_output: torch.Tensor
    # print(images.shape)
    # print(tokens.shape)
    # print(encoder_output.shape)
    # decoder_output, output_hidden = decoder(encoder_output, tokens)
    # decoder_output: torch.Tensor
        # print(decoder_output.shape)
        # # decoder_output = decoder_output.squeeze(1)
        # print(decoder_output.shape)
        # print(targets.shape)
    target_logits = decoder_output[:, -1, :]
    predicts = decoder_output.argmax(2)
    predicted_token = target_logits.argmax()
    return predicted_token, tokens, predicts

image_path = "Flickr8k/Flicker8k_Dataset/44856031_0d82c2c7d1.jpg"
# image_path = "Flickr8k/Flicker8k_Dataset/110595925_f3395c8bd6.jpg"
text = ""
predict_token, captions, predicts = interactive_test(encoder, decoder, image_path, text)
# print(predicts, captions)
predict_token = predict_token.item()
predicts = predicts[0]
# print(predicts)
# print(captions)
detokenize = tokenizer.convert_ids_to_tokens
print(detokenize(predict_token))
print(detokenize(predicts))

# for i in range(min(len(predicts), captions.shape[0])):
#     print(detokenize(predicts[i]), detokenize(captions[i].item()))

trees
['vest', 'in', 'on', '##ing', '##ing', 'as', 'while', 'on', 'on', 'on', 'on', 'on', 'on', 'on', 'on', 'trees']
