In [43]:
import torch
import torch.nn as nn
import torch.nn.functional as F

#net and the loss function
def weights_init(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
        nn.init.xavier_uniform(m.weight.data)
        nn.init.constant(m.bias, 0.1)



class LossFn:
    def __init__(self, cls_factor=1, box_factor=1, landmark_factor=1):
        # loss function
        self.cls_factor = cls_factor
        self.box_factor = box_factor
        self.land_factor = landmark_factor
        self.loss_cls = nn.BCELoss() # binary cross entropy
        self.loss_box = nn.MSELoss() # mean square error
        self.loss_landmark = nn.MSELoss()


    def cls_loss(self,gt_label,pred_label):
        pred_label = torch.squeeze(pred_label)
        gt_label = torch.squeeze(gt_label)
        # get the mask element which >= 0, only 0 and 1 can effect the detection loss
        mask = torch.ge(gt_label,0)
        valid_gt_label = torch.masked_select(gt_label,mask)
        valid_pred_label = torch.masked_select(pred_label,mask)
        return self.loss_cls(valid_pred_label,valid_gt_label)*self.cls_factor


    def box_loss(self,gt_label,gt_offset,pred_offset):
        pred_offset = torch.squeeze(pred_offset)
        gt_offset = torch.squeeze(gt_offset)
        gt_label = torch.squeeze(gt_label)

        #get the mask element which != 0
        unmask = torch.eq(gt_label,0)
        mask = torch.eq(unmask,0)
        #convert mask to dim index
        chose_index = torch.nonzero(mask.data)
        chose_index = torch.squeeze(chose_index)
        #only valid element can effect the loss
        valid_gt_offset = gt_offset[chose_index,:]
        valid_pred_offset = pred_offset[chose_index,:]
        return self.loss_box(valid_pred_offset,valid_gt_offset)*self.box_factor


    def landmark_loss(self,gt_label,gt_landmark,pred_landmark):
        pred_landmark = torch.squeeze(pred_landmark)
        gt_landmark = torch.squeeze(gt_landmark)
        gt_label = torch.squeeze(gt_label)
        mask = torch.eq(gt_label,-2)

        chose_index = torch.nonzero(mask.data)
        chose_index = torch.squeeze(chose_index)

        valid_gt_landmark = gt_landmark[chose_index, :]
        valid_pred_landmark = pred_landmark[chose_index, :]
        return self.loss_landmark(valid_pred_landmark,valid_gt_landmark)*self.land_factor





class PNet(nn.Module):
    ''' PNet '''

    def __init__(self, is_train=False, use_cuda=True):
        super(PNet, self).__init__()
        self.is_train = is_train
        self.use_cuda = use_cuda

        # backend
        self.pre_layer = nn.Sequential(
            nn.Conv2d(3, 10, kernel_size=3, stride=1),  # conv1
            nn.PReLU(),  # PReLU1
            nn.MaxPool2d(kernel_size=2, stride=2),  # pool1
            nn.Conv2d(10, 16, kernel_size=3, stride=1),  # conv2
            nn.PReLU(),  # PReLU2
            nn.Conv2d(16, 32, kernel_size=3, stride=1),  # conv3
            nn.PReLU()  # PReLU3
        )
        # detection
        self.conv4_1 = nn.Conv2d(32, 1, kernel_size=1, stride=1)
        # bounding box regresion
        self.conv4_2 = nn.Conv2d(32, 4, kernel_size=1, stride=1)
        # landmark localization
        self.conv4_3 = nn.Conv2d(32, 10, kernel_size=1, stride=1)

        # weight initiation with xavier
        self.apply(weights_init)

    def forward(self, x):
        x = self.pre_layer(x)
        label = F.sigmoid(self.conv4_1(x))
        offset = self.conv4_2(x)
        # landmark = self.conv4_3(x)

        if self.is_train is True:
            # label_loss = LossUtil.label_loss(self.gt_label,torch.squeeze(label))
            # bbox_loss = LossUtil.bbox_loss(self.gt_bbox,torch.squeeze(offset))
            return label,offset
        #landmark = self.conv4_3(x)
        return label, offset





class RNet(nn.Module):
    ''' RNet '''

    def __init__(self,is_train=False, use_cuda=True):
        super(RNet, self).__init__()
        self.is_train = is_train
        self.use_cuda = use_cuda
        # backend
        self.pre_layer = nn.Sequential(
            nn.Conv2d(3, 28, kernel_size=3, stride=1),  # conv1
            nn.PReLU(),  # prelu1
            nn.MaxPool2d(kernel_size=3, stride=2),  # pool1
            nn.Conv2d(28, 48, kernel_size=3, stride=1),  # conv2
            nn.PReLU(),  # prelu2
            nn.MaxPool2d(kernel_size=3, stride=2),  # pool2
            nn.Conv2d(48, 64, kernel_size=2, stride=1),  # conv3
            nn.PReLU()  # prelu3

        )
        self.conv4 = nn.Linear(64*2*2, 128)  # conv4
        self.prelu4 = nn.PReLU()  # prelu4
        # detection
        self.conv5_1 = nn.Linear(128, 1)
        # bounding box regression
        self.conv5_2 = nn.Linear(128, 4)
        # lanbmark localization
        self.conv5_3 = nn.Linear(128, 10)
        # weight initiation weih xavier
        self.apply(weights_init)

    def forward(self, x):
        # backend
        x = self.pre_layer(x)
        x = x.view(x.size(0), -1)
        x = self.conv4(x)
        x = self.prelu4(x)
        # detection
        det = torch.sigmoid(self.conv5_1(x))
        box = self.conv5_2(x)
        # landmark = self.conv5_3(x)

        if self.is_train is True:
            return det, box
        #landmard = self.conv5_3(x)
        return det, box




class ONet(nn.Module):
    ''' RNet '''

    def __init__(self,is_train=False, use_cuda=True):
        super(ONet, self).__init__()
        self.is_train = is_train
        self.use_cuda = use_cuda
        # backend
        self.pre_layer = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, stride=1),  # conv1
            nn.PReLU(),  # prelu1
            nn.MaxPool2d(kernel_size=3, stride=2),  # pool1
            nn.Conv2d(32, 64, kernel_size=3, stride=1),  # conv2
            nn.PReLU(),  # prelu2
            nn.MaxPool2d(kernel_size=3, stride=2),  # pool2
            nn.Conv2d(64, 64, kernel_size=3, stride=1),  # conv3
            nn.PReLU(), # prelu3
            nn.MaxPool2d(kernel_size=2,stride=2), # pool3
            nn.Conv2d(64,128,kernel_size=2,stride=1), # conv4
            nn.PReLU() # prelu4
        )
        self.conv5 = nn.Linear(128*2*2, 256)  # conv5
        self.prelu5 = nn.PReLU()  # prelu5
        # detection
        self.conv6_1 = nn.Linear(256, 1)
        # bounding box regression
        self.conv6_2 = nn.Linear(256, 4)
        # lanbmark localization
        self.conv6_3 = nn.Linear(256, 10)
        # weight initiation weih xavier
        self.apply(weights_init)

    def forward(self, x):
        # backend
        x = self.pre_layer(x)
        x = x.view(x.size(0), -1)
        x = self.conv5(x)
        x = self.prelu5(x)
        # detection
        det = torch.sigmoid(self.conv6_1(x))
        box = self.conv6_2(x)
        landmark = self.conv6_3(x)
        if self.is_train is True:
            return det, box, landmark
        #landmard = self.conv5_3(x)
        return det, box, landmark





