## データローダーの実装

In [None]:
import zipfile
from logging import Logger, getLogger
from os import mkdir
from os.path import exists
from typing import Callable
from urllib.parse import urlparse
from zipfile import BadZipFile

import numpy as np
import requests
from mpl_toolkits.mplot3d.proj3d import transform
from requests import Response
from torch import no_grad
from torchvision.transforms.v2 import ToTensor
from tqdm import tqdm

KIB = 2 ** 10


class Downloader:
    """
    指定されたURLからデータセットをダウンロードし、ZIPファイルを解凍するクラス。

    :param root: ダウンロード先のルートディレクトリ。
    :param url: ダウンロードするURLまたはURLを返す関数。
    :param overwrite: 既存のファイルを上書きするかどうか（デフォルトはFalse）。
    :param zip_filename: ZIPファイルの名前（指定がない場合はURLから取得）。
    :param logger: ロギング用のLoggerオブジェクト（指定がない場合はデフォルトのLoggerを使用）。
    """

    def __init__(self, root: str, url: str | Callable[[], str] = None, overwrite: bool = False,
                 zip_filename: str = None,
                 logger: Logger = None):
        """
        Downloaderのコンストラクタ。

        :param root: ダウンロード先のルートディレクトリ。
        :param url: ダウンロードするURLまたはURLを返す関数。
        :param overwrite: 既存のファイルを上書きするかどうか（デフォルトはFalse）。
        :param zip_filename: ZIPファイルの名前（指定がない場合はURLから取得）。
        :param logger: ロギング用のLoggerオブジェクト（指定がない場合はデフォルトのLoggerを使用）。
        """
        self._root = os.path.normpath(root)

        if isinstance(url, Callable):
            self.url = url()
        else:
            self.url = url

        self.zip_filename = zip_filename or os.path.basename(urlparse(self.url).path)  # URLからファイル名を取得
        self.zip_path = os.path.join(self._root, self.zip_filename)
        self.extract_path = os.path.splitext(self.zip_path)[0]
        self.overwrite = overwrite
        self.logger = logger or getLogger(__name__)

    def download(self):
        """データセットをダウンロードする。

        既にデータセットがダウンロードされている場合、overwriteがFalseの場合はダウンロードをスキップします。
        """
        if self.is_downloaded() and not self.overwrite:
            print(f"Dataset already exists at {self.zip_path}, skipping download.")
            return

        print(f"Downloading dataset from {self.url}...")

        try:
            response = self.request(self.url)
            self.save_response_content(response, self.zip_path)

            print("\nDownload completed.")

        except requests.exceptions.RequestException as e:
            print(f"Error during download: {e}")

        except zipfile.BadZipFile:
            print("Error: Bad zip file.")

    def request(self, url):
        """指定されたURLにGETリクエストを送り、レスポンスを返す。

        :param url: リクエストするURL。
        :return: リクエストの結果得られたレスポンスオブジェクト。
        """
        response = requests.get(url, stream=True)
        response.raise_for_status()  # HTTPエラーを確認
        return response

    def save_response_content(self, response: Response, destination, chunk_size: int = 100 * KIB):
        """レスポンスのコンテンツを指定されたファイルに保存する。

        :param response: 保存するためのレスポンスオブジェクト。
        :param destination: 保存先ファイルのパス。
        :param chunk_size: 保存時のチャンクサイズ（デフォルトは100KiB）。
        """
        if not exists(self.root):
            mkdir(self.root)

        with open(destination, "wb") as f:
            for chunk in tqdm(response.iter_content(chunk_size=chunk_size)):
                if chunk:
                    f.write(chunk)

    def extract(self):
        """ZIPファイルを解凍し、重複したルートフォルダがある場合はまとめる。

        解凍先のディレクトリが既に存在する場合、その内容は保持されます。
        """
        print(f"Unzipping {self.zip_path}...")
        try:
            with zipfile.ZipFile(self.zip_path, 'r') as zip_ref:
                # ZIPファイルのトップレベルのフォルダを確認
                top_level_dirs = {os.path.normpath(x).split(os.sep)[0] for x in zip_ref.namelist()}

                if len(top_level_dirs) == 1:
                    # トップレベルに1つのディレクトリだけある場合
                    top_level_dir = next(iter(top_level_dirs))
                    self.extract_path = os.path.join(self._root, top_level_dir)
                    print(f"Extracting into {self.extract_path}...")

                total_files = len(zip_ref.namelist())
                with tqdm(total=total_files, unit='file') as progress:
                    for file in zip_ref.namelist():
                        destination = self._root if len(top_level_dirs) == 1 else self.extract_path
                        zip_ref.extract(file, destination)
                        progress.update(1)
        except BadZipFile:
            print("Error: Bad zip file, extraction failed.")

        print(f"Extracted to {self._root}.")

    @property
    def root(self):
        """ダウンロード先のルートディレクトリを取得する。"""
        return self._root

    def is_downloaded(self):
        """ZIPファイルがダウンロードされているかどうかを確認する。

        :return: ZIPファイルが存在する場合はTrue、それ以外はFalse。
        """
        return os.path.exists(self.zip_path)

    def is_extracted(self):
        """データセットが解凍されているかどうかを確認する。

        :return: 解凍先が存在する場合はTrue、それ以外はFalse。
        """
        return os.path.exists(self.extract_path)

    def __call__(self, on_complete: Callable = None):
        """ダウンロードおよび解凍を実行する。

        :param on_complete: 処理完了後に呼び出す関数（オプション）。
        """
        if self.overwrite or not self.is_downloaded():
            self.download()
        else:
            print("Dataset exists and 'overwrite' is False. No download.")

        if self.overwrite or not self.is_extracted():
            self.extract()
        else:
            print("Dataset exists and 'overwrite' is False. No extract.")

        if on_complete is not None:
            on_complete()

