In [None]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from src.models.retinaface import cfg_re50
from src.models.retinaface import retina50
import numpy as np
from PIL import Image
import ast
import cv2 as cv
import time
from src.utils.retinaface import PriorBox
from torchvision.ops import nms
from src.utils.arcface import estimate_norm, LANDS_TEMPLATE


def decode(loc, priors, variances):
    """Decode locations from predictions using priors to undo
    the encoding we did for offset regression at train time.
    Args:
        loc (tensor): location predictions for loc layers,
            Shape: [num_priors,4]
        priors (tensor): Prior boxes in center-offset form.
            Shape: [num_priors,4].
        variances: (list[float]) Variances of priorboxes
    Return:
        decoded bounding box predictions
    """

    boxes = torch.cat((
        priors[:, :2] + loc[:, :2] * variances[0] * priors[:, 2:],
        priors[:, 2:] * torch.exp(loc[:, 2:] * variances[1])), 1)
    boxes[:, :2] -= boxes[:, 2:] / 2
    boxes[:, 2:] += boxes[:, :2]
    return boxes
def decode_landm(pre, priors, variances):
    """Decode landm from predictions using priors to undo
    the encoding we did for offset regression at train time.
    Args:
        pre (tensor): landm predictions for loc layers,
            Shape: [num_priors,10]
        priors (tensor): Prior boxes in center-offset form.
            Shape: [num_priors,4].
        variances: (list[float]) Variances of priorboxes
    Return:
        decoded landm predictions
    """
    landms = torch.cat((priors[:, :2] + pre[:, :2] * variances[0] * priors[:, 2:],
                        priors[:, :2] + pre[:, 2:4] * variances[0] * priors[:, 2:],
                        priors[:, :2] + pre[:, 4:6] * variances[0] * priors[:, 2:],
                        priors[:, :2] + pre[:, 6:8] * variances[0] * priors[:, 2:],
                        priors[:, :2] + pre[:, 8:10] * variances[0] * priors[:, 2:],
                        ), dim=1)
    return landms
def py_cpu_nms(dets, thresh):
    """Pure Python NMS baseline."""
    x1 = dets[:, 0]
    y1 = dets[:, 1]
    x2 = dets[:, 2]
    y2 = dets[:, 3]
    scores = dets[:, 4]

    areas = (x2 - x1 + 1) * (y2 - y1 + 1)
    order = scores.argsort()[::-1]

    keep = []
    while order.size > 0:
        i = order[0]
        keep.append(i)
        xx1 = np.maximum(x1[i], x1[order[1:]])
        yy1 = np.maximum(y1[i], y1[order[1:]])
        xx2 = np.minimum(x2[i], x2[order[1:]])
        yy2 = np.minimum(y2[i], y2[order[1:]])

        w = np.maximum(0.0, xx2 - xx1 + 1)
        h = np.maximum(0.0, yy2 - yy1 + 1)
        inter = w * h
        ovr = inter / (areas[i] + areas[order[1:]] - inter)

        inds = np.where(ovr <= thresh)[0]
        order = order[inds + 1]

    return keep
def check_keys(model, pretrained_state_dict):
    ckpt_keys = set(pretrained_state_dict.keys())
    model_keys = set(model.state_dict().keys())
    used_pretrained_keys = model_keys & ckpt_keys
    unused_pretrained_keys = ckpt_keys - model_keys
    missing_keys = model_keys - ckpt_keys
    print('Missing keys:{}'.format(len(missing_keys)))
    print('Unused checkpoint keys:{}'.format(len(unused_pretrained_keys)))
    print('Used keys:{}'.format(len(used_pretrained_keys)))
    assert len(used_pretrained_keys) > 0, 'load NONE from pretrained checkpoint'
    return True
def remove_prefix(state_dict, prefix):
    ''' Old style model is stored with all names of parameters sharing common prefix 'module.' '''
    print('remove prefix \'{}\''.format(prefix))
    f = lambda x: x.split(prefix, 1)[-1] if x.startswith(prefix) else x
    return {f(key): value for key, value in state_dict.items()}
def load_model(model, pretrained_path, load_to_cpu):
    print('Loading pretrained model from {}'.format(pretrained_path))
    if load_to_cpu:
        pretrained_dict = torch.load(pretrained_path, map_location=lambda storage, loc: storage)
    else:
        device = torch.cuda.current_device()
        pretrained_dict = torch.load(pretrained_path, map_location=lambda storage, loc: storage.cuda(device))
    if "state_dict" in pretrained_dict.keys():
        pretrained_dict = remove_prefix(pretrained_dict['state_dict'], 'module.')
    else:
        pretrained_dict = remove_prefix(pretrained_dict, 'module.')
    check_keys(model, pretrained_dict)
    model.load_state_dict(pretrained_dict, strict=False)
    return model

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = retina50()
model = load_model(model, "checkpoints/RetinaFace-R50.pth", device)
model.eval()
model = model.to(device)

# 1. extract wraped faces

In [None]:
image_path = "./testbench/img/testOmid.jpg"
img_raw = cv.imread(image_path, cv.IMREAD_COLOR)

img = np.float32(img_raw)
resize = 1

scale = torch.Tensor([img.shape[1], img.shape[0], img.shape[1], img.shape[0]])
im_height, im_width, _ = img.shape
img -= (104, 117, 123)
img = img.transpose(2, 0, 1)
img = torch.from_numpy(img).unsqueeze(0)
img = img.to(device)
scale = scale.to(device)

tic = time.time()
loc, conf, landms = model(img)  # forward pass
print('model forward time: {:.4f}'.format(time.time() - tic))

