In [1]:

import torch.nn as nn
from torchvision import models
import torch
import torchvision.transforms as transforms
from PIL import Image
from transformers import AutoTokenizer, AutoModel
import pandas as pd

In [2]:
if torch.cuda.is_available():
    device = torch.device("cuda")  # GPUデバイスを取得
else:
    device = torch.device("cpu")  # CPUデバイスを取得

In [3]:
"""
画像処理のモデル
"""

class ImageEncoder(nn.Module):
    def __init__(self, embedding_size):
        super(ImageEncoder, self).__init__()
        self.resnet50 = models.resnet50(pretrained=True)
        self.fc = nn.Linear(self.resnet50.fc.out_features, embedding_size)
    
    def forward(self, x):
        x = self.resnet50(x)
        x = self.fc(x)
        return x

In [4]:
"""
テキスト処理のモデル
"""
class CaptionEncoder(nn.Module):
  def __init__(self):
    super().__init__()
    self.bert = AutoModel.from_pretrained("cl-tohoku/bert-base-japanese-v2")
  def forward(self, x):
    x = self.bert(x)
    x = torch.max(x.last_hidden_state, dim=1)[0]  # max pooling
    return x

In [5]:
from CustomDataset import EmbeddingDataset


dataset = EmbeddingDataset('./data/anotation.csv')

In [6]:
from torch.utils.data import DataLoader

train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size

train_dataset, test_dataset = torch.utils.data.random_split(
    dataset, [train_size, val_size]
)

train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True, pin_memory=True)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=True, pin_memory=True)

In [7]:
import torch.nn.functional as F

learning_rate = 1e-3
batch_size = 32
epochs = 1

image_model = ImageEncoder(768).to(device)
caption_model = CaptionEncoder().to(device)
img_optimizer = torch.optim.SGD(image_model.parameters(), lr=learning_rate)
cpt_optimizer = torch.optim.SGD(caption_model.parameters(), lr=learning_rate)

loss_fn = torch.nn.CosineSimilarity(dim=1)
tokenizer = AutoTokenizer.from_pretrained("cl-tohoku/bert-base-japanese-v2")

Some weights of the model checkpoint at cl-tohoku/bert-base-japanese-v2 were not used when initializing BertModel: ['cls.predictions.bias', 'cls.predictions.decoder.bias', 'cls.predictions.transform.dense.bias', 'cls.predictions.decoder.weight', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [8]:
def train_loop(dataloader, img_model, cpt_model,  loss_fn, img_opt, cpt_opt):
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):        
        # 予測と損失の計算
        X = X.to(device)
        pred = img_model(X)
        ids = tokenizer.encode(y, return_tensors='pt')
        ids = ids.to(device)
        target = cpt_model(ids)
        # print(pred.shape, target.shape, len(X), len(y))
        # ここ不安
        loss = loss_fn(pred, target).mean()

        # バックプロパゲーション
        img_opt.zero_grad()
        cpt_opt.zero_grad()
        loss.backward()
        img_opt.step()
        cpt_opt.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


def test_loop(dataloader, img_model, cpt_model,  loss_fn):
    size = len(dataloader.dataset)
    test_loss = 0

    with torch.no_grad():
        for X, y in dataloader:
            X = X.to(device)
            pred = img_model(X)
            ids = tokenizer.encode(y, return_tensors='pt')
            ids = ids.to(device)
            target = cpt_model(ids)
            loss = loss_fn(pred, target)
            test_loss += loss.item()
            
    test_loss /= size
    print(f"Avg loss: {test_loss:>8f} \n")

In [9]:
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_dataloader, image_model, caption_model, loss_fn, img_optimizer, cpt_optimizer)
    test_loop(train_dataloader, image_model, caption_model, loss_fn)
print("Done!")

Epoch 1
-------------------------------
loss: 0.013564  [    0/1016381]
loss: -0.184862  [ 6400/1016381]
loss: -0.327048  [12800/1016381]
loss: -0.443929  [19200/1016381]
loss: -0.559477  [25600/1016381]
loss: -0.652961  [32000/1016381]
loss: -0.733801  [38400/1016381]
loss: -0.786328  [44800/1016381]
loss: -0.839357  [51200/1016381]


UnidentifiedImageError: cannot identify image file 'D:/M1/fashion/IQON/IQON3000\\99698\\4060309/42539149_m.jpg'

In [None]:
item = next(iter(train_dataset))

In [None]:
item

(tensor([[[2.2489, 2.2489, 2.2489,  ..., 2.2489, 2.2489, 2.2489],
          [2.2489, 2.2489, 2.2489,  ..., 2.2489, 2.2489, 2.2489],
          [2.2489, 2.2489, 2.2489,  ..., 2.2489, 2.2489, 2.2489],
          ...,
          [2.2489, 2.2489, 2.2489,  ..., 2.2489, 2.2489, 2.2489],
          [2.2489, 2.2489, 2.2489,  ..., 2.2489, 2.2489, 2.2489],
          [2.2489, 2.2489, 2.2489,  ..., 2.2489, 2.2489, 2.2489]],
 
         [[2.4286, 2.4286, 2.4286,  ..., 2.4286, 2.4286, 2.4286],
          [2.4286, 2.4286, 2.4286,  ..., 2.4286, 2.4286, 2.4286],
          [2.4286, 2.4286, 2.4286,  ..., 2.4286, 2.4286, 2.4286],
          ...,
          [2.4286, 2.4286, 2.4286,  ..., 2.4286, 2.4286, 2.4286],
          [2.4286, 2.4286, 2.4286,  ..., 2.4286, 2.4286, 2.4286],
          [2.4286, 2.4286, 2.4286,  ..., 2.4286, 2.4286, 2.4286]],
 
         [[2.6400, 2.6400, 2.6400,  ..., 2.6400, 2.6400, 2.6400],
          [2.6400, 2.6400, 2.6400,  ..., 2.6400, 2.6400, 2.6400],
          [2.6400, 2.6400, 2.6400,  ...,