In [1]:
from facenet_pytorch import MTCNN, InceptionResnetV1, fixed_image_standardization, training
import torch
import torchvision
from torch.utils.data import DataLoader, SubsetRandomSampler
from torch import optim
from torch.optim.lr_scheduler import MultiStepLR
from torch.utils.tensorboard import SummaryWriter
from torchvision import datasets, transforms
from torchvision.datasets import ImageFolder
import numpy as np
import os
import cv2
import math
from PIL import Image
import time
import pickle

In [2]:
data_dir = 'Faces'
batch_size = 8
epochs = 30
workers = 0 if os.name == 'nt' else 8

In [3]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print('Running on device: {}'.format(device))

Running on device: cuda:0


In [4]:
class FastMTCNN(object):
    """Fast MTCNN implementation."""
    
    def __init__(self, stride, resize=1, *args, **kwargs):
        """Constructor for FastMTCNN class.
        
        Arguments:
            stride (int): The detection stride. Faces will be detected every `stride` frames
                and remembered for `stride-1` frames.
        
        Keyword arguments:
            resize (float): Fractional frame scaling. [default: {1}]
            *args: Arguments to pass to the MTCNN constructor. See help(MTCNN).
            **kwargs: Keyword arguments to pass to the MTCNN constructor. See help(MTCNN).
        """
        self.stride = stride
        self.resize = resize
        self.mtcnn = MTCNN(*args, **kwargs)
        
    def __call__(self, frames,save_path):
        """Detect faces in frames using strided MTCNN."""
        if self.resize != 1:
            frames = [
                cv2.resize(f, (int(f.shape[1] * self.resize), int(f.shape[0] * self.resize)))
                    for f in frames
            ]
        else:
            frames_resized = [np.array(frames)]
                      
        boxes, probs = self.mtcnn.detect(frames_resized[::self.stride])

        faces = []
        for i, frame in enumerate(frames_resized):
            box_ind = int(i / self.stride)
            if boxes[box_ind] is None:
                continue
            for j, box in enumerate(boxes[box_ind]):
                box = [int(b) for b in box]
                faces.append(frame[box[1]:box[3], box[0]:box[2]])
                frame_pil = Image.fromarray(frame)
                cropped_image = frame_pil.crop((box[0], box[1], box[2], box[3]))
                timestamp = int(time.time() * 1000)
                cropped_image.save(os.path.join(save_path, f"cropped_{timestamp}.jpg"))
        
        return faces

In [5]:
mtcnn = FastMTCNN(
    image_size=160, margin=0, min_face_size=30,
    thresholds=[0.4, 0.4, 0.4], factor=0.709, post_process=True,
    device=device,stride=1
)

In [6]:
dataset = datasets.ImageFolder(data_dir, transform=transforms.Resize((512, 512)))
dataset.samples = [
    (p, p.replace(data_dir, data_dir + '_cropped'))
        for p, _ in dataset.samples
]
loader = DataLoader(
    dataset,
    num_workers=workers,
    batch_size=batch_size,
    collate_fn=training.collate_pil
)

In [7]:
for i, (x, y) in enumerate(loader):
    # print(len(x))
    for img,path in zip(x,y):
        save_dir = os.path.dirname(path)
        os.makedirs(save_dir, exist_ok=True)
        mtcnn(img,save_path=save_dir)
    print('\rBatch {} of {}'.format(i + 1, len(loader)), end='')

Batch 58 of 58

In [8]:
# Remove mtcnn to reduce GPU memory usage
del mtcnn

In [9]:
transform = transforms.Compose([
    transforms.RandomRotation(degrees=15),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.RandomResizedCrop(size=(128, 128), scale=(0.8, 1.0)),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
    transforms.GaussianBlur(kernel_size=3),
    transforms.Resize(size=(160, 160)),
    transforms.ToTensor()
])

transform = transforms.Compose([
    transform
])

In [10]:
dataset = ImageFolder(root='Faces_cropped', transform=transform)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True)

In [11]:
save_dir = 'Faces_cropped'
os.makedirs(save_dir, exist_ok=True)

# Save augmented images
for i, (images, labels) in enumerate(dataloader):
    for j, (image, label) in enumerate(zip(images, labels)):
        if label==0:
            label_dir = os.path.join(save_dir, f'Kush')
        elif label==1:
            label_dir = os.path.join(save_dir, f'Namit')
        os.makedirs(label_dir, exist_ok=True)
        img_path = os.path.join(label_dir, f'image_{i * len(images) + j}.jpg')
        torchvision.utils.save_image(image, img_path)

print("Images saved successfully.")

Images saved successfully.


In [12]:
resnet = InceptionResnetV1(
    classify=True,
    pretrained='vggface2',
    num_classes=len(dataset.class_to_idx)
).to(device)

In [13]:
optimizer = optim.Adam(resnet.parameters(), lr=0.001)
scheduler = MultiStepLR(optimizer, [5, 10])

