<a href="https://colab.research.google.com/github/Caio-Giacomelli/FIAP-3IADT-TC-04/blob/main/FIAP_3IADT_TC04.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# FIAP - IA Para Devs - Turma 3IADT 2024 - Trabalho 4
### Caio Henrique Giacomelli         - RM 358131
### Rafael Pereira Alonso            - RM 358127
### Wagner Dominike Eugênio de Mello - RM 358565

### Link do Github: https://github.com/Caio-Giacomelli/FIAP-3IADT-TC-04
### Link do Youtube: https://www.youtube.com/watch?v=C2iriFcsIgw

# Resumo

Neste Tech Challenge, realizamos o desafio de implementar reconhecimento facial, análise de expressões emocionais, detecção de atividades em tempo real e geração de resumo destas atividades. Em conjunto, definimos uma anomalia e à detectamos em vídeo.


O reconhecimento facial foi realizado de duas formas. Utilizamos haarcascades como classificador da biblioteca de cv2 em conjunto com Deep Face para identificar o rosto e mapear as emoções associadas ao rosto detectado, já também solucionando o segundo desafio, de análise de expressões emocionais. O método detect_face_expressions é o responsável por esta lógica. Também utilizamos o MediaPipe para identificação de rosto, porém utilizamos seus landmarks para definir anomalias, escolhendo a distância entre o nariz e a boca superiores ou inferiores à um limiar, para ser identificada como anomalia.

A geração de resumo em tempo real foi realizada utilizando BlipProcessor e BlipForConditionalGeneration, da biblioteca de transformers.E para finalizar, resumimos as Captions e Emoções utilizando o Summarizer

**Total de Frames analisados:** 3326

**Total de Anomalias encontradas:** 35

**Tempo total de processamento:** 32 minutos

**Principais emoções encontradas:** Neutra, medo, felicidade, raiva, tristeza, surpresa


# Importação de bibliotecas e Conexão com Drive

In [None]:
!pip install opencv-python mediapipe tqdm deepface transformers --quiet --use-deprecated=legacy-resolver

In [None]:
import cv2
from tqdm import tqdm
import mediapipe as mp
from deepface import DeepFace
from transformers import BlipProcessor, BlipForConditionalGeneration


In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Métodos para Detecção de Poses, expressões faciais e anomalias

In [None]:
def detect_face_expressions(frame, face_cascade):
  gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
  faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=10, minSize=(30, 30))

  emotions_in_frame = []
  for (x, y, w, h) in faces:
    face_roi = frame[y:y+h, x:x+w]
    try:
      analysis = DeepFace.analyze(face_roi, actions=['emotion'], silent=True, enforce_detection=False)
      emotion = analysis[0]['dominant_emotion']
      cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
      cv2.putText(frame, emotion, (x, y-10), cv2.FONT_HERSHEY_DUPLEX, 0.9, (0, 255, 0), 2)
      emotions_in_frame.append(emotion)
    except ValueError as e:
      print(f"Error analyzing face: {e}")
  return emotions_in_frame

In [None]:
def describe_frames(frames, processor, model, frame_width, frame_height):
  inputs = processor(images=frames, return_tensors="pt")
  captions = model.generate(**inputs)
  return [processor.decode(caption, skip_special_tokens=True) for caption in captions]

In [None]:
def detect_face_anomalies(frame, mp_face_mesh, face_mesh, mp_drawing):
  rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

  results = face_mesh.process(rgb_frame)

  anomaly_count = 0

  if results.multi_face_landmarks:
    for face_landmarks in results.multi_face_landmarks:
      # Obter as coordenadas do bounding box do rosto a partir dos landmarks
      x_coords = [landmark.x for landmark in face_landmarks.landmark]
      y_coords = [landmark.y for landmark in face_landmarks.landmark]
      x_min, x_max = min(x_coords), max(x_coords)
      y_min, y_max = min(y_coords), max(y_coords)

      frame_height, frame_width, _ = frame.shape
      face_width = (x_max - x_min) * frame_width

      # Anomalia: verificar a distância entre a boca e o nariz
      # Acesse os landmarks usando os índices numéricos
      nose_tip = face_landmarks.landmark[4]        # Ponto da ponta do nariz
      upper_lip = face_landmarks.landmark[13]     # Ponto médio do lábio superior

      # Converter coordenadas normalizadas para coordenadas de pixel
      nose_tip_coords = (int(nose_tip.x * frame_width), int(nose_tip.y * frame_height))
      upper_lip_coords = (int(upper_lip.x * frame_width), int(upper_lip.y * frame_height))

      # Calcular a distância entre a ponta do nariz e o ponto médio do lábio superior
      distance_nose_lip = math.dist(nose_tip_coords, upper_lip_coords)

      # Normalizar a distância pela largura do rosto
      normalized_distance = distance_nose_lip / face_width if face_width > 0 else 0

      # Definir um limiar para anomalia
      anomaly_threshold_min = 0.15
      anomaly_threshold_max = 0.293

      # print(f"normalized_distance: " + str(normalized_distance))
      # print(f"anomaly_threshold: " + str(anomaly_threshold))

      if normalized_distance <= anomaly_threshold_min or normalized_distance > anomaly_threshold_max:
        # Marcar a anomalia
        mid_point_x = int((nose_tip_coords[0] + upper_lip_coords[0]) / 2)
        mid_point_y = int((nose_tip_coords[1] + upper_lip_coords[1]) / 2)
        cv2.circle(frame, (mid_point_x, mid_point_y), radius=10, color=(0, 0, 255), thickness=-1)
        print(f"Anomalia na distância do nariz e boca detectada: {normalized_distance:.2f}).")
        anomaly_count += 1

      # Desenhar os landmarks faciais
      mp_drawing.draw_landmarks(
          frame,
          face_landmarks,
          mp_face_mesh.FACEMESH_CONTOURS, # Ou outra opção de conexão
          mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=1, circle_radius=1),
          mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=1, circle_radius=1))

  return anomaly_count

