In [None]:
import cv2
import numpy as np
import math
from deepface import DeepFace
from collections import Counter
import mediapipe as mp
from tqdm import tqdm

# ----------------------------- Configuration Parameters -----------------------------

VIDEO_PATH = "Unlocking Facial Recognition_ Diverse Activities Analysis.mp4"
OUTPUT_VIDEO_PATH = "output_video.mp4"
SUMMARY_PATH = "video_summary.txt"
EMOTION_BACKEND = "retinaface"
ANOMALY_THRESHOLD_RATIO = 0.2  # proportion of frame area
FRAME_SKIP = 3  # process every Nth frame
RESIZE_SCALE = 0.5  # scale for analysis

# ----------------------------- Initialization -----------------------------

mp_pose = mp.solutions.pose
pose = mp_pose.Pose()

def initialize_video(video_path, output_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise IOError("Erro ao abrir o vídeo.")
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    return cap, out, width, height

# ----------------------------- Emotion Analysis -----------------------------

def analyze_emotions(small_frame, full_frame, emotion_summary, anomalies_detected, frame_area, scale_factor):
    try:
        analysis = DeepFace.analyze(
            small_frame,
            actions=['emotion'],
            enforce_detection=False,
            detector_backend=EMOTION_BACKEND
        )
        for face_data in analysis:
            region = face_data['region']
            emotion = face_data['dominant_emotion']
            emotion_summary[emotion] += 1

            x, y, w, h = region['x'], region['y'], region['w'], region['h']
            # Rescale coordinates to original frame
            x = int(x / scale_factor)
            y = int(y / scale_factor)
            w = int(w / scale_factor)
            h = int(h / scale_factor)

            if w * h > ANOMALY_THRESHOLD_RATIO * frame_area:
                anomalies_detected += 1

            cv2.rectangle(full_frame, (x, y), (x + w, y + h), (255, 0, 0), 2)
            cv2.putText(full_frame, emotion, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX,
                        0.9, (36, 255, 12), 2)
    except Exception as e:
        print(f"Falha na análise emocional: {e}")
    return anomalies_detected

# ----------------------------- Activity Detection -----------------------------

def detect_activity_from_landmarks(landmarks, image_shape):
    activities = []
    h, w = image_shape

    def get_landmark(name):
        return landmarks.get(name, None)

    def distance(p1, p2):
        if p1 and p2:
            return math.hypot(p1[0] - p2[0], p1[1] - p2[1])
        return 0

    left_shoulder = get_landmark('LEFT_SHOULDER')
    right_shoulder = get_landmark('RIGHT_SHOULDER')
    left_hip = get_landmark('LEFT_HIP')
    right_hip = get_landmark('RIGHT_HIP')
    left_wrist = get_landmark('LEFT_WRIST')
    right_wrist = get_landmark('RIGHT_WRIST')
    left_hand = get_landmark('LEFT_INDEX')
    right_hand = get_landmark('RIGHT_INDEX')
    nose = get_landmark('NOSE')
    mouth = get_landmark('MOUTH_LEFT')
    left_eye = get_landmark('LEFT_EYE')
    right_eye = get_landmark('RIGHT_EYE')

    shoulder_y = (left_shoulder[1] + right_shoulder[1]) / 2 if left_shoulder and right_shoulder else None
    hip_y = (left_hip[1] + right_hip[1]) / 2 if left_hip and right_hip else None

    if shoulder_y and hip_y:
        if hip_y > shoulder_y + 0.05 * h:
            activities.append("sentado")
        elif abs(hip_y - shoulder_y) < 0.05 * h:
            activities.append("em pé")

    if left_wrist and shoulder_y and left_wrist[1] < shoulder_y - 0.1 * h:
        activities.append("acenando com a mão esquerda")
    if right_wrist and shoulder_y and right_wrist[1] < shoulder_y - 0.1 * h:
        activities.append("acenando com a mão direita")

    if left_wrist and shoulder_y and abs(left_wrist[1] - shoulder_y) < 0.05 * h:
        activities.append("gesticulando com a mão esquerda")
    if right_wrist and shoulder_y and abs(right_wrist[1] - shoulder_y) < 0.05 * h:
        activities.append("gesticulando com a mão direita")

    if left_wrist and right_wrist and abs(left_wrist[0] - right_wrist[0]) > 0.3 * w:
        activities.append("dançando")

    if left_hand and right_hand and distance(left_hand, right_hand) < 0.05 * w:
        activities.append("apertando as mãos")

    if nose and mouth and distance(nose, mouth) > 0.08 * h:
        activities.append("bocejando")

    if mouth and left_eye and right_eye and nose:
        eye_avg_y = (left_eye[1] + right_eye[1]) / 2
        if distance(nose, mouth) > 0.06 * h and eye_avg_y < nose[1] - 0.02 * h:
            activities.append("rindo")

    if nose and shoulder_y and nose[1] > shoulder_y + 0.05 * h:
        activities.append("escrevendo")

    return activities

def detect_activity(frame, activity_summary):
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = pose.process(rgb_frame)
    if results.pose_landmarks:
        mp.solutions.drawing_utils.draw_landmarks(
            frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

        landmarks = {}
        for idx, lm in enumerate(results.pose_landmarks.landmark):
            name = mp_pose.PoseLandmark(idx).name
            landmarks[name] = (int(lm.x * frame.shape[1]), int(lm.y * frame.shape[0]))

        activities = detect_activity_from_landmarks(landmarks, frame.shape[:2])
        for act in activities:
            activity_summary[act] += 1

# ----------------------------- Summary Generation -----------------------------

def generate_summary(summary_path, frame_count, anomalies_detected, emotion_summary, activity_summary):
    with open(summary_path, "w", encoding="utf-8") as f:
        f.write("=== Resumo da Análise de Vídeo ===\n")
        f.write(f"Total de frames analisados: {frame_count}\n")
        f.write(f"Número de anomalias detectadas: {anomalies_detected}\n")
        f.write("Distribuição de emoções:\n")
        for emotion, count in emotion_summary.items():
            f.write(f"  {emotion}: {count}\n")
        f.write("Atividades detectadas:\n")
        for activity, count in activity_summary.items():
            f.write(f"  {activity}: {count}\n")

# ----------------------------- Main Processing Loop -----------------------------

def process_video():
    cap, out, width, height = initialize_video(VIDEO_PATH, OUTPUT_VIDEO_PATH)
    frame_area = width * height
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    pbar = tqdm(total=total_frames, desc="Processando frames")

    frame_count = 0
    anomalies_detected = 0
    emotion_summary = Counter()
    activity_summary = Counter()

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_count += 1
        if frame_count % FRAME_SKIP != 0:
            continue

        small_frame = cv2.resize(frame, (0, 0), fx=RESIZE_SCALE, fy=RESIZE_SCALE)
        anomalies_detected = analyze_emotions(small_frame, frame, emotion_summary, anomalies_detected, frame_area, RESIZE_SCALE)
        detect_activity(frame, activity_summary)
        out.write(frame)
        pbar.update(1)

    cap.release()
    out.release()
    cv2.destroyAllWindows()
    pbar.close()

    generate_summary(SUMMARY_PATH, frame_count, anomalies_detected, emotion_summary, activity_summary)
    print("✅ Análise concluída. Resumo salvo em 'video_summary.txt' e vídeo gerado como 'output_video.mp4'.")

# ----------------------------- Run -----------------------------

process_video()



In [None]:
import cv2
import numpy as np
import math
from deepface import DeepFace
from collections import Counter
import mediapipe as mp
from tqdm import tqdm
import torch

# ----------------------------- Configuration Parameters -----------------------------

VIDEO_PATH = "Unlocking Facial Recognition_ Diverse Activities Analysis.mp4"
OUTPUT_VIDEO_PATH = "output_video.mp4"
SUMMARY_PATH = "video_summary.txt"
EMOTION_BACKEND = "retinaface"
ANOMALY_THRESHOLD_RATIO = 0.2  # proportion of frame area
FRAME_SKIP = 3  # process every Nth frame
RESIZE_SCALE = 0.5  # scale for analysis

# ----------------------------- Initialization -----------------------------

mp_pose = mp.solutions.pose
pose = mp_pose.Pose()

# Check for GPU availability
if torch.cuda.is_available():
    print("✅ GPU disponível e será utilizada pelo DeepFace.")
else:
    print("⚠️ GPU não disponível. DeepFace usará CPU.")

def initialize_video(video_path, output_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise IOError("Erro ao abrir o vídeo.")
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    return cap, out, width, height

# ----------------------------- Emotion Analysis -----------------------------

def analyze_emotions(small_frame, full_frame, emotion_summary, anomalies_detected, frame_area, scale_factor):
    try:
        analysis = DeepFace.analyze(
            small_frame,
            actions=['emotion'],
            enforce_detection=False,
            detector_backend=EMOTION_BACKEND
        )
        for face_data in analysis:
            region = face_data['region']
            emotion = face_data['dominant_emotion']
            emotion_summary[emotion] += 1

            x, y, w, h = region['x'], region['y'], region['w'], region['h']
            # Rescale coordinates to original frame
            x = int(x / scale_factor)
            y = int(y / scale_factor)
            w = int(w / scale_factor)
            h = int(h / scale_factor)

            if w * h > ANOMALY_THRESHOLD_RATIO * frame_area:
                anomalies_detected += 1

            cv2.rectangle(full_frame, (x, y), (x + w, y + h), (255, 0, 0), 2)
            cv2.putText(full_frame, emotion, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX,
                        0.9, (36, 255, 12), 2)
    except Exception as e:
        print(f"Falha na análise emocional: {e}")
    return anomalies_detected

# ----------------------------- Activity Detection -----------------------------

def detect_activity_from_landmarks(landmarks, image_shape):
    activities = []
    h, w = image_shape

    def get_landmark(name):
        return landmarks.get(name, None)

    def distance(p1, p2):
        if p1 and p2:
            return math.hypot(p1[0] - p2[0], p1[1] - p2[1])
        return 0

    left_shoulder = get_landmark('LEFT_SHOULDER')
    right_shoulder = get_landmark('RIGHT_SHOULDER')
    left_hip = get_landmark('LEFT_HIP')
    right_hip = get_landmark('RIGHT_HIP')
    left_wrist = get_landmark('LEFT_WRIST')
    right_wrist = get_landmark('RIGHT_WRIST')
    left_hand = get_landmark('LEFT_INDEX')
    right_hand = get_landmark('RIGHT_INDEX')
    nose = get_landmark('NOSE')
    mouth = get_landmark('MOUTH_LEFT')
    left_eye = get_landmark('LEFT_EYE')
    right_eye = get_landmark('RIGHT_EYE')

    if left_shoulder and right_shoulder:
        shoulder_y = (left_shoulder[1] + right_shoulder[1]) / 2
    else:
        shoulder_y = None

    if left_hip and right_hip:
        hip_y = (left_hip[1] + right_hip[1]) / 2
    else:
        hip_y = None

    if shoulder_y and hip_y:
        if hip_y > shoulder_y + 0.05 * h:
            activities.append("sentado")
        elif abs(hip_y - shoulder_y) < 0.05 * h:
            activities.append("em pé")

    if left_wrist and shoulder_y and left_wrist[1] < shoulder_y - 0.1 * h:
        activities.append("acenando com a mão esquerda")
    if right_wrist and shoulder_y and right_wrist[1] < shoulder_y - 0.1 * h:
        activities.append("acenando com a mão direita")

    if left_wrist and shoulder_y and abs(left_wrist[1] - shoulder_y) < 0.05 * h:
        activities.append("gesticulando com a mão esquerda")
    if right_wrist and shoulder_y and abs(right_wrist[1] - shoulder_y) < 0.05 * h:
        activities.append("gesticulando com a mão direita")

    if left_wrist and right_wrist and abs(left_wrist[0] - right_wrist[0]) > 0.3 * w:
        activities.append("dançando")

    if left_hand and right_hand and distance(left_hand, right_hand) < 0.05 * w:
        activities.append("apertando as mãos")

    if nose and mouth and distance(nose, mouth) > 0.08 * h:
        activities.append("bocejando")

    if mouth and left_eye and right_eye and nose:
        eye_avg_y = (left_eye[1] + right_eye[1]) / 2
        if distance(nose, mouth) > 0.06 * h and eye_avg_y < nose[1] - 0.02 * h:
            activities.append("rindo")

    if nose and shoulder_y and nose[1] > shoulder_y + 0.05 * h:
        activities.append("escrevendo")

    return activities

def detect_activity(frame, activity_summary):
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = pose.process(rgb_frame)
    if results.pose_landmarks:
        mp.solutions.drawing_utils.draw_landmarks(
            frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

        landmarks = {}
        for idx, lm in enumerate(results.pose_landmarks.landmark):
            name = mp_pose.PoseLandmark(idx).name
            landmarks[name] = (int(lm.x * frame.shape[1]), int(lm.y * frame.shape[0]))

        activities = detect_activity_from_landmarks(landmarks, frame.shape[:2])
        for act in activities:
            activity_summary[act] += 1

# ----------------------------- Summary Generation -----------------------------

def generate_summary(summary_path, frame_count, anomalies_detected, emotion_summary, activity_summary):
    with open(summary_path, "w", encoding="utf-8") as f:
        f.write("=== Resumo da Análise de Vídeo ===\n")
        f.write(f"Total de frames analisados: {frame_count}\n")
        f.write(f"Número de anomalias detectadas: {anomalies_detected}\n")
        f.write("Distribuição de emoções:\n")
        for emotion, count in emotion_summary.items():
            f.write(f"  {emotion}: {count}\n")
        f.write("Atividades detectadas:\n")
        for activity, count in activity_summary.items():
            f.write(f"  {activity}: {count}\n")

# ----------------------------- Main Processing Loop -----------------------------

def process_video():
    cap, out, width, height = initialize_video(VIDEO_PATH, OUTPUT_VIDEO_PATH)
    frame_area = width * height
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    pbar = tqdm(total=total_frames, desc="Processando frames")

    frame_count = 0
    anomalies_detected = 0
    emotion_summary = Counter()
    activity_summary = Counter()

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_count += 1
        if frame_count % FRAME_SKIP != 0:
            pbar.update(1)
            continue

        small_frame = cv2.resize(frame, (0, 0), fx=RESIZE_SCALE, fy=RESIZE_SCALE)
        anomalies_detected = analyze_emotions(small_frame, frame, emotion_summary, anomalies_detected, frame_area, RESIZE_SCALE)
        detect_activity(frame, activity_summary)
        out.write(frame)
        pbar.update(1)

    cap.release()
    out.release()
    cv2.destroyAllWindows()
    pbar.close()

    generate_summary(SUMMARY_PATH, frame_count, anomalies_detected, emotion_summary, activity_summary)
    print("✅ Análise concluída. Resumo salvo em 'video_summary.txt' e vídeo gerado como 'output_video.mp4'.")

# ----------------------------- Run -----------------------------

process_video()



⚠️ GPU não disponível. DeepFace usará CPU.


Processando frames:   0%|          | 15/3326 [01:31<5:29:47,  5.98s/it]