<a href="https://colab.research.google.com/github/YUJI-AKAMATSU/jm/blob/main/Resnet152/Convnext/Swin%20Transformer%20UNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**UNet with ResNet152  **

** 重要   num_classes: int = 40
    epochs: int = 200
    learning_rate: float = 1e-4
    weight_decay: float = 1e-4
    image_size: tuple = (512, 256)**

In [None]:
# ================================
#     Omnicampus向け：全コード + Scheduler & EarlyStopping + Fix CE Error
# ================================

# 必要ライブラリ
import os
import time
import numpy as np
from tqdm import tqdm
from PIL import Image
from dataclasses import dataclass
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.utils.data import DataLoader, Dataset
from torchvision.models import resnet152
from torch.amp import autocast, GradScaler
import zipfile
import shutil
import albumentations as A
from albumentations.pytorch import ToTensorV2

# ------------------
#    データの展開
# ------------------
zip_path = "/content/data.zip"
extract_dir = "/content/unzipped"
target_data_dir = "/content/data"

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_dir)

found = False
for root, dirs, files in os.walk(extract_dir):
    if 'train' in dirs and 'test' in dirs:
        train_path = os.path.join(root, 'train')
        test_path = os.path.join(root, 'test')
        os.makedirs(target_data_dir, exist_ok=True)
        try:
            shutil.move(train_path, os.path.join(target_data_dir, 'train'))
            shutil.move(test_path, os.path.join(target_data_dir, 'test'))
        except shutil.Error:
            print(f"⚠️ 既に {target_data_dir} に train/test が存在しています。移動をスキップします。")
        else:
            print(f"✔️ train/test を {target_data_dir} に移動しました")
        found = True
        break

if not found:
    print("⚠️ train/test フォルダが見つかりませんでした。zip の中身を再確認してください。")

shutil.rmtree(extract_dir)

# ------------------
#  Albumentations Transform
# ------------------
class AlbumentationsTransform:
    def __init__(self, height, width, is_train=True):
        if is_train:
            self.transform = A.Compose([
                A.Resize(height, width),
                A.HorizontalFlip(p=0.5),
                A.RandomBrightnessContrast(p=0.2),
                A.Rotate(limit=10, p=0.5),
                A.Normalize(),
                ToTensorV2()
            ], additional_targets={'depth': 'image', 'label': 'mask'})
        else:
            self.transform = A.Compose([
                A.Resize(height, width),
                A.Normalize(),
                ToTensorV2()
            ], additional_targets={'depth': 'image', 'label': 'mask'})

    def __call__(self, image, depth, label=None):
        data = {"image": np.array(image), "depth": np.array(depth)}
        if label is not None:
            data["label"] = np.array(label)
            aug = self.transform(**data)
            aug["label"] = aug["label"].long()  # 修正点：すでにTensorなので変換のみ
            return aug["image"], aug["depth"], aug["label"]
        else:
            aug = self.transform(**data)
            return aug["image"], aug["depth"]

# ------------------
#    Dataset Class
# ------------------
class NYUv2(Dataset):
    def __init__(self, root, split='train', transform=None):
        self.root = root
        self.split = split
        self.transform = transform
        self.img_dir = os.path.join(root, split, 'image')
        self.depth_dir = os.path.join(root, split, 'depth')
        self.label_dir = os.path.join(root, split, 'label')
        self.img_names = sorted(os.listdir(self.img_dir))

    def __getitem__(self, idx):
        image = Image.open(os.path.join(self.img_dir, self.img_names[idx])).convert("RGB")
        depth = Image.open(os.path.join(self.depth_dir, self.img_names[idx]))

        if self.split == 'train':
            label = Image.open(os.path.join(self.label_dir, self.img_names[idx]))
            image, depth, label = self.transform(image, depth, label)
            return image, depth, label
        else:
            image, depth = self.transform(image, depth)
            return image, depth

    def __len__(self):
        return len(self.img_names)


# ------------------
#    UNet with ResNet152
# ------------------
from torchvision.models import resnet152, ResNet152_Weights
import torchvision.transforms.functional as TF