trans = transforms.Compose([
    transforms.Resize((160, 160)),
    np.float32,
    transforms.ToTensor(),
    fixed_image_standardization
])
dataset = datasets.ImageFolder(data_dir + '_cropped', transform=trans)
img_inds = np.arange(len(dataset))
np.random.shuffle(img_inds)
train_inds = img_inds[:int(0.8 * len(img_inds))]
val_inds = img_inds[int(0.8 * len(img_inds)):]

train_loader = DataLoader(
    dataset,
    num_workers=workers,
    batch_size=batch_size,
    sampler=SubsetRandomSampler(train_inds)
)
val_loader = DataLoader(
    dataset,
    num_workers=workers,
    batch_size=batch_size,
    sampler=SubsetRandomSampler(val_inds)
)

In [14]:
loss_fn = torch.nn.CrossEntropyLoss()
metrics = {
    'fps': training.BatchTimer(),
    'acc': training.accuracy
}

In [15]:
writer = SummaryWriter()
writer.iteration, writer.interval = 0, 10

print('\n\nInitial')
print('-' * 10)
resnet.eval()
training.pass_epoch(
    resnet, loss_fn, val_loader,
    batch_metrics=metrics, show_running=True, device=device,
    writer=writer
)

for epoch in range(epochs):
    print('\nEpoch {}/{}'.format(epoch + 1, epochs))
    print('-' * 10)

    resnet.train()
    training.pass_epoch(
        resnet, loss_fn, train_loader, optimizer, scheduler,
        batch_metrics=metrics, show_running=True, device=device,
        writer=writer
    )

    resnet.eval()
    training.pass_epoch(
        resnet, loss_fn, val_loader,
        batch_metrics=metrics, show_running=True, device=device,
        writer=writer
    )

writer.close()



Initial
----------
Valid |    16/16   | loss:    0.7317 | fps:   59.1714 | acc:    0.4557   

Epoch 1/30
----------
Train |    63/63   | loss:    0.3533 | fps:   26.2659 | acc:    0.8798   
Valid |    16/16   | loss:    1.6546 | fps:   87.9705 | acc:    0.7682   

Epoch 2/30
----------
Train |    63/63   | loss:    0.1829 | fps:   27.6840 | acc:    0.9444   
Valid |    16/16   | loss:    0.0227 | fps:   81.6568 | acc:    1.0000   

Epoch 3/30
----------
Train |    63/63   | loss:    0.1308 | fps:   29.3208 | acc:    0.9623   
Valid |    16/16   | loss:    0.0159 | fps:   91.0847 | acc:    1.0000   

Epoch 4/30
----------
Train |    63/63   | loss:    0.0475 | fps:   29.2221 | acc:    0.9921   
Valid |    16/16   | loss:    0.0053 | fps:   87.2514 | acc:    1.0000   

Epoch 5/30
----------
Train |    63/63   | loss:    0.1779 | fps:   27.0312 | acc:    0.9722   
Valid |    16/16   | loss:    0.1343 | fps:   89.7069 | acc:    0.9740   

Epoch 6/30
----------
Train |    63/63   | loss: 

In [16]:
mtcnn0=MTCNN(image_size=240,margin=0,keep_all=False,min_face_size=30)

In [17]:
mtcnn=MTCNN(image_size=240,margin=0,keep_all=True,min_face_size=30)

In [18]:
dataset=datasets.ImageFolder('Faces')

In [19]:
idx_to_class={i:c for c,i in dataset.class_to_idx.items()}

In [20]:
def collate_fn(x):
  return x[0]

In [21]:
loader=DataLoader(dataset,collate_fn=collate_fn)

In [22]:
torch.cuda.empty_cache()

In [23]:
# os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

In [None]:
name_list=[]
embedding_list=[]

resnet.to(device)

for img,idx in loader:
  # print(img)
  # print(idx)
  face,prob=mtcnn0(img,return_prob=True)
  if face is not None and prob>0.9:
    face = face.to(device)
    emb=resnet((face.unsqueeze(0)))
    embedding_list.append(emb)
    name_list.append(idx_to_class[idx])


In [None]:
# Saving embedding_list and name_list
with open('NamitKush30_emb/embedding_list.pkl', 'wb') as f:
    pickle.dump(embedding_list, f)

with open('NamitKush30_emb/name_list.pkl', 'wb') as f:
    pickle.dump(name_list, f)


In [None]:
# Loading embedding_list and name_list
with open('NamitKush30_emb/embedding_list.pkl', 'rb') as f:
    embedding_list = pickle.load(f)

with open('NamitKush30_emb/name_list.pkl', 'rb') as f:
    name_list = pickle.load(f)

In [41]:
# cv2.namedWindow("preview")
# # video = cv2.VideoCapture('Namit.mp4')
# # video = cv2.VideoCapture('20240418_122036.mp4')
# video = cv2.VideoCapture('video2.mp4')
# fps = math.ceil(video.get(cv2.CAP_PROP_FPS))
# print('frames per second =',fps)

