In [None]:
import logging
import warnings
import cv2
import torch
from torchvision.ops import nms  # Import NMS from PyTorch
from ultralytics import YOLO
from pypylon import pylon
import httpx  # For sending HTTP requests
import os  # For creating directories
from concurrent.futures import ThreadPoolExecutor

# Parameters
fps = 30  # Desired frame rate
exposure_time = 500  # Exposure time in microseconds
width = 1900  # Width of the image
height = 500 # Height of the image
offset_x = 0  # Offset X
offset_y = 512  # Offset Y
area_threshold = 2500  # Area threshold for object detection

# Raspberry Pi API URL
raspberry_pi_url = "http://192.168.5.118:8000/control-valve/"

# Relaxation pixels
pixel_relaxation = 5

def match_with_relaxation(detection_coords, prev_coords, relaxation=pixel_relaxation):
    x1, y1, x2, y2 = detection_coords
    px1, py1, px2, py2 = prev_coords
    return (px1 - relaxation <= x1 <= px1 + relaxation and
            py1 - relaxation <= y1 <= py1 + relaxation and
            px2 - relaxation <= x2 <= px2 + relaxation and
            py2 - relaxation <= y2 <= py2 + relaxation)

# Suppress the specific warning about weights_only
warnings.filterwarnings("ignore", message="It is possible to construct malicious pickle data")

# Set up logging to a file
logging.basicConfig(filename='./server_requests.log',
                    level=logging.INFO,
                    format='%(asctime)s %(levelname)s %(message)s')

device = torch.device("cuda:0")
print(device)

# Load the YOLO model
model = YOLO("512 - augmentation - V3.pt").to(device)

# Configure Basler camera
camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateFirstDevice())
camera.Open()

# Set the camera parameters
camera.AcquisitionFrameRateEnable.SetValue(True)
camera.AcquisitionFrameRate.SetValue(fps)
camera.Width.SetValue(width)
camera.Height.SetValue(height)
camera.OffsetX.SetValue(offset_x)
camera.OffsetY.SetValue(offset_y)
camera.ExposureTime.SetValue(exposure_time)  # Set exposure time in microseconds

# Start image acquisition
camera.StartGrabbing(pylon.GrabStrategy_LatestImageOnly)

# Image converter to OpenCV format
converter = pylon.ImageFormatConverter()
converter.OutputPixelFormat = pylon.PixelType_BGR8packed
converter.OutputBitAlignment = pylon.OutputBitAlignment_MsbAligned

counter = 0
send_counter = 0

# Create a directory for saving images if it doesn't exist
if not os.path.exists('img'):
    os.makedirs('img')

# Function to save the detected image
def save_image(image, path):
    cv2.imwrite(path, image)
    logging.info(f"Image saved at {path}")

# Function to send the command to the Raspberry Pi
def send_valve_command(valve_id):
    try:
        response = httpx.get(f"{raspberry_pi_url}?valve_id={valve_id}")
        logging.info(f"Command sent to Raspberry Pi for valve {valve_id}: {response.status_code}")
    except Exception as e:
        logging.error(f"Error sending command to Raspberry Pi: {e}")

# Main loop for image acquisition and YOLO detection
with ThreadPoolExecutor() as executor:  # Thread pool for parallel tasks
    while camera.IsGrabbing():
        grabResult = camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)

        if grabResult.GrabSucceeded():
            # Convert image to OpenCV format
            image = converter.Convert(grabResult)
            frame = image.GetArray()

            # Resize frame for processing
            height, width = frame.shape[:2]
            scaling_factor = 512 / max(width, height)
            new_w = int(width * scaling_factor)
            new_h = int(height * scaling_factor)
            middle_line_y = new_h // 2
            frame = cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_LINEAR)

            # Crop the frame to get only the left half
            left_half = frame[:, :new_w // 2]

            # Run YOLO for detection inference on the left half
            results = model.predict(left_half, show=False)

            boxes = results[0].boxes.xyxy.cpu()  # Extract bounding boxes
            confidences = results[0].boxes.conf.cpu()  # Extract confidences
            class_ids = results[0].boxes.cls.cpu()  # Extract class IDs

            # Convert confidences to a tensor
            confidences_tensor = torch.tensor(confidences)

            # Apply NMS (Non-Maximum Suppression)
            nms_indices = nms(boxes, confidences_tensor, iou_threshold=0.5)

            # Filter out the boxes, confidences, and class_ids using NMS results
            nms_boxes = boxes[nms_indices]
            nms_confidences = confidences_tensor[nms_indices]
            nms_class_ids = class_ids[nms_indices]

            # Draw lines on the left half
            left_half = cv2.line(left_half, (0, new_h // 2), (new_w // 2, new_h // 2), (255, 255, 255), 2)

            # Loop through NMS-filtered detections and draw them on the left half
            for i, (box, conf, class_id) in enumerate(zip(nms_boxes, nms_confidences, nms_class_ids)):
                x1, y1, x2, y2 = map(int, box)
                area = (x2 - x1) * (y2 - y1)
                area_label = f"Area: {area}"
                label = f"Class {int(class_id)} - {area_label}"

                # SETTING AREA THRESHOLD
                if area < area_threshold:
                    continue


                center_x = (x1 + x2) // 2
                center_y = (y1 + y2) // 2

                cv2.rectangle(left_half, (x1, y1), (x2, y2), (255, 255, 255), 1)
                cv2.putText(left_half, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.35, (255, 255, 255), 1)

                if middle_line_y - 15 <= center_y <= middle_line_y + 15:
                # Save the image and send the command in parallel
                    image_path = f'img/{counter}.jpg'
                    executor.submit(save_image, left_half.copy(), image_path)

                    valve_id = int(str(3) + str(int(class_id) + 1))  # Create valve command
                    executor.submit(send_valve_command, valve_id)

                    counter += 1
                    print("=================> ", label)

            # Display the left half
            cv2.imshow("Left Half Stream", left_half)

            if cv2.waitKey(1) & 0xFF == ord("q"):
                break

        grabResult.Release()

# Cleanup
camera.StopGrabbing()
camera.Close()
cv2.destroyAllWindows()