class ResNet152EncoderUNet(nn.Module):
    def __init__(self, num_classes, in_channels=4):
        super().__init__()
        base_model = resnet152(weights=ResNet152_Weights.DEFAULT)

        # 入力層（RGB+Depthの4chに対応）
        self.input_conv = nn.Conv2d(in_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)
        with torch.no_grad():
            pretrained_w = base_model.conv1.weight
            self.input_conv.weight[:, :3] = pretrained_w
            self.input_conv.weight[:, 3:] = pretrained_w.mean(dim=1, keepdim=True)

        self.input_bn = base_model.bn1
        self.input_relu = base_model.relu
        self.maxpool = base_model.maxpool

        # エンコーダ部（ResNet152のblockを使用）
        self.layer1 = base_model.layer1  # 256
        self.layer2 = base_model.layer2  # 512
        self.layer3 = base_model.layer3  # 1024
        self.layer4 = base_model.layer4  # 2048

        # デコーダ部（Upsampling + Concat + Conv）
        self.up4 = self._up_block(2048, 1024)
        self.up3 = self._up_block(1024 + 1024, 512)
        self.up2 = self._up_block(512 + 512, 256)
        self.up1 = self._up_block(256 + 256, 128)

        # 出力層
        self.final = nn.Conv2d(128, num_classes, kernel_size=1)

    def _up_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True),
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        # エンコーダパス
        x = self.input_relu(self.input_bn(self.input_conv(x)))
        x = self.maxpool(x)

        e1 = self.layer1(x)
        e2 = self.layer2(e1)
        e3 = self.layer3(e2)
        e4 = self.layer4(e3)

        # デコーダパス + skip connection（center crop）
        d4 = TF.center_crop(self.up4(e4), e3.shape[-2:])
        d3 = TF.center_crop(self.up3(torch.cat([d4, e3], dim=1)), e2.shape[-2:])
        d2 = TF.center_crop(self.up2(torch.cat([d3, e2], dim=1)), e1.shape[-2:])
        d1 = self.up1(torch.cat([d2, e1], dim=1))

        return self.final(d1)


# ------------------
#    Config + Loader
# ------------------
@dataclass
class TrainingConfig:
    dataset_root: str = "/content/data"
    batch_size: int = 8
    num_workers: int = 0
    in_channels: int = 4
    num_classes: int = 40
    epochs: int = 200
    learning_rate: float = 1e-4
    weight_decay: float = 1e-4
    image_size: tuple = (512, 256)
    device: str = "cuda" if torch.cuda.is_available() else "cpu"

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed()
config = TrainingConfig()

train_transform = AlbumentationsTransform(config.image_size[1], config.image_size[0], is_train=True)
test_transform = AlbumentationsTransform(config.image_size[1], config.image_size[0], is_train=False)

train_dataset = NYUv2(config.dataset_root, 'train', transform=train_transform)
test_dataset = NYUv2(config.dataset_root, 'test', transform=test_transform)
train_loader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True, num_workers=config.num_workers)
test_loader = DataLoader(test_dataset, batch_size=1)

# ------------------
#    Model, Loss, Train
# ------------------
model = ResNet152EncoderUNet(config.num_classes, config.in_channels).to(config.device)
optimizer = optim.Adam(model.parameters(), lr=config.learning_rate, weight_decay=config.weight_decay)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=config.epochs)
scaler = GradScaler(enabled=torch.cuda.is_available())

class DiceLoss(nn.Module):
    def __init__(self, smooth=1.):
        super().__init__()
        self.smooth = smooth

    def forward(self, preds, targets):
        num_classes = preds.shape[1]
        preds = torch.softmax(preds, dim=1)
        mask = targets != 255
        targets = targets.clone()
        targets[~mask] = 0
        one_hot = F.one_hot(targets, num_classes).permute(0, 3, 1, 2).float()
        mask = mask.unsqueeze(1).float()
        preds, one_hot = preds * mask, one_hot * mask
        intersection = (preds * one_hot).sum(dim=(2, 3))
        union = preds.sum(dim=(2, 3)) + one_hot.sum(dim=(2, 3))
        return 1 - ((2 * intersection + self.smooth) / (union + self.smooth)).mean()

