<a href="https://colab.research.google.com/github/AokiMasataka/LSTM_sample/blob/master/STAIR.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt install aptitude
!aptitude install mecab libmecab-dev mecab-ipadic-utf8 git make curl xz-utils file -y
!pip install mecab-python3==0.7

In [1]:
import torch
from torch import nn, optim
from torch.nn.utils.rnn import pad_sequence
import torch.nn.functional as F
from torchvision import models
import numpy as np
import json
import MeCab
from PIL import Image
import requests
from io import BytesIO

tagger = MeCab.Tagger("-Owakati")
def inverse_dict(d):
    return {v:int(k) for k,v in d.items()}

json_open = open('drive/My Drive/Colab Notebooks/stair_captions/stair_captions_train.json', 'r', encoding="utf-8")
stairCaptions = json.load(json_open)

json_open = open('drive/My Drive/Colab Notebooks/stair_captions/words.json', 'r', encoding="utf-8")
index2word = json.load(json_open)
word2index = inverse_dict(index2word)

VOCAB_SIZE = len(word2index) # length is 29931
EMBEDDING_DIM = 256
MAXMUM_WORDS = 20
BATCH_SIZE = 2
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
epochs = 100


In [2]:
def sentence2index(sentence, pad=True):
    sentence = tagger.parse(sentence)
    wakati = list(sentence.split(" "))
    wakati.insert(0, '<start>')
    wakati[len(wakati) - 1] = '<end>'
    if not pad:
      return torch.tensor([word2index[w] for w in wakati], dtype=torch.long)
    else:
      index = torch.tensor([word2index[w] for w in wakati])
      if MAXMUM_WORDS < index.shape[0]:
       return index[:MAXMUM_WORDS]
      padding = torch.zeros(MAXMUM_WORDS - index.shape[0])
      return torch.cat((index, padding), 0).to(torch.long)

def index2sentence(ndarray):
  sentence = ''
  for index in ndarray:
    sentence += index2word[str(index)]
  return sentence

In [8]:
def getImg(url, toTensor=True):
  response = requests.get(url)
  img = np.array(Image.open(BytesIO(response.content)).convert('RGB')) / 255
  img = np.transpose(img[np.newaxis], (0, 3, 1, 2))

  if not toTensor:
    return img
  else:
    return torch.tensor(img, dtype=torch.float32)

In [7]:
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        vgg = models.vgg16(pretrained=True)
        self.features = nn.Sequential(*list(vgg.features)[:31]).eval().to(device)
        for param in self.features.parameters():
            param.requires_grad = False
        self.GAP = (vgg.avgpool).to(device)
        self.classifier = nn.Sequential(*list(vgg.classifier)[:1]).eval().to(device)
        for param in self.classifier.parameters():
            param.requires_grad = False
        self.mean = torch.nn.Parameter(torch.tensor([0.485, 0.456, 0.406]).view(1, 3, 1, 1), requires_grad=False).to(device)
        self.std = torch.nn.Parameter(torch.tensor([0.229, 0.224, 0.225]).view(1, 3, 1, 1), requires_grad=False).to(device)

    def forward(self, image):
        image = (image - self.mean) / self.std
        x = self.features(image)
        x = self.GAP(x)
        x = x.reshape(x.shape[0], -1)
        x = self.classifier(x) # output size(batchSize, 4096)
        return x

