In [1]:
import cv2
import tkinter as tk
from tkinter import filedialog
from PIL import Image, ImageTk

class VideoCroppingApp:
    def __init__(self, root, video_path):
        self.root = root
        self.video_path = video_path

        self.cap = cv2.VideoCapture(video_path)
        self.width = int(self.cap.get(3))
        self.height = int(self.cap.get(4))

        self.canvas = tk.Canvas(root, width=self.width, height=self.height)
        self.canvas.pack()

        self.rect_id = None
        self.start_x = None
        self.start_y = None

        self.canvas.bind("<ButtonPress-1>", self.on_press)
        self.canvas.bind("<B1-Motion>", self.on_drag)
        self.canvas.bind("<ButtonRelease-1>", self.on_release)

        self.root.protocol("WM_DELETE_WINDOW", self.on_close)

        self.update_preview()

    def update_preview(self):
        ret, frame = self.cap.read()

        if ret:
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            img = Image.fromarray(frame)
            img_tk = ImageTk.PhotoImage(img)
            self.canvas.img_tk = img_tk  # Keep a reference to avoid garbage collection
            self.canvas.create_image(0, 0, anchor=tk.NW, image=img_tk)
            self.root.after(10, self.update_preview)
        else:
            self.cap.release()

    def on_press(self, event):
        self.start_x = self.canvas.canvasx(event.x)
        self.start_y = self.canvas.canvasy(event.y)

        if self.rect_id:
            self.canvas.delete(self.rect_id)

        # Draw the video frame
        ret, frame = self.cap.read()
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        img = Image.fromarray(frame)
        img_tk = ImageTk.PhotoImage(img)
        self.canvas.img_tk = img_tk
        self.canvas.create_image(0, 0, anchor=tk.NW, image=img_tk)

        # Draw the red selection box on the video frame
        self.rect_id = self.canvas.create_rectangle(
            self.start_x, self.start_y, self.start_x, self.start_y, outline="red"
        )
        self.canvas.tag_raise(self.rect_id)  # Ensure the red box is on top

    def on_drag(self, event):
        cur_x = self.canvas.canvasx(event.x)
        cur_y = self.canvas.canvasy(event.y)

        self.canvas.coords(self.rect_id, self.start_x, self.start_y, cur_x, cur_y)
        self.canvas.tag_raise(self.rect_id)  # Raise the red box during dragging
        self.root.update()  # Update the Tkinter main loop during dragging


    def on_release(self, event):
        end_x = self.canvas.canvasx(event.x)
        end_y = self.canvas.canvasy(event.y)

        self.crop_video(int(self.start_x), int(self.start_y), int(end_x), int(end_y))

    def crop_video(self, x1, y1, x2, y2):
        self.cap.set(1, 0)  # Set video to the beginning
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        out = cv2.VideoWriter("cropped_video.mp4", fourcc, 30.0, (x2 - x1, y2 - y1))

        while True:
            ret, frame = self.cap.read()

            if not ret:
                break

            cropped_frame = frame[y1:y2, x1:x2]
            out.write(cropped_frame)

            cv2.imshow("Cropped Video", cropped_frame)

            if cv2.waitKey(1) & 0xFF == ord("q"):
                break

        self.cap.release()
        out.release()
        cv2.destroyAllWindows()

    def on_close(self):
        self.cap.release()
        self.root.destroy()

def main():
    root = tk.Tk()
    root.title("Video Cropping App")

    file_path = filedialog.askopenfilename(title="Select a video file", filetypes=[("Video files", "*.mp4;*.avi")])

    if file_path:
        app = VideoCroppingApp(root, file_path)
        root.mainloop()

if __name__ == "__main__":
    main()


In [2]:
import os
from ultralytics import YOLO
os.environ['KMP_DUPLICATE_LIB_OK']='True'


model = YOLO('yolov8n.pt')

# Open the video file
video_path = "cropped_video.mp4"
frameCount = -1
# Create a directory to store the keypoints files
output_directory = "detections"
cap = cv2.VideoCapture(video_path)

