#  YOLOv8n + CBAM for Pothole Detection (Full Notebook)

End-to-end pipeline with:
- Custom **CBAM** attention integrated into YOLOv8n (C2f + SPPF)
- Train/Val on your pothole dataset
- Curves, metrics, t‑SNE (optional), sample inference, ONNX/TorchScript export

**Paths assume Kaggle dataset layout**:
`/kaggle/input/pothole-dataset-6knew/{train,valid,test}/{images,labels}`


In [None]:

!pip install -q ultralytics==8.3.0
!pip install -q torch torchvision

import os, sys, glob, math, json, random, shutil, time, pathlib
from pathlib import Path
print('Python:', sys.version)


In [None]:
import torch, torch.nn as nn
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
import pandas as pd
from ultralytics import YOLO
print('Torch:', torch.__version__)
try:
    import torchvision
    print('TorchVision:', torchvision.__version__)
except Exception as e:
    print('TorchVision not found:', e)
import ultralytics
print('Ultralytics:', ultralytics.__version__)


## 2) Define CBAM + Custom Blocks
We build `CBAM`, `C2fCBAM`, and `SPPF_CBAM` as drop-in modules.


In [None]:
import torch
import torch.nn as nn
from ultralytics.nn.modules import Conv, Bottleneck
try:
    from ultralytics.nn.modules.block import SPPF
except Exception:
    from ultralytics.nn.modules import SPPF

