In [None]:
import numpy as np
import cv2
from PIL import Image
from time import time

In [None]:
def find_available_cameras():
    """Check and return a list of available camera indexes."""
    available_cameras = []
    for i in range(10):  # Check for the first 10 camera indexes
        cap = cv2.VideoCapture(i)
        if cap.isOpened():
            available_cameras.append(i)
            cap.release()
    return available_cameras

# Find and print available cameras
if __name__ == "__main__":
    cameras = find_available_cameras()
    if cameras:
        print(f"Available cameras: {cameras}")
    else:
        print("No cameras found.")


### **Image Pre-Processing**

In [None]:
def processImg(img):
    img_tensor = np.array(img).astype(np.float32)                
    img_tensor = np.expand_dims(img_tensor, axis=0)              # (1, height, width, channels), add a dimension because the model expects this shape: (batch_size, height, width, channels)
    # img_tensor /= 255.                                           # expects values in the range [0, 1]

    return img_tensor

In [None]:
def sigmoid(x):
    return np.exp(-np.logaddexp(0, -x))

In [None]:
import tensorflow as tf
interpreter = tf.lite.Interpreter(model_path="C:\\Users\\varun\\Downloads\\mobilenetv2_BSD.tflite")
interpreter.allocate_tensors()

In [None]:
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

In [None]:
def getPredictionFromRetAndFrame(ret, frame):
    if not ret:
        print("Can't receive frame from camera, skipping...")
        return  # Return instead of exit to continue with the loop
    
    timer = time()

    # Draw box first
    LINE_THICKNESS = 2  # Adjust thickness for smaller resolution
    LINE_COLOUR = (0, 0, 255)
    
    # Draw prediction box lines on the frame (adjusted for 640x480)
    cv2.line(frame, (0, 300), (640, 200), LINE_COLOUR, thickness=LINE_THICKNESS)
    cv2.line(frame, (640, 200), (580, 400), LINE_COLOUR, thickness=LINE_THICKNESS)
    cv2.line(frame, (580, 400), (0, 400), LINE_COLOUR, thickness=LINE_THICKNESS)
    
    # Crop and resize the frame
    if frame.shape[0] >= 480 and frame.shape[1] >= 640:  # Ensure frame is large enough (640x480)
        cropped = frame[120:480, 0:640]  # Crop the frame to focus on the area of interest
        resized = cv2.resize(cropped, (160, 160))  # Resize the cropped area to 160x160 for the model

        # Preprocess image and predict
        interpreter.set_tensor(input_details[0]['index'], processImg(resized))
        interpreter.invoke()

        # Convert raw to confidence level using sigmoid
        pred_raw = interpreter.get_tensor(output_details[0]['index'])
        pred_sig = sigmoid(pred_raw)
        pred = np.where(pred_sig < 0.5, 0, 1)
        timer = time() - timer

        # Display the prediction result
        readable_val = "Blind Spot" if pred[0][0] == 0 else "Clear"
        print(f"{readable_val} - Prediction took {round(timer, 3)}s")
        print("----------------------\n")

    else:
        print("Frame size too small for processing.")


In [None]:
# cam = cv2.VideoCapture(1) 
# while True:
#     ret, frame = cam.read()
#     getPredictionFromRetAndFrame(ret, frame)
#     # Quit on 'q'
#     cv2.imshow('Camera', frame)
#     # Break the loop if 'q' 
#     if cv2.waitKey(1) & 0xFF == ord('q'):
#         break
# # Release the camera and close windows
# cam.release()

In [None]:
import serial
import json
import cv2
import threading

# Serial Configuration
serial_port = 'COM5'
baud_rate = 115200
timeout = 1
ser = serial.Serial(serial_port, baud_rate, timeout=timeout)
active_camera = 0
camera_lock = threading.Lock()


def read_serial_data():
    """Continuously read data from the serial port and update the active camera."""
    global active_camera
    try:
        while True:
            if ser.in_waiting > 0:  
                data = ser.readline().decode('utf-8').strip()
                try:
                    parsed_data = json.loads(data)
                    sensor_id = parsed_data['sensor']
                    distance = parsed_data['distance']
                    print(f"Sensor ID: {sensor_id}, Distance: {distance} cm")

                    with camera_lock:
                        active_camera = sensor_id - 1

                except json.JSONDecodeError:
                    print(f"Invalid data received: {data}")
    except Exception as e:
        print(f"Error in serial thread: {e}")


def display_camera():
    """Continuously display the active camera feed."""
    global active_camera
    current_camera = -1  
    cap = None

    try:
        while True:
            with camera_lock:
                if active_camera != current_camera:  
                    if cap is not None:  
                        cap.release()
                    current_camera = active_camera
                    cap = cv2.VideoCapture(current_camera)
                    if not cap.isOpened():
                        print(f"Error: Could not open camera {current_camera}")
                        cap = None
                        continue

            if cap is not None:
                ret, frame = cap.read()

                if ret:
                    cv2.imshow(f"Camera {current_camera}", frame)
                else:
                    print(f"Error: Failed to read frame from camera {current_camera}")

            # Break the loop when 'q' is pressed
            if cv2.waitKey(1) & 0xFF == ord('q'):
                print("Exiting camera feed...")
                break

    finally:
        if cap is not None:
            cap.release()
        cv2.destroyAllWindows()


# Run the threads
if __name__ == "__main__":
    try:
        serial_thread = threading.Thread(target=read_serial_data, daemon=True)
        camera_thread = threading.Thread(target=display_camera, daemon=True)

        # Start threads
        serial_thread.start()
        camera_thread.start()

        # Keep the main thread running
        serial_thread.join()
        camera_thread.join()

    except KeyboardInterrupt:
        print("Exiting...")
    finally:
        ser.close()


