In [7]:
import numpy as np
import cv2
from ultralytics import YOLO
model = YOLO('../best.pt')

In [8]:
# Define the entry/exit boundary line (e.g., a horizontal line at y=250)
boundary_y = 50
entry_count = 0
exit_count = 0

confidence_threshold = 0.9
previous_positions = {}

In [9]:
def process_image(byte_data):
    byteArray = bytearray()
    byteArray.extend(byte_data)
    flatNumpyArray = np.array(byteArray)
    img = flatNumpyArray.reshape(100,100)
    img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
    return img

In [10]:
def process_and_track(byte_data):
    global entry_count, exit_count, previous_positions
    
    # Convert byte data to image
    frame = process_image(byte_data)
    
    # Ensure the frame was converted successfully
    if frame is None:
        return
    #
    results = model.track(source=frame,device=0)
    
    # Process the tracking results (e.g., draw bounding boxes)
    current_positions = {}
    
    for result in results:
        for box in result.boxes:
            confidence = box.conf[0].item()

            if confidence < confidence_threshold:
                continue

            x1, y1, x2, y2 = map(int, box.xyxy.tolist()[0])
            object_id = int(box.id[0])  # Assuming 'id' contains the unique ID
            
            # Calculate the center of the bounding box
            center_y = (y1 + y2) // 2
            
            # Store the current position
            current_positions[object_id] = center_y
            
            # Draw the bounding box on the frame
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 0), 2)
            cv2.putText(frame, f'{confidence:.2f}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1)
            #cv2.putText(frame, f'ID: {object_id}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
            #cv2.putText(frame, f'{confidence:.2f}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)

    # Check for entries and exits
    for object_id, center_y in current_positions.items():
        if object_id in previous_positions:
            previous_y = previous_positions[object_id]
            
            # Check if the object has crossed the boundary
            if previous_y <= boundary_y < center_y:
                entry_count += 1
            elif previous_y >= boundary_y > center_y:
                exit_count += 1
    
    # Update previous positions
    previous_positions = current_positions
    
    # Draw the boundary line
    cv2.line(frame, (0, boundary_y), (frame.shape[1], boundary_y), (0, 255, 0), 2)

    resized_frame = cv2.resize(frame, (500, 500))
    cv2.putText(resized_frame, f'Entries: {entry_count}', (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 0, 0), 2)
    cv2.putText(resized_frame, f'Exits: {exit_count}', (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 0, 255), 2)

    return resized_frame


In [11]:
def show_image(byte_data):
    byteArray = bytearray()
    byteArray.extend(byte_data)
    flatNumpyArray = np.array(byteArray)
    grayImage = flatNumpyArray.reshape(100,100)
    grayImage = cv2.resize(grayImage, (500,500))
    return grayImage

In [12]:
import asyncio
import nest_asyncio
import websockets
nest_asyncio.apply()

async def handle_data(websocket, path):
    async for message in websocket:
        #img = show_image(message)
        #keep showing the imgs with cv2
        #cv2.imshow('image', img)
        #cv2.waitKey(1)
        img_inferred = process_and_track(message)

        if img_inferred is not None:
            cv2.imshow('image', img_inferred)
            cv2.waitKey(1)
        else:
            print('Error processing image')

#192.168.3.3 
# para testes locais pegar o IPV4
start_server = websockets.serve(handle_data, "192.168.3.3", 999)

# This ensures the server keeps running until it's explicitly stopped
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()


0: 640x640 1 person, 81.4ms
Speed: 6.0ms preprocess, 81.4ms inference, 574.2ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 person, 75.3ms
Speed: 4.0ms preprocess, 75.3ms inference, 2.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 person, 73.6ms
Speed: 4.0ms preprocess, 73.6ms inference, 2.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 person, 73.5ms
Speed: 4.1ms preprocess, 73.5ms inference, 3.1ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 person, 72.8ms
Speed: 4.1ms preprocess, 72.8ms inference, 2.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 person, 72.1ms
Speed: 6.9ms preprocess, 72.1ms inference, 1.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 person, 72.7ms
Speed: 6.0ms preprocess, 72.7ms inference, 2.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 person, 71.2ms
Speed: 4.0ms preprocess, 71.2ms inference, 2.1ms postprocess per image at shape (1, 3, 

connection handler failed
Traceback (most recent call last):
  File "c:\Users\lucca\Documents\Arduino\sketch_may2a\myenv\Lib\site-packages\websockets\legacy\protocol.py", line 1301, in close_connection
    await self.transfer_data_task
  File "C:\Users\lucca\AppData\Local\Programs\Python\Python311\Lib\asyncio\futures.py", line 287, in __await__
    yield self  # This tells Task to wait for completion.
    ^^^^^^^^^^
  File "C:\Users\lucca\AppData\Local\Programs\Python\Python311\Lib\asyncio\tasks.py", line 339, in __wakeup
    future.result()
  File "C:\Users\lucca\AppData\Local\Programs\Python\Python311\Lib\asyncio\futures.py", line 198, in result
    raise exc
  File "C:\Users\lucca\AppData\Local\Programs\Python\Python311\Lib\asyncio\tasks.py", line 269, in __step
    result = coro.throw(exc)
             ^^^^^^^^^^^^^^^
  File "c:\Users\lucca\Documents\Arduino\sketch_may2a\myenv\Lib\site-packages\websockets\legacy\protocol.py", line 963, in transfer_data
    message = await self.read