# 442 Project
Rei Meguro (rmeguro), name, name

## 1. Data Processing

In [20]:
labels = {
    "Angelina Jolie": 0,
    "Brad Pitt": 1,
    "Denzel Washington": 2,
    "Hugh Jackman": 3,
    "Jennifer Lawrence": 4,
    "Johnny Depp": 5,
    "Kate Winslet": 6,
    "Leonardo DiCaprio": 7,
    "Megan Fox": 8,
    "Natalie Portman": 9,
    "Nicole Kidman": 10,
    "Robert Downey Jr": 11,
    "Sandra Bullock": 12,
    "Scarlett Johansson": 13,
    "Tom Cruise": 14,
    "Tom Hanks": 15,
    "Will Smith": 16,
}
n_classes = len(labels)
image_size = 256

import os
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms
from PIL import Image

class CustomDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.data = []
        
        # Parse the folder structure
        for folder in os.listdir(root_dir):
            folder_path = os.path.join(root_dir, folder)
            if os.path.isdir(folder_path):
                class_id = labels[folder]
                for file in os.listdir(folder_path):
                    if file.endswith('.jpg'):
                        file_path = os.path.join(folder_path, file)
                        self.data.append((file_path, class_id))
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        file_path, class_id = self.data[idx]
        image = Image.open(file_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, class_id

# dataset and image transformation
transform = transforms.Compose([
    transforms.Resize((image_size, image_size)),
    transforms.ToTensor(),
    transforms.Normalize( 
        mean=[0.5, 0.5, 0.5],
        std=[0.5, 0.5, 0.5]
    )
])
dataset = CustomDataset(root_dir='./Celebrity Faces Dataset', transform=transform)

# splitting dataset into training and testing sets
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# dataloaders
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

print(len(dataset))

1800


## 2. Training a CNN on labeled faces

In [22]:
# import libraries
import torch
import torch.nn as nn
import torchvision.models as models
import torch.optim as optim

In [33]:
# define model

class FacialModel(nn.Module):
    def __init__(self, num_classes):
        super(FacialModel, self).__init__()
        self.base_model = models.resnet50()  # not pretrained by default
        self.base_model.fc = nn.Linear(2048, num_classes)

    def forward(self, x):
        return self.base_model(x)

In [34]:
# loss function + optimizer
loss_fn = nn.CrossEntropyLoss()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = FacialModel(num_classes=n_classes).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [44]:
# training loop
def train_model(model, train_loader, optimizer, loss_fn, device, epochs):
    model.train()
    for epoch in range(epochs):
        running_loss = 0.0
        for iters, (images, labels) in enumerate(train_loader):
            images = images.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()

            outputs = model(images)

            loss = loss_fn(outputs, labels)

            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            if iters % 32 == 0:
                print(f"Epoch [{epoch+1}/{epochs}], Loss: {running_loss / (iters + 1):.4f}")
        
        # in case we don't end in an iteration multiple of 32
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {running_loss / len(train_loader):.4f}")


In [43]:
# train model
train_model(model, train_loader, optimizer, loss_fn, device, epochs=10)

0
Epoch [1/10], Loss: 2.6730
1


KeyboardInterrupt: 

## 3. Creating a generalized facial recognition model

In [None]:
# creating the generalized classifier

import chromadb
import uuid

class GeneralFaceRec:
    def __init__(self, model, db_name: str = "facerec"):
        self.client = chromadb.Client()
        self.collection = self.client.get_or_create_collection(
            db_name, metadata={"hnsw:space": "cosine"}
        )
        self.model = model
        
        # parameters
        self.threshold = 0.8  # how similar does it have to be to count as a match
        self.k = 5  # how many neighbors to consider
        self.eps = 0.0001  # small number to avoid dividing by 0
    
    def set_params(self, threshold: float = None, k: int = None, eps: float = None):
        assert(k is None or k >= 1)
        assert(eps is None or eps > 0)
        
        threshold = threshold or self.threshold
        k = k or self.k
        eps = eps or self.eps
    
    def embed(self, images):
        return self.model(images)
    
    def register_face(self, images, name: str):
        embeddings = self.embed(images)
        
        self.collection.upsert(
            ids = [uuid.uuid4() for _ in images],
            embeddings=embeddings,
            metadatas=[{"name": name} for _ in images]
        )
    
    def identify_face(self, image) -> str | None:
        embedding = self.embed(image)
        
        result = self.collection.query(
            query_embeddings=embedding,
            n_results=self.k,
            include=["metadatas", "distances"]
        )
        
        closest_distance = result["distances"][0]
        if closest_distance < self.threshold:
            return None
        
        scores = {}
        for metadata, distance in zip(result["metadatas"], result["distances"]):
            name = metadata["name"]
            scores.setdefault(name, 0)
            scores[name] += 1.0 / (self.eps + distance)  # smaller the distance, higher the score
        
        return max(scores, key=scores.get)