# Loop through the video frames
while cap.isOpened():
    # Read a frame from the video
    success, frame = cap.read()
    frameCount = frameCount + 1

    if success:
        # Run YOLOv8 tracking on the frame, persisting tracks between frames
        results = model.track(frame, persist=True)
        keypoints_path = os.path.join(output_directory, f"frame{frameCount}.txt")
        with open(keypoints_path, 'w') as file:
            for r in results:
                keypoint_JSON = r.tojson(normalize=True)
                file.write(keypoint_JSON)
                file.write('\n')
        # Visualize the results on the frame
        annotated_frame = results[0].plot()
        # save it in an image
        cv2.imwrite("frame.jpg",annotated_frame)

        # Display the annotated frame
        cv2.imshow("YOLOv8 Tracking", annotated_frame)

        # Break the loop if 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break
    else:
        # Break the loop if the end of the video is reached
        break

# Release the video capture object and close the display window
cap.release()
cv2.destroyAllWindows()


0: 384x640 7 cars, 1 motorcycle, 278.4ms
Speed: 11.9ms preprocess, 278.4ms inference, 19.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 1 motorcycle, 212.0ms
Speed: 0.0ms preprocess, 212.0ms inference, 6.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 1 motorcycle, 180.3ms
Speed: 6.2ms preprocess, 180.3ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 7 cars, 188.6ms
Speed: 0.0ms preprocess, 188.6ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 7 cars, 171.6ms
Speed: 6.4ms preprocess, 171.6ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 7 cars, 193.6ms
Speed: 8.9ms preprocess, 193.6ms inference, 3.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 7 cars, 1 motorcycle, 208.7ms
Speed: 0.0ms preprocess, 208.7ms inference, 3.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person

In [3]:
import os
import json
import csv

# Function to read a detection file and extract unique detections
def process_detection_file(file_path, unique_detections):
    with open(file_path, 'r') as file:
        data = json.load(file)
        for detection in data:
            key = (detection['track_id'], detection['name'])
            if key not in unique_detections:
                unique_detections[key] = detection

# Function to iterate through detection files in a folder
def process_detection_folder(folder_path, output_csv_path):
    unique_detections = {}
    for filename in os.listdir(folder_path):
        if filename.endswith('.txt'):
            file_path = os.path.join(folder_path, filename)
            process_detection_file(file_path, unique_detections)

    # Write unique detections to a CSV file
    with open(output_csv_path, 'w', newline='') as csvfile:
        fieldnames = ['track_id', 'name', 'class', 'confidence', 'box']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for detection in unique_detections.values():
            writer.writerow({
                'track_id': detection['track_id'],
                'name': detection['name'],
                'class': detection['class'],
                'confidence': detection['confidence'],
                'box': detection['box']
            })

# Replace 'detections_folder_path' and 'output_csv_path' with your actual folder and output file paths
detections_folder_path = 'detections'
output_csv_path = 'output.csv'

process_detection_folder(detections_folder_path, output_csv_path)


In [4]:
# view the csv file
import pandas as pd
df = pd.read_csv('output.csv', header=None)
df.head(10)
    

Unnamed: 0,0,1,2,3,4
0,track_id,name,class,confidence,box
1,1,car,2,0.8836764693260193,"{'x1': 0.28173595428466797, 'y1': 0.2885417855..."
2,2,car,2,0.8626297116279602,"{'x1': 0.5321219253540039, 'y1': 0.24044532280..."
3,3,car,2,0.8476601839065552,"{'x1': 0.03666766166687012, 'y1': 0.5279136756..."
4,4,car,2,0.8375164270401001,"{'x1': 0.29856979370117187, 'y1': 0.6793429552..."
5,5,car,2,0.8217783570289612,"{'x1': 0.41848731994628907, 'y1': 0.3797470026..."
6,6,car,2,0.7771808505058289,"{'x1': 0.2364187240600586, 'y1': 0.38601175944..."
7,7,car,2,0.7722057700157166,"{'x1': 0.24698047637939452, 'y1': 0.2348987860..."
8,8,motorcycle,3,0.6146885752677917,"{'x1': 0.7165263366699218, 'y1': 0.44739681095..."
9,8,person,0,0.5050264596939087,"{'x1': 0.7372369384765625, 'y1': 0.39266019878..."


In [5]:
import pandas as pd

# Replace 'input_csv_path' and 'output_csv_path' with your actual input and output file paths
input_csv_path = 'output.csv'
output_csv_path = 'output_2.csv'

