In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, datasets
from sklearn.model_selection import train_test_split
import torch.nn.functional as F
import matplotlib.pyplot as plt
from datetime import datetime
from torchinfo import summary 
import os
import cv2
import numpy as np
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import f1_score
from tqdm import tqdm
import time

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

In [None]:
import torch
import torch.nn as nn
from torchvision.models import mobilenet_v3_small

class MobileNetV3Feature(nn.Module):
    def __init__(self):
        super().__init__()
        backbone = mobilenet_v3_small(pretrained=True)
        self.features = nn.Sequential(
            *list(backbone.children())[:-2],  
            nn.AdaptiveAvgPool2d((2, 2)),
            nn.Flatten(),
            nn.Linear(576*2*2, 128)  
        )
        
    def forward(self, x):
        return self.features(x)

class ActionRecognitionModel(nn.Module):
    def __init__(self, feature_extractor, num_classes=7, num_frames=15):
        super().__init__()
        self.feature_extractor = feature_extractor
        self.temporal = nn.GRU(
            input_size=128,       
            hidden_size=64,
            num_layers=2,
            batch_first=True,
            bidirectional=False
        )
        self.classifier = nn.Sequential(
            nn.Linear(64, 32),
            nn.Hardswish(),
            nn.Dropout(0.3),
            nn.Linear(32, num_classes),
        )
    def forward(self, x):
        # x: (B, T, C, H, W)
        B, T = x.shape[:2]
        x = x.view(B*T, *x.shape[2:])
        spatial_feat = self.feature_extractor(x)
        temporal_in = spatial_feat.view(B, T, -1)
        temporal_out, _ = self.temporal(temporal_in)
        return self.classifier(temporal_out.mean(1))

model = ActionRecognitionModel(MobileNetV3Feature())
model = model.to(device)


In [None]:
summary(model=model, 
        input_size=(2, 10, 3, 224, 224),  # Input shape with batch size
        col_names=["input_size", "output_size", "num_params", "trainable"],
        col_width=20,
        row_settings=["var_names"])

In [None]:
class SlidingWindowDataset(Dataset):
    def __init__(self, root_dir, transform=None, sequence_length=15):
        self.root_dir = root_dir
        self.transform = transform
        self.sequence_length = sequence_length
        
        
        self.classes = sorted(os.listdir(root_dir))
        self.data = []

        
        for class_idx, class_name in enumerate(self.classes):
            class_dir = os.path.join(root_dir, class_name)
            
            for sub_dir in os.listdir(class_dir):
                sub_dir_path = os.path.join(class_dir, sub_dir)
                if os.path.isdir(sub_dir_path):  
                    images = sorted(os.listdir(sub_dir_path))
                    valid_images = [
                        os.path.join(sub_dir_path, img) 
                        for img in images if img.endswith(('.png', '.jpg', '.jpeg'))
                    ]
                    
                    if len(valid_images) >= sequence_length:
                        self.data.append((valid_images, class_idx))

    def __len__(self):
        
        return sum(len(images) - self.sequence_length + 1 for images, _ in self.data)

    def __getitem__(self, idx):
        
        current_idx = 0
        for images, label in self.data:
            num_sequences = len(images) - self.sequence_length + 1
            if current_idx + num_sequences > idx:
                start_idx = idx - current_idx
                sequence = images[start_idx:start_idx + self.sequence_length]
                
                
                processed_images = []
                for img_path in sequence:
                    img = Image.open(img_path).convert("RGB")
                    if self.transform:
                        img = self.transform(img)
                    processed_images.append(img)


                processed_images = torch.stack(processed_images)
                return processed_images, label
            current_idx += num_sequences

        raise IndexError(f"Index {idx} out of range")

preprocess = transforms.Compose([ 
    transforms.Resize((224, 224)),
    transforms.RandomRotation(10), 
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.2),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    transforms.RandomErasing(p=0.2)
])

train_folder = r"path/to/your/folder"
val_folder = r"path/to/your/folder"

sequence_length = 15
train_dataset = SlidingWindowDataset(root_dir=train_folder, transform=preprocess, sequence_length=sequence_length)
val_dataset = SlidingWindowDataset(root_dir=val_folder, transform=preprocess, sequence_length=sequence_length)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True) 
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)
print(len(train_dataset))
print(len(val_dataset))

