efficientnet_fall_classification_baseline

In [None]:
import argparse
import os
import re
from pathlib import Path
from typing import List, Tuple

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from torchvision import models, transforms
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
import numpy as np
import random

PROJECT_ROOT = Path(__file__).resolve().parents[1]
IMAGES_ROOT = PROJECT_ROOT / "fall_dataset" / "images"
MODEL_PATH = PROJECT_ROOT / "saved_model" / "efficientnet_baseline_fall_model.pth"

def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

def label_from_name(path: Path) -> int:
    """
    Return 1 for FALL, 0 for NOT-FALL based on filename.
    Handles names like 'fall123.jpg' vs 'not fallen001.jpg'.
    """
    name = path.stem.lower()
    norm = re.sub(r"[\W_]+", " ", name).strip()

    not_fall_patterns = [
        "not fall", "notfall", "not fallen", "notfallen", "no fall", "nofall",
        "standing", "walk", "walking", "sit", "sitting", "upright"
    ]
    if any(p in norm for p in not_fall_patterns):
        return 0

    if "fall" in norm or norm.startswith("fallen"):
        return 1

    return 0

class NameLabelDataset(Dataset):
    def __init__(self, directory: Path, transform=None):
        self.directory = Path(directory)
        self.transform = transform
        self.paths = self._gather_images(self.directory)

        if len(self.paths) == 0:
            raise RuntimeError(f"No images found in: {self.directory}")

        ys = [label_from_name(p) for p in self.paths]
        self.class_counts = {0: ys.count(0), 1: ys.count(1)}

    @staticmethod
    def _gather_images(d: Path) -> List[Path]:
        exts = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff", ".webp"}
        paths = [p for p in d.iterdir() if p.suffix.lower() in exts]
        for sub in [x for x in d.iterdir() if x.is_dir()]:
            paths.extend([p for p in sub.rglob("*") if p.suffix.lower() in exts])
        return sorted(paths)

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        p = self.paths[idx]
        y = label_from_name(p)
        img = Image.open(p).convert("RGB")
        if self.transform:
            img = self.transform(img)
        return img, y

def build_model(model_name="efficientnet_b0", num_classes: int = 2, freeze_backbone: bool = True):
    if model_name == "efficientnet_b0":
        model = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.IMAGENET1K_V1)
        in_features = model.classifier[1].in_features
        model.classifier[1] = nn.Linear(in_features, num_classes)

    elif model_name == "efficientnet_b3":
        model = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.IMAGENET1K_V1)
        in_features = model.classifier[1].in_features
        model.classifier[1] = nn.Linear(in_features, num_classes)

    else:
        raise ValueError(f"Unsupported model: {model_name}")

    if freeze_backbone:
        for p in model.parameters():
            p.requires_grad = False
        for p in model.classifier.parameters():
            p.requires_grad = True

    return model

def get_transforms():
    return transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        ),
    ])

def epoch_loop(model, loader, device, optimizer=None, criterion=None) -> Tuple[float, float, float, float, float, np.ndarray]:
    """
    If optimizer is None => evaluation mode.
    Returns: (loss, acc, precision, recall, f1, confusion_matrix)
    """
    is_train = optimizer is not None
    model.train(is_train)

    all_preds, all_labels = [], []
    running_loss = 0.0
    total = 0

    for imgs, labels in loader:
        imgs = imgs.to(device)
        labels = labels.to(device, dtype=torch.long)

        with torch.set_grad_enabled(is_train):
            logits = model(imgs)
            loss = criterion(logits, labels) if criterion else torch.tensor(0.0, device=device)
            if is_train:
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        running_loss += loss.item() * labels.size(0)
        preds = torch.argmax(logits, dim=1)
        all_preds.extend(preds.detach().cpu().numpy().tolist())
        all_labels.extend(labels.detach().cpu().numpy().tolist())
        total += labels.size(0)

    avg_loss = running_loss / max(total, 1)
    acc = accuracy_score(all_labels, all_preds)
    prec, rec, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average="binary", zero_division=0)
    cm = confusion_matrix(all_labels, all_preds, labels=[0, 1])
    return avg_loss, acc, prec, rec, f1, cm

