In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install facenet_pytorch

In [None]:
!pip install pytelegrambotapi

In [4]:
import torch
import csv
import numpy as np
import os
from PIL import Image
from torchvision import transforms
from scipy.spatial.distance import euclidean
from facenet_pytorch import MTCNN
import math
import cv2
import PIL.Image
import functools
import telebot
from telebot.types import InputMediaPhoto
import shutil
from torch import nn
import torch.nn.functional as F

In [32]:
!pip freeze | grep torch

facenet-pytorch==2.5.2
torch @ https://download.pytorch.org/whl/cu113/torch-1.12.1%2Bcu113-cp38-cp38-linux_x86_64.whl
torchaudio @ https://download.pytorch.org/whl/cu113/torchaudio-0.12.1%2Bcu113-cp38-cp38-linux_x86_64.whl
torchsummary==1.5.1
torchtext==0.13.1
torchvision @ https://download.pytorch.org/whl/cu113/torchvision-0.13.1%2Bcu113-cp38-cp38-linux_x86_64.whl


In [5]:
def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1):
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=dilation, groups=groups, bias=False, dilation=dilation)


def conv1x1(in_planes, out_planes, stride=1):
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)

def cosine_distance(x, y):
    if x.ndim == 1:
        x_norm = np.linalg.norm(x)
        y_norm = np.linalg.norm(y)
    elif x.ndim == 2:
        x_norm = np.linalg.norm(x, axis=1, keepdims=True)
        y_norm = np.linalg.norm(y, axis=1, keepdims=True)

    np.seterr(divide='ignore', invalid='ignore')
    s = np.dot(x, y.T)/(x_norm*y_norm)
    s *= -1
    dist = s + 1
    dist = np.clip(dist, 0, 2)
    if x is y or y is None:
        dist[np.diag_indices_from(dist)] = 0.0
    if np.any(np.isnan(dist)):
        if x.ndim == 1:
            dist = 1.
        else:
            dist[np.isnan(dist)] = 1.
    return dist

