# 画像スクレイピング＆AIを学習させるプログラム

大きくこの4つに分類されます

1. 関数の読み込み
2. データセットの作成
3. AIモデルの構築と学習
4. 性能評価

<!-- Project Name    : 画像スクレイピングおよび画像認識プログラム
群馬大学ICTデータサイエンスコンソーシアム・中村の著作物です -->


## ①関数の読み込み

In [None]:

# ライブラリのインポート
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torchvision import models
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
from IPython.display import clear_output
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
from sklearn.metrics import accuracy_score, precision_score, recall_score

# Colaboratoryにプリインストールされていないパッケージをインストールする
!pip install icrawler
!pip install japanize_matplotlib

from icrawler.builtin import BingImageCrawler
import japanize_matplotlib
clear_output()
print("ライブラリのインポートが完了しました")

# 関数の読み込み(1/2)
def class_making(subset, img_size, ndim=3):
  """
  subsetからデータとラベルを取り出す関数  

  Parameters  
  --------------  
  subset : subset
    データとラベルのデータセット  
  img_size : int
    画像のデータサイズ，正方形を想定している  
  ndim : int
    画像の次元数，デフォルトは3  
  
  Returns
  ----------
  data : data[1:]
    subset内のデータ，初めの要素は空なので1から返している  
  label : torch.tensor(labels)
    subset内のラベル，データに合わせてそのまま使えるようにtorch.tensorでラップしている
  """

  labels = []
  data = subset.dataset.__getitem__(0)[0].view(1, ndim, img_size, img_size)
  for index in subset.indices:
    data = torch.cat((data, subset.dataset.__getitem__(index)[0].view(1,ndim,img_size,img_size)))
    labels.append(subset.dataset.__getitem__(index)[1])
    
  return data[1:], torch.tensor(labels)

In [None]:
# 関数の読み込み(2/2)
class Tensor2Dataset(torch.utils.data.Dataset):
  """
  データとラベルを受け取ってDatasetオブジェクトを定義する  

  Parameters
  --------------
  inputs : torch.tensor
    データ  
  labels : torch.tensor
    ラベル

  Attritubes  
  -------------
  data : torch.tensor
    データ
  labels : torch.tensor
    ラベル
  data_num : int
    データの長さ
  """
  def __init__(self, inputs, labels):
    self.data = inputs
    self.label = labels
    self.data_num = len(self.data)

  def __len__(self):
    return self.data_num

  def __getitem__(self, idx):
    return self.data[idx], self.label[idx]

In [None]:
import imutils
import numpy as np
import cv2
from google.colab.patches import cv2_imshow
from IPython.display import display, Javascript
from google.colab.output import eval_js
from base64 import b64decode

def take_photo(filename='photo.jpg', quality=0.8):
  js = Javascript('''
    async function takePhoto(quality) {
      const div = document.createElement('div');
      const capture = document.createElement('button');
      capture.textContent = 'Capture';
      div.appendChild(capture);

      const video = document.createElement('video');
      video.style.display = 'block';
      const stream = await navigator.mediaDevices.getUserMedia({video: true});

      document.body.appendChild(div);
      div.appendChild(video);
      video.srcObject = stream;
      await video.play();

      // Resize the output to fit the video element.
      google.colab.output.setIframeHeight(document.documentElement.scrollHeight, true);

      // Wait for Capture to be clicked.
      await new Promise((resolve) => capture.onclick = resolve);

      const canvas = document.createElement('canvas');
      canvas.width = video.videoWidth;
      canvas.height = video.videoHeight;
      canvas.getContext('2d').drawImage(video, 0, 0);
      stream.getVideoTracks()[0].stop();
      div.remove();
      return canvas.toDataURL('image/jpeg', quality);
    }
    ''')
  display(js)
  data = eval_js('takePhoto({})'.format(quality))
  binary = b64decode(data.split(',')[1])
  with open(filename, 'wb') as f:
    f.write(binary)
  return filename