def main(args):
    set_seed(args.seed)

    train_dir = IMAGES_ROOT / "train"
    val_dir = IMAGES_ROOT / "val"

    tfms = get_transforms()
    train_ds = NameLabelDataset(train_dir, transform=tfms)
    val_ds = NameLabelDataset(val_dir, transform=tfms)

    train_loader = DataLoader(train_ds, batch_size=args.batch_size, shuffle=True, num_workers=0)
    val_loader = DataLoader(val_ds, batch_size=args.batch_size, shuffle=False, num_workers=0)

    total_train = len(train_ds)
    w0 = total_train / (2.0 * max(train_ds.class_counts.get(0, 1), 1))
    w1 = total_train / (2.0 * max(train_ds.class_counts.get(1, 1), 1))
    class_weights = torch.tensor([w0, w1], dtype=torch.float32)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = build_model(args.model, num_classes=2, freeze_backbone=not args.unfreeze_backbone).to(device)
    criterion = nn.CrossEntropyLoss(weight=class_weights.to(device))
    optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=args.lr)

    print(f"Project root: {PROJECT_ROOT}")
    print(f"Train dir:    {train_dir} (count={len(train_ds)} | class_counts={train_ds.class_counts})")
    print(f"Val dir:      {val_dir}   (count={len(val_ds)} | class_counts={val_ds.class_counts})")
    print(f"Saving model to: {MODEL_PATH}")
    print(f"Device: {device}\n")

    best_val_f1 = -1.0
    for epoch in range(1, args.epochs + 1):
        tr_loss, tr_acc, tr_p, tr_r, tr_f1, tr_cm = epoch_loop(model, train_loader, device, optimizer, criterion)
        va_loss, va_acc, va_p, va_r, va_f1, va_cm = epoch_loop(model, val_loader, device)

        print(f"Epoch {epoch:02d}/{args.epochs} "
              f"| Train L {tr_loss:.4f} A {tr_acc:.3f} P {tr_p:.3f} R {tr_r:.3f} F1 {tr_f1:.3f} "
              f"| Val L {va_loss:.4f} A {va_acc:.3f} P {va_p:.3f} R {va_r:.3f} F1 {va_f1:.3f}")
        print(f"  Train CM:\n{tr_cm}")
        print(f"  Val   CM:\n{va_cm}\n")

        if va_f1 > best_val_f1:
            best_val_f1 = va_f1
            torch.save(model.state_dict(), MODEL_PATH)
            print(f"Saved best model")

    print("Done.")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--epochs", type=int, default=10)
    parser.add_argument("--batch-size", type=int, default=16)
    parser.add_argument("--lr", type=float, default=1e-4)
    parser.add_argument("--seed", type=int, default=42)
    parser.add_argument("--unfreeze-backbone", action="store_true",
                        help="Fine-tune the whole EfficientNet instead of just the classifier.")
    parser.add_argument("--model", type=str, default="efficientnet_b0",
                        choices=["efficientnet_b0", "efficientnet_b3"],
                        help="Which EfficientNet variant to use")
    args = parser.parse_args()
    main(args)


In [None]:
efficientnet_fall_classification

In [None]:
import argparse
import os
import re
from pathlib import Path
from typing import List, Tuple

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.swa_utils import AveragedModel, SWALR
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from torchvision import models, transforms
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
import numpy as np
import random

PROJECT_ROOT = Path(__file__).resolve().parents[1]
IMAGES_ROOT = PROJECT_ROOT / "fall_dataset" / "images"
MODEL_PATH = PROJECT_ROOT / "saved_model" / "efficientnet_fall_model.pth"

def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

def label_from_name(path: Path) -> int:
    name = path.stem.lower()
    norm = re.sub(r"[\W_]+", " ", name).strip()
    not_fall_patterns = [
        "not fall", "notfall", "not fallen", "notfallen", "no fall", "nofall",
        "standing", "walk", "walking", "sit", "sitting", "upright"
    ]
    if any(p in norm for p in not_fall_patterns):
        return 0
    if "fall" in norm or norm.startswith("fallen"):
        return 1
    return 0

