In [1]:
import os
import torch
import random
from PIL import Image
import torch.nn as nn
import torch.nn.functional as F
from torchvision.transforms import RandomHorizontalFlip, RandomRotation, ColorJitter,transforms
from FaceRecognitionModel import FaceRecognition,get_device

In [2]:
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'

In [3]:
folder_path = 'lfw_funneled'

In [4]:
image_folders = os.listdir(folder_path)
data_path = []
for path in image_folders:
    if '.' not in path:
        image_path = os.path.join(folder_path,path)
    if len(os.listdir(image_path)) >= 2:
        data_path.append(image_path)
    # break

In [5]:
len(data_path),data_path[0]

(1680, 'lfw_funneled\\Aaron_Peirsol')

In [6]:
transform1 = transforms.Compose([
    transforms.Resize((200, 200)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Updated transform with data augmentation
transform2 = transforms.Compose([
    RandomHorizontalFlip(), 
    RandomRotation(10),
    ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.Resize((200, 200)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [7]:
training = []
for path1,path2 in zip(data_path[:len(data_path)-1], data_path[1:]):

    images1 = os.listdir(path1)
    images2 = os.listdir(path2)

    anchor_path = os.path.join(path1,images1[0])
    positive_path = os.path.join(path1,images1[1])
    negative_path = os.path.join(path2,images2[0])

    anchor_image = Image.open(anchor_path)
    positive_image = Image.open(positive_path)
    negative_image = Image.open(negative_path)

    anchor = transform1(anchor_image)
    positive = transform1(positive_image)
    negative = transform1(negative_image)
    training.append([anchor,positive,negative])

for path1,path2 in zip(data_path[:len(data_path)-1], data_path[1:]):

    images1 = os.listdir(path1)
    images2 = os.listdir(path2)

    anchor_path = os.path.join(path1,images1[0])
    positive_path = os.path.join(path1,images1[1])
    negative_path = os.path.join(path2,images2[0])

    anchor_image = Image.open(anchor_path)
    positive_image = Image.open(positive_path)
    negative_image = Image.open(negative_path)

    anchor = transform2(anchor_image)
    positive = transform2(positive_image)
    negative = transform2(negative_image)
    training.append([anchor,positive,negative])
        

In [8]:
random.shuffle(training)

In [9]:
training[0][0].shape,training[0][1].shape,training[0][2].shape

(torch.Size([3, 200, 200]),
 torch.Size([3, 200, 200]),
 torch.Size([3, 200, 200]))

In [10]:
len(training),(len(training)*0.25)

(3358, 839.5)

In [11]:
# test_size = int(len(training)*0.25)
# testing = training[len(training)-test_size:]
# training = training[:len(training)-test_size]
# len(training),len(testing)

In [12]:
faceRecognition = FaceRecognition().to(get_device())

  return x.to(get_device())


In [13]:
1e-4

0.0001

In [14]:
criterian = nn.TripletMarginLoss(margin=1.0,reduction='mean').to(get_device())
optimizer = torch.optim.SGD(params=faceRecognition.parameters(), lr=0.001, momentum=0.7, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=5, factor=0.85, verbose=True)



In [15]:
EPOCHS = 100
BATCH_SIZE = 30

In [16]:
faceRecognition.train()

FaceRecognition(
  (embedding): CNN(
    (conv1): Conv2d(3, 64, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (conv2): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (conv3): Conv2d(128, 256, kernel_size=(2, 2), stride=(1, 1), padding=(1, 1))
    (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (dropout): Dropout(p=0.2, inplace=False)
    (linear1): Linear(in_features=30976, out_features=1024, bias=True)
    (linear2): Linear(in_features=1024, out_features=512, bias=True)
    (output): Linear(in_features=512, out_features=256, bias=True)
  )
)

In [17]:
def dataFormate(data):
    anchor,positive,negative = [],[],[]
    for i in range(len(data)):
        anchor.append(data[i][0])
        positive.append(data[i][1])
        negative.append(data[i][2])
    
    return torch.stack(anchor).to(get_device()),torch.stack(positive).to(get_device()), torch.stack(negative).to(get_device())

In [18]:
# dataFormate([torch.randn(3,200,200),torch.randn(3,200,200),torch.randn(3,200,200)])

In [19]:
torch.cuda.empty_cache()

In [20]:
# for epoch in range(EPOCHS):
#     running_loss = 0.0

#     for i in range(0,len(training),BATCH_SIZE):
#         data = training[i:i+BATCH_SIZE]
#         anchor_tensor, positive_tensor, negative_tensor = dataFormate(data)
        
#         anchor_tensor = anchor_tensor.to(get_device())
#         positive_tensor = positive_tensor.to(get_device())
#         negative_tensor = negative_tensor.to(get_device())

#         anchor_embedding = faceRecognition(anchor_tensor)
#         positive_embedding = faceRecognition(positive_tensor)
#         negative_embedding = faceRecognition(negative_tensor)

#         loss = criterian(anchor_embedding,positive_embedding,negative_embedding)
#         loss.backward()
#         optimizer.step()

#         running_loss += loss.item()
#     avg_loss = running_loss / (len(training)//BATCH_SIZE)
#     scheduler.step(loss)
#     print(f"Epoch {epoch} , loss :- {avg_loss:.4f}")

In [21]:
from sklearn.model_selection import KFold

# Assuming `training_data` is your dataset
k_folds = 5  # Number of folds
kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)

# Store results
fold_results = []

for fold, (train_idx, val_idx) in enumerate(kf.split(training)):
    print(f'Fold {fold+1}/{k_folds}')
    
    # Create training and validation sets
    train_data = [training[i] for i in train_idx]
    val_data = [training[i] for i in val_idx]

    # Initialize your model, loss, optimizer, and scheduler
    faceRecognition = FaceRecognition().to(get_device())
    criterian = nn.TripletMarginLoss(margin=1.0, reduction='mean').to(get_device())
    optimizer = torch.optim.SGD(params=faceRecognition.parameters(), lr=0.001, momentum=0.7, weight_decay=1e-4)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=5, factor=0.7, verbose=True)
    
    for epoch in range(EPOCHS):
        running_loss = 0.0
        
        # Training loop
        faceRecognition.train()
        for i in range(0, len(train_data), BATCH_SIZE):
            data = train_data[i:i+BATCH_SIZE]
            anchor_tensor, positive_tensor, negative_tensor = dataFormate(data)

            # Move data tensors to the correct device
            anchor_tensor = anchor_tensor.to(get_device())
            positive_tensor = positive_tensor.to(get_device())
            negative_tensor = negative_tensor.to(get_device())

            # Forward pass
            anchor_embedding = faceRecognition(anchor_tensor)
            positive_embedding = faceRecognition(positive_tensor)
            negative_embedding = faceRecognition(negative_tensor)

            # Compute loss
            loss = criterian(anchor_embedding, positive_embedding, negative_embedding)
            
            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        avg_loss = running_loss / (len(train_data) // BATCH_SIZE)
        scheduler.step(avg_loss)
        print(f"Epoch {epoch+1} , loss :- {avg_loss:.4f}")

    # Validation loop
    faceRecognition.eval()
    val_loss = 0.0
    with torch.no_grad():
        for i in range(0, len(val_data), BATCH_SIZE):
            data = val_data[i:i+BATCH_SIZE]
            anchor_tensor, positive_tensor, negative_tensor = dataFormate(data)

            # Move data tensors to the correct device
            anchor_tensor = anchor_tensor.to(get_device())
            positive_tensor = positive_tensor.to(get_device())
            negative_tensor = negative_tensor.to(get_device())

            # Forward pass
            anchor_embedding = faceRecognition(anchor_tensor)
            positive_embedding = faceRecognition(positive_tensor)
            negative_embedding = faceRecognition(negative_tensor)

            # Compute loss
            loss = criterian(anchor_embedding, positive_embedding, negative_embedding)
            val_loss += loss.item()

    avg_val_loss = val_loss / (len(val_data) // BATCH_SIZE)
    print(f"Fold {fold+1} , Validation Loss: {avg_val_loss:.4f}")
    
    fold_results.append(avg_val_loss)

# Calculate average validation loss across all folds
average_val_loss = sum(fold_results) / k_folds
print(f'Average Validation Loss across {k_folds} folds: {average_val_loss:.4f}')


Fold 1/5




Epoch 1 , loss :- 1.0098
Epoch 2 , loss :- 1.0059
Epoch 3 , loss :- 1.0016
Epoch 4 , loss :- 0.9887
Epoch 5 , loss :- 0.9691
Epoch 6 , loss :- 0.9420
Epoch 7 , loss :- 0.9149
Epoch 8 , loss :- 0.8810
Epoch 9 , loss :- 0.8519
Epoch 10 , loss :- 0.8227
Epoch 11 , loss :- 0.7893
Epoch 12 , loss :- 0.7567
Epoch 13 , loss :- 0.7319
Epoch 14 , loss :- 0.7103
Epoch 15 , loss :- 0.6914
Epoch 16 , loss :- 0.6833
Epoch 17 , loss :- 0.6752
Epoch 18 , loss :- 0.6656
Epoch 19 , loss :- 0.6554
Epoch 20 , loss :- 0.6529
Epoch 21 , loss :- 0.6383
Epoch 22 , loss :- 0.6079
Epoch 23 , loss :- 0.5957
Epoch 24 , loss :- 0.6045
Epoch 25 , loss :- 0.5901
Epoch 26 , loss :- 0.6103
Epoch 27 , loss :- 0.5901
Epoch 28 , loss :- 0.5889
Epoch 29 , loss :- 0.5969
Epoch 30 , loss :- 0.5802
Epoch 31 , loss :- 0.5866
Epoch 32 , loss :- 0.5595
Epoch 33 , loss :- 0.5579
Epoch 34 , loss :- 0.5401
Epoch 35 , loss :- 0.5586
Epoch 36 , loss :- 0.5535
Epoch 37 , loss :- 0.5504
Epoch 38 , loss :- 0.5385
Epoch 39 , loss :- 0.

In [22]:
import torch.nn.functional as F

def calculate_distance(embedding1, embedding2):
    return F.pairwise_distance(embedding1, embedding2)


In [23]:
# Save only the model's state dictionary
# torch.save(faceRecognition.state_dict(), 'face_recognition_triplet.pth')
torch.save(faceRecognition.state_dict(), 'face_recognition_triplet2.pth')


In [28]:
# Example: Evaluate on a validation dataset
validation_pairs = [(anchor_tensor, positive_tensor), (anchor_tensor, negative_tensor)]  # Example pairs
labels = [1, 0]  # 1 for same person, 0 for different persons
validation_pairs, labels = [],[]

with torch.no_grad():
    for data in training:
        anchor,positive,negative = dataFormate(data)
        anchor_tensor = faceRecognition(anchor.unsqueeze(0))
        positive_tensor = faceRecognition(positive.unsqueeze(0))
        negative_tensor = faceRecognition(negative.unsqueeze(0))

        validation_pairs.append((anchor_tensor,positive_tensor))
        validation_pairs.append((anchor_tensor, negative_tensor))
        labels.append(1)
        labels.append(0)
distances = []
for (embedding1, embedding2) in validation_pairs:
    distances.append(calculate_distance(embedding1, embedding2).item())

# Try different thresholds and pick the best
thresholds = []  # Example thresholds
thr = 0.5
while thr <= 30:
    thresholds.append(thr)
    thr += 0.5
print(thresholds)
best_threshold = 0
best_accuracy = 0

for threshold in thresholds:
    predictions = [(d < threshold) for d in distances]
    accuracy = sum([p == l for p, l in zip(predictions, labels)]) / len(labels)
    if accuracy > best_accuracy:
        best_accuracy = accuracy
        best_threshold = threshold

print(f"Best Threshold: {best_threshold} with accuracy {best_accuracy}")


[0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5, 8.0, 8.5, 9.0, 9.5, 10.0, 10.5, 11.0, 11.5, 12.0, 12.5, 13.0, 13.5, 14.0, 14.5, 15.0, 15.5, 16.0, 16.5, 17.0, 17.5, 18.0, 18.5, 19.0, 19.5, 20.0, 20.5, 21.0, 21.5, 22.0, 22.5, 23.0, 23.5, 24.0, 24.5, 25.0, 25.5, 26.0, 26.5, 27.0, 27.5, 28.0, 28.5, 29.0, 29.5, 30.0]
Best Threshold: 0.5 with accuracy 0.5321620011911852
