In [1]:
from facenet_pytorch import MTCNN, InceptionResnetV1, fixed_image_standardization, training
import torch
import torchvision
from torch.utils.data import DataLoader, SubsetRandomSampler
from torch import optim
from torch.optim.lr_scheduler import MultiStepLR
from torch.utils.tensorboard import SummaryWriter
from torchvision import datasets, transforms
from torchvision.datasets import ImageFolder
import numpy as np
import os
import cv2
import math
from PIL import Image
import pickle

In [2]:
data_dir = 'Faces'

batch_size = 32
epochs = 20
workers = 0 if os.name == 'nt' else 8

In [3]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print('Running on device: {}'.format(device))
# device = 'cpu'

Running on device: cuda:0


In [4]:
mtcnn = MTCNN(
    image_size=160, margin=0, min_face_size=30,
    thresholds=[0.4, 0.4, 0.4], factor=0.709, post_process=True,
    device=device
)

In [5]:
dataset = datasets.ImageFolder(data_dir, transform=transforms.Resize((512, 512)))
dataset.samples = [
    (p, p.replace(data_dir, data_dir + '_cropped'))
        for p, _ in dataset.samples
]
loader = DataLoader(
    dataset,
    num_workers=workers,
    batch_size=batch_size,
    collate_fn=training.collate_pil
)

In [6]:
for i, (x, y) in enumerate(loader):
    # print(len(x))
    for img,path in zip(x,y):
        mtcnn(img,save_path=path)
    print('\rBatch {} of {}'.format(i + 1, len(loader)), end='')

Batch 10 of 10

In [7]:
# Remove mtcnn to reduce GPU memory usage
del mtcnn

In [8]:
transform = transforms.Compose(
    [
        transforms.RandomRotation(degrees=15),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
        transforms.RandomResizedCrop(size=(128, 128), scale=(0.8, 1.0)),
        transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
        transforms.GaussianBlur(kernel_size=3),
        transforms.Resize(size=(160, 160)),
        transforms.ToTensor(),
    ]
)

transform = transforms.Compose([transform])

In [9]:
dataset = ImageFolder(root='Faces_cropped', transform=transform)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

In [10]:
save_dir = 'augmented/'
os.makedirs(save_dir, exist_ok=True)

# Save augmented images
for i, (images, _) in enumerate(dataloader):
    for j, image in enumerate(images):
        img_path = os.path.join(save_dir, f'image_{i * len(images) + j}.jpg')
        torchvision.utils.save_image(image, img_path)

print("Images saved successfully.")

Images saved successfully.


In [11]:
resnet = InceptionResnetV1(
    classify=True,
    pretrained='vggface2',
    num_classes=len(dataset.class_to_idx)
).to(device)

In [12]:
optimizer = optim.Adam(resnet.parameters(), lr=0.001)
scheduler = MultiStepLR(optimizer, [5, 10])

trans = transforms.Compose([
    np.float32,
    transforms.ToTensor(),
    fixed_image_standardization
])
dataset = datasets.ImageFolder(data_dir + '_cropped', transform=trans)
img_inds = np.arange(len(dataset))
np.random.shuffle(img_inds)
train_inds = img_inds[:int(0.8 * len(img_inds))]
val_inds = img_inds[int(0.8 * len(img_inds)):]

train_loader = DataLoader(
    dataset,
    num_workers=workers,
    batch_size=batch_size,
    sampler=SubsetRandomSampler(train_inds)
)
val_loader = DataLoader(
    dataset,
    num_workers=workers,
    batch_size=batch_size,
    sampler=SubsetRandomSampler(val_inds)
)

In [13]:
loss_fn = torch.nn.CrossEntropyLoss()
metrics = {
    'fps': training.BatchTimer(),
    'acc': training.accuracy
}

In [14]:
writer = SummaryWriter()
writer.iteration, writer.interval = 0, 10

print('\n\nInitial')
print('-' * 10)
resnet.eval()
training.pass_epoch(
    resnet, loss_fn, val_loader,
    batch_metrics=metrics, show_running=True, device=device,
    writer=writer
)

