# ðŸš— Project: Traffic Flow Analysis - Tol Nagrak (Night Vision)

In [1]:
%pip install tqdm seaborn

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.2 -> 26.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
import cv2
import torch
import math
import time
import os
import warnings

warnings.filterwarnings("ignore")

In [3]:
# --- CLASS SIMPLE TRACKER ---
class SimpleTracker:
    def __init__(self):
        # Storage format { ID : ((x, y), vehicle_type) }
        self.stored_data = {} 
        self.id_count = 0

    def update(self, objects_rect_with_labels):
        objects_bbs_ids_labels = []
        
        for rect in objects_rect_with_labels:
            x, y, w, h, label = rect
            cx = (x + x + w) // 2
            cy = (y + y + h) // 2

            same_object_detected = False
            
            for id, data in self.stored_data.items():
                center_pt = data[0]
                dist = math.hypot(cx - center_pt[0], cy - center_pt[1])
                
                # Distance threshold
                if dist < 80: 
                    existing_label = data[1]
                    self.stored_data[id] = ((cx, cy), existing_label)
                    objects_bbs_ids_labels.append([x, y, w, h, id, existing_label]) 
                    same_object_detected = True
                    break

            if not same_object_detected:
                self.stored_data[self.id_count] = ((cx, cy), label)
                objects_bbs_ids_labels.append([x, y, w, h, self.id_count, label])
                self.id_count += 1

        new_stored_data = {}
        for obj in objects_bbs_ids_labels:
            id = obj[4]
            new_stored_data[id] = self.stored_data[id]
        self.stored_data = new_stored_data.copy()
        
        return objects_bbs_ids_labels

print("Tracker initialized.")

Tracker initialized.


## CELL 2: Load Model & Video

In [4]:
# --- FILE PATHS ---
base_dir = os.getcwd() 
input_video = os.path.join(base_dir, 'data', 'input', 'ext_tol_nagrak_night.mp4')
output_video = os.path.join(base_dir, 'data', 'output', 'ext_tol_nagrak_tracking_result.mp4')

if not os.path.exists(input_video):
    print(f"Video not found at: {input_video}")
else:
    print("Loading YOLOv5 Model & Video...")
    
    # Load Model
    model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
    model.conf = 0.35          
    model.classes = [2, 5, 7]  # 2=Car, 5=Bus, 7=Truck

    tracker = SimpleTracker()

    cap = cv2.VideoCapture(input_video)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    out = cv2.VideoWriter(output_video, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))

    # --- COUNTING LINE CONFIGURATION ---
    count_line_y = int(height * 0.45) 
    offset = 30 
    
    print(f"Configuration ready")

Loading YOLOv5 Model & Video...


Using cache found in C:\Users\user/.cache\torch\hub\ultralytics_yolov5_master


