Aplikasi **People Flow** adalah sistem visi komputer yang mendeteksi orang melintasi garis vertikal imajiner untuk menghitung masuk (ke kanan) dan keluar (ke kiri), memantau kapasitas ruangan, dan menghasilkan laporan per menit dalam format CSV. Notebook ini bertujuan untuk:
- Mengeksplorasi MediaPipe Pose sebagai model deteksi, termasuk variasi parameter.
- Menyajikan kode model akhir berdasarkan kelas `PeopleCounter`.
- Mengevaluasi keberhasilan model (akurasi, FPS, stabilitas).
- Memberikan kode lengkap dengan penjelasan.

In [None]:
# Instal dependensi
!pip install opencv-python==4.10.0.84 mediapipe==0.10.14 numpy==1.26.4 scipy==1.14.1

In [None]:
!pip uninstall bokeh panel holoviews -y
#Dibutuhkan jika Import Error

In [None]:
# Impor pustaka
import cv2
import mediapipe as mp
import numpy as np
from scipy.spatial import distance
import matplotlib.pyplot as plt

In [None]:
# Unduh video sample untuk pengujian
!wget https://github.com/haijuf/video-sample.git -O sample_video.mp4

Arsitektur: MediaPipe Pose menggunakan model BlazePose untuk mendeteksi 33 landmark tubuh secara real-time. Kami mengeksplorasi parameter seperti min_detection_confidence untuk mengoptimalkan akurasi dan stabilitas.

Kelebihan:





Ringan, cepat (~30 FPS di CPU).



API Python sederhana, terintegrasi dengan OpenCV.



Akurat untuk deteksi satu orang.

Kekurangan:





Hanya mendeteksi satu pose per frame di Python (v0.10.14).



Sensitif terhadap pencahayaan rendah dan occlusion.

