In [None]:
"""import os
import random
import pandas as pd

me_path = r"D:\DB\face_reg\mtcnn_mod\my_face"
other_path = r"D:\DB\face_reg\mtcnn_mod\others"

me_images = [os.path.join(me_path, f) for f in os.listdir(me_path) if f.endswith(".jpg")]
other_images = [os.path.join(other_path, f) for f in os.listdir(other_path) if f.endswith(".jpg")]

positive_pairs = [(me_images[i], me_images[j], 1) 
                  for i in range(len(me_images)) for j in range(i+1, len(me_images))]

negative_pairs = [(random.choice(me_images), random.choice(other_images), 0) for _ in range(len(positive_pairs))]

pairs = positive_pairs + negative_pairs
df = pd.DataFrame(pairs, columns=["Image1", "Image2", "Label"])
df.to_csv("train.csv", index=False)

print("CSV file created successfully!")"""


CSV file created successfully!


In [None]:
"""import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from mtcnn import MTCNN
import cv2
import numpy as np
from PIL import Image
from sklearn.metrics import precision_recall_fscore_support, accuracy_score
import matplotlib.pyplot as plt
from tqdm import tqdm
import pickle

class FaceDataset(Dataset):
    def __init__(self, known_dir, unknown_dir):
        self.detector = MTCNN()
        self.transform = transforms.Compose([
            transforms.Resize((112, 112)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                               std=[0.229, 0.224, 0.225])
        ])
        
        self.data = []
        self.labels = []
        self.names = []
        
        # Process known faces (label 1)
        for img_name in os.listdir(known_dir):
            if img_name.endswith(('.png', '.jpg', '.jpeg')):
                img_path = os.path.join(known_dir, img_name)
                face_tensor = self._process_image(img_path)
                if face_tensor is not None:
                    self.data.append(face_tensor)
                    self.labels.append(1)
                    self.names.append(img_name.split('.')[0])
        
        # Process unknown faces (label 0)
        for img_name in os.listdir(unknown_dir):
            if img_name.endswith(('.png', '.jpg', '.jpeg')):
                img_path = os.path.join(unknown_dir, img_name)
                face_tensor = self._process_image(img_path)
                if face_tensor is not None:
                    self.data.append(face_tensor)
                    self.labels.append(0)
                    self.names.append("unknown")
    
    def _process_image(self, img_path):
        img = cv2.imread(img_path)
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
        faces = self.detector.detect_faces(img_rgb)
        if faces:
            x, y, w, h = faces[0]['box']
            face = img_rgb[y:y+h, x:x+w]
            face_pil = Image.fromarray(face)
            face_tensor = self.transform(face_pil)
            return face_tensor
        return None
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx], self.names[idx]

class FaceRecognitionModel(nn.Module):
    def __init__(self):
        super(FaceRecognitionModel, self).__init__()
        # Load pretrained MagFace model
        self.backbone = torch.hub.load('deepinsight/insightface', 'magface_iresnet50')
        
        # Freeze backbone layers
        for param in self.backbone.parameters():
            param.requires_grad = False
            
        # Add classification head
        self.classifier = nn.Sequential(
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, 2)  # 2 classes: known vs unknown
        )
    
    def forward(self, x):
        features = self.backbone(x)
        output = self.classifier(features)
        return output

def train_model(model, train_loader, val_loader, num_epochs=10):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.classifier.parameters(), lr=0.001)
    
    train_losses = []
    val_losses = []
    precisions = []
    recalls = []
    
    best_val_loss = float('inf')
    
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        running_loss = 0.0
        train_preds = []
        train_labels = []
        
        for faces, labels, _ in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}'):
            faces, labels = faces.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(faces)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            train_preds.extend(predicted.cpu().numpy())
            train_labels.extend(labels.cpu().numpy())
        
        epoch_loss = running_loss / len(train_loader)
        train_losses.append(epoch_loss)
        
        # Validation phase
        model.eval()
        val_loss = 0.0
        val_preds = []
        val_labels = []
        
        with torch.no_grad():
            for faces, labels, _ in val_loader:
                faces, labels = faces.to(device), labels.to(device)
                outputs = model(faces)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                
                _, predicted = torch.max(outputs.data, 1)
                val_preds.extend(predicted.cpu().numpy())
                val_labels.extend(labels.cpu().numpy())
        
        val_loss = val_loss / len(val_loader)
        val_losses.append(val_loss)
        
        # Calculate metrics
        precision, recall, _, _ = precision_recall_fscore_support(val_labels, val_preds, average='binary')
        precisions.append(precision)
        recalls.append(recall)
        
        print(f'Epoch {epoch+1}/{num_epochs}:')
        print(f'Training Loss: {epoch_loss:.4f}')
        print(f'Validation Loss: {val_loss:.4f}')
        print(f'Precision: {precision:.4f}')
        print(f'Recall: {recall:.4f}')
        
        # Save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save({
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'epoch': epoch,
                'val_loss': val_loss,
            }, 'best_face_recognition_model.pth')
    
    # Plot training curves
    plt.figure(figsize=(12, 4))
    
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Training and Validation Loss')
    
    plt.subplot(1, 2, 2)
    plt.plot(precisions, label='Precision')
    plt.plot(recalls, label='Recall')
    plt.xlabel('Epoch')
    plt.ylabel('Score')
    plt.legend()
    plt.title('Precision and Recall')
    
    plt.tight_layout()
    plt.savefig('training_curves.png')
    plt.close()

def main():
    # Create datasets
    dataset = FaceDataset(known_dir=r"D:\DB\face_reg\mtcnn_mod\my_face", unknown_dir=r"D:\DB\face_reg\mtcnn_mod\others")
    
    # Split dataset
    train_size = int(0.8 * len(dataset))
    val_size = len(dataset) - train_size
    train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
    
    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
    
    # Initialize model
    model = FaceRecognitionModel()
    
    # Train model
    train_model(model, train_loader, val_loader, num_epochs=10)
    
    model.eval()
    known_embeddings = {}
    
    with torch.no_grad():
        for face, label, name in dataset:
            if label == 1:  
                face = face.unsqueeze(0)
                if torch.cuda.is_available():
                    face = face.cuda()
                embedding = model.backbone(face).cpu().numpy()
                known_embeddings[name] = embedding
    
    with open('known_face_embeddings.pkl', 'wb') as f:
        pickle.dump(known_embeddings, f)

if __name__ == "__main__":
    main()"""

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from mtcnn import MTCNN
import cv2
import numpy as np
from PIL import Image
from sklearn.metrics import precision_recall_fscore_support, accuracy_score
import matplotlib.pyplot as plt
from tqdm import tqdm
import pickle
import insightface
from insightface.app import FaceAnalysis

