In [54]:
EMBED_DIM = 300
NUM_HEADS = 4
MAX_SEQUENCE_LENGTH = 50
BATCH_SIZE = 2
VOCAB_SIZE = 8000
UNITS = 512

In [80]:

from torchvision.models import ResNet50_Weights
from torch import nn
import torch
from torchvision import models
class CNNEncoder(nn.Module):
    def __init__(self, embed_dim):
        super().__init__()

        # Sử dụng ResNet50 pretrained làm backbone
        resnet = models.resnet50(weights=ResNet50_Weights.DEFAULT)

        self.backbone = nn.Sequential(*list(resnet.children())[:-2])  # Loại bỏ avgpool và fc
        for param in self.backbone.parameters():
            param.requires_grad = False

        self.conv_proj = nn.Conv2d(2048, embed_dim, kernel_size=1)  # Dự phóng đặc trưng ResNet sang embed_dim
    def forward(self, image):
        """
        :param image: tensor of shape (B, 3, 224, 224)
        :return: features tensor of shape (B, 49, embed_dim)
        """
        features = self.backbone(image) #(B, 2048, 7, 7)
        features = self.conv_proj(features) #(B, embed_dim , 7, 7)
        features = features.flatten(2).transpose(1, 2)  # (B, 49, embed_dim)
        return features

fake_image = torch.rand(1,3,224,224)
cnn_model = CNNEncoder(512)
features = cnn_model(fake_image)
print(features)


tensor([[[ 0.1023, -0.0380, -0.1043,  ...,  0.0120, -0.0228,  0.0770],
         [-0.5755, -0.4306,  0.4012,  ...,  0.0873,  0.6671, -0.0862],
         [ 0.5306,  0.1266, -0.0822,  ...,  0.3353,  0.6046,  0.0615],
         ...,
         [-0.0717,  0.0283, -0.2065,  ..., -0.1179,  0.1169, -0.2410],
         [ 0.0912,  0.0845,  0.3961,  ..., -0.0771, -0.0927, -0.2596],
         [-0.0027,  0.0200,  0.0037,  ..., -0.0194, -0.0277, -0.0739]]],
       grad_fn=<TransposeBackward0>)