class NameLabelDataset(Dataset):
    def __init__(self, directory: Path, transform=None):
        self.directory = Path(directory)
        self.transform = transform
        self.paths = self._gather_images(self.directory)
        if len(self.paths) == 0:
            raise RuntimeError(f"No images found in: {self.directory}")
        ys = [label_from_name(p) for p in self.paths]
        self.class_counts = {0: ys.count(0), 1: ys.count(1)}

    @staticmethod
    def _gather_images(d: Path) -> List[Path]:
        exts = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff", ".webp"}
        paths = [p for p in d.iterdir() if p.suffix.lower() in exts]
        for sub in [x for x in d.iterdir() if x.is_dir()]:
            paths.extend([p for p in sub.rglob("*") if p.suffix.lower() in exts])
        return sorted(paths)

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        p = self.paths[idx]
        y = label_from_name(p)
        img = Image.open(p).convert("RGB")
        if self.transform:
            img = self.transform(img)
        return img, y

def build_model(model_name="efficientnet_b0", num_classes: int = 2, freeze_backbone: bool = True):
    if model_name == "efficientnet_b0":
        model = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.IMAGENET1K_V1)
        in_features = model.classifier[1].in_features
        model.classifier[1] = nn.Linear(in_features, num_classes)
    elif model_name == "efficientnet_b3":
        model = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.IMAGENET1K_V1)
        in_features = model.classifier[1].in_features
        model.classifier[1] = nn.Linear(in_features, num_classes)
    else:
        raise ValueError(f"Unsupported model: {model_name}")

    if freeze_backbone:
        for p in model.parameters():
            p.requires_grad = False
        for p in model.classifier.parameters():
            p.requires_grad = True
    return model

def get_transforms():
    return transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        ),
    ])

class LabelSmoothingCrossEntropy(nn.Module):
    def __init__(self, smoothing=0.1):
        super(LabelSmoothingCrossEntropy, self).__init__()
        self.smoothing = smoothing

    def forward(self, pred, target):
        n_class = pred.size(1)
        log_preds = torch.log_softmax(pred, dim=1)
        with torch.no_grad():
            true_dist = torch.zeros_like(pred)
            true_dist.fill_(self.smoothing / (n_class - 1))
            true_dist.scatter_(1, target.data.unsqueeze(1), 1 - self.smoothing)
        return torch.mean(torch.sum(-true_dist * log_preds, dim=1))

def mixup_data(x, y, alpha=0.4):
    lam = np.random.beta(alpha, alpha)
    batch_size = x.size()[0]
    index = torch.randperm(batch_size).to(x.device)
    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

def mixup_criterion(criterion, pred, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

def epoch_loop(model, loader, device, optimizer=None, criterion=None, mixup=False) -> Tuple[float, float, float, float, float, np.ndarray]:
    is_train = optimizer is not None
    model.train(is_train)
    all_preds, all_labels = [], []
    running_loss, total = 0.0, 0

    for imgs, labels in loader:
        imgs, labels = imgs.to(device), labels.to(device, dtype=torch.long)

        if is_train and mixup:
            imgs, y_a, y_b, lam = mixup_data(imgs, labels)
            logits = model(imgs)
            loss = mixup_criterion(criterion, logits, y_a, y_b, lam)
        else:
            logits = model(imgs)
            loss = criterion(logits, labels) if criterion else torch.tensor(0.0, device=device)

        if is_train:
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        running_loss += loss.item() * labels.size(0)
        preds = torch.argmax(logits, dim=1)
        all_preds.extend(preds.detach().cpu().numpy().tolist())
        all_labels.extend(labels.detach().cpu().numpy().tolist())
        total += labels.size(0)

    avg_loss = running_loss / max(total, 1)
    acc = accuracy_score(all_labels, all_preds)
    prec, rec, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average="binary", zero_division=0)
    cm = confusion_matrix(all_labels, all_preds, labels=[0, 1])
    return avg_loss, acc, prec, rec, f1, cm

