In [4]:
import time
import numpy as np
import cv2
from collections import defaultdict, deque
from PIL import Image

import mediapipe as mp #face detector
import math

import warnings
warnings.simplefilter("ignore", UserWarning)

import torch
import torch.nn as  nn
import torch.nn.functional as F
from torchvision import transforms

In [5]:
class Bottleneck(nn.Module):
    expansion = 4
    def __init__(self, in_channels, out_channels, i_downsample=None, stride=1):
        super(Bottleneck, self).__init__()
        
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, padding=0, bias=False)
        self.batch_norm1 = nn.BatchNorm2d(out_channels, eps=0.001, momentum=0.99)
        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding='same', bias=False)
        self.batch_norm2 = nn.BatchNorm2d(out_channels, eps=0.001, momentum=0.99)
        
        self.conv3 = nn.Conv2d(out_channels, out_channels*self.expansion, kernel_size=1, stride=1, padding=0, bias=False)
        self.batch_norm3 = nn.BatchNorm2d(out_channels*self.expansion, eps=0.001, momentum=0.99)
        
        self.i_downsample = i_downsample
        self.stride = stride
        self.relu = nn.ReLU()
        
    def forward(self, x):
        identity = x.clone()
        x = self.relu(self.batch_norm1(self.conv1(x)))
        
        x = self.relu(self.batch_norm2(self.conv2(x)))
        
        x = self.conv3(x)
        x = self.batch_norm3(x)
        
        #downsample if needed
        if self.i_downsample is not None:
            identity = self.i_downsample(identity)
        #add identity
        x+=identity
        x=self.relu(x)
        
        return x

class Conv2dSame(torch.nn.Conv2d):

    def calc_same_pad(self, i: int, k: int, s: int, d: int) -> int:
        return max((math.ceil(i / s) - 1) * s + (k - 1) * d + 1 - i, 0)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        ih, iw = x.size()[-2:]

        pad_h = self.calc_same_pad(i=ih, k=self.kernel_size[0], s=self.stride[0], d=self.dilation[0])
        pad_w = self.calc_same_pad(i=iw, k=self.kernel_size[1], s=self.stride[1], d=self.dilation[1])

        if pad_h > 0 or pad_w > 0:
            x = F.pad(
                x, [pad_w // 2, pad_w - pad_w // 2, pad_h // 2, pad_h - pad_h // 2]
            )
        return F.conv2d(
            x,
            self.weight,
            self.bias,
            self.stride,
            self.padding,
            self.dilation,
            self.groups,
        )

class ResNet(nn.Module):
    def __init__(self, ResBlock, layer_list, num_classes, num_channels=3):
        super(ResNet, self).__init__()
        self.in_channels = 64

        self.conv_layer_s2_same = Conv2dSame(num_channels, 64, 7, stride=2, groups=1, bias=False)
        self.batch_norm1 = nn.BatchNorm2d(64, eps=0.001, momentum=0.99)
        self.relu = nn.ReLU()
        self.max_pool = nn.MaxPool2d(kernel_size = 3, stride=2)
        
        self.layer1 = self._make_layer(ResBlock, layer_list[0], planes=64, stride=1)
        self.layer2 = self._make_layer(ResBlock, layer_list[1], planes=128, stride=2)
        self.layer3 = self._make_layer(ResBlock, layer_list[2], planes=256, stride=2)
        self.layer4 = self._make_layer(ResBlock, layer_list[3], planes=512, stride=2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc1 = nn.Linear(512*ResBlock.expansion, 512)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(512, num_classes)

    def extract_features(self, x):
        x = self.relu(self.batch_norm1(self.conv_layer_s2_same(x)))
        x = self.max_pool(x)
        # print(x.shape)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        
        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc1(x)
        return x
        
    def forward(self, x):
        x = self.extract_features(x)
        x = self.relu1(x)
        x = self.fc2(x)
        return x
        
    def _make_layer(self, ResBlock, blocks, planes, stride=1):
        ii_downsample = None
        layers = []
        
        if stride != 1 or self.in_channels != planes*ResBlock.expansion:
            ii_downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, planes*ResBlock.expansion, kernel_size=1, stride=stride, bias=False, padding=0),
                nn.BatchNorm2d(planes*ResBlock.expansion, eps=0.001, momentum=0.99)
            )
            
        layers.append(ResBlock(self.in_channels, planes, i_downsample=ii_downsample, stride=stride))
        self.in_channels = planes*ResBlock.expansion
        
        for i in range(blocks-1):
            layers.append(ResBlock(self.in_channels, planes))
            
        return nn.Sequential(*layers)
        