priorbox = PriorBox(cfg_re50, image_size=(im_height, im_width))
priors = priorbox.forward()
priors = priors.to(device)
prior_data = priors.data
boxes = decode(loc.data.squeeze(0), prior_data, cfg_re50['variance'])
boxes = boxes * scale / resize
boxes = boxes.cpu().numpy()
scores = conf.squeeze(0).data.cpu().numpy()[:, 1]
landms = decode_landm(landms.squeeze(0), prior_data, cfg_re50['variance'])
scale1 = torch.Tensor([img.shape[3], img.shape[2], img.shape[3], img.shape[2],
                        img.shape[3], img.shape[2], img.shape[3], img.shape[2],
                        img.shape[3], img.shape[2]])
scale1 = scale1.to(device)
landms = landms * scale1 / resize
landms = landms.cpu().detach().numpy()

# ignore low scores
inds = np.where(scores > 0.02)[0]
boxes = boxes[inds]
landms = landms[inds]
scores = scores[inds]

# keep top-K before NMS
order = scores.argsort()[::-1][:5000]
boxes = boxes[order]
landms = landms[order]
scores = scores[order]

# do NMS
dets = np.hstack((boxes, scores[:, np.newaxis])).astype(np.float32, copy=False)
keep = py_cpu_nms(dets, 0.4)
# keep = nms(dets, args.nms_threshold,force_cpu=args.cpu)
dets = dets[keep, :]
landms = landms[keep]

# keep top-K faster NMS
dets = dets[:11, :]
landms = landms[:11, :]

dets = np.concatenate((dets, landms), axis=1)
i = 0
for d in dets:
    if d[4] < 0.6:
        continue
    text = "{:.4f}".format(d[4])
    d = list(map(int, d))

    lm = landms[i]  # (10,)
    landmarks = np.array([
        [lm[0], lm[1]],   # left eye
        [lm[2], lm[3]],   # right eye
        [lm[4], lm[5]],   # nose
        [lm[6], lm[7]],   # mouth left
        [lm[8], lm[9]],   # mouth right
    ], dtype=np.float32)
    
    image = img_raw.copy()#cv.cvtColor(img_raw, cv.COLOR_BGR2RGB)
    face_lands_norm = cv.estimateAffinePartial2D(landmarks, LANDS_TEMPLATE, method=cv.LMEDS)[0]
    image = cv.warpAffine(img_raw, face_lands_norm, (112, 112), flags=cv.INTER_LINEAR)
    # image = np.transpose(image / 127.5 - 1.0, (2,0,1)).astype(np.float32)
    # cv.imshow("image", image)
    # cv.waitKey(5000)
    # cv.destroyAllWindows()
    name = f"E:/Amir/Projects/face/testbench/img/faces/{i}.jpg"
    cv.imwrite(name, image)
    i+=1

# 2. Insert vectors to qdrant

In [None]:
import tensorrt as trt
import pycuda.driver as cuda
import torchvision.transforms as transforms
import pycuda.autoinit  # initializes CUDA driver
import numpy as np
import time
import torch
import cv2 as cv

TRT_LOGGER = trt.Logger(trt.Logger.WARNING)

# 1. Load the engine
with open("checkpoints/arcface-r100-glint360k_fp16.engine", "rb") as f:
    runtime = trt.Runtime(TRT_LOGGER)
    engine = runtime.deserialize_cuda_engine(f.read())

# 2. Create context
context = engine.create_execution_context()

# 3. Allocate buffers
inputs, outputs, bindings, stream = [], [], [], cuda.Stream()

for i in range(engine.num_io_tensors):
    tensor_name = engine.get_tensor_name(i)

    size = trt.volume(engine.get_tensor_shape(tensor_name))
    dtype = trt.nptype(engine.get_tensor_dtype(tensor_name))

    host_mem = cuda.pagelocked_empty(size, dtype)
    device_mem = cuda.mem_alloc(host_mem.nbytes)

    bindings.append(int(device_mem))
    if engine.get_tensor_mode(tensor_name) == trt.TensorIOMode.INPUT:
        inputs.append((host_mem, device_mem))
    else:
        outputs.append((host_mem, device_mem))

def infer(input_numpy):
    context.set_input_shape("input", input_numpy.shape)
    # Copy input data to host buffer
    np.copyto(inputs[0][0], input_numpy.ravel())

    # Transfer to GPU
    cuda.memcpy_htod_async(inputs[0][1], inputs[0][0], stream)

    # Execute
    context.execute_v2(bindings)

    # Transfer outputs back
    cuda.memcpy_dtoh_async(outputs[0][0], outputs[0][1], stream)
    stream.synchronize()
    return outputs


In [None]:
from src.constants import QDRANT_PORT, QDRANT_HOST
from qdrant_client import QdrantClient
from qdrant_client.http import models
import os

collection_name = "faces"
client = QdrantClient(QDRANT_HOST, grpc_port=QDRANT_PORT)

In [None]:
for i, filename in enumerate( os.listdir("./testbench/img/faces")):
    name = filename.split(".")[0]
    image = cv.imread("./testbench/img/faces/"+ filename)
    cv.cvtColor(image, cv.COLOR_BGR2RGB, dst=image)
    image = np.transpose(cv.resize(image, (112,112)) / 127.5 - 1.0, (2,0,1)).astype(np.float32)
    vec = infer(image)
    vec = vec[0][0].tolist()

    client.upsert(
        collection_name=collection_name, 
        points=[models.PointStruct(id=i+1, vector=vec, payload={"name": name})])