def main(args):
    set_seed(args.seed)
    train_dir, val_dir = IMAGES_ROOT / "train", IMAGES_ROOT / "val"
    tfms = get_transforms()
    train_ds, val_ds = NameLabelDataset(train_dir, tfms), NameLabelDataset(val_dir, tfms)

    train_loader = DataLoader(train_ds, batch_size=args.batch_size, shuffle=True, num_workers=0)
    val_loader = DataLoader(val_ds, batch_size=args.batch_size, shuffle=False, num_workers=0)

    total_train = len(train_ds)
    w0 = total_train / (2.0 * max(train_ds.class_counts.get(0, 1), 1))
    w1 = total_train / (2.0 * max(train_ds.class_counts.get(1, 1), 1))
    class_weights = torch.tensor([w0, w1], dtype=torch.float32)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = build_model(args.model, num_classes=2, freeze_backbone=not args.unfreeze_backbone).to(device)
    
    criterion = LabelSmoothingCrossEntropy(smoothing=0.1)  # Novel strategy 1
    optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=args.lr)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=args.epochs)  # Novel strategy 2
    
    swa_model = AveragedModel(model)
    swa_start = int(0.75 * args.epochs)
    swa_scheduler = SWALR(optimizer, swa_lr=1e-4)

    best_val_f1 = -1.0
    for epoch in range(1, args.epochs + 1):
        tr_loss, tr_acc, tr_p, tr_r, tr_f1, tr_cm = epoch_loop(model, train_loader, device, optimizer, criterion, mixup=True)  # Novel strategy 3
        va_loss, va_acc, va_p, va_r, va_f1, va_cm = epoch_loop(model, val_loader, device, criterion=criterion)

        scheduler.step()
        if epoch > swa_start:
            swa_model.update_parameters(model)
            swa_scheduler.step()

        print(f"Epoch {epoch:02d}/{args.epochs} "
              f"| Train L {tr_loss:.4f} A {tr_acc:.3f} P {tr_p:.3f} R {tr_r:.3f} F1 {tr_f1:.3f} "
              f"| Val L {va_loss:.4f} A {va_acc:.3f} P {va_p:.3f} R {va_r:.3f} F1 {va_f1:.3f}")
        print(f"  Train CM:\n{tr_cm}")
        print(f"  Val   CM:\n{va_cm}\n")

        if va_f1 > best_val_f1:
            best_val_f1 = va_f1
            torch.save(model.state_dict(), MODEL_PATH)
            print(f"Saved best model")

    print("Done.")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--epochs", type=int, default=10)
    parser.add_argument("--batch-size", type=int, default=16)
    parser.add_argument("--lr", type=float, default=1e-4)
    parser.add_argument("--seed", type=int, default=42)
    parser.add_argument("--unfreeze-backbone", action="store_true")
    parser.add_argument("--model", type=str, default="efficientnet_b0", choices=["efficientnet_b0", "efficientnet_b3"])
    args = parser.parse_args()
    main(args)


resnet_fall_classification_baseline:

import argparse
import os
import re
from pathlib import Path
from typing import List, Tuple

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from torchvision import models, transforms
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
import numpy as np
import random

PROJECT_ROOT = Path(__file__).resolve().parents[1]
IMAGES_ROOT = PROJECT_ROOT / "fall_dataset" / "images"
MODEL_PATH = PROJECT_ROOT / "saved_model" / "resnet_baseline_fall_model.pth"

def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

def label_from_name(path: Path) -> int:
    """
    Return 1 for FALL, 0 for NOT-FALL based on filename.
    Handles names like 'fall123.jpg' vs 'not fallen001.jpg'.
    """
    name = path.stem.lower()
    norm = re.sub(r"[\W_]+", " ", name).strip()

    not_fall_patterns = [
        "not fall", "notfall", "not fallen", "notfallen", "no fall", "nofall",
        "standing", "walk", "walking", "sit", "sitting", "upright"
    ]
    if any(p in norm for p in not_fall_patterns):
        return 0

    if "fall" in norm or norm.startswith("fallen"):
        return 1

    return 0