In [6]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1,
                 base_width=64, dilation=1, norm_layer=None):
        super(BasicBlock, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        if groups != 1 or base_width != 64:
            raise ValueError('BasicBlock only supports groups=1 and base_width=64')
        if dilation > 1:
            raise NotImplementedError("Dilation > 1 not supported in BasicBlock")
        # Both self.conv1 and self.downsample layers downsample the input when stride != 1
        self.conv1 = conv3x3(inplanes, planes, stride)
        self.bn1 = norm_layer(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = norm_layer(planes)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

In [7]:
class Bottleneck(nn.Module):
    # Bottleneck in torchvision places the stride for downsampling at 3x3 convolution(self.conv2)
    # while original implementation places the stride at the first 1x1 convolution(self.conv1)
    # according to "Deep residual learning for image recognition"https://arxiv.org/abs/1512.03385.
    # This variant is also known as ResNet V1.5 and improves accuracy according to
    # https://ngc.nvidia.com/catalog/model-scripts/nvidia:resnet_50_v1_5_for_pytorch.

    expansion = 4

    def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1,
                 base_width=64, dilation=1, norm_layer=None):
        super(Bottleneck, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        width = int(planes * (base_width / 64.)) * groups
        # Both self.conv2 and self.downsample layers downsample the input when stride != 1
        self.conv1 = conv1x1(inplanes, width)
        self.bn1 = norm_layer(width)
        self.conv2 = conv3x3(width, width, stride, groups, dilation)
        self.bn2 = norm_layer(width)
        self.conv3 = conv1x1(width, planes * self.expansion)
        self.bn3 = norm_layer(planes * self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

In [8]:
class ResNet(nn.Module):

    def __init__(self, block, layers, num_classes=1000, zero_init_residual=False,
                 groups=1, width_per_group=64, replace_stride_with_dilation=None,
                 norm_layer=None):
        super(ResNet, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        self._norm_layer = norm_layer

        self.inplanes = 64
        self.dilation = 1
        if replace_stride_with_dilation is None:
            # each element in the tuple indicates if we should replace
            # the 2x2 stride with a dilated convolution instead
            replace_stride_with_dilation = [False, False, False]
        if len(replace_stride_with_dilation) != 3:
            raise ValueError("replace_stride_with_dilation should be None "
                             "or a 3-element tuple, got {}".format(replace_stride_with_dilation))
        self.groups = groups
        self.base_width = width_per_group
        self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3,
                               bias=False)
        self.bn1 = norm_layer(self.inplanes)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2,
                                       dilate=replace_stride_with_dilation[0])
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2,
                                       dilate=replace_stride_with_dilation[1])
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2,
                                       dilate=replace_stride_with_dilation[2])

        # define margin cnn
        self.conv1_d = nn.Conv1d(101, 64, kernel_size=7, stride=2, padding=3, bias=False)  # 614/2 64
        self.bn1_d = nn.BatchNorm1d(64)
        # 150 64
        self.layer1_d = nn.Sequential(
            nn.MaxPool1d(kernel_size=3, stride=2, padding=1),
            nn.Conv1d(64, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True)
        )
        # output 75 128
        self.layer2_d = nn.Sequential(
            nn.Conv1d(64, 128, kernel_size=3, stride=2, padding=1),  # conv replacing pooling
            nn.Conv1d(128, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True)
        )
        # output 37 256
        self.layer3_d = nn.Sequential(
            nn.Conv1d(128, 256, kernel_size=3, stride=2, padding=1),  # conv replacing pooling
            nn.Conv1d(256, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm1d(256),
            nn.ReLU(inplace=True)
        )
        # output 20 512
        self.layer4_d = nn.Sequential(
            nn.Conv1d(256, 512, kernel_size=3, stride=2, padding=1),  # conv replacing pooling
            nn.Conv1d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True)
        )
        self.avgpool_d = nn.AdaptiveAvgPool1d(1)
        self.fc_d = nn.Linear(512, 101)  # the number of margin_l=101

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, (nn.Conv2d, nn.Conv1d)):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm, nn.BatchNorm1d)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.xavier_normal_(m.weight)
                nn.init.constant_(m.bias, 0)

        # Zero-initialize the last BN in each residual branch,
        # so that the residual branch starts with zeros, and each residual block behaves like an identity.
        # This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
        if zero_init_residual:
            for m in self.modules():
                if isinstance(m, Bottleneck):
                    nn.init.constant_(m.bn3.weight, 0)
                elif isinstance(m, BasicBlock):
                    nn.init.constant_(m.bn2.weight, 0)

    def _make_layer(self, block, planes, blocks, stride=1, dilate=False):
        norm_layer = self._norm_layer
        downsample = None
        previous_dilation = self.dilation
        if dilate:
            self.dilation *= stride
            stride = 1
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * block.expansion, stride),
                norm_layer(planes * block.expansion),
            )
        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample, self.groups,
                            self.base_width, previous_dilation, norm_layer))
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes, groups=self.groups,
                                base_width=self.base_width, dilation=self.dilation,
                                norm_layer=norm_layer))

        return nn.Sequential(*layers)

    def margin_long_tail(self, x):
        x = self.conv1_d(x)
        x = self.bn1_d(x)
        x = self.relu(x)
        x = self.layer1_d(x)
        x = self.layer2_d(x)
        x = self.layer3_d(x)
        x = self.layer4_d(x)
        x = self.avgpool_d(x)
        x = torch.flatten(x, 1)
        x = self.fc_d(x)
        return x

    def margin_miu(self, x):
        x = self.conv1_d(x)
        x = self.bn1_d(x)
        x = self.relu(x)
        x = self.layer1_d(x)
        x = self.layer2_d(x)
        x = self.layer3_d(x)
        x = self.layer4_d(x)
        x = self.avgpool_d(x)
        x = torch.flatten(x, 1)
        x = self.fc_d(x)
        x = (x-torch.min(x))/(torch.max(x)-torch.min(x))*101
        return x

    def margin_sigma(self, x):
        x = self.conv1_d(x)
        x = self.bn1_d(x)
        x = self.relu(x)
        x = self.layer1_d(x)
        x = self.layer2_d(x)
        x = self.layer3_d(x)
        x = self.layer4_d(x)
        x = self.avgpool_d(x)
        x = torch.flatten(x, 1)
        x = self.fc_d(x)
        return x

    @staticmethod
    def gaussian(z, age, m_p_miu, m_p_sigma):  # Calculate the Gaussian distribution for each age
        x = torch.linspace(0, 100, 101).expand([z.shape[0], -1]).cuda()
        pi = torch.Tensor([3.1415926]).cuda()
        u = m_p_miu.T[age]
        sig = m_p_sigma.T[age]
        m_p = torch.exp(-torch.pow((x - u), 2)/(2 * torch.pow(sig, 2))) / (torch.sqrt(2 * pi) * sig)
        return m_p

    @staticmethod
    def distributed_softmax(x, margin):
        a = torch.ones(x.shape[1]).cuda()
        mask = torch.diag(a).cuda()  # 1D vector  Output a 2D square matrix with input as diagonal elements
        mask = (1 - mask).expand((x.shape[0], -1, -1))  # diagonal is 0, the others are 1

        b = x.expand([x.shape[1], -1, -1]).permute(1, 0, 2)  # batch_size, multi, score
        b = b * mask
        b = torch.sum(torch.exp(b), dim=2) - 1  # eliminate exp(0) sum of the negative score
        y = torch.exp(x - margin) / (b + torch.exp(x - margin))
        return y

    def _forward_impl(self, x, age, pro, intra, inter):
        # See note [TorchScript super()]
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        # propotype update
        z = x.cpu().clone().detach().numpy()
        age = age.cpu().clone().detach().numpy()
        pro_t = pro[0].copy()
        intra_t = intra.copy()
        inter_t = inter.copy()
        for i in range(z.shape[0]):
            temp = pro[0][age[i], :].copy()  # shallow copy
            pro[0][age[i]] = pro[0][age[i], :] + (z[i] - pro[0][age[i], :]) / (pro[1][age[i]] + 1)  # update Prototype
            pro[1][age[i]] += 1  # update instance number
            # cosine_distances 0~2 Divisor 0 is set to 1 for unknown relationship
            intra[age[i]] = intra[age[i]] + cosine_distance(z[i], temp) * cosine_distance(z[i], pro[0][age[i]])
        # Calculate distance between two matrices and multi-row vectors
        inter = cosine_distance(pro[0], pro[0])
        delta_pro = np.concatenate((pro[0]-pro_t, intra-intra_t, inter-inter_t), axis=1)[np.newaxis, :]
        pro_input = np.concatenate((pro[0], intra), axis=1)[np.newaxis, :]
        m_l = self.margin_long_tail(torch.from_numpy(delta_pro).cuda())
        m_p_miu = self.margin_miu(torch.from_numpy(pro_input).cuda())
        m_p_sigma = self.margin_sigma(torch.from_numpy(pro_input).cuda())
        m_p = self.gaussian(z, age, m_p_miu, m_p_sigma)
        margin = m_p + 0.1*m_l
        # The margin is normalized to 0 as the mean and 1 as the variance
        margin = (margin-torch.mean(margin, dim=1).expand([101, -1]).permute(1, 0))/torch.std(margin, dim=1).expand([101, -1]).permute(1, 0)
        margin = margin*0.01
        if not False: #cfg.model.margin: эта штука False у них стоит
            margin = margin*0
        x = self.fc(x)
        if margin.requires_grad:
            x = F.softmax(x-margin, dim=1)  # make a baseline
            # x = F.softmax(x-margin, dim=1)
        else:
            x =F.softmax(x, dim=1)
        # L1 normalize
        # x = F.normalize(x-margin, p=1, dim=1)  # negative logarithm possible
        return x, pro, intra, inter

    def forward(self, x, age, proc, intra, inter):
        return self._forward_impl(x, age, proc, intra, inter)


