In [2]:
import os
import random
from collections import Counter, defaultdict, deque
from glob import glob

import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import albumentations as A
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from torchvision import models, transforms
from albumentations.pytorch import ToTensorV2
from scipy.ndimage import gaussian_filter1d
import torch.optim as optim
from torchvision import models
from torchvision import transforms

  from .autonotebook import tqdm as notebook_tqdm
  check_for_updates()


In [3]:
def load_image_sequences(root_dir):
    sequences = []
    for subject in os.listdir(root_dir):
        subject_path = os.path.join(root_dir, subject)
        if not os.path.isdir(subject_path):
            continue
        for label in os.listdir(subject_path):
            label_path = os.path.join(subject_path, label)
            if not os.path.isdir(label_path):
                continue
            for action in os.listdir(label_path):
                action_path = os.path.join(label_path, action)
                if not os.path.isdir(action_path):
                    continue
                image_paths = sorted(glob(os.path.join(action_path, '*.jpg')))
                if len(image_paths) == 0:
                    continue
                action_name = "_".join(action.split('_')[1:]) if '_' in action else action
                sequences.append({
                    'subject': subject,
                    'label': label,
                    'action': action_name,
                    'image_paths': image_paths
                })
    return sequences

root_dir = '/kaggle/input/data-slayer123/train/train'
sequences = load_image_sequences(root_dir)

fall_sequences = [s for s in sequences if s['label'] == 'fall']
non_fall_sequences = [s for s in sequences if s['label'] == 'non_fall']

train_fall, test_fall = train_test_split(fall_sequences, test_size=0.2, random_state=42)
train_non_fall, test_non_fall = train_test_split(non_fall_sequences, test_size=0.2, random_state=42)

train_sequences = train_fall + train_non_fall
test_sequences = test_fall + test_non_fall

In [4]:
def apply_transformations(sequences, transform):
    transformed_sequences = []
    for seq in sequences:
        transformed_images = []
        for img_path in seq['image_paths']:
            img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
            if img is None:
                continue
            img = np.expand_dims(img, axis=2)
            img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)
            img = np.array(img, dtype=np.uint8)
            augmented_img = transform(transforms.ToPILImage()(img))
            transformed_images.append(augmented_img)

        transformed_sequences.append({
            'subject': seq['subject'],
            'label': seq['label'],
            'action': seq['action'],
            'image_tensors': transformed_images
        })
    return transformed_sequences

augmentation = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

train_augmented = apply_transformations(train_sequences, augmentation)
test_transformed = apply_transformations(test_sequences, test_transform)

In [5]:
class SingleFrameActionDataset(Dataset):
    def __init__(self, sequences):
        self.data = []

        for seq in sequences:
            label = 1 if seq['label'] == 'fall' else 0
            for img_tensor in seq['image_tensors']:
                self.data.append((img_tensor, label))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image, label = self.data[idx]
        return image, label

train_single_frame_dataset = SingleFrameActionDataset(train_augmented)
test_single_frame_dataset = SingleFrameActionDataset(test_transformed)

batch_size = 4
train_single_frame_loader = DataLoader(train_single_frame_dataset, batch_size=batch_size, shuffle=True)
test_single_frame_loader = DataLoader(test_single_frame_dataset, batch_size=batch_size, shuffle=False)

for images, labels in train_single_frame_loader:
    assert images.shape[1:] == (3, 224, 224), f"Shape mismatch in batch: {images.shape}"
    print(f"Batch size: {images.shape[0]}, Image shape: {images.shape[1:]}")
    break

print(f"Total single frame train samples: {len(train_single_frame_dataset)}")
print(f"Total single frame test samples: {len(test_single_frame_dataset)}")

Batch size: 4, Image shape: torch.Size([3, 224, 224])
Total single frame train samples: 3347
Total single frame test samples: 884


In [6]:
class MobileNetV3Model(nn.Module):
    def __init__(self, pretrained=True):
        super(MobileNetV3Model, self).__init__()

        # MobileNetV3 Backbone
        self.mobilenet = models.mobilenet_v3_large(pretrained=pretrained)
        num_ftrs_mobilenet = self.mobilenet.classifier[0].in_features
        self.mobilenet.classifier = nn.Identity()  # Remove the classifier

        # Classifier
        self.classifier = nn.Sequential(
            nn.Linear(num_ftrs_mobilenet, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, 2)  # Output layer for binary classification
        )

    def forward(self, x):
        mobilenet_features = self.mobilenet(x)
        output = self.classifier(mobilenet_features)
        return output

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = MobileNetV3Model(pretrained=True).to(device)
criterion = nn.CrossEntropyLoss()

# Optimizer and Scheduler
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.7)

Downloading: "https://download.pytorch.org/models/mobilenet_v3_large-8738ca79.pth" to /root/.cache/torch/hub/checkpoints/mobilenet_v3_large-8738ca79.pth
100%|██████████| 21.1M/21.1M [00:00<00:00, 185MB/s]


In [7]:
# Loop pelatihan (contoh dengan 1 epoch)
num_epochs = 3
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct_train = 0
    total_train = 0
    
    for images, labels in train_single_frame_loader:  # Pastikan loader untuk single frame digunakan
        images = images.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        _, predicted = torch.max(outputs.data, 1)
        total_train += labels.size(0)
        correct_train += (predicted == labels).sum().item()

    epoch_loss = running_loss / len(train_single_frame_loader)
    epoch_accuracy = 100 * correct_train / total_train
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.2f}%')

    # Update learning rate dengan scheduler
    scheduler.step()

