**Task Type**: Image Regression

**Loss Function**: 平均二乗誤差(MSE)

## Imports

import os
import random
import numpy as np
import pandas as pd

In [None]:
import glob
import os
import random
from typing import Optional

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
from PIL import Image
from tqdm.auto import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import warnings

device_str = "CPU"
if torch.cuda.is_available():
    device_str = f"CUDA: {torch.cuda.get_device_name(0)}"
elif torch.backends.mps.is_available():
    device_str = "MPS (Apple Silicon GPU)"

warnings.simplefilter(action='ignore', category=FutureWarning)
print(f"PyTorch: {torch.__version__}")
print(f"Device: {device_str}")

## Data

In [None]:
# Kaggle 上で動いているかどうかを判定
ON_KAGGLE = ("KAGGLE_KERNEL_RUN_TYPE" in os.environ) or Path("/kaggle/input").exists()

if ON_KAGGLE:
    PATH_DATA = "/kaggle/input/csiro-biomass"
else:
    PATH_DATA = "data"

PATH_TRAIN_CSV = os.path.join(PATH_DATA, 'train.csv')
PATH_TRAIN_IMG = os.path.join(PATH_DATA, 'train')
PATH_TEST_IMG = os.path.join(PATH_DATA, 'test')

df = pd.read_csv(PATH_TRAIN_CSV)
print(f'Dataset size: {df.shape}')
df.head()

In [None]:
TARGET_COLS = ["target"]
print(f"Target columns: {TARGET_COLS}")
print(f"Number of targets: {len(TARGET_COLS)}")

### Dataset/DataLoader

まずは画像を読み込んで、`TARGET_COLS` と回帰ターゲットとして返す Dataset を定義する。

In [None]:
from torchvision import transforms

# 画像サイズはとりあえず 256 にする（後で変えてOK)
IMG_SIZE = 256

# 画像前処理（最低限）
train_transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    # 正規化はひとまず 0-1 のままでも良いが、
    # ちゃんとやるなら mean/std を計算してからここに入れる
])


class BiomassDataset(Dataset):
    def __init__(self, df: pd.DataFrame, transform=None, is_train: bool = True):
        self.df = df.reset_index(drop=True)
        self.transform = transform
        self.is_train = is_train
    
    def __len__(self):
        return len(self.df)
    
    def _load_image(self, image_path: str):
        # コンペの画像は .jpg なのでこうしておく (必要なら .png に変更)
        img_path = os.path.join(PATH_DATA, image_path)
        img = Image.open(img_path).convert("RGB")
        if self.transform is not None:
            img = self.transform(img)
        return img
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image = self._load_image(row["image_path"])
        target = torch.tensor(row["target"], dtype=torch.float32)   # スカラー
        return image, target
    

class BiomassTestDataset(Dataset):
    def __init__(self, df: pd.DataFrame, transform=None):
        self.df = df.reset_index(drop=True)
        self.transform = transform

    def __len__(self):
        return len(self.df)
    
    def _load_image(self, image_path):
        image_path = os.path.join(PATH_DATA, image_path)
        img = Image.open(image_path).convert("RGB")
        if self.transform is not None:
            img = self.transform(img)
        return img
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image = self._load_image(row["image_path"])
        sample_id = row["sample_id"]
        return image, sample_id

簡単に train/valid に分割して DataLoader を作る

In [None]:
from sklearn.model_selection import train_test_split

train_df, valid_df = train_test_split(
    df,
    test_size=0.2,
    random_state=42,
    shuffle=True
)

train_dataset = BiomassDataset(train_df, transform=train_transform)
valid_dataset = BiomassDataset(valid_df, transform=train_transform)

BATCH_SIZE = 16

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
valid_loader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

デバイス

In [None]:
if torch.cuda.is_available():
    device = torch.device("cuda")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cpu")

device

シンプルな CNN ベースライン(画像 -> グローバル平均プーリング -> 全結合で5ターゲット)を定義

In [None]:
class SimpleCNNRegressor(nn.Module):
    def __init__(self, num_targets: int):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),    # 256 -> 128

            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),    # 128 -> 64

            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),    # 64 -> 32

            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),    # 32 -> 16
        )

        # グローバル平均プーリング
        self.global_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.regressor = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, num_targets)
        )
    
    def forward(self, x):
        x = self.features(x)
        x = self.global_pool(x)
        x = self.regressor(x)
        return x
    
EPOCHS = 10
lr = 1e-3

model = SimpleCNNRegressor(num_targets=len(TARGET_COLS)).to(device)
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

## 学習ループ(MSE)

最低限の train/valid ループ

In [None]:
def train_one_epoch(model, loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    running_abs_error = 0.0
    n_samples = 0

    for images, targets in tqdm(loader):
        images = images.to(device)
        targets = targets.to(device)    # shape: (batch, )

        optimizer.zero_grad()
        outputs = model(images).squeeze(-1)  # (batch, 1) -> (batch, )
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)
        running_abs_error += torch.abs(outputs - targets).sum().item()
        n_samples += images.size(0)
    
    epoch_loss = running_loss / n_samples   # MSE
    epoch_mae = running_abs_error / n_samples   # MAE
    epoch_rmse = np.sqrt(epoch_loss)   # RMSE

    return epoch_loss, epoch_mae, epoch_rmse


def eval_one_epoch(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    running_abs_error = 0.0
    n_samples = 0

    with torch.no_grad():
        for images, targets in tqdm(loader):
            images = images.to(device)
            targets = targets.to(device)

            outputs = model(images).squeeze(-1)  # (batch, 1) -> (batch, )
            loss = criterion(outputs, targets)

            running_loss += loss.item() * images.size(0)
            running_abs_error += torch.abs(outputs - targets).sum().item()
            n_samples += images.size(0)
    
    epoch_loss = running_loss / n_samples   # MSE
    epoch_mae = running_abs_error / n_samples  # MAE
    epoch_rmse = np.sqrt(epoch_loss)   # RMSE

    return epoch_loss, epoch_mae, epoch_rmse

In [None]:
for epoch in range(EPOCHS):
    train_loss, train_mae, train_rmse = train_one_epoch(model, train_loader, loss_function, optimizer, device)
    valid_loss, valid_mae, valid_rmse = eval_one_epoch(model, valid_loader, loss_function, device)


    print(
        f"Epoch [{epoch}/{EPOCHS}] "
        f"Train [MSE: {train_loss:.4f}, RMSE: {train_rmse:.4f}, MAE: {train_mae:.4f}] "
        f"Valid [MSE: {valid_loss:.4f}, RMSE: {valid_rmse:.4f}, MAE: {valid_mae:.4f}]"
    )

## テスト実行

In [None]:
PATH_TEST_CSV = os.path.join(PATH_DATA, "test.csv")
test_df = pd.read_csv(PATH_TEST_CSV)
test_df.head()

test_dataset = BiomassTestDataset(test_df, transform=train_transform)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

model.eval()
all_sample_ids = []
all_preds = []

with torch.no_grad():
    for images, sample_ids in tqdm(test_loader):
        images = images.to(device)

        outputs = model(images).squeeze(-1)
        preds = outputs.cpu().numpy()

        all_sample_ids.extend(sample_ids)
        all_preds.extend(preds)

submission = pd.DataFrame({
    "sample_id": all_sample_ids,
    "target": all_preds,
})

print(submission.head())
print(submission.shape)

submission.to_csv("submission.csv", index=False)