# Read the CSV file into a DataFrame
df = pd.read_csv(input_csv_path)

# Remove rows where the value in the "name" column is "person" and fire hydrant
df_filtered = df[~df['name'].isin(['person', 'fire hydrant'])]

# Write the filtered DataFrame to a new CSV file
df_filtered.to_csv(output_csv_path, index=False)


In [6]:
df_filtered.head(10)

# unique in name
df_filtered['name'].unique()

array(['car', 'motorcycle', 'bus', 'truck', 'traffic light'], dtype=object)

In [7]:
# now you have to count car', 'motorcycle', 'bus', 'truck
# count the number of each class
# Replace 'input_csv_path' and 'output_csv_path' with your actual input and output file paths
input_csv_path = 'output_2.csv'
output_csv_path = 'output_3.csv'

# Read the CSV file into a DataFrame
df = pd.read_csv(input_csv_path)

# Group the DataFrame by the "name" column and count the number of rows in each group
df_grouped = df.groupby('name').size().reset_index(name='counts')

# Write the grouped DataFrame to a new CSV file
df_grouped.to_csv(output_csv_path, index=False)

df_grouped.head(10)
    

Unnamed: 0,name,counts
0,bus,7
1,car,82
2,motorcycle,36
3,traffic light,1
4,truck,26


In [8]:
import os
import json
import csv
from collections import defaultdict

def calculate_distance(x1, y1, x2, y2):
    # Calculate Euclidean distance
    return ((x2 - x1)**2 + (y2 - y1)**2)**0.5

def calculate_speed(distance, time_elapsed):
    # Calculate speed (distance / time)
    return distance / time_elapsed

def process_frame_data(frame_data, frame_number, object_speeds, csv_writer):
    for obj in frame_data:
        track_id = obj['track_id']
        class_name = obj['name']
        confidence = obj['confidence']
        box = obj['box']

        if class_name == "person":
            # Skip objects with the name "person"
            continue

        if (track_id, class_name) not in object_speeds:
            # If object not encountered before, store current frame data
            object_speeds[(track_id, class_name)] = {
                'box': box,
                'confidence': confidence,
                'frame': frame_number
            }
        else:
            # If object encountered before, calculate speed and write to CSV
            prev_frame_data = object_speeds[(track_id, class_name)]
            time_elapsed = frame_number - prev_frame_data['frame']
            distance = calculate_distance(prev_frame_data['box']['x1'],
                                          prev_frame_data['box']['y1'],
                                          box['x1'], box['y1'])
            speed = calculate_speed(distance, time_elapsed)

            # Write information to CSV
            csv_writer.writerow([class_name, track_id, speed])

            # Update the stored data for the object with current frame data
            object_speeds[(track_id, class_name)] = {
                'box': box,
                'confidence': confidence,
                'frame': frame_number
            }

# Initialize a dictionary to store object data and speeds
object_speeds = defaultdict(dict)

# Specify the CSV file path
csv_file_path = 'object_speeds.csv'

# Open the CSV file for writing
with open(csv_file_path, 'w', newline='') as csv_file:
    # Create a CSV writer
    csv_writer = csv.writer(csv_file)

    # Write header to CSV
    csv_writer.writerow(['Name', 'ID', 'Speed'])

    # Iterate through each file in the 'detections' folder
    folder_path = 'detections'
    for filename in os.listdir(folder_path):
        if filename.endswith(".txt"):
            file_path = os.path.join(folder_path, filename)
            frame_number = int(filename.split('.')[0][5:])  # Extract frame number from filename
            with open(file_path, 'r') as file:
                frame_data = json.load(file)
                process_frame_data(frame_data, frame_number, object_speeds, csv_writer)

print(f"Data has been written to {csv_file_path}")


Data has been written to object_speeds.csv


In [None]:
# import os
# import json
# import csv
# from collections import defaultdict

# def calculate_distance(x1, y1, x2, y2):
#     # Calculate Euclidean distance
#     return ((x2 - x1)**2 + (y2 - y1)**2)**0.5

# def calculate_speed(distance, time_elapsed):
#     # Calculate speed (distance / time)
#     return distance / time_elapsed