## データセットの作成


In [None]:
#@title ②スクレイピングの設定と実行
#@markdown 半角の , 区切りでスクレイピングするワードを入力して下さい
# 0. 複数スクレイピングの準備
スクレイピングキーワード = "\u767D\u77F3\u9EBB\u8863, \u5CA9\u7530\u525B\u5178"#@param {type:"string"}
keywords = スクレイピングキーワード.split(',')
最大取得枚数 =  20#@param {type:"number"}
max_num = 最大取得枚数
print(f"スクレイピングするキーワードの確認 : {keywords}")

for keyword in keywords:

  # 1. インスタンスの宣言
  crawler = BingImageCrawler(storage={"root_dir": "dataset/" + keyword})
  
  # 2. crawでスクレイピング
  crawler.crawl(keyword=keyword, max_num=max_num)

clear_output()
print("スクレイピングが完了しました")

zipダウンロード = False #@param {type:"boolean"}
# zipファイルにまとめる
!zip -r dataset.zip dataset
clear_output()

if zipダウンロード:
  # zipファイルのダウンロード
  from google.colab import files
  files.download('dataset.zip')

In [None]:
#@title ③データセットの詳細設定
学習データとテストデータの比率 = 0.8 #@param {type:"slider", min:0, max:1, step:0.1}
学習データと検証データの比率 = 0.8 #@param {type:"slider", min:0, max:1, step:0.1}

# リサイズ後の画像の大きさ
img_size = 64
# データセットに含める画像のパス
img_pass = "dataset"
# 学習とテストの比率
test_ratio = 学習データとテストデータの比率
val_ratio = 学習データと検証データの比率

# 1. 整形方法の設定  
transform = transforms.Compose([
                                transforms.Resize((img_size, img_size)),
                                transforms.ToTensor(),
                                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
                                ])

# 2. データセットの読み込み
data = torchvision.datasets.ImageFolder(root=img_pass, transform=transform)

# データの長さ
# 全データの長さ
data_len = len(data)
print(f"データ全体 : {data_len}枚")

# 学習データの長さ
train_len = int(data_len*test_ratio)
print(f"学習データ : {train_len}枚")
# テストデータの長さ
test_len = data_len - train_len
print(f"テストデータ : {test_len}枚")

# 学習データとテストデータへの分割
train_subset, test_subset = torch.utils.data.random_split(data, [train_len, test_len])

# 学習データをDatasetクラスオブジェクトに戻す
inputs, labels = class_making(train_subset, img_size)
train_set = Tensor2Dataset(inputs, labels)

# AIモデルの構築と学習

---

<font color="blue">学習の設定</font>では，どのように学習させるかを設定します
- エポック：データセットを使い回す回数を指定します
- バッチサイズ：一度に学習させる枚数を指定します
- エポックごとの誤差の表示：エポックごとに、学習誤差・検証誤差・テスト誤差を表示させるか指定します
- モデルの保存：モデルを保存するかどうかを指定します
- 保存するモデルの名前：保存するモデルの名称を指定します

<font color="blue">学習</font>では，AIモデルの構築で作成したAIモデルを学習させます。スタンダードなCNNを用います

### ④学習の設定

In [None]:
エポック = 100#@param {type:"number"}
バッチサイズ = 10#@param {type:"number"}
エポックごとの誤差の表示 = False #@param {type:"boolean"}
モデルの保存 = True #@param {type:"boolean"}
保存するモデルの名前 = "model" #@param {type:"string"}