In [9]:
from torch.hub import load_state_dict_from_url
from torch.utils.model_zoo import load_url as load_state_dict_from_url
model_urls = {
    'resnet18': 'https://download.pytorch.org/models/resnet18-5c106cde.pth',
    'resnet34': 'https://download.pytorch.org/models/resnet34-333f7ec4.pth',
    'resnet50': 'https://download.pytorch.org/models/resnet50-19c8e357.pth',
    'resnet101': 'https://download.pytorch.org/models/resnet101-5d3b4d8f.pth',
    'resnet152': 'https://download.pytorch.org/models/resnet152-b121ed2d.pth',
    'resnext50_32x4d': 'https://download.pytorch.org/models/resnext50_32x4d-7cdf4587.pth',
    'resnext101_32x8d': 'https://download.pytorch.org/models/resnext101_32x8d-8ba56ff5.pth',
    'wide_resnet50_2': 'https://download.pytorch.org/models/wide_resnet50_2-95faca4d.pth',
    'wide_resnet101_2': 'https://download.pytorch.org/models/wide_resnet101_2-32ee1156.pth',
}
def _resnet(arch, block, layers, pretrained, progress, **kwargs):
    model = ResNet(block, layers, **kwargs)
    if pretrained:
        state_dict = load_state_dict_from_url(model_urls[arch],
                                              progress=progress)
        # partial load pretrained model
        model_dict = model.state_dict()
        pretrained_dict = {k: v for k, v in state_dict.items() if k in model_dict}
        model_dict.update(pretrained_dict)
        model.load_state_dict(model_dict)
    return model