def combined_loss(pred, target):
    ce = F.cross_entropy(pred, target, ignore_index=255)
    dice = DiceLoss()(pred, target)
    return 0.5 * ce + 0.5 * dice

model.train()

for epoch in range(config.epochs):
    total_loss = 0
    for img, depth, label in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
        img, depth, label = img.to(config.device), depth.to(config.device), label.to(config.device)
        optimizer.zero_grad()
        with autocast(device_type=config.device, enabled=torch.cuda.is_available()):
            x = torch.cat((img, depth), dim=1)
            pred = model(x)
            if pred.shape[-2:] != label.shape[-2:]:
                pred = F.interpolate(pred, size=label.shape[-2:], mode='bilinear', align_corners=True)
            loss = combined_loss(pred, label)
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        total_loss += loss.item()
    scheduler.step()
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1}/{config.epochs} - Loss: {avg_loss:.4f}")

# ------------------
#    Save + Predict
# ------------------
save_path = f"model_{time.strftime('%Y%m%d_%H%M%S')}.pt"
torch.save(model.state_dict(), save_path)
print("Saved:", save_path)

model.eval()
def predict_tta(model, img, depth):
    with torch.no_grad():
        x = torch.cat((img, depth), dim=1)
        pred1 = model(x)
        pred2 = model(torch.flip(x, dims=[3]))
        pred2 = torch.flip(pred2, dims=[3])
        pred = (pred1 + pred2) / 2
        pred = F.interpolate(pred, size=img.shape[-2:], mode='bilinear', align_corners=True)
        return pred.argmax(dim=1)

preds = []
with torch.no_grad():
    for img, depth in tqdm(test_loader, desc="TTA Prediction"):
        img, depth = img.to(config.device), depth.to(config.device)
        pred = predict_tta(model, img, depth)
        preds.append(pred.cpu())

np.save("submission.npy", torch.cat(preds).numpy())
print("submission.npy saved")

In [None]:
# ------------------
#    Evaluation with TTA
# ------------------
import os
import time
import torch
import torch.nn.functional as F
import numpy as np
from zipfile import ZipFile, ZIP_DEFLATED
from tqdm import tqdm

# ✅ 保存ファイル名を一貫させる
timestamp = time.strftime('%Y%m%d_%H%M%S')
model_path = f"model_{timestamp}.pt"
submission_npy_path = "submission.npy"
submission_zip_path = os.path.join("/content", "submission.zip")
notebook_path = "/content/DL_Basic_2025_Competition_NYUv2_baseline.ipynb"  # 必要に応じて変更

# ✅ モデル保存
torch.save(model.state_dict(), model_path)
print(f"✅ Saved: {model_path}")

# ✅ モデル読み込み
device = config.device
model.load_state_dict(torch.load(model_path, map_location=device))
model.to(device)
model.eval()

# ✅ TTA予測関数
def predict_tta(model, image, depth, device):
    model.eval()
    with torch.no_grad():
        x = torch.cat((image, depth), dim=1)
        pred1 = model(x)
        x_flip = torch.flip(x, dims=[3])
        pred2 = model(x_flip)
        pred2 = torch.flip(pred2, dims=[3])
        pred = (pred1 + pred2) / 2.0
        pred = F.interpolate(pred, size=image.shape[-2:], mode='bilinear', align_corners=True)
        pred = pred.argmax(dim=1)
    return pred

# ✅ TTAで予測
predictions = []
with torch.no_grad():
    print("Generating predictions with TTA...")
    for image, depth in tqdm(test_loader):
        image, depth = image.to(device), depth.to(device)
        pred = predict_tta(model, image, depth, device)
        predictions.append(pred.cpu())

# ✅ numpy に変換して保存
predictions = torch.cat(predictions, dim=0)
np.save(submission_npy_path, predictions.numpy())
print(f"✅ Predictions saved to: {submission_npy_path}")

