In [None]:
from flask import Flask, request, jsonify
import os
from datetime import datetime
import cv2
import numpy as np
import torch
from transformers import AutoImageProcessor, AutoModelForImageClassification
from mediapipe.python.solutions import face_detection as mp_face_detection
import math

app = Flask(__name__)

# --- 설정 ---
LOG_INTERVAL = 0.3
PREDICT_INTERVAL = 1.0
SOFTMAX_TEMPERATURE = 0.7
NEUTRAL_DAMPING = 0.85
BBOX_PAD_RATIO = 0.15
SQUARE_CROP = True

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_name = "trpakov/vit-face-expression"
processor = AutoImageProcessor.from_pretrained(model_name, use_fast=True)
emo_model = AutoModelForImageClassification.from_pretrained(model_name).to(device)
emo_model.eval()
id2label = emo_model.config.id2label

LABEL_ORDER = [id2label[i] for i in range(len(id2label))]
face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)

# Valence, Arousal weight maps
VALENCE_MAP = {
    "angry": -0.9,
    "disgust": -0.7,
    "fear": -0.8,
    "sad": -0.9,
    "happy": 0.9,
    "surprise": 0.3,
    "neutral": 0.0,
}

AROUSAL_MAP = {
    "angry": 0.8,
    "disgust": 0.4,
    "fear": 0.9,
    "sad": 0.2,
    "happy": 0.7,
    "surprise": 1.0,
    "neutral": 0.1,
}

def clamp01(x):
    return max(0.0, min(1.0, x))

def toProbArrayFromObject(obj, labels=LABEL_ORDER):
    probs_raw = [clamp01(obj.get(lb, 0.0) / 100.0) for lb in labels]
    s = sum(probs_raw)
    if s <= 0 or not math.isfinite(s):
        return [0.0] * len(labels)
    return [v / s for v in probs_raw]

def entropy(probs):
    eps = 1e-12
    H = 0.0
    for p in probs:
        pp = max(p, eps)
        H += -pp * math.log(pp)
    return H

def computeVAC(probs, labels=LABEL_ORDER):
    K = len(probs)
    vWeights = [VALENCE_MAP.get(lb, 0.0) for lb in labels]
    aWeights = [AROUSAL_MAP.get(lb, 0.5) for lb in labels]

    V = sum(p * v for p, v in zip(probs, vWeights))
    A = sum(p * a for p, a in zip(probs, aWeights))
    H = entropy(probs)
    Hmax = math.log(max(K, 1))
    C = 1 - (H / Hmax if Hmax > 0 else 0)
    return V, clamp01(A), clamp01(C)

def computeAESFromProbsObject(obj, labels=LABEL_ORDER):
    probs = toProbArrayFromObject(obj, labels)
    V, A, C = computeVAC(probs, labels)
    Vn = (V + 1) / 2
    AES = clamp01(0.4 * Vn + 0.4 * A + 0.2 * C)
    return AES

def get_boxes(frame_bgr):
    h, w = frame_bgr.shape[:2]
    res = face_detector.process(cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB))
    dets = res.detections if (res and res.detections) else []
    boxes = []
    for det in dets:
        rb = det.location_data.relative_bounding_box
        x1 = max(int(rb.xmin * w), 0)
        y1 = max(int(rb.ymin * h), 0)
        x2 = min(int((rb.xmin + rb.width) * w), w - 1)
        y2 = min(int((rb.ymin + rb.height) * h), h - 1)
        if x2 > x1 and y2 > y1:
            cx = (x1 + x2) // 2
            cy = (y1 + y2) // 2
            bw = x2 - x1
            bh = y2 - y1
            side = max(bw, bh)
            side = int(side * (1.0 + BBOX_PAD_RATIO))
            if SQUARE_CROP:
                half = side // 2
                nx1 = max(cx - half, 0)
                ny1 = max(cy - half, 0)
                nx2 = min(nx1 + side, w - 1)
                ny2 = min(ny1 + side, h - 1)
                nx1 = max(nx2 - side, 0)
                ny1 = max(ny2 - side, 0)
                boxes.append((nx1, ny1, nx2, ny2))
            else:
                padw = int(bw * BBOX_PAD_RATIO * 0.5)
                padh = int(bh * BBOX_PAD_RATIO * 0.5)
                boxes.append((max(x1 - padw, 0), max(y1 - padh, 0),
                              min(x2 + padw, w - 1), min(y2 + padh, h - 1)))
    return boxes

@torch.no_grad()
def predict_emotion_probs(face_bgr, temperature=SOFTMAX_TEMPERATURE, neutral_damping=NEUTRAL_DAMPING):
    face_rgb = cv2.cvtColor(face_bgr, cv2.COLOR_BGR2RGB)
    img_np = np.ascontiguousarray(face_rgb).astype(np.uint8)
    inputs = processor(images=[img_np], return_tensors="pt")
    inputs = {k: v.to(device) for k, v in inputs.items()}
    logits = emo_model(**inputs).logits
    inv = {v: k for k, v in id2label.items()}
    if "neutral" in inv:
        neutral_idx = inv["neutral"]
        logits[:, neutral_idx] *= neutral_damping
    logits = logits / max(1e-6, temperature)
    probs = logits.softmax(dim=-1)[0].detach().cpu().numpy()
    probs_dict = {id2label[i]: float(probs[i]) for i in range(len(probs))}
    idx = int(np.argmax(probs))
    top_label, top_score = id2label[idx], float(probs[idx])
    return top_label, top_score, probs_dict

def analyze_video_to_list(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        return None, f"Unable to open video: {video_path}"
    fps = 30
    if fps <= 0: fps = 30.0
    step = max(1, int(round(PREDICT_INTERVAL * fps)))
    last_log_idx = -10**9
    idx = 0
    logs = []

    try:
        while True:
            ok, frame = cap.read()
            if not ok:
                break
            if idx % step == 0:
                boxes = get_boxes(frame)
                if boxes:
                    areas = [(x2 - x1) * (y2 - y1) for (x1, y1, x2, y2) in boxes]
                    i = int(np.argmax(areas))
                    x1, y1, x2, y2 = boxes[i]
                    roi = frame[y1:y2, x1:x2].copy()
                    top_label, top_score, probs_dict = predict_emotion_probs(roi)
                    if (idx - last_log_idx) >= step:
                        AES_score = computeAESFromProbsObject(probs_dict)
                        score_100 = int(round(AES_score * 100)) 
                        iso_ts = datetime.now().isoformat(timespec="milliseconds")
                        log_entry = {
                            "frame_idx": idx,
                            "score": score_100,
                        }
                        for lab in LABEL_ORDER:
                            log_entry[lab] = round(probs_dict.get(lab, 0.0) * 100.0, 2)
                        logs.append(log_entry)
                        last_log_idx = idx
            idx += 1
    finally:
        cap.release()
    return logs, None

@app.route('/analyze_video', methods=['POST'])
def analyze_video_api():
    data = request.json
    video_path = data.get('video_path')
    if not video_path:
        return jsonify({"error": "Missing 'video_path' in request body."}), 400

    logs, error = analyze_video_to_list(video_path)
    if error:
        return jsonify({"error": error}), 500

     # score 평균 계산
    scores = [entry["score"] for entry in logs if "score" in entry]
    avg_score = int(round(sum(scores) / len(scores))) if scores else 0

    return jsonify({
        "average_score": avg_score,
        "results": logs
    })

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5001)