class NameLabelDataset(Dataset):
    def __init__(self, directory: Path, transform=None):
        self.directory = Path(directory)
        self.transform = transform
        self.paths = self._gather_images(self.directory)

        if len(self.paths) == 0:
            raise RuntimeError(f"No images found in: {self.directory}")

        ys = [label_from_name(p) for p in self.paths]
        self.class_counts = {0: ys.count(0), 1: ys.count(1)}

    @staticmethod
    def _gather_images(d: Path) -> List[Path]:
        exts = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff", ".webp"}
        paths = [p for p in d.iterdir() if p.suffix.lower() in exts]
        for sub in [x for x in d.iterdir() if x.is_dir()]:
            paths.extend([p for p in sub.rglob("*") if p.suffix.lower() in exts])
        return sorted(paths)

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        p = self.paths[idx]
        y = label_from_name(p)
        img = Image.open(p).convert("RGB")
        if self.transform:
            img = self.transform(img)
        return img, y

def build_model(num_classes: int = 2, freeze_backbone: bool = True):
    model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
    if freeze_backbone:
        for p in model.parameters():
            p.requires_grad = False
    in_features = model.fc.in_features
    model.fc = nn.Linear(in_features, num_classes)
    return model


def get_transforms():
    return transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        ),
    ])


def epoch_loop(model, loader, device, optimizer=None, criterion=None) -> Tuple[float, float, float, float, np.ndarray]:
    """
    If optimizer is None => evaluation mode.
    Returns: (loss, acc, precision, recall, f1, confusion_matrix)
    """
    is_train = optimizer is not None
    model.train(is_train)

    all_preds, all_labels = [], []
    running_loss = 0.0
    total = 0

    for imgs, labels in loader:
        imgs = imgs.to(device)
        labels = labels.to(device, dtype=torch.long)

        with torch.set_grad_enabled(is_train):
            logits = model(imgs)
            loss = criterion(logits, labels) if criterion else torch.tensor(0.0, device=device)
            if is_train:
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        running_loss += loss.item() * labels.size(0)
        preds = torch.argmax(logits, dim=1)
        all_preds.extend(preds.detach().cpu().numpy().tolist())
        all_labels.extend(labels.detach().cpu().numpy().tolist())
        total += labels.size(0)

    avg_loss = running_loss / max(total, 1)
    acc = accuracy_score(all_labels, all_preds)
    prec, rec, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average="binary", zero_division=0)
    cm = confusion_matrix(all_labels, all_preds, labels=[0, 1])
    return avg_loss, acc, prec, rec, f1, cm


def main(args):
    set_seed(args.seed)

    train_dir = IMAGES_ROOT / "train"
    val_dir = IMAGES_ROOT / "val"

    tfms = get_transforms()
    train_ds = NameLabelDataset(train_dir, transform=tfms)
    val_ds = NameLabelDataset(val_dir, transform=tfms)

    train_loader = DataLoader(train_ds, batch_size=args.batch_size, shuffle=True, num_workers=0)
    val_loader = DataLoader(val_ds, batch_size=args.batch_size, shuffle=False, num_workers=0)

    total_train = len(train_ds)
    w0 = total_train / (2.0 * max(train_ds.class_counts.get(0, 1), 1))
    w1 = total_train / (2.0 * max(train_ds.class_counts.get(1, 1), 1))
    class_weights = torch.tensor([w0, w1], dtype=torch.float32)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = build_model(num_classes=2, freeze_backbone=not args.unfreeze_backbone).to(device)
    criterion = nn.CrossEntropyLoss(weight=class_weights.to(device))
    optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=args.lr)

    print(f"Project root: {PROJECT_ROOT}")
    print(f"Train dir:    {train_dir} (count={len(train_ds)} | class_counts={train_ds.class_counts})")
    print(f"Val dir:      {val_dir}   (count={len(val_ds)} | class_counts={val_ds.class_counts})")
    print(f"Saving model to: {MODEL_PATH}")
    print(f"Device: {device}\n")

    best_val_f1 = -1.0
    for epoch in range(1, args.epochs + 1):
        tr_loss, tr_acc, tr_p, tr_r, tr_f1, tr_cm = epoch_loop(model, train_loader, device, optimizer, criterion)
        va_loss, va_acc, va_p, va_r, va_f1, va_cm = epoch_loop(model, val_loader, device)

        print(f"Epoch {epoch:02d}/{args.epochs} "
              f"| Train L {tr_loss:.4f} A {tr_acc:.3f} P {tr_p:.3f} R {tr_r:.3f} F1 {tr_f1:.3f} "
              f"| Val L {va_loss:.4f} A {va_acc:.3f} P {va_p:.3f} R {va_r:.3f} F1 {va_f1:.3f}")
        print(f"  Train CM (rows true [0,1], cols pred [0,1]):\n{tr_cm}")
        print(f"  Val   CM (rows true [0,1], cols pred [0,1]):\n{va_cm}\n")

        if va_f1 > best_val_f1:
            best_val_f1 = va_f1
            torch.save(model.state_dict(), MODEL_PATH)
            print(f"Saved best model (val F1={best_val_f1:.3f}) → {MODEL_PATH}\n")

    print("Done.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--epochs", type=int, default=10)
    parser.add_argument("--batch-size", type=int, default=16)
    parser.add_argument("--lr", type=float, default=1e-4)
    parser.add_argument("--seed", type=int, default=42)
    parser.add_argument("--unfreeze-backbone", action="store_true",
                        help="Fine-tune the whole ResNet18 instead of just the final layer.")
    args = parser.parse_args()
    main(args)