# ✅ ZIP 提出ファイル作成
with ZipFile(submission_zip_path, mode="w", compression=ZIP_DEFLATED, compresslevel=9) as zf:
    zf.write(submission_npy_path, arcname="submission.npy")
    zf.write(model_path, arcname=os.path.basename(model_path))
    zf.write(notebook_path, arcname=os.path.basename(notebook_path))

print(f"✅ submission.zip saved to: {submission_zip_path}")


In [None]:
# ------------------
#    ZIP 提出ファイル作成（ローカル保存用）
# ------------------
from zipfile import ZipFile, ZIP_DEFLATED
import shutil
import os

output_dir = "/content"
os.makedirs(output_dir, exist_ok=True)

submission_zip_path = os.path.join(output_dir, "submission.zip")
notebook_path = "/content/DL_Basic_2025_Competition_NYUv2_baseline.ipynb"  # 必要に応じて変更
submission_npy_path = "submission.npy"  # 作業ディレクトリにある想定

with ZipFile(submission_zip_path, mode="w", compression=ZIP_DEFLATED, compresslevel=9) as zf:
    zf.write(submission_npy_path, arcname="submission.npy")
    zf.write(model_path, arcname=os.path.basename(model_path))
    zf.write(notebook_path, arcname=os.path.basename(notebook_path))

print(f"✅ submission.zip saved to: {submission_zip_path}")

In [None]:
from google.colab import files
files.download("/content/submission.zip")

*ConvNeXtEncoderUNet Swin Transformer UNet**

In [None]:
# ================================
#     Omnicampus向け：全コード + Scheduler & Fix CE Error（EarlyStopping 除去）
# ================================

# 必要ライブラリ
import os
import time
import numpy as np
from tqdm import tqdm
from PIL import Image
from dataclasses import dataclass
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.utils.data import DataLoader, Dataset
from torchvision.models import resnet152
from torch.amp import autocast, GradScaler
import zipfile
import shutil
import albumentations as A
from albumentations.pytorch import ToTensorV2
import timm

# ------------------
#    データの展開
# ------------------
zip_path = "/content/data.zip"
extract_dir = "/content/unzipped"
target_data_dir = "/content/data"

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_dir)

found = False
for root, dirs, files in os.walk(extract_dir):
    if 'train' in dirs and 'test' in dirs:
        train_path = os.path.join(root, 'train')
        test_path = os.path.join(root, 'test')
        os.makedirs(target_data_dir, exist_ok=True)
        try:
            shutil.move(train_path, os.path.join(target_data_dir, 'train'))
            shutil.move(test_path, os.path.join(target_data_dir, 'test'))
        except shutil.Error:
            print(f"⚠️ 既に {target_data_dir} に train/test が存在しています。移動をスキップします。")
        else:
            print(f"✔️ train/test を {target_data_dir} に移動しました")
        found = True
        break

if not found:
    print("⚠️ train/test フォルダが見つかりませんでした。zip の中身を再確認してください。")

shutil.rmtree(extract_dir)

# ------------------
#  Albumentations Transform
# ------------------
class AlbumentationsTransform:
    def __init__(self, height, width, is_train=True):
        if is_train:
            self.transform = A.Compose([
                A.Resize(height, width),
                A.HorizontalFlip(p=0.5),
                A.RandomBrightnessContrast(p=0.2),
                A.Rotate(limit=10, p=0.5),
                A.Normalize(),
                ToTensorV2()
            ], additional_targets={'depth': 'image', 'label': 'mask'})
        else:
            self.transform = A.Compose([
                A.Resize(height, width),
                A.Normalize(),
                ToTensorV2()
            ], additional_targets={'depth': 'image', 'label': 'mask'})

    def __call__(self, image, depth, label=None):
        data = {"image": np.array(image), "depth": np.array(depth)}
        if label is not None:
            data["label"] = np.array(label)
            aug = self.transform(**data)
            aug["label"] = aug["label"].long()
            return aug["image"], aug["depth"], aug["label"]
        else:
            aug = self.transform(**data)
            return aug["image"], aug["depth"]

