<a href="https://colab.research.google.com/github/Heisnotanimposter/ObjectDetection_with_Server/blob/main/PersonSpeedvision.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!nvidia-smi

Sun Aug 18 13:10:55 2024       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.104.05             Driver Version: 535.104.05   CUDA Version: 12.2     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  Tesla T4                       Off | 00000000:00:04.0 Off |                    0 |
| N/A   45C    P8              12W /  70W |      0MiB / 15360MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                    

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
!pip install -q supervision ultralytics
!pip install roboflow

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.3/41.3 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.7/135.7 kB[0m [31m10.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m869.0/869.0 kB[0m [31m48.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting roboflow
  Downloading roboflow-1.1.40-py3-none-any.whl.metadata (9.4 kB)
Collecting python-dotenv (from roboflow)
  Downloading python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Collecting requests-toolbelt (from roboflow)
  Downloading requests_toolbelt-1.0.0-py2.py3-none-any.whl.metadata (14 kB)
Collecting filetype (from roboflow)
  Downloading filetype-1.2.0-py2.py3-none-any.whl.metadata (6.5 kB)
Downloading roboflow-1.1.40-py3-none-any.whl (79 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m79.1/79.1 kB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading filetype-1.2.0-py2.py3-n

In [5]:
import cv2
import numpy as np
from ultralytics import YOLO
import supervision as sv
from collections import defaultdict, deque
from google.colab.patches import cv2_imshow

# Load the YOLO model (You can switch to a different YOLO variant if needed)
model = YOLO("yolov8x.pt")  # You can use yolov8n.pt, yolov8m.pt, etc.

# Set up video paths
SOURCE_VIDEO_PATH = "/content/drive/MyDrive/Team7dataset/Team7Shared/140sNightShinjuku.mp4"
TARGET_VIDEO_PATH = "/content/drive/MyDrive/Team7dataset/Team7Shared/140sNightShinjuku_yolov8_result.mp4"

# Initialize video capture
cap = cv2.VideoCapture(SOURCE_VIDEO_PATH)

# Video properties
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(TARGET_VIDEO_PATH, fourcc, fps, (width, height))

# Initialize ByteTrack tracker with updated parameters
byte_track = sv.ByteTrack(
    track_activation_threshold=0.3,
    lost_track_buffer=30,
    frame_rate=fps
)

# Initialize data structures to store past positions for speed estimation
past_positions = defaultdict(lambda: deque(maxlen=5))

# Process the video
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Run YOLO object detection
    results = model(frame, conf=0.5)  # Adjust confidence threshold as needed

    # Get detections
    detections = sv.Detections.from_ultralytics(results[0])

    # Filter detections by confidence
    detections = detections[detections.confidence > 0.3]

    # Update tracker with detections
    tracks = byte_track.update_with_detections(detections)

    # Annotate frame manually
    for track in tracks:
        track_id = track.id
        bbox = track.bbox  # Get bounding box coordinates

        if bbox is not None:
            # Draw the bounding box
            x1, y1, x2, y2 = map(int, bbox)
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

            # Calculate speed estimation
            center_x = (x1 + x2) // 2
            center_y = (y1 + y2) // 2

            if track_id in past_positions:
                past_positions[track_id].append((center_x, center_y))
                if len(past_positions[track_id]) > 1:
                    # Calculate the displacement between the first and last positions
                    x_start, y_start = past_positions[track_id][0]
                    x_end, y_end = past_positions[track_id][-1]
                    distance = np.sqrt((x_end - x_start)**2 + (y_end - y_start)**2)

                    # Categorize the speed
                    if distance > 50:  # Adjust these thresholds based on your data
                        speed_category = "High Speed"
                        color = (0, 0, 255)  # Red for high speed
                    elif 20 < distance <= 50:
                        speed_category = "Mid Speed"
                        color = (0, 255, 255)  # Yellow for mid speed
                    else:
                        speed_category = "Low Speed"
                        color = (0, 255, 0)  # Green for low speed

                    # Draw the speed category
                    cv2.putText(frame, f'{speed_category}', (x1, y1 - 10),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
            else:
                past_positions[track_id].append((center_x, center_y))

            # Draw the track ID
            cv2.putText(frame, f'ID: {track_id}', (x1, y1 - 30),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

    # Write the annotated frame to the output video
    out.write(frame)

    # Display the frame (optional)
    cv2_imshow(frame)  # Use cv2_imshow instead of cv2.imshow
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release resources
cap.release()
out.release()
cv2.destroyAllWindows()

# Save the results
print(f"Results saved to {TARGET_VIDEO_PATH}")


Downloading https://github.com/ultralytics/assets/releases/download/v8.2.0/yolov8x.pt to 'yolov8x.pt'...


100%|██████████| 131M/131M [00:03<00:00, 41.1MB/s]



0: 384x640 3 persons, 1 car, 75.6ms
Speed: 14.4ms preprocess, 75.6ms inference, 704.7ms postprocess per image at shape (1, 3, 384, 640)


AttributeError: 'tuple' object has no attribute 'id'

In [None]:
from roboflow import Roboflow
import shutil
import os
import cv2
import numpy as np
import supervision as sv
from tqdm import tqdm
from ultralytics import YOLO
from collections import defaultdict, deque
from scipy.spatial import distance

In [None]:
# Crosswalk, 2-wheel dataset
!curl -L "https://universe.roboflow.com/ds/p7t3Nx8tQM?key=Lq5jn3mTlf" > roboflow.zip; unzip roboflow.zip; rm roboflow.zip

# Car dataset
!curl -L "https://universe.roboflow.com/ds/CGpCt0Eh41?key=SK4lZaaD6B" > roboflow.zip; unzip roboflow.zip; rm roboflow.zip

# Person dataset
!curl -L "https://universe.roboflow.com/ds/JrT0ne1aM8?key=pejxGZTe7U" > roboflow.zip; unzip roboflow.zip; rm roboflow.zip


In [None]:
# Paths to the downloaded datasets
crosswalk_dataset = "/content/your_crosswalk_2wheel_dataset_path"
car_dataset = "/content/your_car_dataset_path"
person_dataset = "/content/your_person_dataset_path"

# Unified dataset path
unified_dataset_path = "/content/dataset"

# Create directories if they don't exist
os.makedirs(f"{unified_dataset_path}/train/images", exist_ok=True)
os.makedirs(f"{unified_dataset_path}/train/labels", exist_ok=True)
os.makedirs(f"{unified_dataset_path}/valid/images", exist_ok=True)
os.makedirs(f"{unified_dataset_path}/valid/labels", exist_ok=True)
os.makedirs(f"{unified_dataset_path}/test/images", exist_ok=True)
os.makedirs(f"{unified_dataset_path}/test/labels", exist_ok=True)

# Function to copy files
def copy_files(src, dst, subdir):
    for split in ["train", "valid", "test"]:
        images_src = f"{src}/{split}/images"
        labels_src = f"{src}/{split}/labels"

        images_dst = f"{dst}/{split}/images"
        labels_dst = f"{dst}/{split}/labels"

        if os.path.exists(images_src):
            for file_name in os.listdir(images_src):
                shutil.copy(f"{images_src}/{file_name}", f"{images_dst}/{file_name}")
        if os.path.exists(labels_src):
            for file_name in os.listdir(labels_src):
                shutil.copy(f"{labels_src}/{file_name}", f"{labels_dst}/{file_name}")

# Copy files from each dataset to the unified dataset
copy_files(crosswalk_dataset, unified_dataset_path, "crosswalk")
copy_files(car_dataset, unified_dataset_path, "car")
copy_files(person_dataset, unified_dataset_path, "person")


In [None]:
import os

# Check that the train, val, and test directories exist and list their contents
train_dir = "/content/People-Detection-8/train/images"
val_dir = "/content/People-Detection-8/valid/images"
test_dir = "/content/People-Detection-8/test/images"

print("Train Directory Exists:", os.path.exists(train_dir))
print("Validation Directory Exists:", os.path.exists(val_dir))
print("Test Directory Exists:", os.path.exists(test_dir))

if os.path.exists(train_dir):
    print("Train Directory Content:", os.listdir(train_dir)[:5])  # Show first 5 files
if os.path.exists(val_dir):
    print("Validation Directory Content:", os.listdir(val_dir)[:5])
if os.path.exists(test_dir):
    print("Test Directory Content:", os.listdir(test_dir)[:5])


In [None]:
from ultralytics import YOLO

model = YOLO("yolov8n.pt")  # Start with a pre-trained YOLOv8 model

# Train the model with your dataset
# Need to modify the data.yaml with specific target location
"""
data.yaml example:
names:
- person
nc: 1
roboflow:
  license: Private
  project: people-detection-o4rdr
  url: https://universe.roboflow.com/leo-ueno/people-detection-o4rdr/dataset/8
  version: 8
  workspace: leo-ueno
test: /content/People-Detection-8/test/images
train: /content/People-Detection-8/train/images
val: /content/People-Detection-8/valid/images
"""

model.train(data="/content/drive/MyDrive/Team7dataset/Team7Shared/data.yaml", epochs=3, imgsz=480, name="people_detection_model")

#/content/drive/MyDrive/Team7dataset/Team7Shared/data.yaml

In [None]:
import shutil
import os

# Define the source and destination paths
best_model_path = "/content/runs/detect/people_detection_model3/weights/best.pt"
destination_path = "/content/drive/MyDrive/Team7dataset/best.pt"

# Check if the source path exists
if os.path.exists(best_model_path):
    # Copy the best model to the desired location
    shutil.copy(best_model_path, destination_path)
    print(f"Model successfully copied to {destination_path}")
else:
    print(f"Best model not found at {best_model_path}")


## Imports

In [None]:
import cv2
import os

import numpy as np
import supervision as sv

from tqdm import tqdm
from ultralytics import YOLO
#from supervision.assets import VideoAssets, download_assets
from collections import defaultdict, deque

import matplotlib.pyplot as plt
import numpy as np
import cv2

HOME = os.getcwd()
print(HOME)

In [None]:
#download_assets(VideoAssets.VEHICLES)
#!pip install -q gdown
#%cd {HOME}
#!gdown '1pz68D1Gsx80MoPg-_q-IbEdESEmyVLm-'
#SOURCE_VIDEO_PATH = f"{HOME}/Day2024_Tokyo_Shinjuku_20240810_162047.mp4"

In [None]:
SOURCE_VIDEO_PATH = "/content/drive/MyDrive/Team7dataset/DayShinjuku.mp4"
TARGET_VIDEO_PATH = "/content/drive/MyDrive/Team7dataset/DayShinjuku.mp4_result.mp4"
CONFIDENCE_THRESHOLD = 0.3
IOU_THRESHOLD = 0.5
MODEL_NAME = "yolov8S.pt"
MODEL_RESOLUTION = 480

In [None]:
class ViewTransformer:
    def __init__(self, source: np.ndarray, target: np.ndarray) -> None:
        source = source.astype(np.float32)
        target = target.astype(np.float32)
        self.m = cv2.getPerspectiveTransform(source, target)

    def transform_points(self, points: np.ndarray) -> np.ndarray:
        if points.size == 0:
            return points

        reshaped_points = points.reshape(-1, 1, 2).astype(np.float32)
        transformed_points = cv2.perspectiveTransform(reshaped_points, self.m)
        return transformed_points.reshape(-1, 2)

# Define multiple source areas for crosswalks
source_areas = [
    np.array([[600, 300], [1200, 300], [1200, 600], [600, 600]]),  # Crosswalk 1
    np.array([[0, 450], [100, 450], [100, 550], [0, 550]]), # Crosswalk 2
    np.array([[150, 150], [450, 150], [450, 250], [150, 250]]),   # Crosswalk 3
    np.array([[600, 100], [700, 100], [700, 200], [600, 200]]),   # Crosswalk 4
    #np.array([[150, 250], [450, 250], [450, 350], [150, 350]]),   # Crosswalk 5
]

# Define a single target area (rectangle) for the perspective transformation
TARGET_WIDTH = 300  # Adjust to your needs
TARGET_HEIGHT = 200  # Adjust to your needs

frame_generator = sv.get_video_frames_generator(source_path=SOURCE_VIDEO_PATH)
frame_iterator = iter(frame_generator)
frame = next(frame_iterator)

target_area = np.array([
    [0, 0],
    [TARGET_WIDTH - 1, 0],
    [TARGET_WIDTH - 1, TARGET_HEIGHT - 1],
    [0, TARGET_HEIGHT - 1],
], dtype=np.float32)

# Assuming you have a 'frame' loaded, for example, from a video or image
#frame = cv2.imread(SOURCE_VIDEO_PATH)

# Process each crosswalk
for i, source in enumerate(source_areas):
    view_transformer = ViewTransformer(source=source, target=target_area)

    # Apply perspective transformation to the entire frame for the current crosswalk
    transformed_frame = cv2.warpPerspective(frame, view_transformer.m, (TARGET_WIDTH, TARGET_HEIGHT))

    # Display the transformed frame (only in Colab)
    from google.colab.patches import cv2_imshow
    print(f"Transformed Crosswalk {i+1}:")
    cv2_imshow(transformed_frame)

    # Example operation: print the transformed points
    print(f"Crosswalk {i+1} transformed points:")
    print(view_transformer.transform_points(source))


In [None]:
#frame_generator = sv.get_video_frames_generator(source_path=SOURCE_VIDEO_PATH)
#frame_iterator = iter(frame_generator)
#frame = next(frame_iterator)
# Annotate the original frame with the polygon for the current source area
annotated_frame = frame.copy()
color = (0, 0, 255)  # RGB
for src in source_areas:
    cv2.polylines(annotated_frame, [src.astype(np.int32)], isClosed=True, color=color, thickness=4)

# Display the annotated frame
cv2_imshow(annotated_frame)

## Transform Perspective

In [None]:
class ViewTransformer:

    def __init__(self, source: np.ndarray, target: np.ndarray) -> None:
        source = source.astype(np.float32)
        target = target.astype(np.float32)
        self.m = cv2.getPerspectiveTransform(source, target)

    def transform_points(self, points: np.ndarray) -> np.ndarray:
        if points.size == 0:
            return points

        reshaped_points = points.reshape(-1, 1, 2).astype(np.float32)
        transformed_points = cv2.perspectiveTransform(reshaped_points, self.m)
        return transformed_points.reshape(-1, 2)

In [None]:
#view_transformer = ViewTransformer(source=SOURCE, target=TARGET)

In [None]:
from ultralytics import YOLO
model = YOLO("yolov8s.pt") # load the model
results = model.train(data="/content/People-Detection-8/data.yaml", epochs=5)
results = model("/content/drive/MyDrive/Team7dataset/140sDayShinjuku.mp4")


model = YOLO(MODEL_NAME)

video_info = sv.VideoInfo.from_video_path(video_path=SOURCE_VIDEO_PATH)
frame_generator = sv.get_video_frames_generator(source_path=SOURCE_VIDEO_PATH)

# tracer initiation
byte_track = sv.ByteTrack(
    frame_rate=video_info.fps, track_thresh=CONFIDENCE_THRESHOLD
)

# annotators configuration
thickness = sv.calculate_dynamic_line_thickness(
    resolution_wh=video_info.resolution_wh
)
text_scale = sv.calculate_dynamic_text_scale(
    resolution_wh=video_info.resolution_wh
)
bounding_box_annotator = sv.BoundingBoxAnnotator(
    thickness=thickness
)
label_annotator = sv.LabelAnnotator(
    text_scale=text_scale,
    text_thickness=thickness,
    text_position=sv.Position.BOTTOM_CENTER
)
trace_annotator = sv.TraceAnnotator(
    thickness=thickness,
    trace_length=video_info.fps * 2,
    position=sv.Position.BOTTOM_CENTER
)

polygon_zone = sv.PolygonZone(
    polygon=SOURCE,
    frame_resolution_wh=video_info.resolution_wh
)

coordinates = defaultdict(lambda: deque(maxlen=video_info.fps))

# open target video
with sv.VideoSink(TARGET_VIDEO_PATH, video_info) as sink:

    # loop over source video frame
    for frame in tqdm(frame_generator, total=video_info.total_frames):

        result = model(frame, imgsz=MODEL_RESOLUTION, verbose=False)[0]
        detections = sv.Detections.from_ultralytics(result)

        # filter out detections by class and confidence
        detections = detections[detections.confidence > CONFIDENCE_THRESHOLD]
        detections = detections[detections.class_id != 0]

        # filter out detections outside the zone
        detections = detections[polygon_zone.trigger(detections)]

        # refine detections using non-max suppression
        detections = detections.with_nms(IOU_THRESHOLD)

        # pass detection through the tracker
        detections = byte_track.update_with_detections(detections=detections)

        points = detections.get_anchors_coordinates(
            anchor=sv.Position.BOTTOM_CENTER
        )

        # calculate the detections position inside the target RoI
        points = view_transformer.transform_points(points=points).astype(int)

        # store detections position
        for tracker_id, [_, y] in zip(detections.tracker_id, points):
            coordinates[tracker_id].append(y)

        # format labels
        labels = []

        for tracker_id in detections.tracker_id:
            if len(coordinates[tracker_id]) < video_info.fps / 2:
                labels.append(f"#{tracker_id}")
            else:
                # calculate speed
                coordinate_start = coordinates[tracker_id][-1]
                coordinate_end = coordinates[tracker_id][0]
                distance = abs(coordinate_start - coordinate_end)
                time = len(coordinates[tracker_id]) / video_info.fps
                speed = distance / time * 3.6
                labels.append(f"#{tracker_id} {int(speed)} km/h")

        # annotate frame
        annotated_frame = frame.copy()
        annotated_frame = trace_annotator.annotate(
            scene=annotated_frame, detections=detections
        )
        annotated_frame = bounding_box_annotator.annotate(
            scene=annotated_frame, detections=detections
        )
        annotated_frame = label_annotator.annotate(
            scene=annotated_frame, detections=detections, labels=labels
        )

        # add frame to target video
        sink.write_frame(annotated_frame)

In [None]:
# prompt: tensorboard

%load_ext tensorboard
%tensorboard --logdir runs/detect/train


In [None]:
# Load the best model
model = YOLO("/content/drive/MyDrive/Team7dataset/Team7Shared/person_best2.pt")

# Run validation on the model (using the validation data)
results = model.val()


In [None]:
#model.train(data="/content/drive/MyDrive/Team7dataset/Team7Shared/data.yaml", epochs=32, imgsz=480, name="people_detection_model")

# Test the model on a new image
results = model.predict("/content/drive/MyDrive/Team7dataset/Team7Shared/140sDayShinjuku.mp4", imgsz=320, conf=0.5, iou=0.5, batch=16)

# Display results
results.show()

# Save results
results.save("/content/drive/MyDrive/Team7dataset/DayShinjuku_result.mp4")
