In [1]:
import cv2
import ultralytics
import csv
import numpy as np
from ultralytics import YOLO
from collections import OrderedDict
from scipy.spatial import distance as dist
from IPython import display
from datetime import datetime

In [2]:
display.clear_output()
ultralytics.checks()

Ultralytics YOLOv8.2.15  Python-3.11.5 torch-2.3.0+cpu CPU (AMD Ryzen 5 4600H with Radeon Graphics)
Setup complete  (12 CPUs, 15.4 GB RAM, 306.4/457.9 GB disk)


In [3]:
class CentroidTracker:
    def __init__(self, maxDisappeared=20):
        self.nextObjectID = 1
        self.objects = OrderedDict()
        self.disappeared = OrderedDict()
        self.maxDisappeared = maxDisappeared
        self.objectTypes = {}
        self.history = []

    def register(self, centroid, bottle_type):
        self.objects[self.nextObjectID] = centroid
        self.disappeared[self.nextObjectID] = 0
        self.objectTypes[self.nextObjectID] = bottle_type
        self.nextObjectID += 1

    def deregister(self, objectID):
        del self.objects[objectID]
        del self.disappeared[objectID]
        del self.objectTypes[objectID]

    def update(self, rects, bottle_types):
        if len(rects) == 0:
            for objectID in list(self.disappeared.keys()):
                self.disappeared[objectID] += 1
                if self.disappeared[objectID] > self.maxDisappeared:
                    self.deregister(objectID)
            return self.objects

        inputCentroids = np.zeros((len(rects), 2), dtype="int")

        for i, (startX, startY, endX, endY) in enumerate(rects):
            cX = int((startX + endX) / 2.0)
            cY = int((startY + endY) / 2.0)
            inputCentroids[i] = (cX, cY)

        if len(self.objects) == 0:
            for i in range(len(inputCentroids)):
                self.register(inputCentroids[i], bottle_types[i])
        else:
            objectIDs = list(self.objects.keys())
            objectCentroids = list(self.objects.values())
            D = dist.cdist(np.array(objectCentroids), inputCentroids)
            rows = D.min(axis=1).argsort()
            cols = D.argmin(axis=1)[rows]

            usedRows, usedCols = set(), set()
            for row, col in zip(rows, cols):
                if row in usedRows or col in usedCols:
                    continue
                objectID = objectIDs[row]
                self.objects[objectID] = inputCentroids[col]
                self.disappeared[objectID] = 0
                self.objectTypes[objectID] = bottle_types[col]
                usedRows.add(row)
                usedCols.add(col)

            unusedRows = set(range(D.shape[0])).difference(usedRows)
            unusedCols = set(range(D.shape[1])).difference(usedCols)

            if D.shape[0] >= D.shape[1]:
                for row in unusedRows:
                    objectID = objectIDs[row]
                    self.disappeared[objectID] += 1
                    if self.disappeared[objectID] > self.maxDisappeared:
                        self.deregister(objectID)
            else:
                for col in unusedCols:
                    self.register(inputCentroids[col], bottle_types[col])

        return self.objects


Last Edit on adding to csv File (each bottle frequency).