In [10]:
def resnet34(pretrained=False, progress=True, **kwargs):
    r"""ResNet-34 model from
    `"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_
    Args:
        pretrained (bool): If True, returns a model pre-trained on ImageNet
        progress (bool): If True, displays a progress bar of the download to stderr
    """
    return _resnet('resnet34', BasicBlock, [3, 4, 6, 3], pretrained, progress,
                   **kwargs)

In [11]:
def preprocess(img):
  img = Image.open(img).convert('RGB')
  imgs = [img, img.transpose(Image.FLIP_LEFT_RIGHT)]
  transform_list = [
      transforms.Resize((224, 224), interpolation=3),
      transforms.ToTensor(),
      transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
  ]
  transform = transforms.Compose(transform_list)
  imgs = [transform(i) for i in imgs]
  imgs = [torch.unsqueeze(i, dim=0) for i in imgs]

  return imgs

In [12]:
model = resnet34(pretrained=True, progress=True)
fc_in_features = model.fc.in_features
model.fc = torch.nn.Linear(fc_in_features, 101)

Downloading: "https://download.pytorch.org/models/resnet34-333f7ec4.pth" to /root/.cache/torch/hub/checkpoints/resnet34-333f7ec4.pth


  0%|          | 0.00/83.3M [00:00<?, ?B/s]

In [13]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
mtcnn = MTCNN(post_process=False, device=device)

In [14]:
device

device(type='cuda')

In [None]:
state = torch.load('/content/drive/MyDrive/AgeEstimation/best.pth')
model.to(device)

In [None]:
# state = torch.load('/content/best.pth')
model.load_state_dict(state)
# model = model.to(device)
model.eval()

In [17]:
def alignment_procedure(img, left_eye, right_eye):
    #this function aligns given face in img based on left and right eye coordinates
    
    left_eye_x, left_eye_y = left_eye
    right_eye_x, right_eye_y = right_eye
    
    #-----------------------
    #find rotation direction
    
    if left_eye_y > right_eye_y:
        point_3rd = (right_eye_x, left_eye_y)
        direction = -1 #rotate same direction to clock
    else:
        point_3rd = (left_eye_x, right_eye_y)
        direction = 1 #rotate inverse direction of clock
    
    #-----------------------
    #find length of triangle edges
    
    a = euclidean(left_eye, point_3rd)
    b = euclidean(right_eye, point_3rd)
    c = euclidean(right_eye, left_eye)
    
    #-----------------------
    
    #apply cosine rule
    
    if b != 0 and c != 0: #this multiplication causes division by zero in cos_a calculation
    
        cos_a = (b*b + c*c - a*a)/(2*b*c)
        angle = np.arccos(cos_a) #angle in radian
        angle = (angle * 180) / math.pi #radian to degree
    
        #-----------------------
        #rotate base image
    
        if direction == -1:
            angle = 90 - angle
    
        img = Image.fromarray(img)
        img = np.array(img.rotate(direction * angle))
    
    #-----------------------
    
    return img #return img anyway