In [None]:
import os
from typing import Optional

from torch.utils.data import Dataset, DataLoader

from datasets.downloader import Downloader


class Cat2000(Dataset):
    def __init__(self, categories: Optional[list[str]] = None,
                 image_transform: Optional = None,
                 map_transform: Optional = None,
                 downloader: Optional[Downloader] = None,
                 ):
        if categories is None:
            categories = ["*"]
        self.categories = categories
        self.image_transform = image_transform
        self.map_transform = map_transform
        self.downloader = downloader or Downloader("./data", "http://saliency.mit.edu/trainSet.zip")
        self.dataset_path = os.path.join(self.downloader.root, "trainSet", "Stimuli")

        # 画像とマップのペアを取得
        self.image_map_pair_cache = []
        self.downloader(on_complete=self.cache_image_map_paths)

    def cache_image_map_paths(self):
        self.image_map_pair_cache = []

        # categoriesにワイルドカードが含まれている場合、全カテゴリディレクトリを展開
        if "*" in self.categories:
            expanded_categories = [d for d in glob.glob(os.path.join(self.dataset_path, "*")) if os.path.isdir(d)]
        else:
            expanded_categories = [os.path.join(self.dataset_path, category) for category in self.categories]

        # 展開したカテゴリディレクトリごとに処理を行う
        for category_path in expanded_categories:
            # 画像ファイルのパスを取得
            image_paths = glob.glob(os.path.join(category_path, "*.jpg"))

            for image_path in image_paths:
                base_name = os.path.basename(image_path)
                map_name = base_name.replace(".jpg", "_SaliencyMap.jpg")
                map_path = os.path.join(category_path, "Output", map_name)

                if os.path.exists(map_path):
                    self.image_map_pair_cache.append((image_path, map_path))
                else:
                    print(f"Warning: No corresponding map found for {map_path}")

    def __len__(self):
        return len(self.image_map_pair_cache)

    def __getitem__(self, idx: int):
        image_path, map_path = self.image_map_pair_cache[idx]
        image = Image.open(image_path).convert("RGB")
        map_image = Image.open(map_path).convert("RGB")

        if self.image_transform is not None:
            image = self.image_transform(image)

            if self.map_transform is not None:
                map_image = self.map_transform(map_image)
            else:
                map_image = self.image_transform(map_image)

        return image, map_image

    def __str__(self):
        return "\n".join(
            f"image: {Image.open(pair[0]).size}, map: {Image.open(pair[1]).size}" for pair in self.image_map_pair_cache)


