# Tugas 1 - Sinyal Respirasi dan Post-Processing

Berdasarkan apa yang telah kita coba mengenai metode pengambilan sinyal respirasi menggunakan mediapipe. Menggunakan video yang kalian ambil masing-masing pada tahap Pre-requisite modul ini. Lakukan proses pengambilan sinyal respirasi dan juga post-processingnya dalam menentukan laju pernafasan per menit.

In [None]:
import cv2
import numpy as np
import pandas as pd
from scipy import signal
from scipy.signal import find_peaks
import mediapipe as mp
import time

import matplotlib.pyplot as plt

# Load the stopwatch data
stopwatch_df = pd.read_csv('stopwatch.csv')

# Initialize MediaPipe Face Mesh
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=False, 
                                  max_num_faces=1,
                                  min_detection_confidence=0.5,
                                  min_tracking_confidence=0.5)

# Function to extract respiratory signal
def extract_respiratory_signal(video_path):
    cap = cv2.VideoCapture(video_path)
    
    if not cap.isOpened():
        print("Error: Could not open video file")
        return None
    
    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # Nose position tracking for respiratory signal
    nose_y_positions = []
    timestamps = []
    frame_count = 0
    
    while cap.isOpened():
        ret, frame = cap.read()
        
        if not ret:
            break
        
        # Convert to RGB for MediaPipe
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # Process the image
        results = face_mesh.process(frame_rgb)
        
        current_time = frame_count / fps
        timestamps.append(current_time)
        
        if results.multi_face_landmarks:
            face_landmarks = results.multi_face_landmarks[0]
            
            # Extract nose position (using point 1)
            nose_landmark = face_landmarks.landmark[1]
            nose_y = nose_landmark.y
            nose_y_positions.append(nose_y)
        else:
            # If no face is detected, use the last known position or a default
            if nose_y_positions:
                nose_y_positions.append(nose_y_positions[-1])
            else:
                nose_y_positions.append(0)
        
        frame_count += 1
        
        # Display progress
        if frame_count % 30 == 0:
            print(f"Processing frame {frame_count}/{total_frames}")
    
    cap.release()
    
    return np.array(nose_y_positions), np.array(timestamps), fps

# Process the video
resp_signal, timestamps, fps = extract_respiratory_signal('dsp_ho3.mp4')

# Post-processing
def process_respiratory_signal(signal_data, timestamps, fps):
    # Convert to inverted normalized signal
    signal_inverted = -1 * (signal_data - np.mean(signal_data)) / np.std(signal_data)
    
    # Apply bandpass filter to isolate respiratory frequencies (0.1-0.5 Hz)
    b, a = signal.butter(3, [0.1, 0.5], 'bandpass', fs=fps)
    filtered_signal = signal.filtfilt(b, a, signal_inverted)
    
    # Find peaks for breath counting
    peaks, _ = find_peaks(filtered_signal, height=0.1, distance=fps)
    
    # Calculate breaths per minute at different time points
    breaths_per_minute = []
    window_size = 30  # 30 seconds window
    
    for i in range(len(timestamps)):
        if timestamps[i] < window_size:
            continue
        
        # Count peaks in the last window_size seconds
        start_time = timestamps[i] - window_size
        window_start_idx = np.argmax(timestamps >= start_time)
        
        peaks_in_window = np.sum((timestamps[peaks] >= start_time) & 
                                 (timestamps[peaks] <= timestamps[i]))
        
        # Convert to breaths per minute
        bpm = (peaks_in_window / window_size) * 60
        breaths_per_minute.append((timestamps[i], bpm))
    
    return filtered_signal, peaks, np.array(breaths_per_minute)

# Process the respiratory signal
filtered_signal, resp_peaks, breaths_per_min = process_respiratory_signal(resp_signal, timestamps, fps)

# Visualization
plt.figure(figsize=(15, 10))

# Plot respiratory signal
plt.subplot(3, 1, 1)
plt.plot(timestamps, resp_signal)
plt.title('Raw Respiratory Signal (Nose Y-Position)')
plt.xlabel('Time (s)')
plt.ylabel('Nose Y-Position')

# Plot filtered signal with peaks
plt.subplot(3, 1, 2)
plt.plot(timestamps, filtered_signal)
plt.plot(timestamps[resp_peaks], filtered_signal[resp_peaks], 'ro')
plt.title('Filtered Respiratory Signal with Detected Breaths')
plt.xlabel('Time (s)')
plt.ylabel('Amplitude')

# Plot breaths per minute over time
plt.subplot(3, 1, 3)
if len(breaths_per_min) > 0:
    plt.plot(breaths_per_min[:, 0], breaths_per_min[:, 1])
    plt.title('Respiratory Rate (Breaths per Minute)')
    plt.xlabel('Time (s)')
    plt.ylabel('Breaths per Minute')

plt.tight_layout()
plt.show()

# Compare with stopwatch data
if not stopwatch_df.empty:
    print("Comparing with stopwatch data:")
    avg_bpm = np.mean(breaths_per_min[:, 1]) if len(breaths_per_min) > 0 else 0
    print(f"Average calculated respiratory rate: {avg_bpm:.2f} BPM")
    print(f"Stopwatch data: {stopwatch_df.to_string(index=False)}")

# Tugas 2 - Metode Lucas-Kanade Optical Flow

Kita menggunakan banyak metode dalam melakukan proses pengambilan sinyal pernafasan ini seperti apa itu Lucas-Kanade Optical Flow, tetapi kita tidak cukup detail dalam membahasnya kali ini. Anggap diri kamu sebagai periset independen, lakukan riset mandiri dan jelaskan konsep-konsep dan step yang kamu gunakan sampai kamu bisa mendapatkan laju pernafasan pada video kamu, dan jangan lupa untuk membandingkan hasil data dari video kamu dengan metode ini. 