In [81]:
# Định nghĩa Transformer Encoder
class TransformerEncoder(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super(TransformerEncoder, self).__init__()

        self.layer_norm_1 = nn.LayerNorm(embed_dim)
        self.fc = nn.Sequential(
            nn.Linear(embed_dim, embed_dim),
            nn.ReLU()
        )
        self.attention = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
        self.layer_norm_2 = nn.LayerNorm(embed_dim)

    def forward(self, x):
        """
        :param x: (B, S, E)
        :return:
        """
        x = self.layer_norm_1(x)
        x = self.fc(x)
        attention_output, _ = self.attention(x, x, x)
        output = self.layer_norm_2(x + attention_output)
        return output

model = TransformerEncoder(embed_dim=EMBED_DIM, num_heads=NUM_HEADS)
dummy_input = torch.randn(BATCH_SIZE, MAX_SEQUENCE_LENGTH, EMBED_DIM)
encoder_output = model(dummy_input)
print(encoder_output)

tensor([[[ 2.3458e-03, -1.1354e+00, -3.4171e-01,  ..., -1.0320e+00,
          -4.3343e-01, -1.6673e-01],
         [ 1.3954e+00, -1.2598e+00, -1.9444e-02,  ..., -1.1260e+00,
           7.8159e-02,  8.3682e-01],
         [-8.3563e-02, -7.0761e-01, -4.0396e-01,  ...,  1.0848e+00,
           1.0095e+00, -2.4489e-01],
         ...,
         [-4.3441e-02, -1.2229e+00, -1.7695e-01,  ..., -4.8878e-02,
           2.2392e-01, -2.1868e-01],
         [ 2.2855e-01, -1.2376e+00, -4.7761e-01,  ..., -1.1419e+00,
          -5.6921e-01,  5.7530e-01],
         [ 1.4122e+00,  1.1692e+00,  2.0426e-01,  ...,  1.0484e+00,
          -5.3385e-01,  1.1882e+00]],

        [[-4.3900e-02,  5.3344e-01,  4.8977e-01,  ..., -1.0202e+00,
           1.4918e+00,  5.8758e-01],
         [ 2.4488e-02, -1.0517e+00,  1.4635e+00,  ..., -9.6917e-01,
          -4.9653e-01, -2.6514e-01],
         [ 1.8355e+00, -1.1179e+00,  8.0710e-01,  ..., -1.0090e+00,
          -5.1617e-01,  1.9195e+00],
         ...,
         [ 3.7846e-02,  1

In [83]:
class Embedding(nn.Module):
    def __init__(self, embed_dim = 300, vocab_size = 10000, max_len_seq = 50):
        super(Embedding, self).__init__()
        self.token_embedding = nn.Embedding(vocab_size, embed_dim)
        self.pos_embedding = nn.Embedding(max_len_seq, embed_dim)
    def forward(self, x):
        """

        :param x: tensor of shape (B, MAX_SEQUENCE_LENGTH)
        :return: tensor of shape (B,MAX_SEQUENCE_LENGTH, EMBED_DIM)
        """
        token_embedding = self.token_embedding(x) ## (B, S, E)
        seq_length = x.shape[1]
        positions = torch.arange(0, seq_length, device=x.device).unsqueeze(0)  # (1, S)
        pos_embed = self.pos_embedding(positions)  # (1, S, E)
        return token_embedding + pos_embed

embed_model = Embedding(EMBED_DIM, VOCAB_SIZE, MAX_SEQUENCE_LENGTH)



RuntimeError: Expected tensor for argument #1 'indices' to have one of the following scalar types: Long, Int; but got torch.FloatTensor instead (while checking arguments for embedding)

In [1]:
class TransformerDecoder(nn.Module):
    def __init__(self,units, embed_dim = 300, num_heads= 4, vocab_size=10000, max_len = 50):
        super(TransformerDecoder, self).__init__()

        self.embedding = Embedding(embed_dim, vocab_size=vocab_size, max_len_seq=max_len)

        self.attention_1 = nn.MultiheadAttention(embed_dim, num_heads, dropout=0.1, batch_first=True)
        self.layer_norm_1 = nn.LayerNorm(embed_dim)

        self.attention_2 = nn.MultiheadAttention(embed_dim, num_heads, dropout=0.1,  batch_first=True)
        self.layer_norm_2 = nn.LayerNorm(embed_dim)


        self.ffn_layer_1 = nn.Sequential(
            nn.Linear(embed_dim, units),
            nn.ReLU(),
        )

        self.dropout_1 = nn.Dropout(0.3)

        self.ffn_layer_2 = nn.Sequential(
            nn.Linear(units,embed_dim),
            nn.ReLU(),
        )

        self.layer_norm_3 = nn.LayerNorm(embed_dim)

        self.dropout_2 = nn.Dropout(0.5)

        self.out = nn.Sequential(
            nn.Linear(embed_dim, vocab_size),
        )




    def get_causal_attention_mask(self, inputs):
        batch_size, seq_len, _ = inputs.shape
        mask = torch.tril(torch.ones((seq_len, seq_len), dtype=torch.int32, device=inputs.device)).bool()
        mask = mask.unsqueeze(0).repeat(batch_size, 1, 1)
        return mask


    def forward(self, input_ids, encoder_output, mask = None):
        """
        :param input_ids: tensor of shape (B, MAX_SEQUENCE_LENGTH)
        :param encoder_output:
        :param mask:
        :return:
        """

        embeddings = self.embedding(input_ids) # (batch, seq_len, embed_dim)
        print(embeddings.shape)
        seq_len = embeddings.size(1)


        # Tạo causal mask với kích thước (seq_len, seq_len)
        causal_mask = torch.tril(torch.ones((seq_len, seq_len), device=embeddings.device)).bool()
        key_padding_mask = None
        if mask is not None:
            # Giả sử mask có True ở vị trí hợp lệ, ta cần đảo lại (True cho padding)
            key_padding_mask = ~mask.bool()


        attn_output_1, _ = self.attention_1(
            embeddings,
            embeddings,
            embeddings,
            attn_mask=causal_mask,
            key_padding_mask=key_padding_mask
        )

        out_1 = self.layer_norm_1(embeddings + attn_output_1)

        attn_output_2, _ = self.attention_2(
            query=out_1,
            value=encoder_output,
            key=encoder_output
        )

        out_2 = self.layer_norm_2(out_1 + attn_output_2)

        ffn_output = self.ffn_layer_1(out_2)
        ffn_output = self.dropout_1(ffn_output)
        ffn_output = self.ffn_layer_2(ffn_output)

        ffn_output = self.layer_norm_3(ffn_output + out_2)
        ffn_output = self.dropout_2(ffn_output)
        preds = self.out(ffn_output)
        return preds


decoder = TransformerDecoder(units=UNITS, embed_dim=EMBED_DIM, num_heads=NUM_HEADS, vocab_size=VOCAB_SIZE, max_len=MAX_SEQUENCE_LENGTH)
input_ids = torch.randint(0, VOCAB_SIZE, (BATCH_SIZE, MAX_SEQUENCE_LENGTH))  # (B, S)
encoder_output = torch.randn(BATCH_SIZE, MAX_SEQUENCE_LENGTH, EMBED_DIM)  # (B, S, E)

output = decoder(input_ids, encoder_output)
print("Output shape:", output.shape)


NameError: name 'nn' is not defined

In [84]:
class ImageCaptionModel(nn.Module):
    def __init__(self, embed_dim=300, num_heads = 4, units = 512, vocab_size = 10000, max_len = 50):
        super(ImageCaptionModel, self).__init__()
        self.cnn_model = CNNEncoder(embed_dim)
        self.encoder = TransformerEncoder(embed_dim, num_heads)
        self.decoder = TransformerDecoder(units, embed_dim, num_heads, vocab_size, max_len)
    def forward(self, images, inputs):
        """
        :param images: tensor of shape (B, 3, 224, 224)
        :param images:
        :param input:
        :return:
        """
        features = self.cnn_model(images)
        print(features)
        encoded_features = self.encoder(features)
        print(encoded_features)
        mask = (inputs != 0)

        outputs = self.decoder(inputs, encoded_features, mask=mask)
        print(output)
        return outputs

# Tạo fake data
def create_fake_data():
    # Fake images: (B, 3, 224, 224) -> Giả lập đầu ra ResNet bằng (B, 2048, 7, 7)
    fake_images = torch.randn(BATCH_SIZE, 3, 224, 224)  # Giả lập đầu ra sau ResNet

    # Fake input_ids: (B, MAX_SEQUENCE_LENGTH)
    # Giả sử 0 là padding token, các giá trị khác là token IDs ngẫu nhiên từ 1 đến VOCAB_SIZE-1
    fake_input_ids = torch.randint(0, VOCAB_SIZE, (BATCH_SIZE, MAX_SEQUENCE_LENGTH))
    fake_input_ids[0, 40:] = 0  # Thêm padding cho batch 0
    fake_input_ids[1, 45:] = 0  # Thêm padding cho batch 1

    return fake_images, fake_input_ids

model = ImageCaptionModel(
        embed_dim=EMBED_DIM,
        num_heads=NUM_HEADS,
        units=UNITS,
        vocab_size=VOCAB_SIZE,
        max_len=MAX_SEQUENCE_LENGTH
    )

# Tạo dữ liệu giả
fake_images, fake_input_ids = create_fake_data()

# Chạy mô hình
model.eval()  # Chuyển sang chế độ đánh giá để tắt dropout
with torch.no_grad():
    outputs = model(fake_images, fake_input_ids)

# Kiểm tra kích thước đầu ra
print("Input images shape:", fake_images.shape)
print("Input IDs shape:", fake_input_ids.shape)
print("Output shape:", outputs.shape)
print("Test passed: Output shape is correct!")


tensor([[[ 0.4842, -0.2637,  0.0942,  ...,  0.3385,  0.5352,  0.0480],
         [ 0.0073,  0.0668,  0.1749,  ...,  0.0199, -0.0232,  0.0013],
         [ 0.0352,  0.0659,  0.1235,  ..., -0.2071, -0.1315, -0.0244],
         ...,
         [ 0.4003,  0.1956,  0.3757,  ..., -0.2038, -0.1387,  0.1403],
         [ 0.4169,  0.2573,  0.3274,  ..., -0.2580, -0.0329,  0.2286],
         [ 0.3901,  0.2516,  0.3742,  ..., -0.4066, -0.3367,  0.2889]],

        [[ 0.5987, -0.4070,  0.0554,  ...,  0.5139,  0.7587,  0.0194],
         [-0.1266,  0.0816,  0.2008,  ...,  0.0689, -0.0855, -0.0482],
         [ 0.1053,  0.0011,  0.2195,  ..., -0.1870, -0.1799,  0.0600],
         ...,
         [ 0.1246,  0.1640,  0.1929,  ..., -0.3208, -0.3308,  0.2767],
         [ 0.4593,  0.2201,  0.4284,  ..., -0.2924, -0.3859,  0.3219],
         [ 0.3205,  0.2608,  0.3638,  ..., -0.2398, -0.2610,  0.1867]]])
tensor([[[-1.0807, -1.1966, -0.4577,  ..., -0.6663,  1.2143,  1.7226],
         [-0.4045, -1.1544,  1.6457,  ..., -0

FileNotFoundError: [Errno 2] No such file or directory: 'C:\\Users\\NguyenPC\\Desktop\\dataset/Images\\1000268201_693b08cb0e.jpg'

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
criterion = nn.CrossEntropyLoss(ignore_index=w2i["<PAD>"])
optimizer = torch.optim.Adam(model.parameters(), lr= learning_rate)
model.to(device)



def train():
    best_val_loss = float("inf")
    stopping_counter = 0
    train_losses = []
    val_losses = []
    for epoch in range(epochs):
        model.train()
        total_train_loss = 0
        train_loader = tqdm(train_dataloader, desc=f"Epoch {epoch + 1}/{epochs} [Training]")
        for images, inputs, targets in train_loader:
            images, inputs, targets = images.to(device), inputs.to(device), targets.to(device)
            print(images)

            optimizer.zero_grad()
            output = model(images, inputs) #(B, MAX_LEN - 1, VOCAB_SIZE)

            loss = criterion(output.reshape(-1, VOCAB_SIZE), targets.reshape(-1))

            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()
        avg_train_loss = total_train_loss / len(train_dataloader)
        train_losses.append(avg_train_loss)

        model.eval()
        total_test_loss = 0
        test_loader = tqdm(test_dataloader, desc=f"Epoch {epoch + 1}/{epochs} [Validation]")
        with torch.no_grad():
            for images, inputs, targets in test_loader:
                images, inputs, targets = images.to(device), inputs.to(device), targets.to(device)
                output = model(images, inputs)
                loss = criterion(output.reshape(-1, VOCAB_SIZE), targets.reshape(-1))
                total_test_loss += loss.item()

        avg_test_loss = total_test_loss / len(test_dataloader)
        val_losses.append(avg_test_loss)
        print(f"\nEpoch {epoch + 1}/{epochs} | Train Loss: {avg_train_loss:.4f} | Test Loss: {avg_test_loss:.4f}")

        if avg_test_loss < best_val_loss - min_delta:
            best_val_loss = avg_test_loss
            stopping_counter = 0
            torch.save(model.state_dict(), "best_model_flick.pth")  # Lưu model tốt nhất
        else:
            stopping_counter += 1
            if stopping_counter >= patience:
                print("Early stopping triggered!")
                break




    # Vẽ biểu đồ
    plt.figure(figsize=(10, 5))
    plt.plot(train_losses, label="Train Loss", marker="o")
    plt.plot(val_losses, label="Validation Loss", marker="o")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.legend()
    plt.title("Training & Validation Loss")
    plt.grid()
    plt.show()
    plt.savefig("train_loss.png")

if __name__ == "__main__":
    train()