class FaceDataset(Dataset):
    def __init__(self, known_dir, unknown_dir):
        self.face_app = FaceAnalysis(providers=['CPUExecutionProvider'])
        self.face_app.prepare(ctx_id=0, det_size=(640, 640))
        
        self.transform = transforms.Compose([
            transforms.Resize((112, 112)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                               std=[0.229, 0.224, 0.225])
        ])
        
        self.data = []
        self.labels = []
        self.names = []
        
        print("Processing known faces...")
        for img_name in os.listdir(known_dir):
            if img_name.endswith(('.png', '.jpg', '.jpeg')):
                img_path = os.path.join(known_dir, img_name)
                face_tensor = self._process_image(img_path)
                if face_tensor is not None:
                    self.data.append(face_tensor)
                    self.labels.append(1)
                    self.names.append(img_name.split('.')[0])
        
        print("Processing unknown faces...")
        for img_name in os.listdir(unknown_dir):
            if img_name.endswith(('.png', '.jpg', '.jpeg')):
                img_path = os.path.join(unknown_dir, img_name)
                face_tensor = self._process_image(img_path)
                if face_tensor is not None:
                    self.data.append(face_tensor)
                    self.labels.append(0)
                    self.names.append("unknown")
    
    def _process_image(self, img_path):
        img = cv2.imread(img_path)
        if img is None:
            print(f"Warning: Could not read image {img_path}")
            return None
            
        faces = self.face_app.get(img)
        if len(faces) > 0:
            bbox = faces[0].bbox.astype(int)
            x1, y1, x2, y2 = bbox
            face = img[y1:y2, x1:x2]
            if face.size == 0:
                return None
                
            face_rgb = cv2.cvtColor(face, cv2.COLOR_BGR2RGB)
            face_pil = Image.fromarray(face_rgb)
            face_tensor = self.transform(face_pil)
            return face_tensor
        return None
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx], self.names[idx]

class FaceRecognitionModel(nn.Module):
    def __init__(self):
        super(FaceRecognitionModel, self).__init__()
        # Initialize InsightFace model for feature extraction
        self.face_app = FaceAnalysis(providers=['CPUExecutionProvider'])
        self.face_app.prepare(ctx_id=0, det_size=(640, 640))
        
        # Feature dimension from InsightFace model
        feature_dim = 512
        
        # Classification head
        self.classifier = nn.Sequential(
            nn.Linear(feature_dim, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, 2)  # 2 classes: known vs unknown
        )
    
    def forward(self, x):
        # Convert tensor to numpy for InsightFace
        batch_size = x.size(0)
        features = []
        
        for i in range(batch_size):
            img = (x[i].permute(1, 2, 0).cpu().numpy() * 255).astype(np.uint8)
            faces = self.face_app.get(img)
            if faces:
                feat = torch.from_numpy(faces[0].embedding).float()
                features.append(feat)
            else:
                # If no face detected, use zero vector
                features.append(torch.zeros(512))
        
        features = torch.stack(features)
        if x.is_cuda:
            features = features.cuda()
        
        output = self.classifier(features)
        return output