resnet_fall_classification

import argparse
import os
import re
from pathlib import Path
from typing import List, Tuple

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from torchvision import models, transforms
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
import numpy as np
import random
import torch.nn.functional as F

PROJECT_ROOT = Path(__file__).resolve().parents[1]
IMAGES_ROOT = PROJECT_ROOT / "fall_dataset" / "images"
MODEL_PATH = PROJECT_ROOT / "saved_model" / "resnet_fall_model.pth"

class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2, reduction="mean"):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        ce_loss = F.cross_entropy(inputs, targets, reduction="none")
        pt = torch.exp(-ce_loss)
        focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss
        if self.reduction == "mean":
            return focal_loss.mean()
        elif self.reduction == "sum":
            return focal_loss.sum()
        return focal_loss


def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)


def label_from_name(path: Path) -> int:
    name = path.stem.lower()
    norm = re.sub(r"[\W_]+", " ", name).strip()

    not_fall_patterns = [
        "not fall", "notfall", "not fallen", "notfallen", "no fall", "nofall",
        "standing", "walk", "walking", "sit", "sitting", "upright"
    ]
    if any(p in norm for p in not_fall_patterns):
        return 0

    if "fall" in norm or norm.startswith("fallen"):
        return 1

    return 0


class NameLabelDataset(Dataset):
    def __init__(self, directory: Path, transform=None):
        self.directory = Path(directory)
        self.transform = transform
        self.paths = self._gather_images(self.directory)

        if len(self.paths) == 0:
            raise RuntimeError(f"No images found in: {self.directory}")

        ys = [label_from_name(p) for p in self.paths]
        self.class_counts = {0: ys.count(0), 1: ys.count(1)}

    @staticmethod
    def _gather_images(d: Path) -> List[Path]:
        exts = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff", ".webp"}
        paths = [p for p in d.iterdir() if p.suffix.lower() in exts]
        for sub in [x for x in d.iterdir() if x.is_dir()]:
            paths.extend([p for p in sub.rglob("*") if p.suffix.lower() in exts])
        return sorted(paths)

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        p = self.paths[idx]
        y = label_from_name(p)
        img = Image.open(p).convert("RGB")
        if self.transform:
            img = self.transform(img)
        return img, y

def build_model(num_classes: int = 2, freeze_backbone: bool = True):
    model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
    if freeze_backbone:
        for p in model.parameters():
            p.requires_grad = False
    in_features = model.fc.in_features
    model.fc = nn.Sequential(
        nn.Dropout(0.5),
        nn.Linear(in_features, num_classes)
    )
    return model


def get_transforms():
    return transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        ),
    ])