In [None]:
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = optim.AdamW(model.parameters(), lr=0.0001, weight_decay=1e-4)
losses = []
test_losses = []  
ap = []
best_ap = 0.0
best_loss = 10.0
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=5, factor=0.1)
best_f1 = 0.0  
best_epoch = 0

In [None]:
history = {
    'train_loss': [],
    'val_loss': [],
    'train_acc': [],
    'train_f1': [],
    'accuracy': [],
    'f1': []
}

In [None]:
num_epoch = 50

for epoch in range(num_epoch):
    model.train()
    train_loss = 0.0
    correct_train = 0
    total_train = 0
    all_train_preds = []
    all_train_labels = []
    batch_times = []
    img_per_batch = train_loader.batch_size

    print(f"\nEpoch {epoch+1}/{num_epoch}")
    print("-" * 60)

    train_bar = tqdm(train_loader, desc="Training", unit="batch")

    for images, labels in train_bar:
        start_time = time.time()

        images, labels = images.to(device), labels.to(device)
        labels = labels.view(-1).long()

        optimizer.zero_grad()
        outputs = model(images)

        # 修正維度問題
        if outputs.dim() == 1 and len(labels) == 1:
            outputs = outputs.unsqueeze(0)

        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

        # 計算訓練 acc/f1
        probs = torch.softmax(outputs, dim=1)
        preds = torch.argmax(probs, dim=1)
        correct_train += (preds == labels).sum().item()
        total_train += labels.size(0)
        all_train_preds.extend(preds.cpu().numpy())
        all_train_labels.extend(labels.cpu().numpy())

        # 計算訓練速度
        batch_time = time.time() - start_time
        batch_times.append(batch_time)
        train_bar.set_postfix(loss=loss.item(), speed=f"{img_per_batch/batch_time:.1f} img/s")

    avg_train_loss = train_loss / len(train_loader)
    train_accuracy = 100. * correct_train / total_train
    train_f1 = 100. * f1_score(all_train_labels, all_train_preds, average='macro')
    avg_train_speed = img_per_batch / (sum(batch_times) / len(batch_times))

    # 驗證階段
    model.eval()
    val_loss = 0.0
    all_preds = []
    all_labels = []

    val_bar = tqdm(val_loader, desc="Validating", unit="batch", leave=False)
    with torch.no_grad():
        for images, labels in val_bar:
            images, labels = images.to(device), labels.to(device)
            labels = labels.view(-1).long()

            outputs = model(images)

            if outputs.dim() == 1:
                outputs = outputs.unsqueeze(0)

            val_loss += criterion(outputs, labels).item()

            probs = torch.softmax(outputs, dim=1)
            preds = torch.argmax(probs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    avg_val_loss = val_loss / len(val_loader)
    all_preds = np.array(all_preds)
    all_labels = np.array(all_labels)
    accuracy = 100. * np.sum(all_preds == all_labels) / len(all_labels)
    f1 = 100. * f1_score(all_labels, all_preds, average='macro')

    # 記錄歷史
    history['train_loss'].append(avg_train_loss)
    history['val_loss'].append(avg_val_loss)
    history['train_acc'].append(train_accuracy)
    history['train_f1'].append(train_f1)
    history['accuracy'].append(accuracy)
    history['f1'].append(f1)

    scheduler.step(avg_val_loss)

    # 儲存最佳 ckpt
    if f1 > best_f1:
        best_f1 = f1
        best_epoch = epoch + 1
        torch.save(model.state_dict(), f"best_model_epoch{best_epoch}_f1{best_f1:.2f}.pth")

    print(f"Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")
    print(f"Train Acc: {train_accuracy:.2f}% | Train F1: {train_f1:.2f}%")
    print(f"Val Acc: {accuracy:.2f}% | Val F1: {f1:.2f}%")
    print(f"Average Training Speed: {avg_train_speed:.1f} images/sec")
    print("-" * 60)

# 最終 ckpt
torch.save(model.state_dict(), "final_model.pth")

print("\nTraining Summary:")
print(f"Best Val F1: {best_f1:.2f}% at epoch {best_epoch}")
print(f"Best Val Acc: {max(history['accuracy']):.2f}%")
print(f"Lowest Val Loss: {min(history['val_loss']):.4f}")