def train_model(model, train_loader, val_loader, num_epochs=10):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.classifier.parameters(), lr=0.001)
    
    train_losses = []
    val_losses = []
    precisions = []
    recalls = []
    
    best_val_loss = float('inf')
    
    for epoch in range(num_epochs):
        print(f"\nEpoch {epoch+1}/{num_epochs}")
        
        # Training phase
        model.train()
        running_loss = 0.0
        train_preds = []
        train_labels = []
        
        for faces, labels, _ in tqdm(train_loader, desc='Training'):
            try:
                faces, labels = faces.to(device), labels.to(device)
                
                optimizer.zero_grad()
                outputs = model(faces)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
                
                running_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                train_preds.extend(predicted.cpu().numpy())
                train_labels.extend(labels.cpu().numpy())
            except Exception as e:
                print(f"Error in training batch: {str(e)}")
                continue
        
        epoch_loss = running_loss / len(train_loader)
        train_losses.append(epoch_loss)
        
        # Validation phase
        model.eval()
        val_loss = 0.0
        val_preds = []
        val_labels = []
        
        with torch.no_grad():
            for faces, labels, _ in tqdm(val_loader, desc='Validation'):
                try:
                    faces, labels = faces.to(device), labels.to(device)
                    outputs = model(faces)
                    loss = criterion(outputs, labels)
                    val_loss += loss.item()
                    
                    _, predicted = torch.max(outputs.data, 1)
                    val_preds.extend(predicted.cpu().numpy())
                    val_labels.extend(labels.cpu().numpy())
                except Exception as e:
                    print(f"Error in validation batch: {str(e)}")
                    continue
        
        val_loss = val_loss / len(val_loader)
        val_losses.append(val_loss)
        
        # Calculate metrics
        try:
            precision, recall, _, _ = precision_recall_fscore_support(val_labels, val_preds, average='binary')
            precisions.append(precision)
            recalls.append(recall)
            
            print(f'Training Loss: {epoch_loss:.4f}')
            print(f'Validation Loss: {val_loss:.4f}')
            print(f'Precision: {precision:.4f}')
            print(f'Recall: {recall:.4f}')
            
            # Save best model
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                print("Saving best model...")
                torch.save({
                    'model_state_dict': model.state_dict(),
                    'optimizer_state_dict': optimizer.state_dict(),
                    'epoch': epoch,
                    'val_loss': val_loss,
                }, 'best_face_recognition_model.pth')
        except Exception as e:
            print(f"Error calculating metrics: {str(e)}")
            continue
    
    # Plot training curves
    try:
        plt.figure(figsize=(12, 4))
        
        plt.subplot(1, 2, 1)
        plt.plot(train_losses, label='Training Loss')
        plt.plot(val_losses, label='Validation Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        plt.title('Training and Validation Loss')
        
        plt.subplot(1, 2, 2)
        plt.plot(precisions, label='Precision')
        plt.plot(recalls, label='Recall')
        plt.xlabel('Epoch')
        plt.ylabel('Score')
        plt.legend()
        plt.title('Precision and Recall')
        
        plt.tight_layout()
        plt.savefig('training_curves.png')
        plt.close()
    except Exception as e:
        print(f"Error plotting curves: {str(e)}")

def main():
    try:
        # Check if directories exist
        if not os.path.exists(r"D:\DB\face_reg\mtcnn_mod\my_face"):
            raise FileNotFoundError("Directory 'dataset/my' not found")
        if not os.path.exists(r"D:\DB\face_reg\mtcnn_mod\others"):
            raise FileNotFoundError("Directory 'dataset/other' not found")
            
        print("Creating datasets...")
        dataset = FaceDataset(known_dir=r"D:\DB\face_reg\mtcnn_mod\my_face", unknown_dir=r"D:\DB\face_reg\mtcnn_mod\others")
        
        if len(dataset) == 0:
            raise ValueError("No valid face images found in the dataset")
            
        print(f"Total dataset size: {len(dataset)}")
        
        # Split dataset
        train_size = int(0.8 * len(dataset))
        val_size = len(dataset) - train_size
        train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
        
        print(f"Training set size: {len(train_dataset)}")
        print(f"Validation set size: {len(val_dataset)}")
        
        # Create data loaders
        train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)
        
        # Initialize model
        print("Initializing model...")
        model = FaceRecognitionModel()
        
        # Train model
        print("Starting training...")
        train_model(model, train_loader, val_loader, num_epochs=10)
        
        # Save the face embeddings for known faces
        print("Saving face embeddings...")
        model.eval()
        known_embeddings = {}
        
        with torch.no_grad():
            for face, label, name in dataset:
                if label == 1:  # Known face
                    face = face.unsqueeze(0)
                    if torch.cuda.is_available():
                        face = face.cuda()
                    # Get embeddings using InsightFace
                    img = (face[0].permute(1, 2, 0).cpu().numpy() * 255).astype(np.uint8)
                    faces = model.face_app.get(img)
                    if faces:
                        embedding = faces[0].embedding
                        known_embeddings[name] = embedding
        
        # Save embeddings
        with open('known_face_embeddings.pkl', 'wb') as f:
            pickle.dump(known_embeddings, f)
            
        print("Training completed successfully!")
        
    except Exception as e:
        print(f"Error in main: {str(e)}")

