# Face Identification Model for Video Privacy System

This notebook implements a machine learning model for identifying faces in videos as part of an adaptive face-blur privacy system.

## Overview
- Uses face images and landmarks from YouTube dataset
- Trains a neural network to identify faces consistently
- Provides inference capabilities for video frames

## Step 1: Install Required Packages

In [10]:
# Load and explore dataset from .npz filesimport osimport glob# Define data directoriesdata_base_dir = "/Users/adityasirohi/BlurBuddy/Data"subdirs = [    "youtube_faces_with_keypoints_full_1/youtube_faces_with_keypoints_full_1",    "youtube_faces_with_keypoints_full_2/youtube_faces_with_keypoints_full_2",     "youtube_faces_with_keypoints_full_3/youtube_faces_with_keypoints_full_3",    "youtube_faces_with_keypoints_full_4/youtube_faces_with_keypoints_full_4"]# Find all .npz filesnpz_files = []for subdir in subdirs:    subdir_path = os.path.join(data_base_dir, subdir)    files = glob.glob(os.path.join(subdir_path, "*.npz"))    npz_files.extend(files)print(f"Found {len(npz_files)} .npz files")# Extract person names from filenamesperson_names = []for file_path in npz_files:    filename = os.path.basename(file_path)    # Extract person name (everything before the last underscore and number)    person_name = '_'.join(filename.split('_')[:-1])    person_names.append(person_name)# Create a simple DataFrame-like structureimport pandas as pddata_info = pd.DataFrame({    'file_path': npz_files,    'person_name': person_names})print(f"Dataset shape: {data_info.shape}")print(f"Unique persons: {data_info['person_name'].nunique()}")# Show top persons by sample countperson_counts = data_info['person_name'].value_counts()print(f"Top 10 persons by sample count:")print(person_counts.head(10))# Show data distributionprint(f"Dataset Info:")print(f"- Total samples: {len(data_info)}")print(f"- Unique persons: {data_info['person_name'].nunique()}")print(f"- Avg samples per person: {len(data_info) / data_info['person_name'].nunique():.1f}")print(f"- Persons with >= 4 samples: {(person_counts >= 4).sum()}")# Store for later usedf = data_info

Installation complete!


# Filter dataset - requiring more samples for proper stratificationperson_counts = df['person_name'].value_counts()df_filtered = df[df['person_name'].isin(person_counts[person_counts >= 6].index)]print(f"Filtered dataset: {len(df_filtered)} samples from {df_filtered['person_name'].nunique()} persons")# Split dataset - use simple random split instead of stratification to avoid class imbalance issuestrain_df, temp_df = train_test_split(df_filtered, test_size=0.3, random_state=42)val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42)print(f"Train set: {len(train_df)} samples")print(f"Validation set: {len(val_df)} samples")print(f"Test set: {len(test_df)} samples")# Create datasetsdata_dir = "/Users/adityasirohi/BlurBuddy/Data"train_dataset = FaceDataset(train_df, data_dir)val_dataset = FaceDataset(val_df, data_dir)test_dataset = FaceDataset(test_df, data_dir)# Test loading a samplesample_image, sample_landmarks, sample_label = train_dataset[0]print(f"Sample data shapes:")print(f"- Image: {sample_image.shape}")print(f"- Landmarks: {sample_landmarks.shape}")print(f"- Label: {sample_label.item()}")# Get person nameperson_name = train_dataset.get_label_encoder().inverse_transform([sample_label.item()])[0]print(f"- Person: {person_name}")

In [11]:
# Import necessary libraries
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, classification_report
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings("ignore")

# Import our custom model
from face_identification_model import *

print(f"Using device: {device}")
print("Libraries imported successfully!")

Using device: cpu
Libraries imported successfully!


## Step 3: Load and Explore Dataset

In [12]:
# Load dataset and explore
df = pd.read_csv("/Users/adityasirohi/BlurBuddy/Data/youtube_faces_with_keypoints_full.csv")

print(f"Dataset shape: {df.shape}")
print(f"Unique persons: {df["personName"].nunique()}")

# Show top persons by sample count
person_counts = df["personName"].value_counts()
print(f"\nTop 10 persons by sample count:")
print(person_counts.head(10))

# Show data distribution
print(f"\nDataset Info:")
print(f"- Total samples: {len(df)}")
print(f"- Unique persons: {df["personName"].nunique()}")
print(f"- Avg samples per person: {len(df) / df["personName"].nunique():.1f}")
print(f"- Persons with >= 4 samples: {(person_counts >= 4).sum()}")

