Example code for capturing input from a camera feed.

In [None]:
# https://github.com/phamquiluan/ResidualMaskingNetwork
from rmn import RMN
m = RMN()
m.video_demo()

In [None]:
# Detect emotions from an image
import cv2
from rmn import RMN
m = RMN()

image = cv2.imread("test.png")
results = m.detect_emotion_for_single_frame(image)
print(results)
# # [{'xmin': 161, 'ymin': 81, 'xmax': 440, 'ymax': 360, 'emo_label': 'surprise', 'emo_proba': 0.7028718590736389, 'proba_list': [{'angry': 0.011575396172702312}, {'disgust': 0.0002630411763675511}, {'fear': 0.00482788635417819}, {'happy': 0.015042794868350029}, {'sad': 0.0018715509213507175}, {'surprise': 0.7028718590736389}, {'neutral': 0.26354748010635376}]}]
image = m.draw(image, results)
cv2.imwrite("output.png", image)

Final version of the code. For additional details, refer to `Prototype/FER/Models/Info.md`.

In [6]:
import cv2
import os
import time
from collections import deque
from rmn import RMN

# Create logs directory if it doesn't exist
os.makedirs("logs", exist_ok=True)

def get_log_filename():
    """Generate a unique log filename."""
    base_name = "logs/emotion_log"
    counter = 0
    while os.path.exists(f"{base_name}_{counter}.txt"):
        counter += 1
    return f"{base_name}_{counter}.txt"

def process_video(video_file, show_preview=True):
    """Process video for real-time emotion recognition using RMN."""
    video_source = 0 if video_file == "camera" else video_file
    cap = cv2.VideoCapture(video_source)
    if not cap.isOpened():
        print(f"Error: Could not open video source '{video_source}'.")
        return

    # Initialize RMN model
    print("Loading RMN model...")
    m = RMN()

    # Initialize log file
    log_filename = get_log_filename()
    log_file = open(log_filename, "w")
    log_file.write("Timestamp,Emotion,Confidence\n")

    # Smoothing buffer for predictions
    recent_scores = deque(maxlen=15)

    # Variables for FPS calculation
    frame_count = 0
    start_time = time.time()

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Detect emotions
        results = m.detect_emotion_for_single_frame(frame)
        if results:
            for result in results:
                xmin, ymin, xmax, ymax = result["xmin"], result["ymin"], result["xmax"], result["ymax"]
                emotion = result["emo_label"]
                confidence = result["emo_proba"]

                # Smooth predictions
                recent_scores.append((emotion, confidence))
                smoothed_emotion, smoothed_confidence = aggregate_predictions(recent_scores)

                if video_file == "camera":
                    # Get the current timestamp
                    timestamp = time.time()
                else:
                    timestamp = cap.get(cv2.CAP_PROP_POS_MSEC) / 1000.0
                
                # Log results
                log_file.write(f"{timestamp:.2f},{smoothed_emotion},{smoothed_confidence:.2f}\n")

                # Annotate frame
                if show_preview:
                    cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
                    cv2.putText(
                        frame,
                        f"{smoothed_emotion} ({smoothed_confidence:.2f})",
                        (xmin, ymin - 10),
                        cv2.FONT_HERSHEY_SIMPLEX,
                        0.7,
                        (0, 255, 0),
                        2,
                    )

        # Display the video
        if show_preview:
            cv2.imshow("RMN Emotion Recognition", frame)
            if cv2.waitKey(1) & 0xFF == ord("q"):
                break
            if cv2.getWindowProperty("RMN Emotion Recognition", cv2.WND_PROP_VISIBLE) < 1:
                break

        frame_count += 1

    # Calculate FPS
    end_time = time.time()
    total_time = end_time - start_time
    fps = frame_count / total_time
    print(f"Processed {frame_count} frames in {total_time:.2f} seconds. FPS: {fps:.2f}")
    log_file.write(f"\nProcessed {frame_count} frames in {total_time:.2f} seconds.\n")
    log_file.write(f"FPS: {fps:.2f}\n")
    log_file.close()

    cap.release()
    if show_preview:
        cv2.destroyAllWindows()
        cv2.waitKey(1)

def aggregate_predictions(recent_scores):
    """Aggregate predictions using a majority vote and average confidence."""
    if not recent_scores:
        return "unknown", 0.0

    emotion_counts = {}
    total_confidence = {}
    for emotion, confidence in recent_scores:
        emotion_counts[emotion] = emotion_counts.get(emotion, 0) + 1
        total_confidence[emotion] = total_confidence.get(emotion, 0.0) + confidence

    smoothed_emotion = max(emotion_counts, key=emotion_counts.get)
    smoothed_confidence = total_confidence[smoothed_emotion] / emotion_counts[smoothed_emotion]
    return smoothed_emotion, smoothed_confidence

if __name__ == "__main__":
    # Example: Process a video file or camera feed
    process_video("../ExampleVideos/surry.mp4", show_preview=True)


Loading RMN model...
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num faces: 1
num 