In [None]:
!pip install torch==2.0.1 torchvision==0.15.2 --index-url https://download.pytorch.org/whl/cu118
!pip install numpy==1.26.4
!pip install ultralytics

In [None]:
# ───────────────────────────────
# 0. 掛載與資料準備
# ───────────────────────────────
import os, random, shutil
import numpy as np
import pandas as pd
from PIL import Image
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import CosineAnnealingLR
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms
from torchvision.models import efficientnet_v2_s, EfficientNet_V2_S_Weights

from sklearn.metrics import f1_score, classification_report
from ultralytics import YOLO

from google.colab import drive
drive.mount('/content/drive', force_remount=True)

# 路徑設定
base_drive = '/content/drive/MyDrive/Competition2/Competition/'
path_dataset = '/content/dataset_beam/beam_damage/'
test_folder = '/content/dataset_beam/test/'
src_dir = '/content/drive/MyDrive/Competition2/Competition/dataset_beam/beam_crack'
dst_dir = '/content/dataset_beam/beam_crack'
cls_ckpt = '/content/best_multitask_model.pth'
yolo_ckpt = '/content/runs/detect/yolov8_beam_crack_aug/weights/best.pt'
output_csv = '/content/beam_submission.csv'

os.makedirs('/content/dataset_beam', exist_ok=True)
if not os.path.exists(path_dataset):
    shutil.copytree(base_drive + 'dataset_beam/beam_damage/', path_dataset)
if not os.path.exists(test_folder):
    shutil.copytree(base_drive + 'dataset_beam/test/', test_folder)
if not os.path.exists(dst_dir):
    shutil.copytree(src_dir, dst_dir)


In [None]:
import os
from collections import Counter

# === 設定你的標註檔路徑 ===
label_folder = '/content/dataset_beam/beam_crack/train/labels'

# 用來統計每個 class_id 出現次數
class_counter = Counter()

# 逐一讀取每個 .txt 標註檔
for file_name in os.listdir(label_folder):
    if not file_name.endswith('.txt'):
        continue
    file_path = os.path.join(label_folder, file_name)
    with open(file_path, 'r') as f:
        for line in f:
            if line.strip():
                class_id = int(line.strip().split()[0])
                class_counter[class_id] += 1

# 輸出結果
print("📊 各類別（class_id）出現次數：")
for class_id, count in sorted(class_counter.items()):
    print(f"  class_id= {class_id} ：{count} 次")


In [None]:
# ───────────────────────────────────────────────
# ✅ 1. CNN 架構與訓練 (EfficientNetV2-S)
# ───────────────────────────────────────────────
CLASS2IDX = {'A': 0, 'B': 1, 'C': 2}
IDX2LABEL18 = {0: 18, 1: 19, 2: 20}
CRIT_BY_CLS_BEAM = {
    0: [0],
    1: [3, 4, 6, 8],
    2: [1]
}
N_CRIT = 11