RuntimeError: Expected 3D (unbatched) or 4D (batched) input to conv2d, but got input of size: [4, 960]

In [None]:
from PIL import Image

class TestingDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        """
        Dataset untuk pengujian single frame dengan memuat semua gambar dari direktori.

        Args:
            image_dir (str): Path ke direktori yang berisi gambar uji.
            transform (callable, optional): Transformasi opsional untuk diterapkan pada sampel.
        """
        self.image_paths = glob(os.path.join(image_dir, '*'))  # Mendukung format gambar umum
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = -1  # Placeholder karena label tidak tersedia

        # Membaca gambar dalam format grayscale
        img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        if img is None:
            raise ValueError(f"Gambar tidak ditemukan atau tidak dapat dibaca: {img_path}")

        # Konversi grayscale ke RGB
        img = np.expand_dims(img, axis=2)  # [H, W, 1]
        img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)  # [H, W, 3]
        img = np.array(img, dtype=np.uint8)

        # Konversi numpy array ke PIL.Image
        img = Image.fromarray(img)

        if self.transform:
            img = self.transform(img)  # [C, H, W]

        return img, label, img_path

# Path direktori test
test_directory = '/kaggle/input/data-slayer123/test/test'  
testing_dataset = TestingDataset(image_dir=test_directory, transform=test_transform)
testing_loader = DataLoader(testing_dataset, batch_size=1, shuffle=False)

In [None]:
def evaluate_testing(model, testing_loader, device, num_samples=150, csv_path='predictions_single_frame.csv'):
    """
    Evaluates the model on the testing dataset, visualizes random samples, and saves predictions to CSV.

    Args:
        model (torch.nn.Module): Trained PyTorch model.
        testing_loader (DataLoader): DataLoader for the testing dataset.
        device (torch.device): Device to perform computations on.
        num_samples (int): Number of random samples to visualize.
        csv_path (str): Path to save the CSV file with predictions.
    """
    model.eval()

    all_preds = []
    all_paths = []

    sample_images = []
    sample_preds = []
    sample_paths = []

    total_samples = len(testing_loader.dataset)
    num_samples = min(num_samples, total_samples)
    random_indices = set(random.sample(range(total_samples), num_samples))

    with torch.no_grad():
        for batch_idx, (images, _, img_paths) in enumerate(testing_loader):
            # Move data to the specified device
            images = images.to(device)
            
            # Get model predictions
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            pred_label = predicted.item()

            img_path = img_paths[0]
            img_filename = os.path.basename(img_path)

            all_preds.append(pred_label)
            all_paths.append(img_filename)

            # Collect random samples for visualization
            if batch_idx in random_indices and len(sample_images) < num_samples:
                img = images[0].cpu().numpy()
                img = np.transpose(img, (1, 2, 0))  # [H, W, C]
                img = img * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])  # Unnormalize
                img = np.clip(img, 0, 1)
                sample_images.append(img)
                sample_preds.append(pred_label)
                sample_paths.append(img_filename)

    # Save predictions to CSV
    df = pd.DataFrame({
        'id': all_paths,
        'label': all_preds
    })
    df.to_csv(csv_path, index=False)
    print(f'Predictions saved to {csv_path}')
    
    # Plot distribution of predictions
    distribution = df['label'].value_counts().sort_index()
    labels = ['NonFall', 'Fall']
    counts = distribution.tolist()
    plt.figure(figsize=(8, 6))
    plt.bar(labels, counts, color=['blue', 'orange'])
    plt.title('Distribution of Predictions')
    plt.xlabel('Class')
    plt.ylabel('Count')
    for i, count in enumerate(counts):
        plt.text(i, count, str(count), ha='center', va='bottom')
    plt.show()

    # Visualize random samples
    num_plots = len(sample_images)
    cols = 15  # Update columns to 15 to fit more images per row (for 150 images, 15 x 10 grid)
    rows = (num_plots + cols - 1) // cols  # Calculate rows needed to fit images

    fig, axes = plt.subplots(rows, cols, figsize=(20, 4 * rows))
    axes = axes.flatten()

    for i in range(num_plots):
        axes[i].imshow(sample_images[i])
        axes[i].set_title(f'Path: {sample_paths[i]}\nPredicted: {sample_preds[i]}')
        axes[i].axis('off')

    for i in range(num_plots, len(axes)):
        axes[i].axis('off')

    plt.tight_layout()
    plt.show()

# Evaluate the model using the single frame test set
evaluate_testing(model, testing_loader, device, num_samples=150, csv_path='predictions_single_frame9.csv')

In [None]:
predictions_path = "/kaggle/working/predictions_single_frame9.csv"
sample_submission_path = "/kaggle/input/sample/sample_submission.csv"
merged_output_path = "P1F9.csv"

predictions = pd.read_csv(predictions_path)

sample_submission = pd.read_csv(sample_submission_path)
merged = pd.merge(sample_submission, predictions, on='id', how='inner')

merged = merged[['id', 'label_y']].rename(columns={'label_y': 'label'})
merged.to_csv(merged_output_path, index=False)

print(f'Merged CSV saved to {merged_output_path}')