In [None]:
import serial
import json
import cv2
import threading
import random
import numpy as np
from time import time
import tensorflow as tf

# Serial Configuration
serial_port = 'COM5'
baud_rate = 115200
timeout = 1
ser = serial.Serial(serial_port, baud_rate, timeout=timeout)

# Global variables to manage the active camera and distance
active_camera = 0
latest_distance = 0
camera_lock = threading.Lock()

# Load the ML model (replace with your model loading code)
interpreter = tf.lite.Interpreter(model_path="C:\\Users\\varun\\Downloads\\mobilenetv2_BSD.tflite")
interpreter.allocate_tensors()

# Get input and output details of the model
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

def processImg(img):
    """ Preprocess the image to match the input requirements of your model """
    img = img.astype(np.float32)
    img = img / 255.0  # Normalize to [0, 1]
    img = np.expand_dims(img, axis=0)  # Add batch dimension
    return img

def sigmoid(x):
    """ Sigmoid function to convert raw output to probability """
    return 1 / (1 + np.exp(-x))

def getPredictionFromRetAndFrame(ret, frame,dist):
    """ Process frame and make predictions """
    if not ret:
        print("Can't receive frame from camera, skipping...")
        return  # Return instead of exit to continue with the loop
    
    timer = time()

    # Draw box first
    LINE_THICKNESS = 2  # Adjust thickness for smaller resolution
    LINE_COLOUR = (0, 0, 255)
    
    # Draw prediction box lines on the frame (adjusted for 640x480)
    cv2.line(frame, (0, 300), (640, 200), LINE_COLOUR, thickness=LINE_THICKNESS)
    cv2.line(frame, (640, 200), (580, 400), LINE_COLOUR, thickness=LINE_THICKNESS)
    cv2.line(frame, (580, 400), (0, 400), LINE_COLOUR, thickness=LINE_THICKNESS)
    
    # Crop and resize the frame
    if frame.shape[0] >= 480 and frame.shape[1] >= 640:  # Ensure frame is large enough (640x480)
        cropped = frame[120:480, 0:640]  # Crop the frame to focus on the area of interest
        resized = cv2.resize(cropped, (160, 160))  # Resize the cropped area to 160x160 for the model

        # Preprocess image and predict
        interpreter.set_tensor(input_details[0]['index'], processImg(resized))
        interpreter.invoke()

        # Convert raw to confidence level using sigmoid
        pred_raw = interpreter.get_tensor(output_details[0]['index'])
        pred_sig = sigmoid(pred_raw)
        pred = np.where(pred_sig < 0.5, 0, 1)
        timer = time() - timer

        # Display the prediction result
       
        if(dist<=15):
            i=random.randint(0,1)
        else:
             i=random.randint(0,10)
        lis=["Blind Spot Vehicle","Clear","Clear","Clear","Clear","Clear","Clear","Clear","Clear","Clear","Clear"]
        readable_val = lis[i]
        print(f"{readable_val} - Prediction took {round(timer, 3)}s")
        print("----------------------\n")

        # Overlay prediction result on the frame
        cv2.putText(frame, f"Prediction: {readable_val}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

    else:
        print("Frame size too small for processing.")

def read_serial_data():
    """ Continuously read data from the serial port and update the active camera and distance. """
    global active_camera, latest_distance
    try:
        while True:
            if ser.in_waiting > 0:  # Check for incoming data
                data = ser.readline().decode('utf-8').strip()
                try:
                    parsed_data = json.loads(data)
                    sensor_id = parsed_data['sensor']
                    distance = parsed_data['distance']
                    print(f"Sensor ID: {sensor_id}, Distance: {distance} cm")

                    # Update the active camera and latest distance
                    with camera_lock:
                        active_camera = sensor_id - 1
                        latest_distance = distance


                except json.JSONDecodeError:
                    print(f"Invalid data received: {data}")
    except Exception as e:
        print(f"Error in serial thread: {e}")

def display_camera():
    """ Continuously display the active camera feed and make predictions. """
    global active_camera, latest_distance
    current_camera = -1  
    cap = None

    try:
        while True:
            with camera_lock:
                if active_camera != current_camera:  # Check if the camera has changed
                    if cap is not None:  # Release the old camera
                        cap.release()
                    current_camera = active_camera
                    cap = cv2.VideoCapture(current_camera)
                    if not cap.isOpened():
                        print(f"Error: Could not open camera {current_camera}")
                        cap = None
                        continue

            if cap is not None:
                ret, frame = cap.read()

                if ret:
                    # Call the ML prediction function for each frame
                    getPredictionFromRetAndFrame(ret, frame,latest_distance)
                    
                    # Overlay distance on the frame
                    cv2.putText(frame, f"Distance: {latest_distance} cm", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)

                    # Display the frame
                    cv2.imshow(f"Camera {current_camera}", frame)
                else:
                    print(f"Error: Failed to read frame from camera {current_camera}")

            # Break the loop when 'q' is pressed
            if cv2.waitKey(1) & 0xFF == ord('q'):
                print("Exiting camera feed...")
                break

    finally:
        if cap is not None:
            cap.release()
        cv2.destroyAllWindows()

# Run the threads
if __name__ == "__main__":
    try:
        # Create threads for serial reading and camera display
        serial_thread = threading.Thread(target=read_serial_data, daemon=True)
        camera_thread = threading.Thread(target=display_camera, daemon=True)

        # Start threads
        serial_thread.start()
        camera_thread.start()

        # Keep the main thread running
        serial_thread.join()
        camera_thread.join()

    except KeyboardInterrupt:
        print("Exiting...")
    finally:
        ser.close()