# Residual Block
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        if self.downsample:
            residual = self.downsample(x)
        out += residual
        out = self.relu(out)
        return out



# ResNet Module
class ResNet(nn.Module):
    def __init__(self, block, num_classes=10):
        super(ResNet, self).__init__()
        self.in_channels = 16
        self.conv = nn.Conv2d(3, 16,kernel_size=3)
        self.bn = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)
        self.layer1 = self.make_layer(block, 16, 3)
        self.layer2 = self.make_layer(block, 32, 3, 2)
        self.layer3 = self.make_layer(block, 64, 3, 2)
        self.avg_pool = nn.AvgPool2d(8)
        self.fc = nn.Linear(64, num_classes)

    def make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if (stride != 1) or (self.in_channels != out_channels):
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels, kernel_size=3, stride=stride),
                nn.BatchNorm2d(out_channels))
        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels
        for i in range(1, blocks):
            layers.append(block(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv(x)
        out = self.bn(out)
        out = self.relu(out)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.avg_pool(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out

In [44]:
import cv2
import time
import numpy as np
import torch
from torch.autograd.variable import Variable
# create the whole mtcnn
def create_mtcnn_net(p_model_path=None, r_model_path=None, o_model_path=None, use_cuda=True):

    pnet, rnet, onet = None, None, None

    if p_model_path is not None:
        pnet = PNet(use_cuda=use_cuda)
        if(use_cuda):
            print('p_model_path:{0}'.format(p_model_path))
            pnet.load_state_dict(torch.load(p_model_path))
            pnet.cuda()
        else:
            
            pnet.load_state_dict(torch.load(p_model_path, map_location='cpu'))
        pnet.eval()

    if r_model_path is not None:
        rnet = RNet(use_cuda=use_cuda)
        if (use_cuda):
            print('r_model_path:{0}'.format(r_model_path))
            rnet.load_state_dict(torch.load(r_model_path))
            rnet.cuda()
        else:
            rnet.load_state_dict(torch.load(r_model_path, map_location=lambda storage, loc: storage))
        rnet.eval()

    if o_model_path is not None:
        onet = ONet(use_cuda=use_cuda)
        if (use_cuda):
            print('o_model_path:{0}'.format(o_model_path))
            onet.load_state_dict(torch.load(o_model_path))
            onet.cuda()
        else:
            onet.load_state_dict(torch.load(o_model_path, map_location=lambda storage, loc: storage))
        onet.eval()

    return pnet,rnet,onet


class MtcnnDetector(object):
    """
        P,R,O net face detection and landmarks align
    """
    def  __init__(self,
                 pnet = None,
                 rnet = None,
                 onet = None,
                 min_face_size=12,
                 stride=2,
                 threshold=[0.6, 0.7, 0.7],
                 scale_factor=0.709,
                 ):

        self.pnet_detector = pnet
        self.rnet_detector = rnet
        self.onet_detector = onet
        self.min_face_size = min_face_size
        self.stride=stride
        self.thresh = threshold
        self.scale_factor = scale_factor


    def unique_image_format(self,im):
        if not isinstance(im,np.ndarray):
            if im.mode == 'I':
                im = np.array(im, np.int32, copy=False)
            elif im.mode == 'I;16':
                im = np.array(im, np.int16, copy=False)
            else:
                im = np.asarray(im)
        return im

    def square_bbox(self, bbox):
        
        square_bbox = bbox.copy()

        
        
        h = bbox[:, 3] - bbox[:, 1] + 1# x2 - x1
        w = bbox[:, 2] - bbox[:, 0] + 1# y2 - y1
        l = np.maximum(h,w)
        square_bbox[:, 0] = bbox[:, 0] + w*0.5 - l*0.5
        square_bbox[:, 1] = bbox[:, 1] + h*0.5 - l*0.5

        # x2 = x1 + l - 1
        # y2 = y1 + l - 1
        square_bbox[:, 2] = square_bbox[:, 0] + l - 1
        square_bbox[:, 3] = square_bbox[:, 1] + l - 1
        return square_bbox


    def generate_bounding_box(self, map, reg, scale, threshold):
        
        stride = 2
        cellsize = 12 # receptive field

        t_index = np.where(map[:,:,0] > threshold)
        
        if t_index[0].size == 0:
            return np.array([])

        
        dx1, dy1, dx2, dy2 = [reg[0, t_index[0], t_index[1], i] for i in range(4)]
        
        reg = np.array([dx1, dy1, dx2, dy2])
        
        score = map[t_index[0], t_index[1], 0]

      
        boundingbox = np.vstack([np.round((stride * t_index[1]) / scale),            # x1 of prediction box in original image
                                 np.round((stride * t_index[0]) / scale),            # y1 of prediction box in original image
                                 np.round((stride * t_index[1] + cellsize) / scale), # x2 of prediction box in original image
                                 np.round((stride * t_index[0] + cellsize) / scale), # y2 of prediction box in original image
                                                                                     # reconstruct the box in original image
                                 score,
                                 reg,
                                
                                 ])

        return boundingbox.T


    def resize_image(self, img, scale):
        #resize nd transform
        height, width, channels = img.shape
        new_height = int(height * scale)     # resized new height
        new_width = int(width * scale)       # resized new width
        new_dim = (new_width, new_height)
        img_resized = cv2.resize(img, new_dim, interpolation=cv2.INTER_LINEAR)      # resized image
        return img_resized


    def pad(self, bboxes, w, h):
        
        # width and height
        tmpw = (bboxes[:, 2] - bboxes[:, 0] + 1).astype(np.int32)
        tmph = (bboxes[:, 3] - bboxes[:, 1] + 1).astype(np.int32)
        numbox = bboxes.shape[0]

        dx = np.zeros((numbox, ))
        dy = np.zeros((numbox, ))
        edx, edy  = tmpw.copy()-1, tmph.copy()-1
        
        x, y, ex, ey = bboxes[:, 0], bboxes[:, 1], bboxes[:, 2], bboxes[:, 3]

        tmp_index = np.where(ex > w-1)
        edx[tmp_index] = tmpw[tmp_index] + w - 2 - ex[tmp_index]
        ex[tmp_index] = w - 1

        tmp_index = np.where(ey > h-1)
        edy[tmp_index] = tmph[tmp_index] + h - 2 - ey[tmp_index]
        ey[tmp_index] = h - 1

        tmp_index = np.where(x < 0)
        dx[tmp_index] = 0 - x[tmp_index]
        x[tmp_index] = 0

        tmp_index = np.where(y < 0)
        dy[tmp_index] = 0 - y[tmp_index]
        y[tmp_index] = 0

        return_list = [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph]
        return_list = [item.astype(np.int32) for item in return_list]

        return return_list


    def detect_pnet(self, im):
       #face box candidate detection

        # original wider face data
        h, w, c = im.shape

        net_size = 12

        current_scale = float(net_size) / self.min_face_size    # find initial scale
        im_resized = self.resize_image(im, current_scale) 
        current_height, current_width, _ = im_resized.shape

        # fcn
        all_boxes = list()
        
        while min(current_height, current_width) > net_size:
            # print(i)
            feed_imgs = []
            image_tensor =  convert_image_to_tensor(im_resized)
            feed_imgs.append(image_tensor)
            feed_imgs = torch.stack(feed_imgs)
            feed_imgs = Variable(feed_imgs)

            if self.pnet_detector.use_cuda:
                feed_imgs = feed_imgs.cuda()

           
            cls_map, reg = self.pnet_detector(feed_imgs)

            cls_map_np =  convert_chwTensor_to_hwcNumpy(cls_map.cpu())
            reg_np =  convert_chwTensor_to_hwcNumpy(reg.cpu())
            
            boxes = self.generate_bounding_box(cls_map_np[ 0, :, :], reg_np, current_scale, self.thresh[0])

            # generate pyramid images
            current_scale *= self.scale_factor 
            im_resized = self.resize_image(im, current_scale)
            current_height, current_width, _ = im_resized.shape

            if boxes.size == 0:
                continue

            # non-maximum suppresion
            keep = nms(boxes[:, :5], 0.5, 'Union')
            boxes = boxes[keep]
            # print(boxes.shape)
            all_boxes.append(boxes)
 

        if len(all_boxes) == 0:
            return None, None

        all_boxes = np.vstack(all_boxes)
       
        # merge the detection from first stage
        keep = nms(all_boxes[:, 0:5], 0.7, 'Union')
        all_boxes = all_boxes[keep]
        
        bw = all_boxes[:, 2] - all_boxes[:, 0] + 1
        bh = all_boxes[:, 3] - all_boxes[:, 1] + 1




        boxes = np.vstack([all_boxes[:,0],
                   all_boxes[:,1],
                   all_boxes[:,2],
                   all_boxes[:,3],
                   all_boxes[:,4],
                   
                  ])

        boxes = boxes.T

        # boxes = boxes = [x1, y1, x2, y2, score, reg] reg= [px1, py1, px2, py2] (in prediction)
        align_topx = all_boxes[:, 0] + all_boxes[:, 5] * bw
        align_topy = all_boxes[:, 1] + all_boxes[:, 6] * bh
        align_bottomx = all_boxes[:, 2] + all_boxes[:, 7] * bw
        align_bottomy = all_boxes[:, 3] + all_boxes[:, 8] * bh

        # refine the boxes
        boxes_align = np.vstack([ align_topx,
                              align_topy,
                              align_bottomx,
                              align_bottomy,
                              all_boxes[:, 4],
                             
                              ])
        boxes_align = boxes_align.T
        
        #remove invalid box
        valindex = [True for _ in range(boxes_align.shape[0])]   
        for i in range(boxes_align.shape[0]):
            if boxes_align[i][2]-boxes_align[i][0]<=3 or boxes_align[i][3]-boxes_align[i][1]<=3:
                valindex[i]=False
                print('pnet has one smaller than 3')
            else:
                if boxes_align[i][2]<1 or boxes_align[i][0]>w-2 or boxes_align[i][3]<1 or boxes_align[i][1]>h-2:
                    valindex[i]=False
                    print('pnet has one out')
        boxes_align=boxes_align[valindex,:]
        boxes = boxes[valindex,:]

        return boxes, boxes_align

    def detect_rnet(self, im, dets):
       #seect the face box candidate
        # im: an input image
        h, w, c = im.shape

        if dets is None:
            return None,None
        if dets.shape[0]==0:
            return None, None

        # return square boxes
        dets = self.square_bbox(dets)
        # rounds
        dets[:, 0:4] = np.round(dets[:, 0:4])

        [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] = self.pad(dets, w, h)
        num_boxes = dets.shape[0]

        # cropped_ims_tensors = np.zeros((num_boxes, 3, 24, 24), dtype=np.float32)
        cropped_ims_tensors = []
        for i in range(num_boxes):
            try:
                tmp = np.zeros((tmph[i], tmpw[i], 3), dtype=np.uint8)
                tmp[dy[i]:edy[i]+1, dx[i]:edx[i]+1, :] = im[y[i]:ey[i]+1, x[i]:ex[i]+1, :]
            except:    
                print(dy[i],edy[i],dx[i],edx[i],y[i],ey[i],x[i],ex[i],tmpw[i],tmph[i])
                print(dets[i])
                print(detss[i])
                print(detsss[i])
                print(h,w)
                exit()
            crop_im = cv2.resize(tmp, (24, 24))
            crop_im_tensor = convert_image_to_tensor(crop_im)
            
            cropped_ims_tensors.append(crop_im_tensor)
        feed_imgs = Variable(torch.stack(cropped_ims_tensors))

        if self.rnet_detector.use_cuda:
            feed_imgs = feed_imgs.cuda()

        cls_map, reg = self.rnet_detector(feed_imgs)

        cls_map = cls_map.cpu().data.numpy()
        reg = reg.cpu().data.numpy()

        keep_inds = np.where(cls_map > self.thresh[1])[0]

        if len(keep_inds) > 0:
            boxes = dets[keep_inds]
            cls = cls_map[keep_inds]
            reg = reg[keep_inds]
           
        else:
            return None, None

        keep = nms(boxes, 0.7)

        if len(keep) == 0:
            return None, None

        keep_cls = cls[keep]
        keep_boxes = boxes[keep]
        keep_reg = reg[keep]
     


        bw = keep_boxes[:, 2] - keep_boxes[:, 0] + 1
        bh = keep_boxes[:, 3] - keep_boxes[:, 1] + 1


        boxes = np.vstack([ keep_boxes[:,0],
                              keep_boxes[:,1],
                              keep_boxes[:,2],
                              keep_boxes[:,3],
                              keep_cls[:,0],
                             
                            ])

        align_topx = keep_boxes[:,0] + keep_reg[:,0] * bw
        align_topy = keep_boxes[:,1] + keep_reg[:,1] * bh
        align_bottomx = keep_boxes[:,2] + keep_reg[:,2] * bw
        align_bottomy = keep_boxes[:,3] + keep_reg[:,3] * bh

        boxes_align = np.vstack([align_topx,
                               align_topy,
                               align_bottomx,
                               align_bottomy,
                               keep_cls[:, 0],
                               
                             ])

        boxes = boxes.T
        boxes_align = boxes_align.T
        #remove invalid box
        valindex = [True for _ in range(boxes_align.shape[0])]   
        for i in range(boxes_align.shape[0]):
            if boxes_align[i][2]-boxes_align[i][0]<=3 or boxes_align[i][3]-boxes_align[i][1]<=3:
                valindex[i]=False
                print('rnet has one smaller than 3')
            else:
                if boxes_align[i][2]<1 or boxes_align[i][0]>w-2 or boxes_align[i][3]<1 or boxes_align[i][1]>h-2:
                    valindex[i]=False
                    print('rnet has one out')
        boxes_align=boxes_align[valindex,:]
        boxes = boxes[valindex,:]

        return boxes, boxes_align

    def detect_onet(self, im, dets):
        # get the final face box
        h, w, c = im.shape

        if dets is None:
            return None, None
        if dets.shape[0]==0:
            return None, None

        dets = self.square_bbox(dets)
        dets[:, 0:4] = np.round(dets[:, 0:4])

        [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] = self.pad(dets, w, h)
        num_boxes = dets.shape[0]


        # cropped_ims_tensors = np.zeros((num_boxes, 3, 24, 24), dtype=np.float32)
        cropped_ims_tensors = []
        for i in range(num_boxes):
            try:
                tmp = np.zeros((tmph[i], tmpw[i], 3), dtype=np.uint8)
                # crop input image
                tmp[dy[i]:edy[i] + 1, dx[i]:edx[i] + 1, :] = im[y[i]:ey[i] + 1, x[i]:ex[i] + 1, :]
            except:
                print(dy[i],edy[i],dx[i],edx[i],y[i],ey[i],x[i],ex[i],tmpw[i],tmph[i])
                print(dets[i])
                print(detss[i])
                print(h,w)
            crop_im = cv2.resize(tmp, (48, 48))
            crop_im_tensor = convert_image_to_tensor(crop_im)
           
            cropped_ims_tensors.append(crop_im_tensor)
        feed_imgs = Variable(torch.stack(cropped_ims_tensors))
        
        if self.rnet_detector.use_cuda:
            feed_imgs = feed_imgs.cuda()

        cls_map, reg, landmark = self.onet_detector(feed_imgs)

        cls_map = cls_map.cpu().data.numpy()
        reg = reg.cpu().data.numpy()
        landmark = landmark.cpu().data.numpy()
        

        keep_inds = np.where(cls_map > self.thresh[2])[0]

        if len(keep_inds) > 0:
            boxes = dets[keep_inds]
            cls = cls_map[keep_inds]
            reg = reg[keep_inds]
            landmark = landmark[keep_inds]
        else:
            return None, None

        keep = nms(boxes, 0.7, mode="Minimum")

        if len(keep) == 0:
            return None, None

        keep_cls = cls[keep]
        keep_boxes = boxes[keep]
        keep_reg = reg[keep]
        keep_landmark = landmark[keep]

        bw = keep_boxes[:, 2] - keep_boxes[:, 0] + 1
        bh = keep_boxes[:, 3] - keep_boxes[:, 1] + 1


        align_topx = keep_boxes[:, 0] + keep_reg[:, 0] * bw
        align_topy = keep_boxes[:, 1] + keep_reg[:, 1] * bh
        align_bottomx = keep_boxes[:, 2] + keep_reg[:, 2] * bw
        align_bottomy = keep_boxes[:, 3] + keep_reg[:, 3] * bh

        align_landmark_topx = keep_boxes[:, 0]
        align_landmark_topy = keep_boxes[:, 1]




        boxes_align = np.vstack([align_topx,
                                 align_topy,
                                 align_bottomx,
                                 align_bottomy,
                                 keep_cls[:, 0],
                                 ])

        boxes_align = boxes_align.T

        landmark =  np.vstack([
                                 align_landmark_topx + keep_landmark[:, 0] * bw,
                                 align_landmark_topy + keep_landmark[:, 1] * bh,
                                 align_landmark_topx + keep_landmark[:, 2] * bw,
                                 align_landmark_topy + keep_landmark[:, 3] * bh,
                                 align_landmark_topx + keep_landmark[:, 4] * bw,
                                 align_landmark_topy + keep_landmark[:, 5] * bh,
                                 align_landmark_topx + keep_landmark[:, 6] * bw,
                                 align_landmark_topy + keep_landmark[:, 7] * bh,
                                 align_landmark_topx + keep_landmark[:, 8] * bw,
                                 align_landmark_topy + keep_landmark[:, 9] * bh,
                                 ])

        landmark_align = landmark.T
        

        return boxes_align, landmark_align


    def detect_face(self,img):
        # detect the face
        boxes_align = np.array([])
        landmark_align =np.array([])

        t = time.time()

        # pnet
        if self.pnet_detector:
            boxes, boxes_align = self.detect_pnet(img)
            if boxes_align is None:
                return np.array([]), np.array([])

            t1 = time.time() - t
            t = time.time()

        # rnet
        if self.rnet_detector:
            boxes, boxes_align = self.detect_rnet(img, boxes_align)
            if boxes_align is None:
                return np.array([]), np.array([])

            t2 = time.time() - t
            t = time.time()

        # onet
        if self.onet_detector:
            boxes_align, landmark_align = self.detect_onet(img, boxes_align)

            if boxes_align is None:
                return np.array([]), np.array([])

            t3 = time.time() - t
            t = time.time()
            print("time cost " + '{:.3f}'.format(t1+t2+t3) + '  pnet {:.3f}  rnet {:.3f}  onet {:.3f}'.format(t1, t2, t3))

        return boxes_align, landmark_align

In [308]:
import torchvision.transforms as transforms
import torch
from torch.autograd.variable import Variable
import numpy as np

transform = transforms.ToTensor()

def convert_image_to_tensor(image):
    """convert an image to pytorch tensor
        Parameters:
        ----------
        image: numpy array , h * w * c
        Returns:
        -------
        image_tensor: pytorch.FloatTensor, c * h * w
        """
    # image = image.astype(np.float32)
    return transform(image)
    # return transform(image)


def convert_chwTensor_to_hwcNumpy(tensor):
    """convert a group images pytorch tensor(count * c * h * w) to numpy array images(count * h * w * c)
            Parameters:
            ----------
            tensor: numpy array , count * c * h * w
            Returns:
            -------
            numpy array images: count * h * w * c
            """

    if isinstance(tensor, Variable):
        return np.transpose(tensor.data.numpy(), (0,2,3,1))
    elif isinstance(tensor, torch.FloatTensor):
        return np.transpose(tensor.numpy(), (0,2,3,1))
    else:
        raise Exception("covert b*c*h*w tensor to b*h*w*c numpy error.This tensor must have 4 dimension.")

In [309]:
import numpy as np
# the basic tools
def IoU(box, boxes):
    # box = (x1, y1, x2, y2)
    box_area = (box[2] - box[0] + 1) * (box[3] - box[1] + 1)
    area = (boxes[:, 2] - boxes[:, 0] + 1) * (boxes[:, 3] - boxes[:, 1] + 1)
    
    # the offset of the interception of union between crop_box and gt_box
    xx1 = np.maximum(box[0], boxes[:, 0])
    yy1 = np.maximum(box[1], boxes[:, 1])
    xx2 = np.minimum(box[2], boxes[:, 2])
    yy2 = np.minimum(box[3], boxes[:, 3])

    # the width and height of the bounding box
    w = np.maximum(0, xx2 - xx1 + 1)
    h = np.maximum(0, yy2 - yy1 + 1)

    inter = w * h
    ovr = np.true_divide(inter,(box_area + area - inter))
    return ovr


def convert_to_square(bbox):
    
    square_bbox = bbox.copy()

    h = bbox[:, 3] - bbox[:, 1] + 1
    w = bbox[:, 2] - bbox[:, 0] + 1
    max_side = np.maximum(h,w)
    square_bbox[:, 0] = bbox[:, 0] + w*0.5 - max_side*0.5
    square_bbox[:, 1] = bbox[:, 1] + h*0.5 - max_side*0.5
    square_bbox[:, 2] = square_bbox[:, 0] + max_side - 1
    square_bbox[:, 3] = square_bbox[:, 1] + max_side - 1
    return square_bbox

def nms(dets, thresh, mode="Union"):
    
    x1 = dets[:, 0]
    y1 = dets[:, 1]
    x2 = dets[:, 2]
    y2 = dets[:, 3]
    scores = dets[:, 4]

    areas = (x2 - x1 + 1) * (y2 - y1 + 1)
    order = scores.argsort()[::-1] 
   
    keep = []
    while order.size > 0:
        i = order[0]
        keep.append(i)
        xx1 = np.maximum(x1[i], x1[order[1:]])
        yy1 = np.maximum(y1[i], y1[order[1:]])
        xx2 = np.minimum(x2[i], x2[order[1:]])
        yy2 = np.minimum(y2[i], y2[order[1:]])

        w = np.maximum(0.0, xx2 - xx1 + 1)
        h = np.maximum(0.0, yy2 - yy1 + 1)
        inter = w * h

        
        if mode == "Union":
            ovr = inter / (areas[i] + areas[order[1:]] - inter)
        elif mode == "Minimum":
            ovr = inter / np.minimum(areas[i], areas[order[1:]])
        

        inds = np.where(ovr <= thresh)[0]
        order = order[inds + 1] 
        # print(inds)
        
    return keep

In [310]:
p_model_path = "./model_store/pnet_epoch_10.pt"
r_model_path = "./model_store/rnet_epoch_10.pt"
o_model_path = "./model_store/onet_model_final.pt"#"./model_store/landmark/onet_epoch_10.pt"
pnet, rnet, onet = create_mtcnn_net(p_model_path=p_model_path, r_model_path=r_model_path, o_model_path=o_model_path, use_cuda=False)
mtcnn_detector = MtcnnDetector(pnet=pnet, rnet=rnet, onet=onet, min_face_size=24,threshold=[0.6, 0.7, 0.7])

  
  if __name__ == '__main__':


In [311]:
import cv2
import sys
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

In [312]:
#video_capture = cv2.VideoCapture(0)
capture_interval = 1
capture_num = 100
capture_count = 0
frame_count = 0
detect_multiple_faces = False #因为是训练目标对象，一次只有一张人脸

In [317]:
capture_interval = 1
capture_num = 100
capture_count = 0
frame_count = 0
detect_multiple_faces = False
cap = cv2.VideoCapture('./trial.mp4')
fps = cap.get(cv2.CAP_PROP_FPS)#29.97
ret, frame = cap.read()
if not os.path.exists('./image_store'):
    os.makedirs('./image_store')
f=open("box.txt","w")
f1=open("landmark.txt","w")
f4=open('number.txt','w')
while True:

    #ret, frame = video_capture.read()
    ret, frame = cap.read()
    
    
    #每1帧采集一张人脸，这里采样不进行灰度变换，直接保存彩色图
    if(capture_count%capture_interval == 0): 
        
        bboxs, landmarks = mtcnn_detector.detect_face(frame)
        nrof_faces = bboxs.shape[0]
        #print(nrof_faces)
        
        with open("box.txt","a+") as f:
            f.writelines(str(bboxs.tolist())+'\n')

            
        with open("landmark.txt","a+") as f1:
            f1.writelines(str(landmarks.tolist())+'\n')
            
        with open('number.txt','a+') as f4:
            f4.writelines(str(nrof_faces)+'\n')
                          
        for face_position in bboxs: #因为只采集一张人脸，所以实际只遍历一次
            face_position=face_position.astype(int)
            cropped = frame[face_position[1]:face_position[3],face_position[0]:face_position[2],:]
            scaled = cv2.resize(cropped, (160, 160), interpolation=cv2.INTER_CUBIC )  #这里取和负样本一样大小
            name='./image_store/try'+str(capture_count)
            if not os.path.exists(name):
                os.makedirs(name)
            cv2.imwrite(os.path.join(name,str(frame_count)+'.jpg'), scaled)
            
            frame_count += 1
          
    capture_count += 1
   
    if capture_count >= fps:
        break

cap.release()
cv2.destroyAllWindows()
print('Done!')

time cost 0.844  pnet 0.400  rnet 0.419  onet 0.025
time cost 0.878  pnet 0.403  rnet 0.450  onet 0.025
time cost 0.859  pnet 0.393  rnet 0.444  onet 0.023
time cost 0.870  pnet 0.407  rnet 0.442  onet 0.021
time cost 0.832  pnet 0.387  rnet 0.414  onet 0.031
time cost 0.852  pnet 0.388  rnet 0.431  onet 0.032
time cost 0.774  pnet 0.368  rnet 0.381  onet 0.025
time cost 0.782  pnet 0.369  rnet 0.391  onet 0.022
time cost 0.762  pnet 0.374  rnet 0.361  onet 0.027
time cost 0.741  pnet 0.359  rnet 0.356  onet 0.026
time cost 0.770  pnet 0.368  rnet 0.381  onet 0.021
time cost 0.809  pnet 0.367  rnet 0.418  onet 0.025
time cost 0.866  pnet 0.391  rnet 0.449  onet 0.026
time cost 0.867  pnet 0.387  rnet 0.452  onet 0.028
time cost 0.926  pnet 0.417  rnet 0.483  onet 0.027
time cost 0.929  pnet 0.401  rnet 0.500  onet 0.028
time cost 0.860  pnet 0.399  rnet 0.445  onet 0.016
time cost 0.873  pnet 0.398  rnet 0.455  onet 0.020
time cost 0.882  pnet 0.400  rnet 0.464  onet 0.018
time cost 0.

In [129]:
import torch
import torch.nn as nn
import os
import torch
import itertools
import time
import torchvision
import numpy as np
from PIL import Image
%pylab inline
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from torchvision import transforms
import torch.nn as nn
from torch.autograd import Variable
import torchvision.models as models
from torch.autograd import Function
import math

Populating the interactive namespace from numpy and matplotlib


`%matplotlib` prevents importing * from pylab and numpy
  "\n`%matplotlib` prevents importing * from pylab and numpy"


In [130]:
class Resnet152(nn.Module):
    def __init__(self, embedding_dim = 512, pretrained = False):
        super(Resnet152, self).__init__()
        self.embedding_dim = embedding_dim
        self.resnet152 = models.resnet152(pretrained=pretrained)
        self.linear = nn.Linear(self.resnet152.fc.in_features, embedding_dim)
        self.resnet152.fc = self.linear
        # self.batch_norm = nn.BatchNorm1d(embedding_dim, momentum=0.01)
        self.init_weights()
    
    def init_weights(self):
        self.linear.weight.data.normal_(0.0, 0.02)
        self.linear.bias.data.fill_(0)
    
    def forward(self, images):
        embed = self.resnet152(images)
        # embed = self.batch_norm(embed)
        return embed

In [131]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class TripletNet(nn.Module):
    def __init__(self, cnn):
        super(TripletNet, self).__init__()
        self.embedding = cnn

    # def forward(self, images_tensor, minibatch_X):
    def forward(self, images_tensor):
        embeds = self.embedding(images_tensor)
        # id2embeds = {}
        # minibatch_size = len(minibatch_X)
        # for i in range(minibatch_size):
        #    x = minibatch_X[i]
        #    id2embeds[x] = embeds[i, :]
        # return id2embeds
        return embeds

In [132]:
class TripletLoss(nn.Module):
    def __init__(self, alpha = 0.2):
        super(TripletLoss, self).__init__()
        self.alpha = alpha

    def forward(self, anchor, positive, negative):
        alpha = self.alpha
        pos_dist = anchor - positive
        pos_dist = torch.pow(pos_dist, 2).sum(dim=1)
        neg_dist = anchor - negative
        neg_dist = torch.pow(neg_dist, 2).sum(dim=1)
        basic_loss = pos_dist - neg_dist + alpha
        # loss = torch.clamp(basic_loss, min=0.0).sum()
        relu = nn.ReLU()
        loss = relu(basic_loss)
        return loss.mean()

def triplet_loss(anchor, positive, negative, alpha=0.2):
    return TripletLoss(alpha)(anchor, positive, negative)

In [133]:
def shuffle_data(data, seed = 0):
    image_ids, labels = data
    shuffled_image_ids = []
    shuffled_labels = []
    num_images = len(image_ids)
    torch.manual_seed(seed)
    perm = list(torch.randperm(num_images))
    for i in range(num_images):
        shuffled_image_ids.append(image_ids[perm[i]])
        shuffled_labels.append(labels[perm[i]])
    return shuffled_image_ids, shuffled_labels

def make_minibatches(data, minibatch_size = 16,  seed = 0, shuffle = 'random'):
    X, Y = data
    m = len(X)
    minibatches = []
    if shuffle == 'sequential':
        shuffled_X, shuffled_Y = X, Y

    elif shuffle == 'random':
        shuffled_X, shuffled_Y = shuffle_data(data, seed = seed)

    num_complete_minibatches = math.floor(m/minibatch_size)
    for k in range(0, num_complete_minibatches):
        minibatch_X = shuffled_X[k * minibatch_size : k * minibatch_size + minibatch_size]
        minibatch_Y = shuffled_Y[k * minibatch_size : k * minibatch_size + minibatch_size]
        minibatches.append((minibatch_X, minibatch_Y))

    rem_size = m - num_complete_minibatches * minibatch_size
    if m % minibatch_size != 0:
        minibatch_X = shuffled_X[num_complete_minibatches * minibatch_size : m]
        minibatch_Y = shuffled_Y[num_complete_minibatches * minibatch_size : m]
        minibatches.append((minibatch_X, minibatch_Y))

    return minibatches

def batch2images_tensor(minibatch_X, dataloader, gpu_device):
    minibatch_size = len(minibatch_X)
    images_tensor = torch.zeros(minibatch_size, 3, 224, 224)
    for i in range(minibatch_size):
        x = minibatch_X[i]
        x_image = dataloader.get_image(x)
        images_tensor[i, :, :, :] = x_image
    images_tensor = Variable(images_tensor)
    if torch.cuda.is_available():
        with torch.cuda.device(gpu_device):
            images_tensor = images_tensor.cuda()
    return images_tensor

def gen_triplets(minibatch, id2embeds, embedding_dim, device, mode = 'all'):
    X, Y = minibatch
    Y_prod = itertools.product(Y, repeat=3)
    X_prod = itertools.product(X, repeat=3)
    triplet = []
    #print(minibatch)
    for x, y  in zip(X_prod, Y_prod):
        
        xa, xp, xn = x
        ya, yp, yn = y
        if (ya == yp) and (ya!=yn) and (xa!=xp):
            triplet.append((xa, xp, xn))
    #print(triplet)
    num_triplets = len(triplet)
    anchor = torch.zeros(num_triplets, embedding_dim)
    positive = torch.zeros(num_triplets, embedding_dim)
    negative = torch.zeros(num_triplets, embedding_dim)
#     if torch.cuda.is_available():
#         with torch.cuda.device(device):
#             anchor = anchor.cuda()
#             positive = positive.cuda()
#             negative = negative.cuda()

    for i in range(num_triplets):
        xa, xp, xn = triplet[i]
        anchor[i, :] = id2embeds[xa]
        positive[i, :] = id2embeds[xp]
        negative[i, :] = id2embeds[xn]
            
    return anchor, positive, negative

def label2embeds_list2dict(labels_list, embeds):
    label2embeds = {}
    num_labels = len(labels_list)
    for i in range(num_labels):
        label = labels_list[i]
        label2embeds[label] = embeds[i, :]
    return label2embeds

def final_label2embeds(triplet_net, train_dataloader, gpu_device):
    labels_list = []
    image_ids = []
    for label, images_list in train_dataloader.images_dict.items():
        image_ids.append(images_list[0])
        labels_list.append(label)

    images_tensor = batch2images_tensor(image_ids, train_dataloader, gpu_device)
    with torch.no_grad():
        embeds = triplet_net.embedding(images_tensor)
    label2embeds = label2embeds_list2dict(labels_list, embeds)
    return label2embeds

def who_is_it(label2embeds, embed):
    labels = []
    num_labels = len(label2embeds)
    embedding_dim = embed.shape[0]
    embeds = torch.zeros(num_labels, embedding_dim)
    i = 0
    for label, cur_embed in label2embeds.items():
        labels.append(label)
        embeds[i, :] = cur_embed
        i += 1
    dist = torch.pow(embeds - embed, 2).sum(dim = 1)
    index = torch.argmin(dist).tolist()
    return labels[index]

def accuracy(data, dataloader, label2embeds, triplet_net, gpu_device):
    image_ids, Y = data
    num_data = len(Y)
    embedding_dim = triplet_net.embedding.embedding_dim
    embeds = torch.zeros(num_data, embedding_dim)
    minibatch_size = 32
    minibatches = make_minibatches(data, minibatch_size = minibatch_size,  seed = 0, shuffle = 'sequential')
    start = 0
    end = 0
    for cur_minibatch in minibatches:
        minibatch_X, _ = cur_minibatch
        cur_minibatch_size = len(minibatch_X)
        end += cur_minibatch_size
        images_tensor = batch2images_tensor(minibatch_X, dataloader, gpu_device)
        with torch.no_grad():
            embeds[start:end, :] = triplet_net(images_tensor)
        start = end

    acc = 0
    pred = []

    for i in range(num_data):
        embed = embeds[i]
        target_label = Y[i]
        predicted_label = who_is_it(label2embeds, embed)
        if predicted_label == target_label:
            acc += 1
            pred.append(predicted_label)
    return acc, num_data,pred

In [134]:
class DataLoader():
    def __init__(self, dir_path, transform):
        self.images_dict = {}
        self.id2image = {}
        self.labels = None
        self.dir_path = dir_path
        self.transform = transform
        self.load_images()

    def load_images(self):
        # returns labels/names list
        self.labels = os.listdir(self.dir_path)
        for label in self.labels:
            
            path = os.path.join(self.dir_path, label)
            if 'DS_Store' in path:
                continue
            images = os.listdir(path)
            self.images_dict[label] = images
            for image_id in images:
                img_path = os.path.join(path, image_id)
                self.id2image[image_id] = self.transform(Image.open(img_path))

    def gen_data(self):
        labels = []
        image_ids = []
        for label, images in self.images_dict.items():
            num_images = len(images)
            labels.extend([label] * num_images)
            image_ids.extend(images)
        return image_ids, labels

    def get_image(self, image_id):
        return self.id2image[image_id]

In [135]:
label2embeds = torch.load('./model_store/iter_1499_label2embeds.pkl',map_location='cpu')

In [136]:
image_dir = './image_store'
transform = transforms.Compose([transforms.Resize((224, 224)), 
                                    transforms.ToTensor(),
                                    transforms.Normalize((0.5, 0.5, 0.5),
                                                        (0.5, 0.5, 0.5))
                                    ])
image_dataloader = DataLoader(image_dir, transform)
image_data = image_dataloader.gen_data()

In [137]:
def trial(data, dataloader, label2embeds, triplet_net, gpu_device):
    
    image_ids, Y = data
    num_data = len(Y)
    embedding_dim = triplet_net.embedding.embedding_dim
    embeds = torch.zeros(num_data, embedding_dim)
    minibatch_size = 32
    minibatches = make_minibatches(data, minibatch_size = minibatch_size,  seed = 0, shuffle = 'sequential')
    start = 0
    end = 0
    for cur_minibatch in minibatches:
        minibatch_X, _ = cur_minibatch
        cur_minibatch_size = len(minibatch_X)
        end += cur_minibatch_size
        images_tensor = batch2images_tensor(minibatch_X, dataloader, gpu_device)
        with torch.no_grad():
            embeds[start:end, :] = triplet_net(images_tensor)
        start = end

#     acc = 0
#     pred = []

#     for i in range(num_data):
#         embed = embeds[i]
#         target_label = Y[i]
#         predicted_label = who_is_it(label2embeds, embed)
#         if predicted_label == target_label:
#             acc += 1
#             pred.append(predicted_label)
    return embeds

In [247]:
embedding_dim = 64
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
cnn = Resnet152(embedding_dim = embedding_dim, pretrained = False)
triplet_net = TripletNet(cnn)
embeds=trial(image_data, image_dataloader, label2embeds, triplet_net, device)

In [248]:
embeds.size()

torch.Size([38, 64])

In [318]:
with open('./embedding.txt','w') as f2:
    for i in embeds:
        f2.writelines(str(i.tolist())+('\n'))

In [1]:
def strList(s):
    '''

    :param s:
    :return:
    '''
    s = s.replace('[', '').replace(']', '')
    s = s.replace('\n','')
    files = s.split(',') # 用,号分割成list
    file=[]
    for i in files:
        i=float(i)
        file.append(i)
    files_List = [] # 新建空list
    i=0
    while i < len(files):
        
        files_List.append(file[i])
        i+=1
    return files_List

In [2]:
embed = []
for line in open("embedding.txt","r"): #设置文件对象并读取每一行文件
    
    embed.append(strList(line))    

In [3]:
len(embed)

38

In [4]:
def strListDemo(s):
    '''

    :param s:
    :return:
    '''
    s = s.replace('[', '').replace(']', '')
    s = s.replace('\n','')
    files = s.split(',') # 用,号分割成list
    file=[]
    for i in files:
        i=float(i)
        file.append(i)
    files_List = [] # 新建空list
    i=0
    while i < len(files):
        
        files_List.append(file[i:i+5])
        i=i+5
    return files_List


In [6]:
box = []

for line in open("box.txt","r"): #设置文件对象并读取每一行文件
    #a=line
    #print(strListDemo(line))
    box.append(strListDemo(line))

In [7]:
len(box)

30

In [8]:
box[13]

[[894.4193812608719,
  116.81876841187477,
  1149.4248317480087,
  473.6086235642433,
  0.942638635635376]]

In [9]:
def strListland(s):
    '''

    :param s:
    :return:
    '''
    s = s.replace('[', '').replace(']', '')
    s = s.replace('\n','')
    files = s.split(',') # 用,号分割成list
    file=[]
    for i in files:
        i=float(i)
        file.append(i)
    files_List = [] # 新建空list
    i=0
    while i < len(files):
        
        files_List.append(file[i:i+10])
        i=i+10
    return files_List

In [10]:
landmark = []
for line in open("landmark.txt","r"): #设置文件对象并读取每一行文件
    landmark.append(strListland(line))

In [11]:
len(landmark)

30

In [12]:
number = []
for line in open("number.txt","r"): #设置文件对象并读取每一行文件
    number.append(int(line))

In [13]:
len(number)

30

In [14]:
image_list=[]
n=0
for i in range(len(box)):#box,landmark,embedding
    nu=number[i]
    image_dic={}
    image_dic['box']=box[i]
    image_dic['landmark']=landmark[i]
    image_dic['embedding']=embed[n:n+nu]
    image_list.append(image_dic)
    #print(n)
    n+=nu
    #print(n)
    #print(i+nu)
    
    

In [15]:
import json
filename='image.json'
with open(filename,'w',encoding='utf-8') as file_obj:
    json.dump(image_list,file_obj)