In [None]:
import glob
from os import path
from typing import Optional, Callable

from PIL import Image
from torch.utils.data import Dataset

from datasets.downloader import Downloader


class SALICONDataset(Dataset):

    def __init__(self,
                 val_mode: bool = False,
                 image_transform: Optional[Callable] = None,
                 map_transform: Optional[Callable] = None,
                 images_downloader: Optional[Downloader] = None,
                 map_downloader: Optional[Downloader] = None,
                 ):

        self.categories = "val" if val_mode else "train"

        self.image_transform = image_transform
        self.map_transform = map_transform

        self.images_downloader = images_downloader or Downloader("./data/salicon", "", zip_filename="images.zip",
                                                                 overwrite=False)
        self.maps_downloader = map_downloader or Downloader("./data/salicon", "", zip_filename="maps.zip",
                                                            overwrite=False)

        self.images_downloader()
        self.maps_downloader()

        # 画像とマップのペアを取得
        self.image_map_pair_cache = []
        self.cache_image_map_paths()

    def cache_image_map_paths(self):
        for category in self.categories:
            images_dir = self.images_downloader.extract_path
            maps_dir = self.maps_downloader.extract_path

            images_path_list = sorted(glob.glob(path.join(images_dir, category, "*.jpg")))
            maps_path_list = sorted(glob.glob(path.join(maps_dir, category, "*.png")))

            # ペアリング
            for img_path, map_path in zip(images_path_list, maps_path_list):
                if path.basename(img_path) == path.basename(map_path).replace(".png", ".jpg"):
                    self.image_map_pair_cache.append((img_path, map_path))

    def __len__(self):
        return len(self.image_map_pair_cache)

    def __getitem__(self, index: int):
        image_path, map_path = self.image_map_pair_cache[index]
        image = Image.open(image_path).convert("RGB")
        map_image = Image.open(map_path).convert("RGB")

        if self.image_transform is not None:
            image = self.image_transform(image)

            if self.map_transform is not None:
                map_image = self.map_transform(map_image)
            else:
                map_image = self.image_transform(map_image)

        return image, map_image

    def __str__(self):
        return "\n".join(
            f"image: {Image.open(pair[0]).size}, map: {Image.open(pair[1]).size}" for pair in self.image_map_pair_cache)


## モデルの実装

In [1]:
from torch.nn import Module, Conv2d, LeakyReLU, Upsample
from torchvision.models import vgg16


class DecoderBlock(Module):
    def __init__(self, in_channels=512, out_channels=512):
        super().__init__()
        self.module = Sequential(
            Conv2d(in_channels, out_channels, 3, padding=1),
            LeakyReLU()
        )


class Generator(Module):
    def __init__(self, pretrained=True):
        super().__init__()
        self.encoder1 = vgg16(pretrained=pretrained).features[:17]
        self.encoder_last = vgg16(pretrained=pretrained).features[17:-1]
        self.decoder = Sequential(
            DecoderBlock(512, 512),
            DecoderBlock(512, 512),
            DecoderBlock(512, 512),
            Upsample(scale_factor=2),

            DecoderBlock(512, 512),
            DecoderBlock(512, 512),
            DecoderBlock(512, 512),

            Upsample(scale_factor=2),

            DecoderBlock(512, 256),
            DecoderBlock(256, 256),
            DecoderBlock(256, 256),

            Upsample(scale_factor=2),

            DecoderBlock(256, 128),
            DecoderBlock(128, 128),

            Upsample(scale_factor=2),
            DecoderBlock(128, 64),
            DecoderBlock(64, 64),
        )
        self.output = Sequential(
            Conv2d(64, 1, 1, padding=0),
            Sigmoid()
        )

    def forward(self, x):
        x = self.encoder1(x)
        x = self.encoder_last(x)
        x = self.decoder(x)
        return self.output(x)