In [1]:
class Decoder(nn.Module):
    def __init__(self, cuda=True, vocabSize=VOCAB_SIZE, embeddingDim=EMBEDDING_DIM, hiddenDim=1024):
        super(Decoder, self).__init__()
        self.hiddenDim = hiddenDim
        self.word_embeddings = nn.Embedding(vocabSize, embeddingDim)
        self.lstm = nn.LSTM(input_size=embeddingDim, hidden_size=hiddenDim, batch_first=True, num_layers=1)
        self.liner = nn.Linear(hiddenDim + 4096, vocabSize)
        self.hidden = self.init_hidden()
        self.cuda = cuda
    
    def init_hidden(self):
      if cuda:
        return (torch.zeros(1, BATCH_SIZE, self.hiddenDim).to(device),
                torch.zeros(1, BATCH_SIZE, self.hiddenDim).to(device))
      else:
        return (torch.zeros(1, BATCH_SIZE, self.hiddenDim),
                torch.zeros(1, BATCH_SIZE, self.hiddenDim))

    def forward(self, encodedImage, sentence):
        embeds = self.word_embeddings(sentence)
        embeds = attention(embeds, embeds, embeds)
        output, self.hidden = self.lstm(embeds, self.hidden)
        repeatedEncodedImage = encodedImage.repeat(1, sentence.shape[1]).view(-1, sentence.shape[1], 4096)
        output = torch.cat((output, repeatedEncodedImage), axis=-1)
        output = self.liner(output)
        return output

def attention(q, k, v):
    scores = torch.matmul(q, k.transpose(-2, -1))
    scores = torch.nn.functional.softmax(scores, dim=-1)
    output = torch.matmul(scores, v)
    return output

NameError: ignored

In [20]:
x = torch.zeros((BATCH_SIZE, MAXMUM_WORDS), dtype=torch.long)
img = torch.rand((BATCH_SIZE, 4096))

model = Decoder()
model.eval()
x = model(img, x)
print(x.shape)

torch.Size([1, 24, 29931])


In [4]:
def generateData(sentence, image):
    sentence = sentence.view(1, -1)
    pad = torch.zeros((1, MAXMUM_WORDS), dtype=torch.long)
    image = image.view(1, -1)
    data_x = torch.cat((sentence[0, :1], pad[0, 1:]), 0).unsqueeze(0)
    data_y = sentence[0, 1].view(1)
    for i in range(2, MAXMUM_WORDS):
        data_x = torch.cat((data_x, torch.cat((sentence[0, :i], pad[0, i:]), 0).unsqueeze(0)), 0)
        data_y = torch.cat((data_y, sentence[0, i].view(1)), 0) 
    image = image.repeat(MAXMUM_WORDS - 1, 1)
    return data_x, data_y, image