# ------------------
#    Dataset Class
# ------------------
class NYUv2(Dataset):
    def __init__(self, root, split='train', transform=None):
        self.root = root
        self.split = split
        self.transform = transform
        self.img_dir = os.path.join(root, split, 'image')
        self.depth_dir = os.path.join(root, split, 'depth')
        self.label_dir = os.path.join(root, split, 'label')
        self.img_names = sorted(os.listdir(self.img_dir))

    def __getitem__(self, idx):
        image = Image.open(os.path.join(self.img_dir, self.img_names[idx])).convert("RGB")
        depth = Image.open(os.path.join(self.depth_dir, self.img_names[idx]))

        if self.split == 'train':
            label = Image.open(os.path.join(self.label_dir, self.img_names[idx]))
            image, depth, label = self.transform(image, depth, label)
            return image, depth, label
        else:
            image, depth = self.transform(image, depth)
            return image, depth

    def __len__(self):
        return len(self.img_names)

# --- Input Adapter（共通）---
class InputAdapter(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.conv(x)

# --- ConvNeXtBaseEncoderUNet ---
class ConvNeXtBaseEncoderUNet(nn.Module):
    def __init__(self, num_classes, in_channels):
        super().__init__()
        self.input_adapter = InputAdapter(in_channels, 3)
        self.encoder = timm.create_model(
            'convnext_base',
            features_only=True,
            pretrained=True
        )
        self.decoder = nn.Sequential(
            nn.Conv2d(self.encoder.feature_info[-1]['num_chs'], 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, num_classes, kernel_size=1)
        )

    def forward(self, x):
        x = self.input_adapter(x)         # 4ch → 3ch変換
        features = self.encoder(x)       # features[-1] = 最終特徴マップ
        return self.decoder(features[-1])


# --- ConvNeXt UNet ---
class ConvNeXtEncoderUNet(nn.Module):
    def __init__(self, num_classes, in_channels):
        super().__init__()
        self.input_adapter = InputAdapter(in_channels, 3)
        self.encoder = timm.create_model('convnext_tiny', features_only=True, pretrained=True)
        self.decoder = nn.Sequential(
            nn.Conv2d(self.encoder.feature_info[-1]['num_chs'], 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, num_classes, kernel_size=1)
        )

    def forward(self, x):
        x = self.input_adapter(x)
        features = self.encoder(x)
        return self.decoder(features[-1])

# --- Swin Transformer UNet ---
class SwinEncoderUNet(nn.Module):
    def __init__(self, num_classes, in_channels):
        super().__init__()
        self.input_adapter = InputAdapter(in_channels, 3)
        self.encoder = timm.create_model('swin_tiny_patch4_window7_224', features_only=True, pretrained=True)
        self.decoder = nn.Sequential(
            nn.Conv2d(self.encoder.feature_info[-1]['num_chs'], 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, num_classes, kernel_size=1)
        )

    def forward(self, x):
        x = self.input_adapter(x)
        features = self.encoder(x)   # list of features
        x = features[-1]             # x.shape: [B, C, H, W] ← 通常はこれでOK
        if x.ndim == 4 and x.shape[1] != self.encoder.feature_info[-1]['num_chs']:
            # 転置が必要な場合（例：出力が [B, H, W, C] の場合）
            x = x.permute(0, 3, 1, 2)  # [B, C, H, W]
        return self.decoder(x)




In [None]:
# ------------------
#    Config + Loader
# ------------------
@dataclass
class TrainingConfig:
    dataset_root: str = "/content/data"
    batch_size: int = 8
    num_workers: int = 0
    in_channels: int = 4
    num_classes: int = 13
    epochs: int = 100
    learning_rate: float = 1e-4
    weight_decay: float = 1e-4
    image_size: tuple = (512, 256) #(224, 224)
    device: str = "cuda" if torch.cuda.is_available() else "cpu"

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed()
config = TrainingConfig()

train_transform = AlbumentationsTransform(config.image_size[1], config.image_size[0], is_train=True)
test_transform = AlbumentationsTransform(config.image_size[1], config.image_size[0], is_train=False)

train_dataset = NYUv2(config.dataset_root, 'train', transform=train_transform)
test_dataset = NYUv2(config.dataset_root, 'test', transform=test_transform)
train_loader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True, num_workers=config.num_workers)
test_loader = DataLoader(test_dataset, batch_size=1)