if __name__ == "__main__":
    main()

In [11]:
import torch
import torch.nn as nn
import numpy as np
import cv2
import pickle
from PIL import Image
from torchvision import transforms
from insightface.app import FaceAnalysis

# Load the trained model
class FaceRecognitionModel(nn.Module):
    def __init__(self):
        super(FaceRecognitionModel, self).__init__()
        self.face_app = FaceAnalysis(providers=['CPUExecutionProvider'])
        self.face_app.prepare(ctx_id=0, det_size=(640, 640))
        feature_dim = 512
        self.classifier = nn.Sequential(
            nn.Linear(feature_dim, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, 2)
        )
    
    def forward(self, x):
        return self.classifier(x)

# Load model checkpoint
def load_model(model_path):
    model = FaceRecognitionModel()
    checkpoint = torch.load(model_path, map_location=torch.device('cpu'))
    model.load_state_dict(checkpoint['model_state_dict'])
    model.eval()
    return model

# Load known face embeddings
def load_known_embeddings(embedding_path):
    with open(embedding_path, 'rb') as f:
        known_embeddings = pickle.load(f)
    return known_embeddings

# Process image and extract face embedding
def extract_face_embedding(image_path, face_app):
    img = cv2.imread(image_path)
    if img is None:
        print(f"Error: Unable to read image {image_path}")
        return None, None
    
    faces = face_app.get(img)
    if len(faces) == 0:
        print("No face detected in the image.")
        return None, None
    
    embedding = faces[0].embedding
    return embedding, img

# Recognize face
def recognize_face(image_path, model, known_embeddings, threshold=0.6):
    face_app = FaceAnalysis(providers=['CPUExecutionProvider'])
    face_app.prepare(ctx_id=0, det_size=(640, 640))
    
    embedding, img = extract_face_embedding(image_path, face_app)
    if embedding is None:
        return "No face detected"
    
    # Compare with known embeddings
    min_dist = float('inf')
    best_match = "Unknown"
    
    for name, known_embedding in known_embeddings.items():
        dist = np.linalg.norm(embedding - known_embedding)
        if dist < min_dist and dist < threshold:
            min_dist = dist
            best_match = name
    
    return best_match

if __name__ == "__main__":
    model_path = "best_face_recognition_model.pth"
    embedding_path = "known_face_embeddings.pkl"
    image_path = r"C:\Users\Aswin Christo\Pictures\Camera Roll\WIN_20250206_15_27_15_Pro.jpg"  # Change to your test image path
    
    print("Loading model...")
    model = load_model(model_path)
    print("Loading known embeddings...")
    known_embeddings = load_known_embeddings(embedding_path)
    print("Recognizing face...")
    result = recognize_face(image_path, model, known_embeddings)
    print(f"Recognition Result: {result}")


Loading model...
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\Aswin Christo/.insightface\models\buffalo_l\1k3d68.onnx landmark_3d_68 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\Aswin Christo/.insightface\models\buffalo_l\2d106det.onnx landmark_2d_106 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\Aswin Christo/.insightface\models\buffalo_l\det_10g.onnx detection [1, 3, '?', '?'] 127.5 128.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\Aswin Christo/.insightface\models\buffalo_l\genderage.onnx genderage ['None', 3, 96, 96] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\Aswin Christo/.insightface\models\buffal

  checkpoint = torch.load(model_path, map_location=torch.device('cpu'))


Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\Aswin Christo/.insightface\models\buffalo_l\1k3d68.onnx landmark_3d_68 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\Aswin Christo/.insightface\models\buffalo_l\2d106det.onnx landmark_2d_106 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\Aswin Christo/.insightface\models\buffalo_l\det_10g.onnx detection [1, 3, '?', '?'] 127.5 128.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\Aswin Christo/.insightface\models\buffalo_l\genderage.onnx genderage ['None', 3, 96, 96] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\Aswin Christo/.insightface\models\buffalo_l\w600k_r50.onn