In [None]:
class PeopleCounterExperiment:
    def __init__(self, detection_confidence=0.5, tracking_confidence=0.5, max_capacity=None):
        self.mp_pose = mp.solutions.pose
        self.pose = self.mp_pose.Pose(
            min_detection_confidence=detection_confidence,
            min_tracking_confidence=tracking_confidence,
            model_complexity=1
        )
        self.count_in = 0
        self.count_out = 0
        self.max_capacity = max_capacity
        self.tracked_people = {}
        self.next_id = 1
        self.line_x = 400

    def get_centroid(self, landmarks, frame_shape):
        h, w, _ = frame_shape
        shoulder = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_SHOULDER]
        hip = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_HIP]
        cx = int((shoulder.x + hip.x) / 2 * w)
        cy = int((shoulder.y + hip.y) / 2 * h)
        return cx, cy

    def process_frame(self, frame):
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = self.pose.process(frame_rgb)
        status = "Normal"
        is_full = False
        current_people = {}

        if results.pose_landmarks:
            cx, cy = self.get_centroid(results.pose_landmarks, frame.shape)
            person_id = None
            for pid, (px, py) in self.tracked_people.items():
                if distance.euclidean((cx, cy), (px, py)) < 100:
                    person_id = pid
                    break
            if person_id is None:
                person_id = self.next_id
                self.next_id += 1
            current_people[person_id] = (cx, cy)

            prev_pos = self.tracked_people.get(person_id, (cx, cy))
            prev_x = prev_pos[0]
            current_count = self.count_in - self.count_out
            if prev_x <= self.line_x and cx > self.line_x:
                if self.max_capacity is None or current_count < self.max_capacity:
                    self.count_in += 1
            elif prev_x >= self.line_x and cx < self.line_x:
                self.count_out += 1

            cv2.circle(frame, (cx, cy), 5, (0, 255, 0), -1)
            cv2.putText(frame, f"ID: {person_id}", (cx, cy - 10),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        self.tracked_people = current_people
        cv2.line(frame, (self.line_x, 0), (self.line_x, frame.shape[0]), (255, 0, 0), 2)

        current_count = self.count_in - self.count_out
        if self.max_capacity:
            if current_count >= self.max_capacity:
                is_full = True
                status = "Ruangan Penuh"
                cv2.putText(frame, "Ruangan Penuh!", (50, 50),
                           cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3)
            elif current_count == 0:
                status = "Ruangan Kosong"
            else:
                status = f"Sisa Kapasitas: {self.max_capacity - current_count}"
        else:
            if current_count == 0:
                status = "Ruangan Kosong"
            else:
                status = f"Total Orang: {current_count}"

        return frame, self.count_in, self.count_out, status, is_full

# Uji parameter berbeda
configs = [
    {"detection_confidence": 0.5, "tracking_confidence": 0.5, "name": "Default"},
    {"detection_confidence": 0.7, "tracking_confidence": 0.7, "name": "Tinggi"},
    {"detection_confidence": 0.3, "tracking_confidence": 0.3, "name": "Rendah"}
]

for config in configs:
    print(f"\nMenguji konfigurasi: {config['name']}")
    cap = cv2.VideoCapture('sample_video.mp4')
    counter = PeopleCounterExperiment(
        detection_confidence=config["detection_confidence"],
        tracking_confidence=config["tracking_confidence"],
        max_capacity=5
    )
    frame_count = 0
    while cap.isOpened() and frame_count < 100:  # Batasi 100 frame
        ret, frame = cap.read()
        if not ret:
            break
        frame, count_in, count_out, status, is_full = counter.process_frame(frame)
        frame_count += 1
    print(f"Hasil {config['name']}: Masuk={counter.count_in}, Keluar={counter.count_out}")
    cap.release()


Menguji konfigurasi: Default
Hasil Default: Masuk=0, Keluar=0

Menguji konfigurasi: Tinggi
Hasil Tinggi: Masuk=0, Keluar=0

Menguji konfigurasi: Rendah
Hasil Rendah: Masuk=0, Keluar=0


In [None]:
class PeopleCounter:
    def __init__(self, max_capacity=None):
        self.mp_pose = mp.solutions.pose
        self.pose = self.mp_pose.Pose(
            min_detection_confidence=0.7,
            min_tracking_confidence=0.7,
            model_complexity=1
        )
        self.count_in = 0
        self.count_out = 0
        self.max_capacity = max_capacity
        self.tracked_people = {}  # Format: {id: (centroid_x, centroid_y)}
        self.next_id = 1
        self.line_x = 400  # Garis imajiner vertikal

    def get_centroid(self, landmarks, frame_shape):
        """Menghitung centroid dari landmark untuk satu orang."""
        h, w, _ = frame_shape
        # Ambil koordinat bahu kiri dan pinggul kiri
        shoulder = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_SHOULDER]
        hip = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_HIP]
        cx = int((shoulder.x + hip.x) / 2 * w)
        cy = int((shoulder.y + hip.y) / 2 * h)
        return cx, cy

    def process_frame(self, frame):
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = self.pose.process(frame_rgb)
        status = "Normal"
        is_full = False
        current_people = {}

        if results.pose_landmarks:
            cx, cy = self.get_centroid(results.pose_landmarks, frame.shape)
            person_id = None
            for pid, (px, py) in self.tracked_people.items():
                if distance.euclidean((cx, cy), (px, py)) < 100:
                    person_id = pid
                    break
            if person_id is None:
                person_id = self.next_id
                self.next_id += 1
            current_people[person_id] = (cx, cy)

            prev_pos = self.tracked_people.get(person_id, (cx, cy))
            prev_x = prev_pos[0]
            current_count = self.count_in - self.count_out
            if prev_x <= self.line_x and cx > self.line_x:
                if self.max_capacity is None or current_count < self.max_capacity:
                    self.count_in += 1
            elif prev_x >= self.line_x and cx < self.line_x:
                self.count_out += 1

            cv2.circle(frame, (cx, cy), 5, (0, 255, 0), -1)
            cv2.putText(frame, f"ID: {person_id}", (cx, cy - 10),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        self.tracked_people = current_people
        cv2.line(frame, (self.line_x, 0), (self.line_x, frame.shape[0]), (255, 0, 0), 2)

        current_count = self.count_in - self.count_out
        if self.max_capacity:
            if current_count >= self.max_capacity:
                is_full = True
                status = "Ruangan Penuh"
                cv2.putText(frame, "Ruangan Penuh!", (50, 50),
                           cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3)
            elif current_count == 0:
                status = "Ruangan Kosong"
            else:
                status = f"Sisa Kapasitas: {self.max_capacity - current_count}"
        else:
            if current_count == 0:
                status = "Ruangan Kosong"
            else:
                status = f"Total Orang: {current_count}"

        return frame, self.count_in, self.count_out, status, is_full

# Uji model akhir dengan video sample
cap = cv2.VideoCapture('sample_video.mp4')
counter = PeopleCounter(max_capacity=5)
frame_count = 0
while cap.isOpened() and frame_count < 100:  # Batasi 100 frame
    ret, frame = cap.read()
    if not ret:
        break
    frame, count_in, count_out, status, is_full = counter.process_frame(frame)
    cv2.putText(frame, f"Masuk: {count_in} | Keluar: {count_out} | {status}",
                (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
    frame_count += 1
    # Visualisasi frame ke-50
    if frame_count == 50:
        plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        plt.title(f"Masuk: {count_in} | Keluar: {count_out} | {status}")
        plt.axis('off')
        plt.show()
cap.release()
print(f"Hasil Model Akhir: Masuk={counter.count_in}, Keluar={counter.count_out}")

Hasil Model Akhir: Masuk=0, Keluar=0
