In [58]:
import cv2
import os
import torch
import torch.nn as nn
from torchvision import datasets, transforms
import cv2
import torch
import time
from tqdm import tqdm
from torchvision import models

### Generate Dataset

In [59]:
def generate_dataset(name):
    # Check if the dataset directory already exists
    if os.path.exists(f"./data/{name}"):
        print("Dataset already exists")
        return
    else:
        # If the dataset directory doesn't exist, create it
        os.mkdir(f"./data/{name}")
    
    # Load the Haar cascade classifier for face detection
    face_classifier = cv2.CascadeClassifier("./haarcascade_frontalface_default.xml")
    
    # Function to crop the face from an image
    def face_cropped(img):
        gray = img
        # Detect faces in the image
        faces = face_classifier.detectMultiScale(gray, 1.3, 5)

        # If no faces are detected, return None
        if len(faces) == ():
            return None
        
        # Extract the coordinates of the bounding box around the face
        cropped_face = None
        for (x,y,w,h) in faces:
            cropped_face = img[y:y+h, x:x+w]
        return cropped_face
    
    # Open the video capture device (webcam)
    cap = cv2.VideoCapture(0)

    # Check if the webcam is opened successfully
    if not cap.isOpened():
        print("Error: Camera not found")
        return
    
    img_id = 0

    # Capture images until a specified number is reached or Enter key is pressed
    while True:
        # Capture frame-by-frame
        ret, frame = cap.read()
        
        # If a face is detected in the frame, save it to the dataset directory
        if face_cropped(frame) is not None:
            img_id+=1
            # Resize the cropped face to a fixed size
            face = cv2.resize(face_cropped(frame), (200,200))
            # Save the cropped face as an image file
            file_name_path = f"./data/{name}/{img_id}.jpg"
            cv2.imwrite(file_name_path, face)
            # Display the cropped face with an ID
            cv2.putText(face, str(img_id), (50,50), cv2.FONT_HERSHEY_COMPLEX, 1, (0,255,0), 2)
            cv2.imshow("Cropped Face", face)
            
            # Break the loop if the Enter key is pressed or the specified number of images is captured
            if cv2.waitKey(1) == 13 or int(img_id) == 100:
                break
    
    # Release the video capture device and close all OpenCV windows
    cap.release()
    cv2.destroyAllWindows()
    print("Collecting Samples Complete!!!")

In [60]:
generate_dataset('parth')

Dataset already exists


In [61]:
# Define transformations to be applied to the images
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize images to 224x224
    transforms.ToTensor(),           # Convert images to PyTorch tensors
    # You can add more transformations here if needed (e.g., normalization)
])

# Path to the root directory containing image folders
data_dir = './data'

# Load images from the folders with specified transformations
dataset = datasets.ImageFolder(root=data_dir, transform=transform)

# Split the dataset into training and testing sets
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [int(len(dataset)*0.9), int(len(dataset)*0.1)])

# Get the class names (folders' names within the data directory)
class_names = dataset.classes

# Create data loaders to iterate over the datasets in batches
batch_size = 1  # Set batch size to 1 for demonstration

# Create data loaders for training and testing sets
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Combine data loaders into a dictionary for easy access
data_loader = {
    "train": train_loader,
    "val": test_loader  # Renamed from 'test_loader' to 'val' for validation set
}

# Iterate over the training dataset using the data loader
for images, labels in data_loader['train']:
    # Print the shape of the batch of images and labels
    print(images.shape, labels.shape)
    break  # Break after printing the first batch for demonstration


torch.Size([1, 3, 224, 224]) torch.Size([1])


In [62]:
class_names

['kartik', 'parth', 'shrikant']

In [63]:
model = models.mobilenet_v2(weights="DEFAULT")
print(model)