In [4]:
def train(decoder, e=None):
  json_open = open('drive/My Drive/Colab Notebooks/stair_captions/index2image.json', 'r', encoding="utf-8")
  id2index = json.load(json_open)

  encordedArrays = np.load('drive/My Drive/Colab Notebooks/stair_captions/encorded_ndarray.npy')
  encordedTensors = torch.tensor(encordedArrays, dtype=torch.float)
  del encordedArrays

  if e:
    modelPath = 'drive/My Drive/Colab Notebooks/LSTM_models/epoch' + str(e)
    decoder.load_state_dict(torch.load(modelPath))
  else:
    e = 0

  decoder.train().to(device)
  optimizer = torch.optim.Adam(decoder.parameters(), lr=0.05)
  CEL = nn.CrossEntropyLoss()

  
  pad = torch.tensor([1])
  for epoch in range(1, epochs):
    print("epoch :", epoch + e)
    iterate = 0
    batch = 0
    sentence = []
    inputFeature = torch.empty((1, 4096))
    for caption in stairCaptions['annotations'][:20000]:
      try:
        idx = id2index[str(caption['image_id'])]
      except:
        continue

      sentence.append(sentence2index(caption['caption'], False))
      inputFeature = torch.cat((inputFeature, encordedTensors[idx].unsqueeze(0)), dim=0)
      batch += 1

      if batch == BATCH_SIZE:
        data = pad_sequence(sentence, batch_first=True)
        inputs = data[:, :-1].to(device)
        targets = data[:, 1:].to(device)
        inputFeature = inputFeature[1:].to(device)
        decoder.hidden = decoder.init_hidden()  # LSTM初期化
        output = decoder(inputFeature, inputs)
        loss = CEL(output.view(-1, VOCAB_SIZE), targets.view(-1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        sentence = []
        inputFeature = torch.empty((1, 4096))
        batch = 0

      iterate += 1
      if (iterate + 1) % 1000 == 0:
        print("loss :", loss.item())

    if epoch % 2:
      model_path = 'drive/My Drive/Colab Notebooks/LSTM_models/epoch' + str(epoch + e)
      torch.save(decoder.state_dict(), model_path)

In [None]:
decoder = Decoder()
train(decoder)

In [9]:
def test(modelPath):
  BATCH_SIZE = 1
  json_open = open('drive/My Drive/Colab Notebooks/stair_captions/stair_captions_train.json', 'r', encoding="utf-8")
  captions = json.load(json_open)

  json_open = open('drive/My Drive/Colab Notebooks/stair_captions/index2image.json', 'r', encoding="utf-8")
  id2index = json.load(json_open)

  encordedArrays = np.load('drive/My Drive/Colab Notebooks/stair_captions/encorded_ndarray.npy')
  encordedTensors = torch.tensor(encordedArrays, dtype=torch.float)
  del encordedArrays

  decoder = Decoder(False)
  decoder.load_state_dict(torch.load(modelPath))
  decoder.cpu()
  encoder = Encoder().to(device)

  rand = np.random.randint(0, 20000)
  for image in captions['images'][rand:rand+10]:
    try:
      tensorImg = getImg(image['flickr_url'])
    except:
      continue
    
    tensorImg = tensorImg.to(device)
    encordedimg = encoder(tensorImg)
    encordedImg = encordedimg.cpu().view(1, -1)

    decoder.init_hidden()
    sentence = torch.zeros((1), dtype=torch.long)
    sentence[0] = 1
    for i in range(1, MAXMUM_WORDS):
      decoder.init_hidden()
      output = decoder(encordedImg, sentence.view(1, -1))
      number = torch.argmax(output[0, -1]).view(1)
      sentence = torch.cat((sentence, number.view(1)), dim=0)
      if number.item() == 2:
        break

    
    print(image['flickr_url'])
    sentence = sentence.detach().numpy().reshape(-1)
    print(index2sentence(sentence))

In [10]:
modelPath = 'drive/My Drive/Colab Notebooks/LSTM_models/epoch3'
test(modelPath)

http://farm6.staticflickr.com/5447/9513770553_f8549974e3_z.jpg
<start>車車車車車車車車車車車車車車車車車車車
http://farm3.staticflickr.com/2564/5825169236_a531bbf559_z.jpg
<start>女の子女の子女の子女の子女の子女の子女の子女の子女の子女の子女の子女の子女の子女の子女の子女の子女の子女の子女の子
http://farm2.staticflickr.com/1332/1437349099_2241c31c30_z.jpg
<start>シルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバー
http://farm4.staticflickr.com/3666/9647891421_42123291f1_z.jpg
<start>濡れを濡れている<end>
http://farm8.staticflickr.com/7356/9651126370_c06b2e9710_z.jpg
<start>機関機関に機関ている<end>
http://farm8.staticflickr.com/7097/7168601503_1c98a9aa45_z.jpg
<start>まわりまわりまわりまわりまわりまわりまわりまわりまわりまわりまわりまわりまわりまわりまわりまわりまわりまわりまわり
http://farm3.staticflickr.com/2695/4314565761_a90a0d35fe_z.jpg
<start>濡れに濡れに濡れに濡れに濡れに濡れに濡れに濡れに濡れに濡れ
http://farm4.staticflickr.com/3194/2951542925_6fd8906a34_z.jpg
<start>シルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバーシルバー
http://farm3.staticflickr.com/2667/3761726052_8c97db702b_z.jpg
<start>シルバーシルバーシルバーシルバーシルバー