def ResNet50(num_classes, channels=3):
    return ResNet(Bottleneck, [3,4,6,3], num_classes, channels)


class LSTMPyTorch(nn.Module):
    def __init__(self):
        super(LSTMPyTorch, self).__init__()
        
        self.lstm1 = nn.LSTM(input_size=512, hidden_size=512, batch_first=True, bidirectional=False)
        self.lstm2 = nn.LSTM(input_size=512, hidden_size=256, batch_first=True, bidirectional=False)
        self.fc = nn.Linear(256, 7)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)        
        x = self.fc(x[:, -1, :])
        x = self.softmax(x)
        return x

In [11]:
def pth_processing(fp):
    class PreprocessInput(torch.nn.Module):
        def init(self):
            super(PreprocessInput, self).init()

        def forward(self, x):
            x = x.to(torch.float32)
            x = torch.flip(x, dims=(0,))
            x[0, :, :] -= 91.4953
            x[1, :, :] -= 103.8827
            x[2, :, :] -= 131.0912
            return x

    def get_img_torch(img):
        
        ttransform = transforms.Compose([
            transforms.PILToTensor(),
            PreprocessInput()
        ])
        img = img.resize((224, 224), Image.Resampling.NEAREST)
        img = ttransform(img)
        img = torch.unsqueeze(img, 0)
        return img
    return get_img_torch(fp)

def tf_processing(fp):
    def preprocess_input(x):
        x_temp = np.copy(x)
        x_temp = x_temp[..., ::-1]
        x_temp[..., 0] -= 91.4953
        x_temp[..., 1] -= 103.8827
        x_temp[..., 2] -= 131.0912
        return x_temp

    def get_img_tf(img):
        img = cv2.resize(img, (224,224), interpolation=cv2.INTER_NEAREST)
        img = tf.keras.utils.img_to_array(img)
        img = preprocess_input(img)
        img = np.array([img])
        return img

    return get_img_tf(fp)

def norm_coordinates(normalized_x, normalized_y, image_width, image_height):
    
    x_px = min(math.floor(normalized_x * image_width), image_width - 1)
    y_px = min(math.floor(normalized_y * image_height), image_height - 1)
    
    return x_px, y_px

def get_box(fl, w, h):
    idx_to_coors = {}
    for idx, landmark in enumerate(fl.landmark):
        landmark_px = norm_coordinates(landmark.x, landmark.y, w, h)

        if landmark_px:
            idx_to_coors[idx] = landmark_px

    x_min = np.min(np.asarray(list(idx_to_coors.values()))[:,0])
    y_min = np.min(np.asarray(list(idx_to_coors.values()))[:,1])
    endX = np.max(np.asarray(list(idx_to_coors.values()))[:,0])
    endY = np.max(np.asarray(list(idx_to_coors.values()))[:,1])

    (startX, startY) = (max(0, x_min), max(0, y_min))
    (endX, endY) = (min(w - 1, endX), min(h - 1, endY))
    
    return startX, startY, endX, endY