# ------------------
#    Model, Loss, Train
# ------------------

# 使用するエンコーダUNetを切り替える（必要に応じてコメントアウト/切り替え）
# model = ResNet152EncoderUNet(config.num_classes, config.in_channels).to(config.device)
model = ConvNeXtBaseEncoderUNet(config.num_classes, config.in_channels).to(config.device)
# model = ConvNeXtEncoderUNet(config.num_classes, config.in_channels).to(config.device)
# model = SwinEncoderUNet(config.num_classes, config.in_channels).to(config.device)

optimizer = optim.Adam(model.parameters(), lr=config.learning_rate, weight_decay=config.weight_decay)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=config.epochs)
scaler = GradScaler(enabled=torch.cuda.is_available())

class DiceLoss(nn.Module):
    def __init__(self, smooth=1.):
        super().__init__()
        self.smooth = smooth

    def forward(self, preds, targets):
        num_classes = preds.shape[1]
        preds = torch.softmax(preds, dim=1)
        mask = targets != 255
        targets_cleaned = targets.clone()
        targets_cleaned[~mask] = 0  # in-place操作を避けた

        one_hot = F.one_hot(targets_cleaned, num_classes).permute(0, 3, 1, 2).float()
        mask = mask.unsqueeze(1).float()
        preds, one_hot = preds * mask, one_hot * mask
        intersection = (preds * one_hot).sum(dim=(2, 3))
        union = preds.sum(dim=(2, 3)) + one_hot.sum(dim=(2, 3))
        return 1 - ((2 * intersection + self.smooth) / (union + self.smooth)).mean()


def combined_loss(pred, target):
    ce = F.cross_entropy(pred, target, ignore_index=255)
    dice = DiceLoss()(pred, target)
    return 0.5 * ce + 0.5 * dice

model.train()
for epoch in range(config.epochs):
    total_loss = 0
    for img, depth, label in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
        img, depth, label = img.to(config.device), depth.to(config.device), label.to(config.device)
        optimizer.zero_grad()
        with autocast(device_type=config.device, enabled=torch.cuda.is_available()):
            x = torch.cat((img, depth), dim=1)
            pred = model(x)
            if pred.shape[-2:] != label.shape[-2:]:
                pred = F.interpolate(pred, size=label.shape[-2:], mode='bilinear', align_corners=True)
            loss = combined_loss(pred, label)
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        total_loss += loss.item()
    scheduler.step()
    avg_loss = total_loss / len(train_loader)
    print(f"Loss: {avg_loss:.4f}")

# ------------------
#    Save + Predict
# ------------------
save_path = f"model_{time.strftime('%Y%m%d_%H%M%S')}.pt"
torch.save(model.state_dict(), save_path)
print("Saved:", save_path)

model.eval()
def predict_tta(model, img, depth):
    with torch.no_grad():
        x = torch.cat((img, depth), dim=1)
        pred1 = model(x)
        pred2 = model(torch.flip(x, dims=[3]))
        pred2 = torch.flip(pred2, dims=[3])
        pred = (pred1 + pred2) / 2
        pred = F.interpolate(pred, size=img.shape[-2:], mode='bilinear', align_corners=True)
        return pred.argmax(dim=1)

preds = []
with torch.no_grad():
    for img, depth in tqdm(test_loader, desc="TTA Prediction"):
        img, depth = img.to(config.device), depth.to(config.device)
        pred = predict_tta(model, img, depth)
        preds.append(pred.cpu())

np.save("submission.npy", torch.cat(preds).numpy())
print("submission.npy saved")


In [None]:
# ------------------
#    Evaluation with TTA + ZIP submission
# ------------------
import os
import time
import torch
import torch.nn.functional as F
import numpy as np
from zipfile import ZipFile, ZIP_DEFLATED
from tqdm import tqdm