# if not video.isOpened():
#     print("Error: Could not open video.")

# interval_frames = int(fps * 1/30)
# print(interval_frames)

# target_size = (1536, 864)

# fc=0
# resnet.to(device)
# while True:
#   ret,frame=video.read()
# #   print(frame.shape)
# #   break
#   if not ret:
#     break
  
#   resized_frame = cv2.resize(frame, target_size)
  
#   if fc%interval_frames==0:
#       img=Image.fromarray(resized_frame)
#       img_cropped_list,prob_list=mtcnn(img,return_prob=True)
#       if img_cropped_list is not None:
#          boxes,_=mtcnn.detect(img)
#          for i,prob in enumerate(prob_list):
#              if prob>0.9:
#                  face_tensor = torch.tensor(img_cropped_list[i], dtype=torch.float32).unsqueeze(0).to(device).detach()
#                  emb = resnet(face_tensor).detach()
#                 #  emb=resnet(img_cropped_list[i].unsqueeze(0)).detach()
#                  dist_list=[]
#                  for idx,emb_db in enumerate(embedding_list):
#                      dist=torch.dist(emb,emb_db).item()
#                      dist_list.append(dist)
#                  min_dist=min(dist_list)
#                  box=boxes[i]
#                  if min_dist<0.2:
#                      min_dist_idx=dist_list.index(min_dist)
#                      name=name_list[min_dist_idx]
#                 #  original_frame=resized_frame.copy()
#                      resized_frame = cv2.putText(resized_frame, name+' '+str(min_dist), (int(box[0]),int(box[1])), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0),1, cv2.LINE_AA)
#                  resized_frame = cv2.rectangle(resized_frame, (int(box[0]),int(box[1])) , (int(box[2]),int(box[3])), (255,0,0), 2)
#       cv2.imshow("preview",resized_frame)
#       if cv2.waitKey(25) & 0xFF == ord('q'):
#           break
#   fc+= 1
# video.release()
# cv2.destroyAllWindows()
      

frames per second = 31
1


  face_tensor = torch.tensor(img_cropped_list[i], dtype=torch.float32).unsqueeze(0).to(device).detach()


In [None]:
n=2
v=[]
count=0

In [None]:
for j in range(n):
    # print(j)
    video = cv2.VideoCapture(f'videos2/video{j+1}.mp4')
    fps = math.ceil(video.get(cv2.CAP_PROP_FPS))
    print('frames per second =',fps)
    if not video.isOpened():
        print("Error: Could not open video.")
    interval_frames = int(fps * 1/30)
    print('interval_frames=',interval_frames)
    fc=0
    while (video.isOpened()):
      ret,frame=video.read()
      if not ret:
        break
      if fc%interval_frames==0:
          img=Image.fromarray(frame)
          img_cropped_list,prob_list=mtcnn(img,return_prob=True)
          if img_cropped_list is not None:
             boxes,_=mtcnn.detect(img)
             for i,prob in enumerate(prob_list):
                 if prob>0.9:
                     face_tensor = torch.tensor(img_cropped_list[i], dtype=torch.float32).unsqueeze(0).to(device).detach()
                     emb = resnet(face_tensor).detach()
                     dist_list=[]
                     for idx,emb_db in enumerate(embedding_list):
                         dist=torch.dist(emb,emb_db).item()
                         dist_list.append(dist)
                     min_dist=min(dist_list)
                     min_dist_idx=dist_list.index(min_dist)
                     name=name_list[min_dist_idx]
                     box=boxes[i]
                     original_frame=frame.copy()
                     if min_dist<0.5:
                         frame = cv2.putText(frame, name+' '+str(min_dist), (int(box[0]),int(box[1])), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0),1, cv2.LINE_AA)
                     frame = cv2.rectangle(frame, (int(box[0]),int(box[1])) , (int(box[2]),int(box[3])), (255,0,0), 2)
                     ts = video.get(cv2.CAP_PROP_POS_MSEC)
                     resized_frame = cv2.resize(frame, (3840,2160))
                     print("j=",j+1)
                     v.append((resized_frame,ts))
                     count+=1
          # cv2.imshow("preview",frame)
          if cv2.waitKey(25) & 0xFF == ord('q'):
              break
      fc+= 1
    video.release()
    cv2.destroyAllWindows()

In [None]:
sv= sorted(v, key=lambda x: x[1])

In [None]:
v.clear()

In [None]:
for i in sv:
    v.append((i[0]))

In [None]:
def create_video_from_frames(frames, output_video_path, frame_rate):
    frame_shape = frames[0].shape
    height, width, _ = frame_shape
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, frame_rate, (width, height))
    for frame in frames:
        out.write(frame)

    out.release()


output_video_path = "output2_video.mp4"
frame_rate = 60
create_video_from_frames(v, output_video_path, frame_rate)