In [5]:
def update_csv_cells(csv_file, time_value, bool_stop_line, pepsi_count, brown_count, transparency_count):
    # Calculate total liters
    total_Liters = ((brown_count * 275) + (pepsi_count * 325) + (transparency_count * 360))/1000
    
    # Read the content of the CSV file
    with open(csv_file, mode='r', newline='') as file:
        reader = csv.reader(file)
        rows = list(reader)
    
    # Ensure the first row has at least 6 columns (adding one more for total_Liters)
    if len(rows[0]) < 6:
        rows[0].extend([''] * (6 - len(rows[0])))
    
    # Update the values of the first six cells
    if rows:  # Check if there are any rows
        rows[0][0] = time_value
        rows[0][1] = bool_stop_line
        rows[0][2] = pepsi_count
        rows[0][3] = brown_count
        rows[0][4] = transparency_count
        rows[0][5] = total_Liters
    
    # Write the updated content back to the CSV file
    with open(csv_file, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerows(rows)

model = YOLO("C:/Users/ASUS/Desktop/detect/100.pt")

# Open the video file
cap = cv2.VideoCapture(0)

if not cap.isOpened():
    print("Error: Could not open video file.")
    exit()

# Set up video writer
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter('C:/Users/ASUS/Desktop/Video Saved/output.avi', fourcc, 20.0, (800, 500))

specific_x = 400  
confidence_threshold = 0.5

# Dictionary to store detection times
detection_times = {
    'Pepsi': 19,
    'Brown': 16,
    'transparency': 21
}

# CSV file setup
csv_file = 'C:/Users/ASUS/Desktop/detect/AI.csv'  # Updated path

ct = CentroidTracker()
bottle_count = {
    'Pepsi': 0,
    'Brown': 0,
    'transparency': 0
}
counted_bottles = set()  # Set to keep track of counted bottle IDs
previous_object_id = -1  # Variable to track previous object ID
bool_stop_line = 0
update_csv_cells(
                    csv_file, 
                    0, 
                    bool_stop_line, 
                    bottle_count['Pepsi'], 
                    bottle_count['Brown'], 
                    bottle_count['transparency']
                )
while True:
    ret, frame = cap.read()
    
    if not ret:
        print("Reached end of video stream or encountered an error.")
        break
    
    frame = cv2.resize(frame, (800, 500))
    results = model.predict(frame, device="cpu", save=False, verbose=False)
    boxes = results[0].boxes
    names = model.names
    
    # Draw a circle on the specific pixel we want to reach
    specific_pixel_y = frame.shape[0] // 2  # Center vertically for visualization
    cv2.circle(frame, (specific_x, specific_pixel_y), 5, (0, 255, 255), -1)  # Yellow circle

    rects = []
    bottle_types = []

    for box in boxes:
        # Get the confidence score of the detected object
        confidence = box.conf[0]
        if confidence < confidence_threshold:
            continue
        
        xyxy = box.xyxy.numpy()[0]  # Get bounding box coordinates
        x1, y1, x2, y2 = xyxy
        
        # Draw the bounding box
        cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (255, 0, 0), 2)
        
        # Get the class index of the detected object
        class_index = int(box.cls[0])
        class_name = names[class_index]
        
        center_x = int((x1 + x2) / 2)
        cv2.putText(frame, class_name, (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
        
        center_coordinates = (center_x, int((y1 + y2) / 2))
        radius = 5
        color = (0, 0, 255)  
        thickness = -1  
        cv2.circle(frame, center_coordinates, radius, color, thickness)
        
        # Add bounding box and class name to lists
        rects.append((int(x1), int(y1), int(x2), int(y2)))
        bottle_types.append(class_name)
        
    objects = ct.update(rects, bottle_types)
    detected_bottles = len(objects)
    
    for objectID, centroid in objects.items():
        bottle_type = ct.objectTypes[objectID]
        ct.history.append((objectID, bottle_type))
        cv2.circle(frame, (centroid[0], centroid[1]), 4, (0, 255, 0), -1)
        #cv2.putText(frame, f"ID {objectID}: {bottle_type}", (centroid[0] - 10, centroid[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        
        if specific_x - 50 < centroid[0] < specific_x + 50:
            if objectID not in counted_bottles:
                bottle_count[bottle_type] += 1
                counted_bottles.add(objectID)
                line_color = (0, 255, 0)
                cv2.line(frame, (specific_x, 0), (specific_x, frame.shape[0]), line_color, 2)
                detection_time = detection_times[bottle_type]
                bool_stop_line = 1
                update_csv_cells(
                    csv_file, 
                    detection_time, 
                    bool_stop_line, 
                    bottle_count['Pepsi'], 
                    bottle_count['Brown'], 
                    bottle_count['transparency']
                )
    
    # Reset bool_stop_line to 0 when new object ID is detected
        if previous_object_id != -1 and previous_object_id != objectID:
            bool_stop_line = 0
            update_csv_cells(
                csv_file, 
                detection_time, 
                bool_stop_line, 
                bottle_count['Pepsi'], 
                bottle_count['Brown'], 
                bottle_count['transparency']
            )
        
        previous_object_id = objectID

    # Write the frame to the video file
    out.write(frame)

    # Display the resulting frame
    cv2.imshow('Frame', frame)
    
    # Exit if 'Esc' key is pressed
    if cv2.waitKey(1) & 0xFF == 27:
        break

# Release the video capture and writer objects
cap.release()
out.release()
cv2.destroyAllWindows()

print("******************")
print("ِAll Detected Bottles:", bottle_count['Pepsi']+bottle_count['Brown']+bottle_count['transparency'])
print("Pepsi Bottles: ", bottle_count['Pepsi'])
print("Brown Bottles: ", bottle_count['Brown'])
print("transparency Bottles: ", bottle_count['transparency'])


******************
ِAll Detected Bottles: 8
Pepsi Bottles:  4
Brown Bottles:  3
transparency Bottles:  1