# def process_frame_data(frame_data, frame_number, object_speeds, csv_writer):
#     for obj in frame_data:
#         track_id = obj['track_id']
#         class_name = obj['name']
#         confidence = obj['confidence']
#         box = obj['box']

#         if class_name == "person":
#             # Skip objects with the name "person"
#             continue

#         if (track_id, class_name) not in object_speeds:
#             # If object not encountered before, store current frame data
#             object_speeds[(track_id, class_name)] = {
#                 'box': box,
#                 'confidence': confidence,
#                 'frame': frame_number
#             }
#         else:
#             # If object encountered before, calculate speed and write to CSV
#             prev_frame_data = object_speeds[(track_id, class_name)]
#             time_elapsed = frame_number - prev_frame_data['frame']
#             distance = calculate_distance(prev_frame_data['box']['x1'],
#                                           prev_frame_data['box']['y1'],
#                                           box['x1'], box['y1'])
#             speed = calculate_speed(distance, time_elapsed)

#             # Write information to CSV
#             csv_writer.writerow([class_name, track_id, frame_number, speed])

#             # Update the stored data for the object with current frame data
#             object_speeds[(track_id, class_name)] = {
#                 'box': box,
#                 'confidence': confidence,
#                 'frame': frame_number
#             }

# # Initialize a dictionary to store object data and speeds
# object_speeds = defaultdict(dict)

# # Specify the CSV file path
# csv_file_path = 'object_speeds.csv'

# # Open the CSV file for writing
# with open(csv_file_path, 'w', newline='') as csv_file:
#     # Create a CSV writer
#     csv_writer = csv.writer(csv_file)

#     # Write header to CSV
#     csv_writer.writerow(['Name', 'ID', 'Frame', 'Speed'])

#     # Iterate through each file in the 'detections' folder
#     folder_path = 'detections'
#     for filename in os.listdir(folder_path):
#         if filename.endswith(".txt"):
#             file_path = os.path.join(folder_path, filename)
#             frame_number = int(filename.split('.')[0][5:])  # Extract frame number from filename
#             with open(file_path, 'r') as file:
#                 frame_data = json.load(file)
#                 process_frame_data(frame_data, frame_number, object_speeds, csv_writer)

# print(f"Data has been written to {csv_file_path}")


In [9]:
df3 = pd.read_csv('object_speeds.csv')
df3.head(10)

Unnamed: 0,Name,ID,Speed
0,car,1,6.9e-05
1,car,2,0.000125
2,car,3,0.000386
3,car,4,0.000345
4,car,5,0.000173
5,car,6,0.000376
6,car,7,0.000149
7,motorcycle,8,0.000629
8,car,1,0.002312
9,car,2,0.001287


In [5]:
# write speeds acc to id on the video
import cv2
import pandas as pd
import numpy as np

# Replace 'video_path' and 'speeds_csv_path' with your actual video and speeds CSV file paths
video_path = 'cropped_video.mp4'
speeds_csv_path = 'object_speeds.csv'

# Read the speeds CSV file into a DataFrame
df = pd.read_csv(speeds_csv_path)

# Initialize a video capture object
cap = cv2.VideoCapture(video_path)

# Initialize a list to store speeds
speeds = []

# Loop through the video frames
while cap.isOpened():
    # Read a frame from the video
    success, frame = cap.read()

    if success:
        # Get the frame number
        frame_number = int(cap.get(cv2.CAP_PROP_POS_FRAMES))

        # Get the speeds for the current frame from the DataFrame
        frame_speeds = df[df['Frame'] == frame_number]

        # Loop through the detected objects in the current frame
        for index, row in frame_speeds.iterrows():
            # Extract the object ID and speed from the row
            obj_id = row['ID']
            speed = row['Speed']

            # Draw the object ID and speed on the frame
            cv2.putText(frame, f"ID: {obj_id}", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            cv2.putText(frame, f"Speed: {speed:.2f} px/s", (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

            # Append the speed to the list
            speeds.append(speed)

        # Display the frame
        cv2.imshow("Frame", frame)

        # Break the loop if 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break
    else:
        # Break the loop if the end of the video is reached
        break
        
# Release the video capture object and close the display window
cap.release()
cv2.destroyAllWindows()