def epoch_loop(model, loader, device, optimizer=None, criterion=None, scheduler=None) -> Tuple[float, float, float, float, np.ndarray]:
    is_train = optimizer is not None
    model.train(is_train)

    all_preds, all_labels = [], []
    running_loss = 0.0
    total = 0

    for imgs, labels in loader:
        imgs = imgs.to(device)
        labels = labels.to(device, dtype=torch.long)

        with torch.set_grad_enabled(is_train):
            logits = model(imgs)
            loss = criterion(logits, labels) if criterion else torch.tensor(0.0, device=device)
            if is_train:
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                if scheduler:
                    scheduler.step()

        running_loss += loss.item() * labels.size(0)
        preds = torch.argmax(logits, dim=1)
        all_preds.extend(preds.detach().cpu().numpy().tolist())
        all_labels.extend(labels.detach().cpu().numpy().tolist())
        total += labels.size(0)

    avg_loss = running_loss / max(total, 1)
    acc = accuracy_score(all_labels, all_preds)
    prec, rec, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average="binary", zero_division=0)
    cm = confusion_matrix(all_labels, all_preds, labels=[0, 1])
    return avg_loss, acc, prec, rec, f1, cm


def main(args):
    set_seed(args.seed)

    train_dir = IMAGES_ROOT / "train"
    val_dir = IMAGES_ROOT / "val"

    tfms = get_transforms()
    train_ds = NameLabelDataset(train_dir, transform=tfms)
    val_ds = NameLabelDataset(val_dir, transform=tfms)

    train_loader = DataLoader(train_ds, batch_size=args.batch_size, shuffle=True, num_workers=0)
    val_loader = DataLoader(val_ds, batch_size=args.batch_size, shuffle=False, num_workers=0)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = build_model(num_classes=2, freeze_backbone=not args.unfreeze_backbone).to(device)

    criterion = FocalLoss(alpha=1, gamma=2)

    optimizer = optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=args.lr, weight_decay=1e-4)
    scheduler = optim.lr_scheduler.OneCycleLR(
        optimizer, max_lr=args.lr, steps_per_epoch=len(train_loader), epochs=args.epochs
    )

    print(f"Project root: {PROJECT_ROOT}")
    print(f"Train dir:    {train_dir} (count={len(train_ds)} | class_counts={train_ds.class_counts})")
    print(f"Val dir:      {val_dir}   (count={len(val_ds)} | class_counts={val_ds.class_counts})")
    print(f"Saving model to: {MODEL_PATH}")
    print(f"Device: {device}\n")

    best_val_f1 = -1.0
    for epoch in range(1, args.epochs + 1):
        tr_loss, tr_acc, tr_p, tr_r, tr_f1, tr_cm = epoch_loop(model, train_loader, device, optimizer, criterion, scheduler)
        va_loss, va_acc, va_p, va_r, va_f1, va_cm = epoch_loop(model, val_loader, device, None, criterion)

        print(f"Epoch {epoch:02d}/{args.epochs} "
              f"| Train L {tr_loss:.4f} A {tr_acc:.3f} P {tr_p:.3f} R {tr_r:.3f} F1 {tr_f1:.3f} "
              f"| Val L {va_loss:.4f} A {va_acc:.3f} P {va_p:.3f} R {va_r:.3f} F1 {va_f1:.3f}")
        print(f"  Train CM (rows true [0,1], cols pred [0,1]):\n{tr_cm}")
        print(f"  Val   CM (rows true [0,1], cols pred [0,1]):\n{va_cm}\n")

        if va_f1 > best_val_f1:
            best_val_f1 = va_f1
            torch.save(model.state_dict(), MODEL_PATH)
            print(f"Saved best model (val F1={best_val_f1:.3f}) → {MODEL_PATH}\n")

    print("Done.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--epochs", type=int, default=10)
    parser.add_argument("--batch-size", type=int, default=16)
    parser.add_argument("--lr", type=float, default=1e-4)
    parser.add_argument("--seed", type=int, default=42)
    parser.add_argument("--unfreeze-backbone", action="store_true",
                        help="Fine-tune the whole ResNet18 instead of just the final layer.")
    args = parser.parse_args()
    main(args)


main.py

import gradio as gr
from ultralytics import YOLO
import torch
import torch.nn as nn
from torchvision import transforms, models
from PIL import Image
import os
import math

BASE_DIR = os.path.dirname(os.path.dirname(__file__))
RESNET_MODEL_PATH = os.path.join(BASE_DIR, "saved_model", "resnet_fall_model.pth")
EFFICIENTNET_MODEL_PATH = os.path.join(BASE_DIR, "saved_model", "efficientnet_fall_model.pth")
HAZARD_MODEL_PATH = os.path.join(BASE_DIR, "saved_model", "hazard_yolov83", "weights", "best.pt")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