# ✅ 保存ファイル名とパスの設定
timestamp = time.strftime('%Y%m%d_%H%M%S')
model_path = f"model_{timestamp}.pt"
submission_npy_path = "submission.npy"
submission_zip_path = os.path.join("/content", "submission.zip")
notebook_path = "/content/DL_Basic_2025_Competition_NYUv2_baseline.ipynb"  # 任意に変更

# ✅ モデル保存
torch.save(model.state_dict(), model_path)
print(f"✅ Saved model to: {model_path}")

# ✅ モデル読み込み
device = config.device
model.load_state_dict(torch.load(model_path, map_location=device))
model.to(device)
model.eval()

# ✅ TTA予測関数（HFlip, VFlip, 90度回転）
def predict_tta(model, img, depth):
    with torch.no_grad():
        x = torch.cat((img, depth), dim=1)

        # 各方向で予測
        pred1 = model(x)  # Original

        x_h = torch.flip(x, dims=[3])  # HFlip
        pred2 = torch.flip(model(x_h), dims=[3])

        x_v = torch.flip(x, dims=[2])  # VFlip
        pred3 = torch.flip(model(x_v), dims=[2])

        x_r = x.rot90(1, dims=[2, 3])  # 90度回転
        pred4 = model(x_r).rot90(-1, dims=[2, 3])

        # 平均
        pred = (pred1 + pred2 + pred3 + pred4) / 4.0
        pred = F.interpolate(pred, size=img.shape[-2:], mode='bilinear', align_corners=True)

        return pred.argmax(dim=1)

# ✅ 推論ループ（TTA）
all_preds = []
with torch.no_grad():
    for img, depth in tqdm(test_loader, desc="TTA Prediction"):
        img, depth = img.to(device), depth.to(device)
        pred = predict_tta(model, img, depth)
        all_preds.append(pred.cpu())

# ✅ submission.npy に保存
all_preds_tensor = torch.cat(all_preds, dim=0)
np.save(submission_npy_path, all_preds_tensor.numpy())
print(f"✅ Predictions saved to: {submission_npy_path}")

# ✅ ZIPファイル作成
with ZipFile(submission_zip_path, mode="w", compression=ZIP_DEFLATED, compresslevel=9) as zf:
    zf.write(submission_npy_path, arcname="submission.npy")
    zf.write(model_path, arcname=os.path.basename(model_path))
    if os.path.exists(notebook_path):
        zf.write(notebook_path, arcname=os.path.basename(notebook_path))

print(f"✅ submission.zip saved to: {submission_zip_path}")


In [None]:
# ------------------
#    ZIP 提出ファイル作成（ローカル保存用）
# ------------------
from zipfile import ZipFile, ZIP_DEFLATED
import shutil
import os

# --- パス設定 ---
output_dir = "/content"
submission_zip_path = os.path.join(output_dir, "submission.zip")
submission_npy_path = os.path.join(output_dir, "submission.npy")  # 明示的にパスを記述
notebook_path = os.path.join(output_dir, "DL_Basic_2025_Competition_NYUv2_baseline.ipynb")  # 必要に応じて変更
model_path = model_path if 'model_path' in locals() else "model_latest.pt"  # fallback処理

# --- 出力ディレクトリ確認 ---
os.makedirs(output_dir, exist_ok=True)

# --- ZIP 作成 ---
with ZipFile(submission_zip_path, mode="w", compression=ZIP_DEFLATED, compresslevel=9) as zf:
    if os.path.exists(submission_npy_path):
        zf.write(submission_npy_path, arcname="submission.npy")
        print(f"✅ submission.npy を追加")
    else:
        print("❌ submission.npy が存在しません")

    if os.path.exists(model_path):
        zf.write(model_path, arcname=os.path.basename(model_path))
        print(f"✅ モデル重み {model_path} を追加")
    else:
        print("❌ モデルファイルが見つかりません")

    if os.path.exists(notebook_path):
        zf.write(notebook_path, arcname=os.path.basename(notebook_path))
        print(f"✅ ノートブック {notebook_path} を追加")
    else:
        print("⚠️ ノートブックファイルが見つかりません（任意）")

print(f"✅ 完成: ZIPファイルを保存しました → {submission_zip_path}")