def display_EMO_PRED(img, box, label='', color=(128, 128, 128), txt_color=(255, 255, 255), line_width=2, ):
    lw = line_width or max(round(sum(img.shape) / 2 * 0.003), 2)
    text2_color = (255, 0, 255)
    p1, p2 = (int(box[0]), int(box[1])), (int(box[2]), int(box[3]))
    cv2.rectangle(img, p1, p2, text2_color, thickness=lw, lineType=cv2.LINE_AA)
    font = cv2.FONT_HERSHEY_SIMPLEX

    tf = max(lw - 1, 1)
    text_fond = (0, 0, 0)
    text_width_2, text_height_2 = cv2.getTextSize(label, font, lw / 3, tf)
    text_width_2 = text_width_2[0] + round(((p2[0] - p1[0]) * 10) / 360)
    center_face = p1[0] + round((p2[0] - p1[0]) / 2)

    cv2.putText(img, label,
                (center_face - round(text_width_2 / 2), p1[1] - round(((p2[0] - p1[0]) * 20) / 360)), font,
                lw / 3, text_fond, thickness=tf, lineType=cv2.LINE_AA)
    cv2.putText(img, label,
                (center_face - round(text_width_2 / 2), p1[1] - round(((p2[0] - p1[0]) * 20) / 360)), font,
                lw / 3, text2_color, thickness=tf, lineType=cv2.LINE_AA)
    return img

def display_FPS(img, text, margin=1.0, box_scale=1.0):
    img_h, img_w, _ = img.shape
    line_width = int(min(img_h, img_w) * 0.001)  # line width
    thickness = max(int(line_width / 3), 1)  # font thickness

    font_face = cv2.FONT_HERSHEY_SIMPLEX
    font_color = (0, 0, 0)
    font_scale = thickness / 1.5

    t_w, t_h = cv2.getTextSize(text, font_face, font_scale, None)[0]

    margin_n = int(t_h * margin)
    sub_img = img[0 + margin_n: 0 + margin_n + t_h + int(2 * t_h * box_scale),
              img_w - t_w - margin_n - int(2 * t_h * box_scale): img_w - margin_n]

    white_rect = np.ones(sub_img.shape, dtype=np.uint8) * 255

    img[0 + margin_n: 0 + margin_n + t_h + int(2 * t_h * box_scale),
    img_w - t_w - margin_n - int(2 * t_h * box_scale):img_w - margin_n] = cv2.addWeighted(sub_img, 0.5, white_rect, .5,
                                                                                          1.0)

    cv2.putText(img=img,
                text=text,
                org=(img_w - t_w - margin_n - int(2 * t_h * box_scale) // 2,
                     0 + margin_n + t_h + int(2 * t_h * box_scale) // 2),
                fontFace=font_face,
                fontScale=font_scale,
                color=font_color,
                thickness=thickness,
                lineType=cv2.LINE_AA,
                bottomLeftOrigin=False)

    return img

def draw_label(frame, text, x, y, color=(0,255,0), font_scale=0.6, thickness=2, bg=True):
    """Draws text with optional filled background for readability."""
    (tw, th), bl = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, font_scale, thickness)
    if bg:
        cv2.rectangle(frame, (x, y - th - 3), (x + tw + 2, y + 3), (0, 0, 0), -1)
    cv2.putText(frame, text, (x, y), cv2.FONT_HERSHEY_SIMPLEX, font_scale, color, thickness)