MobileNetV2(
  (features): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU6(inplace=True)
    )
    (1): InvertedResidual(
      (conv): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
          (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (2): ReLU6(inplace=True)
        )
        (1): Conv2d(32, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
    )
    (2): InvertedResidual(
      (conv): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(16, 96, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (1): BatchNorm2d(96, eps=

In [64]:
# Print the classifier of the model
print(model.classifier)

# Getting number of output classes
num_out_ftrs = len(class_names)

# Freeze all layers for fine-tuning (not doing this takes it very long to train)
for param in model.parameters():
    param.requires_grad = False

# Get the number of inputs in the last layer
num_ftrs = model.classifier[1].in_features

# Modify the last layer of the classifier
model.classifier[0] = nn.Dropout(p=0.4, inplace=True)
model.classifier[1] = nn.Linear(num_ftrs, num_out_ftrs)

# Print the modified last layer of the classifier
print("After modifying the last layer:")
print(model.classifier)

Sequential(
  (0): Dropout(p=0.2, inplace=False)
  (1): Linear(in_features=1280, out_features=1000, bias=True)
)
After modifying the last layer:
Sequential(
  (0): Dropout(p=0.4, inplace=True)
  (1): Linear(in_features=1280, out_features=3, bias=True)
)


In [65]:
def train_model(model,
                dataloaders,
                criterion = nn.CrossEntropyLoss(),
                prev_checkpoint=None,
                learning_rate=0.001,
                optimizer = None, 
                schedular=None, 
                num_epoch=10,
                save_checkpoint=False):

    # Check if CUDA is available, and move the model to the appropriate device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    
    # Define default parameters for optimizer if not provided
    if optimizer == None:
        optimizer = torch.optim.Adam(
            params=model.parameters(),
            lr=learning_rate
        )

    # Get dataset and dataloader sizes
    dataset_sizes = {
        "train": len(dataloaders["train"].dataset),
        "val": len(dataloaders["val"].dataset)
    }
    dataloader_sizes ={
        "train": len(dataloaders["train"]),
    }

    # Define variables for storing loss and accuracy
    train_time_list = []

    # Load previous checkpoint if provided
    if prev_checkpoint != None:
        print(model.load_state_dict(prev_checkpoint["model_state"]))
        optimizer.load_state_dict(prev_checkpoint["optim_state"])
        epochs_completed = prev_checkpoint["epoch"]
        epoch_loss_list = prev_checkpoint["epoch_losses"]
        train_time = prev_checkpoint["time_taken"]
        print('Loaded checkpoint')
    else:
        epochs_completed = 0
        epoch_loss_list = {
            "train": [],
            "val": []
        }
        train_time = 0
    
    print(f"Training Started on {device}")

    # Loop through epochs
    for epoch in range(epochs_completed, num_epoch+epochs_completed):

        # Each epoch has a training and a validation phase
        for phase in ["train", 'val']:
            time_start = time.time()

            # Set model to train mode during training, and eval mode during validation
            model.train() if phase == "train" else model.eval()

            epoch_losses = []
            running_loss = 0.0
            running_corrects = 0.0

            # Use tqdm for progress bar during training
            data_loader = tqdm(dataloaders[phase], desc=f'{phase.capitalize()} Epoch {epoch+1}/{num_epoch+epochs_completed}')

            # Iterate over batches of data
            for i, (images, labels) in enumerate(data_loader):
                images = images.to(device)
                labels = labels.to(device)
                with torch.set_grad_enabled(phase == "train"):
                    outputs = model(images)
                    output_one_hot = torch.argmax(outputs, dim=1)
                    loss = criterion(outputs, labels)

                    # Backward + optimize only if in training loop
                    if phase == "train":
                        optimizer.zero_grad()
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * images.size(0)
                running_corrects += torch.sum(output_one_hot == labels.data)

            # Update learning rate scheduler if provided and in training phase
            if schedular != None and phase == "train":
                    schedular.step()

            # Calculate epoch loss and accuracy
            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = 100 * running_corrects.double() / dataset_sizes[phase]

            # Print loss and accuracy
            data_loader.write(f'loss: {epoch_loss} acc: {epoch_acc}')

        epoch_time = time.time() - time_start

        if phase == "train":
            train_time_list.append(epoch_time) 

        epoch_loss_list[phase].append(epoch_losses)

    # Update total training time
    train_time += sum(train_time_list)

    # Create checkpoint dictionary
    checkpoint = {
        "epoch": num_epoch+epochs_completed,
        "criterion": criterion,
        "model_state": model.state_dict(),
        "optim_state": optimizer.state_dict(),
        "epoch_losses": epoch_loss_list,
        "time_taken": train_time
    }
    
    # Save checkpoint if specified
    if save_checkpoint == True:
        file_name = f"checkpoint_{time.time()}.pth"
        torch.save(checkpoint, file_name)
    
    return model, checkpoint


In [66]:
# Train the model using the provided data loader and training parameters for 5 epochs.
model, checkpoint = train_model(model, data_loader, num_epoch=5)

Training Started on cpu


Train Epoch 1/5: 100%|██████████| 270/270 [00:07<00:00, 35.75it/s]


loss: 1.130750706019225 acc: 37.407407407407405


Val Epoch 1/5: 100%|██████████| 30/30 [00:00<00:00, 46.01it/s]


loss: 1.0484357794125876 acc: 60.0


Train Epoch 2/5: 100%|██████████| 270/270 [00:06<00:00, 40.69it/s]


loss: 1.0324967414140702 acc: 48.148148148148145


Val Epoch 2/5: 100%|██████████| 30/30 [00:00<00:00, 48.48it/s]


loss: 0.9894797921180725 acc: 50.0


Train Epoch 3/5: 100%|██████████| 270/270 [00:06<00:00, 41.63it/s]


loss: 0.9546482502310364 acc: 55.18518518518518


Val Epoch 3/5: 100%|██████████| 30/30 [00:00<00:00, 49.70it/s]


loss: 1.004696120818456 acc: 60.0


Train Epoch 4/5: 100%|██████████| 270/270 [00:06<00:00, 40.95it/s]


loss: 0.8818027510687158 acc: 66.29629629629629


Val Epoch 4/5: 100%|██████████| 30/30 [00:00<00:00, 45.49it/s]


loss: 0.9184776902198791 acc: 53.333333333333336


Train Epoch 5/5: 100%|██████████| 270/270 [00:06<00:00, 42.08it/s]


loss: 0.8120905207263098 acc: 72.96296296296296


Val Epoch 5/5: 100%|██████████| 30/30 [00:00<00:00, 49.86it/s]

loss: 0.8406905780235926 acc: 70.0





In [67]:
# Enable gradient computation for all parameters in the model.
for param in model.parameters():
    param.requires_grad = True


In [68]:
# Train the model for an additional 5 epochs, using the previously trained model and checkpoint.
model, checkpoint = train_model(model, data_loader, num_epoch=5)

Training Started on cpu


Train Epoch 1/5: 100%|██████████| 270/270 [00:57<00:00,  4.69it/s]


loss: 0.11844719879391724 acc: 97.03703703703704


Val Epoch 1/5: 100%|██████████| 30/30 [00:00<00:00, 43.38it/s]


loss: 0.048870255773363167 acc: 100.0


Train Epoch 2/5: 100%|██████████| 270/270 [00:56<00:00,  4.79it/s]


loss: 0.0007161589090106578 acc: 100.0


Val Epoch 2/5: 100%|██████████| 30/30 [00:00<00:00, 48.13it/s]


loss: 0.06623064047671505 acc: 96.66666666666667


Train Epoch 3/5: 100%|██████████| 270/270 [00:55<00:00,  4.82it/s]


loss: 0.0003377330281013942 acc: 100.0


Val Epoch 3/5: 100%|██████████| 30/30 [00:00<00:00, 50.57it/s]


loss: 0.039476608947734346 acc: 100.0


Train Epoch 4/5: 100%|██████████| 270/270 [00:55<00:00,  4.84it/s]


loss: 0.00019978626757316913 acc: 100.0


Val Epoch 4/5: 100%|██████████| 30/30 [00:00<00:00, 49.43it/s]


loss: 0.027144681539963737 acc: 100.0


Train Epoch 5/5: 100%|██████████| 270/270 [00:59<00:00,  4.51it/s]


loss: 0.00013217514301855032 acc: 100.0


Val Epoch 5/5: 100%|██████████| 30/30 [00:00<00:00, 41.38it/s]

loss: 0.02715660879512143 acc: 100.0





In [70]:
# Load the pre-trained face detection model.
facedetect = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')

# Check if CUDA (GPU) is available and set the device accordingly.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(torch.cuda.is_available())

# Open the video capture device (webcam).
cap = cv2.VideoCapture(0)

# Set the resolution of the video capture.
cap.set(3, 640)  # Width
cap.set(4, 480)  # Height

# Define the font for displaying text on the image.
font = cv2.FONT_HERSHEY_COMPLEX

# Define the image transformations to be applied.
transform = transforms.Compose([
    transforms.ToPILImage(),        # Convert numpy array to PIL Image
    transforms.Resize((200, 200)),  # Resize image to 200x200
    transforms.ToTensor(),           # Convert image to PyTorch tensor
])

try:
    while True:
        # Capture frame-by-frame from the webcam.
        success, imgOrignal = cap.read()
        
        # Detect faces in the captured frame.
        faces = facedetect.detectMultiScale(imgOrignal, 1.3, 5)
        
        # Loop over each detected face.
        for x, y, w, h in faces:
            # Crop the detected face.
            crop_img = imgOrignal[y:y+h, x:x+h]
            
            # Apply the defined transformation to the cropped face.
            img = transform(crop_img)
            
            # Add batch dimension to the image tensor.
            img = img.unsqueeze(0)
            
            # Move the image tensor to the appropriate device (GPU or CPU).
            img = img.to(device)
            
            # Perform prediction using the trained model.
            prediction = model(img)
            
            # Get the predicted class index.
            classIndex = torch.argmax(prediction, dim=1)
            
            # Draw rectangle around the detected face.
            cv2.rectangle(imgOrignal, (x, y), (x+w, y+h), (0, 255, 0), 2)
            cv2.rectangle(imgOrignal, (x, y-40), (x+w, y), (0, 255, 0), -2)
            
            # Display the predicted class label on the image.
            cv2.putText(imgOrignal, str(class_names[classIndex]), (x, y-10), font, 0.75, (255, 255, 255), 1, cv2.LINE_AA)

        # Display the resulting image.
        cv2.imshow("Result", imgOrignal)
        
        # Wait for the 'q' key to be pressed to exit the loop.
        k = cv2.waitKey(1)
        if k == ord('q'):
            break
finally:
    # Release the video capture device and close all OpenCV windows.
    cap.release()
    cv2.destroyAllWindows()
    print("Screen Closed")


False
Screen Closed