In [None]:
from torch.nn import Module, Sequential, Conv2d, LeakyReLU, Upsample, Sigmoid, MaxPool2d, Tanh, Linear


class Discriminator(Module):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.main = Sequential(
            Conv2d(4, 3, 1, padding=1),
            LeakyReLU(inplace=True),
            Conv2d(3, 32, 3, padding=1),
            LeakyReLU(inplace=True),
            MaxPool2d(2, stride=2),
            Conv2d(32, 64, 3, padding=1),
            LeakyReLU(inplace=True),
            Conv2d(64, 64, 3, padding=1),
            LeakyReLU(inplace=True),
            MaxPool2d(2, stride=2),
            Conv2d(64, 64, 3, padding=1),
            LeakyReLU(inplace=True),
            Conv2d(64, 64, 3, padding=1),
            LeakyReLU(inplace=True),
            MaxPool2d(2, stride=2))

        self.classifier = Sequential(
            Linear(64 * 32 * 24, 100, bias=True),
            Tanh(),
            Linear(100, 2, bias=True),
            Tanh(),
            Linear(2, 1, bias=True),
            Sigmoid())

    def forward(self, x):
        x = self.main(x)
        x = x.view(x.shape[0], -1)
        x = self.classifier(x)

        return x

### トレーニングの実装

In [None]:
from torch.nn import Module


def train(generator: Module, discriminator: Module, dataloader: DataLoader, criterion: Module, optimizer, device,
          show_stride=100):
    # set train mode
    generator.train()

    criterion.to(device)
    optimizer.to(device)
    generator.to(device)
    discriminator.to(device)

    for batch_index, (data, label) in enumerate(dataloader):
        data, label = data.to(device), label.to(data)

        loss = criterion(label)

        if batch_index % show_stride == 0:
            print(f"[<train> batch: {batch_index}]")


In [None]:
from torch.nn import Module
import torch


def test(generator, discriminator, dataloader: DataLoader, criterion, device, show_stride=100):
    criterion.to(device)
    generator.to(device)
    discriminator.to(device)

    with no_grad():
        for batch_index, (data, label) in enumerate(dataloader):
            data, label = data.to(device), label.to(data)

            loss = criterion(label)

            if batch_index % show_stride == 0:
                print(f"[<train> batch: {batch_index}]")

In [4]:
def run(generator, discriminator, train_dataloader, criterion, optimizer, test_dataloader, device, epochs=120):
    for epoch in range(epochs):
        print("--- train start ---")
        train(generator, discriminator, train_dataloader, criterion, optimizer, device)

        print("--- test start ---")
        test(generator, discriminator, test_dataloader, criterion, device)


In [None]:
### Utils

In [None]:
from os.path import exists
from os import makedirs


def save_log(save_dir="./log"):
    if not exists(save_dir):
        makedirs(save_dir)

In [None]:
from datetime import datetime


def get_timestamp(date_format="{0:%Y%m%d-%H%M%S}"):
    return date_format.format(datetime.now())

In [None]:
def predict(generator: Module, transform, image):
    with torch.no_grad():
        image = transform(image)
        saliency_map = generator(image)
        saliency_map = np.array(saliency_map.cpu())[0, 0]

    saliency_map = saliency_map / saliency_map.sum()
    saliency_map = (())

### セットアップ

In [None]:
from torchvision.transforms import Compose
from torch.nn import BCELoss
from torch.optim import Adagrad

transforms = Compose([ToTensor()])

salicon_train = SALICONDataset(image_transform=transforms)
salicon_test = SALICONDataset(image_transform=transforms, val_mode=True)

train_dataloader = DataLoader(salicon_train, batch_size=32, shuffle=True, pin_memory=True)
test_dataloader = DataLoader(salicon_test, batch_size=8, shuffle=False)

generator = Generator()
discriminator = Discriminator()

criterion = BCELoss()
generator_optimizer = Adagrad([{'params': generator.encoder_last.paramators()},
                               {'params': generator.decoder.paramators()}])
discriminator_optimizer = Adagrad(discriminator.parameters())

### 実行