for epoch in range(epochs):
    print('\nEpoch {}/{}'.format(epoch + 1, epochs))
    print('-' * 10)

    resnet.train()
    training.pass_epoch(
        resnet, loss_fn, train_loader, optimizer, scheduler,
        batch_metrics=metrics, show_running=True, device=device,
        writer=writer
    )

    resnet.eval()
    training.pass_epoch(
        resnet, loss_fn, val_loader,
        batch_metrics=metrics, show_running=True, device=device,
        writer=writer
    )

writer.close()



Initial
----------
Valid |     1/1    | loss:    0.6948 | fps:   62.0697 | acc:    0.5185   

Epoch 1/20
----------
Train |     4/4    | loss:    0.5365 | fps:   37.2436 | acc:    0.7266   
Valid |     1/1    | loss:   67.0453 | fps:  157.1057 | acc:    0.2963   

Epoch 2/20
----------
Train |     4/4    | loss:    0.3829 | fps:   46.2814 | acc:    0.9219   
Valid |     1/1    | loss: 2986.6252 | fps:  171.0252 | acc:    0.2963   

Epoch 3/20
----------
Train |     4/4    | loss:    0.2077 | fps:   47.9488 | acc:    0.9516   
Valid |     1/1    | loss: 1836.7736 | fps:  155.5471 | acc:    0.2963   

Epoch 4/20
----------
Train |     4/4    | loss:    0.0161 | fps:   48.3089 | acc:    1.0000   
Valid |     1/1    | loss:  602.8787 | fps:  172.7942 | acc:    0.2963   

Epoch 5/20
----------
Train |     4/4    | loss:    0.6076 | fps:   49.3834 | acc:    0.8687   
Valid |     1/1    | loss:  122.6568 | fps:  172.8129 | acc:    0.4074   

Epoch 6/20
----------
Train |     4/4    | loss: 

In [15]:
mtcnn0=MTCNN(image_size=240,margin=0,keep_all=False,min_face_size=30)

In [16]:
mtcnn=MTCNN(image_size=240,margin=0,keep_all=True,min_face_size=30)

In [17]:
dataset=datasets.ImageFolder('Faces')

In [18]:
idx_to_class={i:c for c,i in dataset.class_to_idx.items()}

In [19]:
def collate_fn(x):
  return x[0]

In [20]:
loader=DataLoader(dataset,collate_fn=collate_fn)

In [22]:
name_list=[]
embedding_list=[]

resnet.to(device)

for img,idx in loader:
  # print(img)
  # print(idx)
  face,prob=mtcnn0(img,return_prob=True)
  if face is not None and prob>0.9:
    face = face.to(device)
    emb=resnet((face.unsqueeze(0)))
    embedding_list.append(emb)
    name_list.append(idx_to_class[idx])


In [23]:
# Saving embedding_list and name_list
with open('NamitAryan20_emb/embedding_list.pkl', 'wb') as f:
    pickle.dump(embedding_list, f)

with open('NamitAryan20_emb/name_list.pkl', 'wb') as f:
    pickle.dump(name_list, f)


In [29]:
# Loading embedding_list and name_list
with open('NamitAryan20_emb/embedding_list.pkl', 'rb') as f:
    embedding_list = pickle.load(f)

with open('NamitAryan20_emb/name_list.pkl', 'rb') as f:
    name_list = pickle.load(f)

In [24]:
# cv2.namedWindow("preview")

In [25]:
# video = cv2.VideoCapture('20240402_182307.mp4')

In [26]:
# fps = math.ceil(video.get(cv2.CAP_PROP_FPS))
# print('frames per second =',fps)

In [27]:
# if not video.isOpened():
#     print("Error: Could not open video.")

In [28]:
# interval_frames = int(fps * 1/30)
# print(interval_frames)

In [31]:
cv2.namedWindow("preview")
# video = cv2.VideoCapture('20240402_182307.mp4')
video = cv2.VideoCapture('20240418_122036.mp4')
# video = cv2.VideoCapture('20240418_124735.mp4')
fps = math.ceil(video.get(cv2.CAP_PROP_FPS))
print('frames per second =',fps)