In [18]:
def detect_face(input_path):
    img = cv2.imread(input_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    bbox, _, landmarks = mtcnn.detect(img, landmarks=True)

    if bbox is None:
        # self.non_detect.append(self.path + img_path)
        return "can't detect"
    if landmarks is None:
        # self.non_align.append(self.path + img_path)
        return "can't detect"


    bbox = list(map(int, bbox[0]))
    bbox = [max(0, int(x)) for x in bbox]
    img = img[bbox[1]: bbox[3], bbox[0]: bbox[2], :]
    align = alignment_procedure(img, (int(landmarks[0][0][0]),
                    int(landmarks[0][0][1])),
                    (int(landmarks[0][1][0]), 
                    int(landmarks[0][1][1])))
    
    return align, img

In [19]:
def inference(img_path, uid):
    
    rank = torch.Tensor([i for i in range(101)]).cuda()

    age = 20
    age = torch.IntTensor([int(age)])
    age = age.to(device)

    p = detect_face(img_path)

    if type(p) == str:
        return "MTCNN can't detect face"
    else:
        align, not_align = p
        align = cv2.cvtColor(align, cv2.COLOR_BGR2RGB)
        not_align = cv2.cvtColor(not_align, cv2.COLOR_BGR2RGB)
        cv2.imwrite(str(uid) + 'align.jpg', align) 
        cv2.imwrite(str(uid) + 'not_align.jpg', not_align)
        imgs = preprocess(str(uid) + 'align.jpg')
        imgs2 = preprocess(str(uid) + 'not_align.jpg')
        predict_age_align = 0
        predict_age_not_align = 0
        prototype = np.zeros([101, 512], dtype=np.float32)
        instance_num = np.zeros([101, 1], dtype=np.float32)
        intra = np.zeros([101, 1], dtype=np.float32)
        inter = np.zeros([101, 101], dtype=np.float32)
        pro = [prototype, instance_num]

        for img in imgs:
            img = img.to(device)
            output, pro, intra, inter = model(img, age, pro, intra, inter)
            predict_age_align += torch.sum(output * rank, dim=1).item() / 2

        for img in imgs2:
            img = img.to(device)
            output, pro, intra, inter = model(img, age, pro, intra, inter)
            predict_age_not_align += torch.sum(output * rank, dim=1).item() / 2

    # print(predict_age_align)
    # print(predict_age_not_align)

    return predict_age_align, predict_age_not_align

In [20]:
def preprocess_photo(photo, uid, postfix):
    fileID = photo.photo[-1].file_id

    file_info = bot.get_file(fileID)

    downloaded_file = bot.download_file(file_info.file_path)
    path = '../' + str(uid) + postfix + ".jpg"
    with open(path, 'wb') as new_file:
        new_file.write(downloaded_file)
    return path#open(str(uid) + postfix + ".jpg", 'rb')

In [22]:
bot = telebot.TeleBot('5984227231:AAFpG9jBO6QePoVf9suamH0O0cjLfhQpw08') # insert ur token here


@bot.message_handler(commands=['start'])
def start_message(message):
    user_id = message.from_user.id
    bot.send_message(message.chat.id, 'Привет! Тут можно оценить свой возраст\nНапиши /help, если ты не знаешь как со мной работать.')


@bot.message_handler(commands=['help'])
def start_message(message):
    user_id = message.from_user.id
    bot.send_message(message.chat.id, 'Отправь мне фотографию, я оценю твой возраст! :)')



@bot.message_handler(content_types=['photo'])
def send_photo(photo):
    user_id = photo.from_user.id

    img_path = preprocess_photo(photo, user_id, 'age')

    p = inference(img_path, user_id)
    if type(p[0]) == str:
        message = p
    else:
        p1, p2 = p
        message = f'alignment face prediction: {p1}\nnot alignment face prediction: {p2}'
    

    bot.send_message(photo.chat.id, message)



bot.polling()