def train_model(model, criterion, optimizer, epochs=エポック, batch_size=バッチサイズ, train_set=train_set, test_subset=test_subset, val_ratio=val_ratio):

  device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

  train_loss_arr = []
  validation_loss_arr = []
  test_loss_arr = []

  # テストデータは一気にテスト
  test_inputs, test_labels = class_making(test_subset, img_size)

  for epoch in range(epochs):

    train_loss = 0.0
    validation_loss = 0.0
    test_loss = 0.0

    # エポックごとに交差検証
    train_size = int(len(train_set) * val_ratio)
    validation_size = len(train_set) - train_size
    train_data, validation_data = torch.utils.data.random_split(train_set, [train_size, validation_size])

    # 学習データはバッチで学習
    train_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, shuffle=True)
    # 検証データは一気にテスト
    validation_inputs, validation_labels = class_making(validation_data, img_size)

    # 学習モード
    model.train()
    for data in train_loader:
      # データ入力
      inputs, labels = data
      inputs = inputs.to(device)
      labels = labels.to(device)
      outputs = model(inputs)

      # 誤差逆伝播とパラメータの更新
      optimizer.zero_grad()
      loss = criterion(outputs, labels)
      loss.backward()
      optimizer.step()

      train_loss += loss.item()

    # 検証モード
    model.eval()
    with torch.no_grad():
      # 検証データ
      validation_loss = criterion(model(validation_inputs), validation_labels)
      # テストデータ
      test_loss = criterion(model(test_inputs), test_labels)

    if エポックごとの誤差の表示:
      print("epoch{}\n 学習誤差: {:.4f} / 検証誤差:{:.4f} / テスト誤差:{:.4f}".format(epoch+1, train_loss/len(train_loader), validation_loss, test_loss))
    train_loss_arr.append(train_loss/len(train_loader))
    validation_loss_arr.append(validation_loss)
    test_loss_arr.append(test_loss)

  if torch.cuda.is_available():
    model.to("cpu")
  torch.save(model.state_dict(), f"{保存するモデルの名前}.pth")
  loss_arr = {"学習誤差": train_loss_arr, "検証誤差": validation_loss_arr, "テスト誤差": test_loss_arr}
  return model, loss_arr

  # ニューラルネットワークのクラスを定義
class CNN(nn.Module):
  def __init__(self):
    super(CNN, self).__init__()
    # 全結合層のニューロン数
    self.fc_neuron = 108
    # 層の定義
    self.conv1 = nn.Conv2d( 3, 16, 3)
    self.conv2 = nn.Conv2d(16, 16, 3)
    self.conv3 = nn.Conv2d(16,  3, 3)
    self.fc = nn.Linear(self.fc_neuron, 3)

  def forward(self, x):
    h = self.conv1(x)
    h = torch.max_pool2d(h, 2)
    h = F.relu(h)

    h = self.conv2(h)
    h = torch.max_pool2d(h, 2)
    h = F.relu(h)

    h = self.conv3(h)
    h = torch.max_pool2d(h, 2)
    h = F.relu(h)

    # 全結合層に接続するために並べ替え
    h = h.view(-1, self.fc_neuron)

    y = self.fc(h)
    return y

# 　インスタンスの宣言
# インスタンスの名称はcnnにしましょう
cnn = CNN()

# 念の為GPUの設定をできるようにする
# デバイスインスタンスの宣言
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# ニューラルネットワーククラスインスタンスをGPUにセット
cnn.to(device)

print("ニューラルネットワークの定義が完了しました")


# 学習
学習方法の選択 = "自作のモデルで学習"
# 誤差関数の定義
criterion = nn.CrossEntropyLoss()
# 最適化器の定義
optimizer = optim.Adam(cnn.parameters())

model = cnn if 学習方法の選択=="自作のモデルで学習" else transfer_model
model, loss = train_model(model, criterion, optimizer)

# 性能評価

---

<font color="blue">学習の評価</font>では，学習中の誤差の経過を表示して，学習後のモデルを使って予測を試してみます．

- 学習経過の可視化  
  縦軸の誤差、横軸にエポック数を取り、学習誤差・検証誤差・テスト誤差の3つを折線グラフで描画します
  - 学習誤差：このエポックで学習に使ったデータの誤差
  - テスト誤差：学習には絶対に使わないデータの誤差
  - 検証誤差：このエポックでは学習に使わなかったデータの誤差