if not video.isOpened():
    print("Error: Could not open video.")

interval_frames = int(fps * 1/30)
print(interval_frames)

target_size = (1536, 864)

fc=0
resnet.to(device)
while True:
  ret,frame=video.read()
#   print(frame.shape)
#   break
  if not ret:
    break
  
  resized_frame = cv2.resize(frame, target_size)
  
  if fc%interval_frames==0:
      img=Image.fromarray(resized_frame)
      img_cropped_list,prob_list=mtcnn(img,return_prob=True)
      if img_cropped_list is not None:
         boxes,_=mtcnn.detect(img)
         for i,prob in enumerate(prob_list):
             if prob>0.9:
                 face_tensor = torch.tensor(img_cropped_list[i], dtype=torch.float32).unsqueeze(0).to(device).detach()
                 emb = resnet(face_tensor).detach()
                #  emb=resnet(img_cropped_list[i].unsqueeze(0)).detach()
                 dist_list=[]
                 for idx,emb_db in enumerate(embedding_list):
                     dist=torch.dist(emb,emb_db).item()
                     dist_list.append(dist)
                 min_dist=min(dist_list)
                 box=boxes[i]
                 if min_dist<0.2:
                     min_dist_idx=dist_list.index(min_dist)
                     name=name_list[min_dist_idx]
                #  original_frame=resized_frame.copy()
                     resized_frame = cv2.putText(resized_frame, name+' '+str(min_dist), (int(box[0]),int(box[1])), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0),1, cv2.LINE_AA)
                 resized_frame = cv2.rectangle(resized_frame, (int(box[0]),int(box[1])) , (int(box[2]),int(box[3])), (255,0,0), 2)
      cv2.imshow("preview",resized_frame)
      if cv2.waitKey(25) & 0xFF == ord('q'):
          break
  fc+= 1
video.release()
cv2.destroyAllWindows()
      

frames per second = 31
1


  face_tensor = torch.tensor(img_cropped_list[i], dtype=torch.float32).unsqueeze(0).to(device).detach()


In [55]:
# cv2.namedWindow("preview")
# cam=cv2.VideoCapture(4)

# target_size = (1000, 500)

# fc=0
# resnet.to(device)
# while True:
#   ret,frame=cam.read()
# #   print(frame.shape)
# #   break
#   if not ret:
#     break
  
#   resized_frame = cv2.resize(frame, target_size)
  
#   if fc%interval_frames==0:
#       img=Image.fromarray(resized_frame)
#       img_cropped_list,prob_list=mtcnn(img,return_prob=True)
#       if img_cropped_list is not None:
#          boxes,_=mtcnn.detect(img)
#          for i,prob in enumerate(prob_list):
#              if prob>0.9:
#                  face_tensor = torch.tensor(img_cropped_list[i], dtype=torch.float32).unsqueeze(0).to(device)
#                  emb = resnet(face_tensor).detach()
#                 #  emb=resnet(img_cropped_list[i].unsqueeze(0)).detach()
#                  dist_list=[]
#                  for idx,emb_db in enumerate(embedding_list):
#                      dist=torch.dist(emb,emb_db).item()
#                      dist_list.append(dist)
#                  min_dist=min(dist_list)
#                  min_dist_idx=dist_list.index(min_dist)
#                  name=name_list[min_dist_idx]
#                  box=boxes[i]
#                  original_frame=resized_frame.copy()
#                  if min_dist<0.9:
#                      resized_frame = cv2.putText(resized_frame, name+' '+str(min_dist), (int(box[0]),int(box[1])), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0),1, cv2.LINE_AA)
#                  resized_frame = cv2.rectangle(resized_frame, (int(box[0]),int(box[1])) , (int(box[2]),int(box[3])), (255,0,0), 2)
#       cv2.imshow("preview",resized_frame)
#       if cv2.waitKey(25) & 0xFF == ord('q'):
#           break
#   fc+= 1
# video.release()
# cv2.destroyAllWindows()
      

  face_tensor = torch.tensor(img_cropped_list[i], dtype=torch.float32).unsqueeze(0).to(device)