class ChannelAttention(nn.Module):
    def __init__(self, in_channels, reduction=16):
        super().__init__()
        mid = max(1, in_channels // reduction)
        self.avg = nn.AdaptiveAvgPool2d(1)
        self.max = nn.AdaptiveMaxPool2d(1)
        self.fc1 = nn.Conv2d(in_channels, mid, 1, bias=False)
        self.fc2 = nn.Conv2d(mid, in_channels, 1, bias=False)
    def forward(self, x):
        a = self.fc2(torch.relu(self.fc1(self.avg(x))))
        m = self.fc2(torch.relu(self.fc1(self.max(x))))
        s = torch.sigmoid(a + m)
        return x * s

class SpatialAttention(nn.Module):
    def __init__(self, k=7):
        super().__init__()
        self.conv = nn.Conv2d(2, 1, kernel_size=k, padding=k//2, bias=False)
    def forward(self, x):
        avg = torch.mean(x, 1, keepdim=True)
        mx, _ = torch.max(x, 1, keepdim=True)
        s = torch.sigmoid(self.conv(torch.cat([avg, mx], 1)))
        return x * s

class CBAM(nn.Module):
    def __init__(self, in_channels, reduction=16):
        super().__init__()
        self.ca = ChannelAttention(in_channels, reduction)
        self.sa = SpatialAttention(7)
    def forward(self, x):
        return self.sa(self.ca(x))

class C2fCBAM(nn.Module):
    """C2f-like block with CBAM applied to the output."""
    def __init__(self, c1, c2, n=1, shortcut=False, g=1, e=0.5, reduction=16):
        super().__init__()
        c_ = int(c2 * e)
        self.cv1 = Conv(c1, c_, 1, 1)
        self.cv2 = Conv(c1, c_, 1, 1)
        self.m = nn.ModuleList([Bottleneck(c_, c_, shortcut, g, k=(3, 3)) for _ in range(n)])
        self.cv3 = Conv(2 * c_, c2, 1, 1)
        self.cbam = CBAM(c2, reduction)
    def forward(self, x):
        y1 = self.cv1(x)
        y2 = self.cv2(x)
        for m in self.m:
            y2 = m(y2)
        y = self.cv3(torch.cat((y1, y2), 1))
        return self.cbam(y)

class SPPF_CBAM(nn.Module):
    def __init__(self, c1, c2, k=5, reduction=16):
        super().__init__()
        self.sppf = SPPF(c1, c2, k)
        self.cbam = CBAM(c2, reduction)
    def forward(self, x):
        return self.cbam(self.sppf(x))

print('CBAM modules declared')


## 3) Write Model YAML with CBAM blocks
This swaps deeper C2f and SPPF with CBAM variants.


In [None]:
os.makedirs('models', exist_ok=True)
yaml_content = '''
nc: 1
names: ['Pothole']
depth_multiple: 0.33
width_multiple: 0.25

backbone:
  - [-1, 1, Conv, [64, 3, 2]]
  - [-1, 1, Conv, [128, 3, 2]]
  - [-1, 3, C2fCBAM, [128, True]]
  - [-1, 1, Conv, [256, 3, 2]]
  - [-1, 6, C2fCBAM, [256, True]]
  - [-1, 1, Conv, [512, 3, 2]]
  - [-1, 6, C2fCBAM, [512, True]]
  - [-1, 1, SPPF_CBAM, [512, 5]]

neck:
  - [-1, 1, Conv, [256, 1, 1]]
  - [[-1, 6], 1, nn.Upsample, [None, 2, 'nearest']]
  - [[-1, 4], 1, Concat, [1]]
  - [-1, 3, C2fCBAM, [256]]

  - [-1, 1, Conv, [128, 1, 1]]
  - [[-1, 2], 1, nn.Upsample, [None, 2, 'nearest']]
  - [[-1, 2], 1, Concat, [1]]
  - [-1, 3, C2fCBAM, [128]]

  - [-1, 1, Conv, [128, 3, 2]]
  - [[-1, 11], 1, Concat, [1]]
  - [-1, 3, C2fCBAM, [256]]

  - [-1, 1, Conv, [256, 3, 2]]
  - [[-1, 7], 1, Concat, [1]]
  - [-1, 3, C2fCBAM, [512]]

head:
  - [[15, 18, 21], 1, Detect, [nc]]
'''
with open('models/yolov8n_cbam.yaml', 'w') as f:
    f.write(yaml_content)
print('Wrote models/yolov8n_cbam.yaml')


## 4) Dataset YAML & quick checks
Confirm dataset folders exist and show a small summary.


In [None]:
DATA_YAML = 'pothole_dataset.yaml'
dataset_yaml = '''# auto-generated
train: /kaggle/input/pothole-dataset-6knew/train
val:   /kaggle/input/pothole-dataset-6knew/valid
test:  /kaggle/input/pothole-dataset-6knew/test
nc: 1
names: ['Pothole']
'''
open(DATA_YAML, 'w').write(dataset_yaml)
print('Wrote', DATA_YAML)

for split in ['train','valid','test']:
    img_dir = f'/kaggle/input/pothole-dataset-6knew/{split}/images'
    lbl_dir = f'/kaggle/input/pothole-dataset-6knew/{split}/labels'
    exists = os.path.isdir(img_dir)
    cnt = len(glob.glob(os.path.join(img_dir, '*.jpg'))) + len(glob.glob(os.path.join(img_dir, '*.png')))
    print(f'{split}: images={cnt} dir_exists={exists}')


## 5) Build model & warm-start from `yolov8n.pt`
We load our YAML model and then partially load official YOLOv8n weights for faster convergence.


In [None]:
import cbam_modules 
model = YOLO('models/yolov8n_cbam.yaml')
try:
    model.load('yolov8n.pt')
    print('Base weights loaded')
except Exception as e:
    print('Could not load base weights:', e)


## 6) Train
Training knobs are tuned for small single-class datasets.


In [None]:
train_args = dict(
    data=DATA_YAML,
    epochs=350,
    imgsz=640,
    batch=16,
    device=0,
    lr0=0.005,
    lrf=0.1,
    momentum=0.937,
    weight_decay=5e-4,
    warmup_epochs=3.0,
    cos_lr=True,
    patience=80,
    optimizer='SGD',
    amp=True,
    ema=True,
    augment=True,
)
print('Training args:', train_args)
results = model.train(**train_args)
print('training done')


## 7) Validate & Plot Curves
Generate metrics plots from `results.csv` in the latest run.


In [None]:
val = model.val(split='val', plots=True, save_json=True)
print('val:', val)

runs_root = 'runs/detect'
latest = None
if os.path.isdir(runs_root):
    subdirs = [os.path.join(runs_root, d) for d in os.listdir(runs_root) if d.startswith('train')]
    if subdirs:
        latest = max(subdirs, key=os.path.getmtime)
print('latest run:', latest)

if latest:
    csv_path = os.path.join(latest, 'results.csv')
    if os.path.exists(csv_path):
        df = pd.read_csv(csv_path)
        epochs = np.arange(len(df))

        plt.figure(figsize=(9,5))
        for k in ['train/box_loss','val/box_loss','train/cls_loss','val/cls_loss','val/dfl_loss']:
            if k in df.columns:
                plt.plot(epochs, df[k], label=k)
        plt.xlabel('Epoch'); plt.ylabel('Loss'); plt.title('Loss curves'); plt.legend(); plt.tight_layout()
        plt.savefig(os.path.join(latest, 'loss_curves_cbam.png'))
        plt.show()

        plt.figure(figsize=(9,5))
        metric_cols = [
            'metrics/precision(B)', 'metrics/recall(B)',
            'metrics/mAP50(B)', 'metrics/mAP50-95(B)',
            'metrics/mAP_0.5(B)', 'metrics/mAP_0.5:0.95(B)'
        ]
        plotted = False
        for col in metric_cols:
            if col in df.columns:
                plt.plot(epochs, df[col], label=col); plotted=True
        if plotted:
            plt.xlabel('Epoch'); plt.ylabel('Metric'); plt.title('Validation metrics over epochs'); plt.legend(); plt.tight_layout()
            plt.savefig(os.path.join(latest, 'val_metrics_cbam.png'))
            plt.show()
        else:
            print('Metric columns not found in results.csv; skipping metric plot')

        df.iloc[-1].to_csv(os.path.join(latest, 'final_metrics_cbam.csv'))
        print('Saved final metrics to final_metrics_cbam.csv')
    else:
        print('results.csv not found in run dir')
else:
    print('No train run dir found; skip plotting')


## 8) Sample Inference & Visualization
Run inference on a few validation images and render predictions.


In [None]:
sample_imgs = sorted(glob.glob('/kaggle/input/pothole-dataset-6knew/valid/images/*.jpg'))[:8]
if not sample_imgs:
    sample_imgs = sorted(glob.glob('/kaggle/input/pothole-dataset-6knew/valid/images/*.png'))[:8]
print('sample count:', len(sample_imgs))

if len(sample_imgs):
    preds = model.predict(sample_imgs, save=True, imgsz=640, conf=0.25)
    print('Saved predicted images under the latest runs folder.')
else:
    print('No sample images found to run inference.')


## 9) Export Models (ONNX / TorchScript)


In [None]:
try:
    model.export(format='onnx', opset=12, simplify=True)
    print('✅ Exported ONNX')
except Exception as e:
    print('ONNX export issue:', e)
try:
    model.export(format='torchscript')
    print('✅ Exported TorchScript')
except Exception as e:
    print('TorchScript export issue:', e)


## 10) (Optional) t‑SNE of SPPF features
Useful for sanity‑checking clustering of features; may be slow. You can skip if not needed.


In [None]:
try:
    from sklearn.manifold import TSNE
    from torchvision import transforms
    from ultralytics.nn.modules.block import SPPF
    feats = []
    sppf_layer = None
    for m in model.model.modules():
        if isinstance(m, SPPF):
            sppf_layer = m
            break
    if sppf_layer is None:
        raise RuntimeError('SPPF not found')
    def hook_fn(_, __, out):
        feats.append(out.detach().flatten(1).cpu())
    h = sppf_layer.register_forward_hook(hook_fn)
    tfm = transforms.Compose([transforms.Resize((640,640)), transforms.ToTensor()])
    paths = sorted(glob.glob('/kaggle/input/pothole-dataset-6knew/valid/images/*.jpg'))[:150]
    if not paths:
        paths = sorted(glob.glob('/kaggle/input/pothole-dataset-6knew/valid/images/*.png'))[:150]
    for p in paths:
        img = Image.open(p).convert('RGB')
        t = tfm(img).unsqueeze(0)
        with torch.no_grad():
            _ = model(t)
    h.remove()
    if len(feats) > 1:
        X = torch.cat(feats, 0).numpy()
        tsne = TSNE(n_components=2, random_state=0, perplexity=30)
        X2 = tsne.fit_transform(X)
        plt.figure(figsize=(7,6))
        plt.scatter(X2[:,0], X2[:,1], s=10, alpha=0.6)
        plt.title('t-SNE of SPPF features (val subset)')
        plt.tight_layout()
        out_path = os.path.join(latest if latest else '.', 'tsne_sppf.png')
        plt.savefig(out_path)
        plt.show()
        print('Saved t-SNE image at', out_path)
    else:
        print('Not enough features for t-SNE')
except Exception as e:
    print('t-SNE skipped:', e)