<font color="blue">認識精度の性能評価</font>では，学習データかテストデータを指定して認識精度を確認できます
- 使うデータ：学習データかテストデータを指定します

認識精度として次の3つが表示されます
- 正解率：Accuracy：全体のうち、AIの判断が当たっていた割合
- 的中精度：Precision：AIが乃木坂46と判断したもののうち、本当に乃木坂46だったものの割合
- 取りこぼし率：Recall：乃木坂46全体のうち、AIが乃木坂46と認識したもの

### ⑤学習経過の可視化と判定率の算出


In [None]:
# 折れ線グラフの作成
for key, value in loss.items():
  plt.plot(range(1, len(value) + 1), value, label=key)

# グラフタイトルの設定
plt.title("学習経過")

# 凡例とラベルの追加
plt.legend()
plt.xlabel("エポック数")
plt.ylabel("誤差")

# グラフの表示
plt.show()

使うデータ = "\u5B66\u7FD2\u30C7\u30FC\u30BF"#@param ["学習データ", "テストデータ"]{type:"string"}
data_type = 使うデータ
一度に表示する枚数 = 8#@param {type:"number"}
batch_size = 一度に表示する枚数

モデルの選択 = "自作のモデル" #@param ["自作のモデル", "転移学習モデル"] {allow-input: false}


# 入力画像を表示する関数の作成
def imshow(img):
    img = img / 2 + 0.5
    plt.title("予測に使った画像")
    plt.imshow(img.transpose(0, 1).transpose(1, 2))
    plt.axis("off")
    plt.show()

def prediction(model, inputs):

  imshow(torchvision.utils.make_grid(inputs))
  
  outputs = model(inputs.to(device))
  outputs = outputs.to("cpu").argmax(dim=1)

  # 数値から名称に変換
  ans = []
  sorted_keys = sorted(keywords)
  for label in outputs:
    for index in range(len(keywords)):
      if label==index:
        ans.append(sorted_keys[index])
  print("AIの予測結果", ans)

if data_type=="学習データ":
  prediction_subset = train_subset
else :
  prediction_subset = test_subset

model = cnn if モデルの選択=="自作のモデル" else transfer_model

prediction_loader = torch.utils.data.DataLoader(prediction_subset, batch_size=batch_size, shuffle=True)
inputs, _ = iter(prediction_loader).next()
prediction(model, inputs)

使うデータ = "\u5B66\u7FD2\u30C7\u30FC\u30BF"#@param ["学習データ", "テストデータ"]{type:"string"}
data_type = 使うデータ

if data_type=="学習データ":
  verification_subset = train_subset
else :
  verification_subset = test_subset

def performance_chesk(verification_subset):
  verification_inputs, verification_labels = class_making(verification_subset, img_size)

  outputs = cnn(verification_inputs.to(device))
  outputs = outputs.to("cpu").argmax(dim=1)

  accuracy = accuracy_score(verification_labels, outputs)
  precision = precision_score(verification_labels, outputs, average=None)
  recall = recall_score(verification_labels, outputs, average=None)
  print("正解率 (%)：", accuracy*100,"%")
  print("的中精度(%) :", precision*100,"%")
  print("取りこぼし率(%) :", recall*100,"%")

performance_chesk(verification_subset)

### ⑥画像のアップロードと判別

In [None]:
image_file = take_photo()
image = cv2.imread(image_file)


import matplotlib.pyplot as plt
import matplotlib.image as img
image = img.imread(image_file)
plt.title("アップロードした画像")
plt.imshow(image)
plt.axis("off")
plt.show()



model = cnn if モデルの選択=="自作のモデル" else transfer_model
model_name = "model.pth"
clear_output()
model.load_state_dict(torch.load(model_name))


from PIL import Image
image = Image.open(image_file)
transform = transforms.Compose([
                                transforms.Resize((img_size, img_size)),
                                transforms.ToTensor(),
                                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
                                ])
image = transform(image).unsqueeze(0).float()
prediction(model, image)