tf_train = transforms.Compose([
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

tf_test = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

class BeamDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.paths, self.cls_labels = [], []
        self.transform = transform
        for cls_name in os.listdir(root_dir):
            if cls_name not in CLASS2IDX: continue
            cls_idx = CLASS2IDX[cls_name]
            cls_folder = os.path.join(root_dir, cls_name)
            for img_name in os.listdir(cls_folder):
                if not img_name.endswith('.jpg'): continue
                self.paths.append(os.path.join(cls_folder, img_name))
                self.cls_labels.append(cls_idx)
    def __len__(self): return len(self.paths)
    def __getitem__(self, i):
        img = Image.open(self.paths[i]).convert('RGB')
        if self.transform: img = self.transform(img)
        return img, self.cls_labels[i]

class MultiTaskNet(nn.Module):
    def __init__(self):
        super().__init__()
        base = efficientnet_v2_s(weights=EfficientNet_V2_S_Weights.DEFAULT)
        self.backbone = nn.Sequential(*list(base.features))
        self.pool = nn.AdaptiveAvgPool2d(1)
        in_ch = base.classifier[1].in_features
        self.cls_head = nn.Sequential(nn.Dropout(0.5), nn.Linear(in_ch, 3))
    def forward(self, x):
        x = self.pool(self.backbone(x))
        x = torch.flatten(x, 1)
        return self.cls_head(x)

def train_cnn():
    ds = BeamDataset(path_dataset, transform=tf_train)
    tr, va = random_split(ds, [int(0.8*len(ds)), len(ds)-int(0.8*len(ds))])
    tr_loader = DataLoader(tr, batch_size=16, shuffle=True)
    va_loader = DataLoader(va, batch_size=16)

    model = MultiTaskNet().cuda()
    loss_fn = nn.CrossEntropyLoss()
    opt = optim.Adam(model.parameters(), lr=3e-4)
    sch = CosineAnnealingLR(opt, T_max=20)

    for ep in range(20):
        model.train()
        for x, y in tr_loader:
            x, y = x.cuda(), y.cuda()
            opt.zero_grad()
            out = model(x)
            loss = loss_fn(out, y)
            loss.backward()
            opt.step()
        sch.step()
    torch.save(model.state_dict(), cls_ckpt)
    print('✓ CNN training finished')


In [None]:
# ───────────────────────────────────────────────
# ✅ 2. YOLOv8 訓練
# ───────────────────────────────────────────────
import yaml

data_yaml = {
    'path': '/content/dataset_beam/beam_crack',
    'train': 'train/images',
    'val': 'train/images',
    'nc': 4,
    'names': ['3_xv', '4_cdiag', '6_cvert', '8_chori']
}


yaml_path = '/content/beam_crack.yaml'
with open(yaml_path, 'w') as f:
    yaml.dump(data_yaml, f)

def train_yolo():
    !yolo task=detect mode=train model=yolov8s.pt \
        data={yaml_path} epochs=100 imgsz=640 batch=16 \
        name=yolov8_beam_crack_aug pretrained=True


In [None]:
# ───────────────────────────────────────────────
# ✅ 3. 推論：CNN + YOLO 整合後輸出 CSV
# ───────────────────────────────────────────────
def predict_beam():
    from ultralytics import YOLO
    import torch
    import pandas as pd
    import os
    from PIL import Image

    # === 模型載入 ===
    model_cnn = MultiTaskNet().cuda()
    model_cnn.load_state_dict(torch.load(cls_ckpt))
    model_cnn.eval()
    model_yolo = YOLO(yolo_ckpt)

    yolo_cls_to_crit = {0: 3, 1: 4, 2: 6, 3: 8}
    class_thresholds = {0: 0.10, 1: 0.05, 2: 0.10, 3: 0.10}
    criteria_class_map = {
        18: {0},
        19: {3, 4, 6, 8},
        20: {1}
    }

    submission = []

    for i in range(1, 51):
        img_path = os.path.join(test_folder, f"{i}.jpg")
        if not os.path.exists(img_path):
            print(f"❌ 缺少圖片：{img_path}")
            submission.append([i, "20,1"])
            continue

        img = Image.open(img_path).convert('RGB')
        x = tf_test(img).unsqueeze(0).cuda()
        with torch.no_grad():
            cls_idx = torch.argmax(model_cnn(x), dim=1).item()
        cls_label = IDX2LABEL18[cls_idx]

        if cls_label in [18, 20]:
            criteria = list(criteria_class_map[cls_label])
        else:
            preds = model_yolo(img_path, conf=0.001)[0]
            crit_conf = []
            for box in preds.boxes:
                yolo_cls = int(box.cls[0])
                conf = float(box.conf[0])
                if yolo_cls in yolo_cls_to_crit:
                    real_crit = yolo_cls_to_crit[yolo_cls]
                    threshold = class_thresholds[yolo_cls]
                    if conf >= threshold:
                        crit_conf.append((real_crit, conf))

            if 0 in [c for c, _ in crit_conf]:
                cls_label = 18
                criteria = [0]
            elif crit_conf:
                top_crit = sorted(crit_conf, key=lambda x: -x[1])[0][0]
                for cls_id, valid_crits in criteria_class_map.items():
                    if top_crit in valid_crits:
                        cls_label = cls_id
                        break
                criteria = sorted({c for c, conf in crit_conf if c in criteria_class_map[cls_label]})
            else:
                cls_label = 20
                criteria = [1]

        cls_str = f"{cls_label}," + ",".join(str(c) for c in criteria)
        submission.append([i, cls_str])

    df = pd.DataFrame(submission, columns=["ID", "class"])
    df.to_csv(output_csv, index=False)
    print(f"✅ 輸出成功：{output_csv}")



In [None]:
# ───────────────────────────────────────────────
# ✅ 執行流程（需放在 __main__）
# ───────────────────────────────────────────────
if __name__ == '__main__':
    train_cnn()
    train_yolo()
    predict_beam()