SyntaxError: f-string: unmatched '[' (4291465703.py, line 5)

## Step 4: Create and Test Data Pipeline

In [None]:
# Filter dataset - requiring more samples for proper stratification
person_counts = df["personName"].value_counts()
df_filtered = df[df["personName"].isin(person_counts[person_counts >= 6].index)]

print(f"Filtered dataset: {len(df_filtered)} samples from {df_filtered["personName"].nunique()} persons")

# Split dataset - use simple random split instead of stratification to avoid class imbalance issues
train_df, temp_df = train_test_split(df_filtered, test_size=0.3, random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42)

print(f"Train set: {len(train_df)} samples")
print(f"Validation set: {len(val_df)} samples")
print(f"Test set: {len(test_df)} samples")

# Create datasets
data_dir = "/Users/adityasirohi/BlurBuddy/Data"
train_dataset = FaceDataset(train_df, data_dir)
val_dataset = FaceDataset(val_df, data_dir)
test_dataset = FaceDataset(test_df, data_dir)

# Test loading a sample
sample_image, sample_landmarks, sample_label = train_dataset[0]
print(f"\nSample data shapes:")
print(f"- Image: {sample_image.shape}")
print(f"- Landmarks: {sample_landmarks.shape}")
print(f"- Label: {sample_label.item()}")

# Get person name
person_name = train_dataset.get_label_encoder().inverse_transform([sample_label.item()])[0]
print(f"- Person: {person_name}")

## Step 5: Create Model

In [None]:
# Create model
num_classes = train_dataset.get_label_encoder().classes_.shape[0]
model = FaceIdentificationModel(num_classes).to(device)

print(f"Model created with {num_classes} classes")
print(f"Total parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}")

# Test model with sample data
test_image = torch.zeros((1, 3, 64, 64)).to(device)
test_landmarks = torch.zeros((1, 136)).to(device)

with torch.no_grad():
    output = model(test_image, test_landmarks)
    print(f"Model output shape: {output.shape}")

## Step 6: Visualize Sample Data

In [None]:
# Visualize sample faces
fig, axes = plt.subplots(2, 4, figsize=(16, 8))
axes = axes.flatten()

for i in range(min(8, len(train_dataset))):
    image, landmarks, label_idx = train_dataset[i]
    person_name = train_dataset.get_label_encoder().inverse_transform([label_idx.item()])[0]
    
    # Convert image for display
    image_display = image.permute(1, 2, 0).cpu().numpy()
    
    # Plot images
    axes[i].imshow(image_display)
    axes[i].set_title(f"{person_name}", fontsize=10)
    axes[i].axis("off")

plt.tight_layout()
plt.show()
print(f"\nShown {min(8, len(train_dataset))} sample faces from training set")

## Step 7: Train Model

In [None]:
# Initialize training setup
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, "min", patience=5, factor=0.5)

# Create data loaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

print("Training setup complete!")
print(f"Training in {len(train_loader)} batches of {batch_size} samples each")

# Training variables
best_val_acc = 0
patience = 10
no_improve = 0
train_losses = []
val_accs = []

print("\nStarting training...")
print("Note: This may take several minutes depending on your hardware")

# Training loop
for epoch in range(20):  # Reduced epochs for demonstration
    print(f"\nEpoch {epoch + 1}/20:")
    
    # Train
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
    train_losses.append(train_loss)
    
    # Evaluate
    val_loss, val_acc, val_preds, val_labels = evaluate(model, val_loader, criterion, device)
    val_accs.append(val_acc)
    
    print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
    print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")
    
    # Update learning rate
    scheduler.step(val_loss)
    
    # Save best model
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save({
            "model_state_dict": model.state_dict(),
            "optimizer_state_dict": optimizer.state_dict(),
            "val_acc": val_acc,
            "epoch": epoch,
            "label_encoder": train_dataset.get_label_encoder()
        }, "face_identification_model.pth")
        no_improve = 0
        print(f"  -> Best model saved! (Val Acc: {val_acc:.2f}%)")
    else:
        no_improve += 1
    
    # Early stopping check
    if no_improve >= patience:
        print(f"  -> Early stopping triggered after {epoch + 1} epochs")
        break

print(f"\nTraining completed! Best validation accuracy: {best_val_acc:.2f}%")