fall_classes = ["Not Fall", "Fall"]

# ---------- Load ResNet ----------
resnet_model = models.resnet18(weights=None)
resnet_model.fc = nn.Sequential(
    nn.Dropout(0.5),
    nn.Linear(resnet_model.fc.in_features, len(fall_classes))
)
resnet_model.load_state_dict(torch.load(RESNET_MODEL_PATH, map_location=device))
resnet_model.to(device)
resnet_model.eval()

efficientnet_model = models.efficientnet_b0(weights=None)
efficientnet_model.classifier[1] = nn.Linear(efficientnet_model.classifier[1].in_features, len(fall_classes))
efficientnet_model.load_state_dict(torch.load(EFFICIENTNET_MODEL_PATH, map_location=device))
efficientnet_model.to(device)
efficientnet_model.eval()

hazard_model = YOLO(HAZARD_MODEL_PATH)

fall_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406],
                         [0.229, 0.224, 0.225])
])

def predict_resnet(image):
    img = Image.fromarray(image)
    img_t = fall_transform(img).unsqueeze(0).to(device)
    with torch.no_grad():
        outputs = resnet_model(img_t)
        probs = torch.softmax(outputs, dim=1)[0]

    conf, pred = torch.max(probs, 0)
    label = fall_classes[pred.item()]

    probs_dict = {fall_classes[i]: f"{probs[i].item()*100:.1f}%" for i in range(len(fall_classes))}

    entropy = -torch.sum(probs * torch.log(probs + 1e-9)).item()

    return (
        f"[ResNet]\n"
        f"Prediction: {label}\n"
        f"Confidence: {conf.item()*100:.1f}%\n"
        f"Probabilities: {probs_dict}\n"
        f"Uncertainty (entropy): {entropy:.3f}"
    )

def predict_efficientnet(image):
    img = Image.fromarray(image)
    img_t = fall_transform(img).unsqueeze(0).to(device)
    with torch.no_grad():
        outputs = efficientnet_model(img_t)
        probs = torch.softmax(outputs, dim=1)[0]

    conf, pred = torch.max(probs, 0)
    label = fall_classes[pred.item()]

    probs_dict = {fall_classes[i]: f"{probs[i].item()*100:.1f}%" for i in range(len(fall_classes))}
    entropy = -torch.sum(probs * torch.log(probs + 1e-9)).item()

    return (
        f"[EfficientNet]\n"
        f"Prediction: {label}\n"
        f"Confidence: {conf.item()*100:.1f}%\n"
        f"Probabilities: {probs_dict}\n"
        f"Uncertainty (entropy): {entropy:.3f}"
    )

def predict_hazard(image):
    results = hazard_model.predict(image, conf=0.25)
    return results[0].plot()

with gr.Blocks() as demo:
    gr.Markdown("Fall & Hazard Detection")

    mode = gr.Dropdown(
        choices=["ResNet Fall Detection", "EfficientNet Fall Detection", "Hazard Detection"], 
        label="Select Mode", value="ResNet Fall Detection"
    )

    img = gr.Image(type="numpy", label="Upload Image")
    run = gr.Button("Run Inference", variant="primary")

    out_txt = gr.Textbox(label="Prediction", lines=2)
    out_img = gr.Image(type="numpy", label="Hazard Detection Output")

    def inference(image, mode):
        if mode == "ResNet Fall Detection":
            return predict_resnet(image), None
        elif mode == "EfficientNet Fall Detection":
            return predict_efficientnet(image), None
        else:
            return None, predict_hazard(image)

    run.click(inference, inputs=[img, mode], outputs=[out_txt, out_img])

if __name__ == "__main__":
    demo.launch(share=True)


requirements.txt

gradio
gradio_client
torch==2.3.1
torchvision==0.18.1
mediapipe==0.10.14
ultralytics==8.3.26
opencv-python==4.10.0.84
numpy==1.26.4
cvzone
tensorflow
scikit-learn
label-studio