def annotate_id_and_emotion(frame, x1, y1, x2, y2, track_id, emo_label, emo_prob):
    H, W = frame.shape[:2]

    # --- ID label: ABOVE the box, clamped ---
    id_text = f"ID {track_id}"
    y_id = max(14, y1 - 8)                   # a bit above top edge
    x_id = max(0, min(x1, W - 10))
    draw_label(frame, id_text, x_id, y_id, color=(0,255,0))

    # --- Emotion label: BELOW the box, clamped ---
    emo_text = "warming…" if emo_label is None else f"{emo_label} {emo_prob:.1%}"
    (tw, th), _ = cv2.getTextSize(emo_text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
    x_emo = max(0, min(x1, W - tw - 6))
    y_emo = min(H - 6, y2 + th + 8)          # a bit below bottom edge
    draw_label(frame, emo_text, x_emo, y_emo, color=(0,255,255))

In [7]:
# ----------------------------
# 1) Models & utilities
# ----------------------------
DICT_EMO = {0:'Neutral', 1:'Happiness', 2:'Sadness', 3:'Surprise', 4:'Fear', 5:'Disgust', 6:'Anger'}

# file names
name_backbone_model = 'FER_static_ResNet50_AffectNet.pt'
name_LSTM_model     = 'Aff-Wild2'  # -> file 'FER_dinamic_LSTM_Aff-Wild2.pt'

# device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')



In [8]:
# instantiate + load weights
pth_backbone_model = ResNet50(num_classes=7, channels=3).to(device).eval()
pth_backbone_model.load_state_dict(torch.load(name_backbone_model, map_location=device))
pth_backbone_model.eval()

pth_LSTM_model = LSTMPyTorch().to(device).eval()
pth_LSTM_model.load_state_dict(torch.load(f'FER_dinamic_LSTM_{name_LSTM_model}.pt', map_location=device))
pth_LSTM_model.eval()

# preprocessing
try:
    # use user's preprocessing if provided
    _ = pth_processing
    def prepare_face(rgb_np):
        return pth_processing(Image.fromarray(rgb_np)).to(device)
except NameError:
    # fallback preprocessing (adjust to your training pipeline if needed)
    import torchvision.transforms as T
    _transform = T.Compose([
        T.Resize((224,224)),
        T.ToTensor(),
        T.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]),
    ])
    def prepare_face(rgb_np):
        return _transform(Image.fromarray(rgb_np)).unsqueeze(0).to(device)


In [9]:

# per-ID temporal buffers (for LSTM)
T_SEQ = 10
id2buf   = defaultdict(lambda: deque(maxlen=T_SEQ))  # stores feature vectors per ID
id2label = {}  # last predicted label per ID
id2prob  = {}  # last probability per ID
id2seen  = {}  # last frame timestamp to prune stale tracks

@torch.no_grad()
def extract_feature(face_rgb_np: np.ndarray) -> np.ndarray:
    """Run backbone once and return 1D feature vector (numpy)."""
    x = prepare_face(face_rgb_np)            # shape: (1, C, H, W)
    feats = torch.relu(pth_backbone_model.extract_features(x))  # (1, D) depending on your model
    return feats.squeeze(0).detach().cpu().numpy()

@torch.no_grad()
def run_lstm_for_id(track_id: int):
    """If buffer warm, run LSTM and update id2label/id2prob."""
    if len(id2buf[track_id]) < T_SEQ:
        return
    seq = np.vstack(id2buf[track_id])                 # (T, D)
    x = torch.from_numpy(seq).unsqueeze(0).to(device) # (1, T, D)
    out = pth_LSTM_model(x)                           # assume logits
    out = torch.softmax(out, dim=1)                   # (1, C)
    prob, cls = out.max(dim=1)
    id2label[track_id] = DICT_EMO[int(cls.item())]
    id2prob[track_id]  = float(prob.item())

def annotate_emotion(frame, x1, y1, x2, y2, track_id):
    """Draw label+prob next to the bbox."""
    label = id2label.get(track_id, "warming…")
    prob  = id2prob.get(track_id, 0.0)
    txt   = f"{label} {prob:.1%}" if label != "warming…" else label
    y_text = max(0, y1 - 6)
    cv2.putText(frame, txt, (x1, y_text),
                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)

def clamp_box(x1, y1, x2, y2, W, H):
    x1 = max(0, min(int(x1), W-1))
    y1 = max(0, min(int(y1), H-1))
    x2 = max(0, min(int(x2), W))
    y2 = max(0, min(int(y2), H))
    return x1, y1, x2, y2