In [None]:
def detect_poses(frame, mp_pose, pose, mp_drawing):
  rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

  results = pose.process(rgb_frame)

  if results.pose_landmarks:
      mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

# Implementação do Laço para coleta e processamento dos Frames

In [None]:
import math

def process_video(video_path, output_path):
  cap = cv2.VideoCapture(video_path)

  if not cap.isOpened():
    print("Erro ao abrir o vídeo.")
    return

  width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
  height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
  fps = int(cap.get(cv2.CAP_PROP_FPS))
  total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

  fourcc = cv2.VideoWriter_fourcc(*'mp4v')
  out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

  # frame describer
  processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
  model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")

  # face recognition
  face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

  # pose recognition
  mp_pose = mp.solutions.pose
  pose = mp_pose.Pose()
  mp_drawing = mp.solutions.drawing_utils

  print("Iniciando coleta dos frames...")
  frames = []
  for _ in tqdm(range(total_frames), desc="Coletando frames"):
    ret, frame = cap.read()

    if not ret:
      break

    frames.append(frame)

  print("Iniciando processamento...")
  batch_size = fps
  total_batches = math.ceil(len(frames) / batch_size)
  processed_seconds = []
  total_anomaly_count = 0


  mp_face_mesh = mp.solutions.face_mesh
  face_mesh = mp_face_mesh.FaceMesh(static_image_mode=False, max_num_faces=1, min_detection_confidence=0.5)

  for i in range(0, len(frames), batch_size):
    batch_frames = frames[i:i + batch_size]
    current_batch = (i // batch_size) + 1

    # get description for a second of video
    print(f"Gerando descrições de frames...")
    captions = describe_frames(batch_frames[0], processor, model, width, height)
    caption_text = captions[0]
    emotions_in_fps = []

    for frame in tqdm(batch_frames, desc=f"Processando batch {current_batch}/{total_batches}"):

      # detect face expressions per frame
      emotions_in_frame = detect_face_expressions(frame, face_cascade)
      emotions_in_fps += emotions_in_frame

      # detect anomaly
      anomaly_count = detect_face_anomalies(frame, mp_face_mesh, face_mesh, mp_drawing)
      total_anomaly_count += anomaly_count

      # print description in frame
      cv2.rectangle(frame, (0, height), (width, height - 70), (0, 0, 0), -1)
      cv2.putText(frame, caption_text, (10, height - 30), cv2.FONT_HERSHEY_DUPLEX, 1, (0, 255, 255), 2)

      # save frame in output video
      out.write(frame)

    processed_seconds.append({ 'caption': caption_text, 'emotions': set(emotions_in_fps) })
  print("\nProcessamento concluído!")

  cap.release()
  out.release()
  cv2.destroyAllWindows()
  return { 'total_frames': len(frames), 'results_per_second': processed_seconds, 'anomaly_count': total_anomaly_count }

# Manipulação dos vídeos e inicialização do processamento

In [None]:
import os
import time

folder_path = '/content/drive/MyDrive/FIAP/Trabalho04'
input_video_path = os.path.join(folder_path, 'video', 'Unlocking Facial Recognition_ Diverse Activities Analysis.mp4')
output_video_path = os.path.join(folder_path, 'output', 'output_video.mp4')

if not os.path.exists(input_video_path):
  raise FileNotFoundError(f"O vídeo de entrada não existe em {input_video_path}")

start_time = time.time()
processed_frames = process_video(input_video_path, output_video_path)
end_time = time.time()
print(f"Tempo de processamento: {(end_time - start_time) // 60} minutos")
print(f"Total de frames processados: {processed_frames['total_frames']}")
print(f"Total de Anomalias: {processed_frames['anomaly_count']}")

if not os.path.exists(output_video_path):
  raise FileNotFoundError(f"Erro ao gerar video de saída em {output_video_path}")

In [None]:
from google.colab import files
files.download(output_video_path)

# Summarização das Captions e Emoções do vídeo

In [None]:
processed_frames['results_per_second']

In [None]:
from transformers import pipeline

summarizer = pipeline("summarization")

# Extrai captions e emoções únicas
all_captions = [d['caption'] for d in processed_frames['results_per_second']]
all_emotions = []
for d in processed_frames['results_per_second']:
    emotions = d.get('emotions')
    if emotions:
        all_emotions.extend(list(emotions))

unique_emotions = list(set(all_emotions))
unique_captions = list(set(all_captions))

# Sumarização das Captions
caption_input_text = f"Captions:\n{', '.join(unique_captions)}"
caption_summary = summarizer(caption_input_text, max_length=100, min_length=30, do_sample=True)
print("Sumário das Captions:")
print(caption_summary[0]['summary_text'])

print("-" * 20)

# Sumarização das Emoções
emotion_input_text = f"Emotions:\n{', '.join(unique_emotions)}"
emotion_summary = summarizer(emotion_input_text, max_length=100, min_length=30, do_sample=True)
print("Sumário das Emoções:")
print(emotion_summary[0]['summary_text'])