[31m[1mrequirements:[0m Ultralytics requirement ['gitpython>=3.1.30'] not found, attempting AutoUpdate...


YOLOv5  2026-2-6 Python-3.13.7 torch-2.10.0+cpu CPU

Fusing layers... 
YOLOv5s summary: 213 layers, 7225885 parameters, 0 gradients, 16.4 GFLOPs
Adding AutoShape... 


Configuration ready


### --- 3. Eksekusi Looping ---

In [5]:
# --- COUNTERS ---
count_out = 0  # Outbound
count_in = 0   # Inbound

# Vehicle Type Counters
total_car = 0
total_bus = 0
total_truck = 0

# History for direction logic
previous_positions = {}

frame_idx = 0
start_time = time.time()

print("processing video...")

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        print("\nvideo Processing completed")
        break
    
    frame_idx += 1
    
    # 1. YOLO Detection
    results = model(frame)
    df = results.pandas().xyxy[0]
    
    detections = []
    for _, row in df.iterrows():
        x1, y1, x2, y2 = int(row['xmin']), int(row['ymin']), int(row['xmax']), int(row['ymax'])
        vehicle_type = row['name'].capitalize()
        detections.append([x1, y1, x2-x1, y2-y1, vehicle_type]) 
        
    # 2. Update Tracker
    tracker_results = tracker.update(detections)
    
    # 3. Draw Counting Line (Red)
    cv2.line(frame, (0, count_line_y), (width, count_line_y), (0, 0, 255), 2)
    cv2.putText(frame, "COUNTING LINE", (width - 200, count_line_y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,255), 2)
    
    # 4. Counting Logic
    for vehicle in tracker_results:
        x, y, w, h, id, v_type = vehicle
        
        cx = (x + x + w) // 2
        cy = (y + y + h) // 2
        
        if id in previous_positions:
            prev_y = previous_positions[id]
            
            if prev_y < count_line_y and cy >= count_line_y:
                count_out += 1
                
                if v_type == 'Car': total_car += 1
                elif v_type == 'Bus': total_bus += 1
                elif v_type == 'Truck': total_truck += 1
                
                cv2.line(frame, (0, count_line_y), (width, count_line_y), (0, 255, 255), 5) # Yellow Flash

            elif prev_y > count_line_y and cy <= count_line_y:
                count_in += 1
                
                if v_type == 'Car': total_car += 1
                elif v_type == 'Bus': total_bus += 1
                elif v_type == 'Truck': total_truck += 1
                
                cv2.line(frame, (0, count_line_y), (width, count_line_y), (0, 255, 0), 5) # Green Flash

        # Update Position
        previous_positions[id] = cy

        cv2.rectangle(frame, (x, y), (x + w, y + h), (255, 255, 0), 2)
        cv2.putText(frame, f"{id}|{v_type}", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2)

    # 5. --- DASHBOARD ---
    
    box_w = 300
    box_h = 240
    pad_x = 20
    
    x_base = width - box_w - pad_x 
    y_base = 20                    

    overlay = frame.copy()
    cv2.rectangle(overlay, (x_base, y_base), (x_base + box_w, y_base + box_h), (0, 0, 0), -1)
    frame = cv2.addWeighted(overlay, 0.6, frame, 0.4, 0)
    
    def draw_text(text, row_idx, color=(255,255,255), font_scale=0.6, thickness=1):
        pos_x = int(x_base + 20)
        pos_y = int(y_base + 30 + (row_idx * 30))
        cv2.putText(frame, text, (pos_x, pos_y), cv2.FONT_HERSHEY_SIMPLEX, font_scale, color, thickness)

    # Dashboard Content
    draw_text("EXT Nagrak Traffic Monitor", 0, thickness=2)
    draw_text("-----------------", 0.5)
    
    draw_text(f"OUTBOUND : {count_out}", 1.5, color=(0, 255, 255), font_scale=0.7, thickness=2)
    draw_text(f"INBOUND  : {count_in}", 2.5, color=(0, 255, 0), font_scale=0.7, thickness=2)
    
    draw_text("-----------------", 3.2)
    draw_text(f"Cars     : {total_car}", 4.2)
    draw_text(f"Buses    : {total_bus}", 5.0)
    draw_text(f"Trucks   : {total_truck}", 5.8)
    
    # Write Frame
    out.write(frame)
    
    # Console Progress
    if frame_idx % 20 == 0:
        elapsed = time.time() - start_time
        fps_curr = frame_idx / (elapsed + 0.001)
        print(f"Frame {frame_idx}/{total_frames} | Out:{count_out} In:{count_in} | Car:{total_car} Buses:{total_bus} Trucks:{total_truck}", end='\r')

cap.release()
out.release()
print(f"\n\nResult saved to:\n{output_video}")

processing video...
Frame 14520/14530 | Out:139 In:64 | Car:148 Buses:21 Trucks:34
video Processing completed


Result saved to:
d:\Annisa\Project Pribadi\nagrak-vehicle-tracker\data\output\ext_tol_nagrak_tracking_result.mp4