def maybe_prune_stale(now_ts, ttl_sec=2.0):
    """Optional: drop ID buffers not seen recently to save memory."""
    stale = [tid for tid, tlast in id2seen.items() if (now_ts - tlast) > ttl_sec]
    for tid in stale:
        id2buf.pop(tid, None)
        id2label.pop(tid, None)
        id2prob.pop(tid, None)
        id2seen.pop(tid, None)


In [22]:
source = "data/dangoon.mp4"
# source = 0

In [23]:

# ----------------------------
# 2) Your YOLO + BoT-SORT loop with emotion overlay
# ----------------------------
from ultralytics import YOLO

def main():
    model = YOLO("yolov12n-face.pt")   # your face weights
    win = "YOLO + BoT-SORT + Emotion (CPU)"
    cv2.namedWindow(win, cv2.WINDOW_NORMAL)

    try:
        for res in model.track(
            source=source,
            tracker='botsort_custom.yaml',
            stream=True,
            imgsz=640,
            conf=0.35,
            iou=0.5,
            device='cpu',
            verbose=False,
            persist=True
        ):
            frame = res.orig_img
            H, W = frame.shape[:2]

            boxes = res.boxes
            if boxes is not None and len(boxes) > 0:
                xyxy  = boxes.xyxy.cpu().numpy()               # (N,4)
                confs = boxes.conf.cpu().numpy()               # (N,)
                ids   = boxes.id.cpu().numpy() if boxes.id is not None else np.full((len(confs),), -1)

                now_ts = time.time()

                for k, (x1, y1, x2, y2) in enumerate(xyxy.astype(int)):
                    tid  = int(ids[k]) if ids[k] != -1 else k   # fallback if no ID
                    conf = float(confs[k])

                    x1, y1, x2, y2 = clamp_box(x1, y1, x2, y2, W, H)
                    if x2 <= x1 or y2 <= y1:
                        continue

                    # draw bbox + ID
                    # cv2.rectangle(frame, (x1, y1), (x2, y2), (0,255,0), 2)
                    # y_text = max(0, y1 - 6)
                    # cv2.putText(frame, f"ID {tid}", (x1, y_text),
                    #             cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0), 2)

                    # --- Emotion pipeline ---
                    # Convert BGR -> RGB crop for backbone
                    face_rgb = cv2.cvtColor(frame[y1:y2, x1:x2], cv2.COLOR_BGR2RGB)
                    if face_rgb.size == 0 or face_rgb.shape[0] < 20 or face_rgb.shape[1] < 20:
                        annotate_emotion(frame, x1, y1, x2, y2, tid)
                        continue

                    # 1) feature extraction
                    feat = extract_feature(face_rgb)

                    # 2) per-ID temporal buffer
                    buf = id2buf[tid]
                    if len(buf) == 0:
                        # warm start so we can predict immediately (optional)
                        for _ in range(T_SEQ): buf.append(feat)
                    else:
                        buf.append(feat)

                    # 3) LSTM inference when warm
                    run_lstm_for_id(tid)

                    # 4) draw emotion
                    # draw bbox
                    cv2.rectangle(frame, (x1,y1), (x2,y2), (0,255,0), 2)
                    
                    # get last emotion info for this track
                    emo_label = id2label.get(tid, None)
                    emo_prob  = id2prob.get(tid, 0.0)
                    
                    # draw non-overlapping labels
                    annotate_id_and_emotion(frame, x1, y1, x2, y2, tid, emo_label, emo_prob)

                    # 5) housekeeping
                    id2seen[tid] = now_ts

                # prune old IDs
                maybe_prune_stale(now_ts, ttl_sec=2.0)

            # show
            cv2.imshow(win, frame)
            k = cv2.waitKey(1) & 0xFF
            if k in (27, ord('q')) or cv2.getWindowProperty(win, cv2.WND_PROP_VISIBLE) < 1:
                break

    finally:
        cv2.destroyWindow(win)
        for _ in range(3): cv2.waitKey(1)
        time.sleep(0.05)



In [None]:
if __name__ == "__main__":
    main()