In [1]:
import urllib.request
import tarfile
import os

def download_and_extract_model(model_url, model_dir):
    if not os.path.exists(model_dir):
        urllib.request.urlretrieve(model_url, "model.tar.gz")
        with tarfile.open("model.tar.gz", 'r:gz') as tar:
            tar.extractall(path=model_dir)

MODEL_URL = 'http://download.tensorflow.org/models/object_detection/ssd_mobilenet_v2_coco_2018_03_29.tar.gz'
MODEL_DIR = 'ssd_mobilenet_v2_coco_2018_03_29'
download_and_extract_model(MODEL_URL, MODEL_DIR)


In [2]:
import tensorflow as tf

# Load the pre-trained model
model = tf.saved_model.load(os.path.join(MODEL_DIR, 'saved_model'))

# Get the inference function
infer = model.signatures['serving_default']


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


In [3]:
import numpy as np
import cv2
import pyttsx3
from threading import Thread

# Initialize text-to-speech engine
engine = pyttsx3.init()

# Function to perform object detection on a frame
def detect_objects(frame, infer):
    # Resize frame to reduce processing time
    frame_resized = cv2.resize(frame, (300, 300))
    # Convert the frame to RGB
    frame_rgb = cv2.cvtColor(frame_resized, cv2.COLOR_BGR2RGB)
    # Convert the frame to tensor
    input_tensor = tf.convert_to_tensor(frame_rgb)
    input_tensor = input_tensor[tf.newaxis, ...]
    # Perform inference
    detections = infer(input_tensor)
    # Extract detection data
    boxes = detections['detection_boxes'][0].numpy()
    scores = detections['detection_scores'][0].numpy()
    classes = detections['detection_classes'][0].numpy().astype(np.int64)
    return boxes, scores, classes

# Function to visualize detected objects
def visualize_detection(frame, boxes, scores, classes, threshold=0.5):
    for i in range(len(scores)):
        if scores[i] > threshold:
            box = boxes[i]
            ymin, xmin, ymax, xmax = box
            left, right, top, bottom = int(xmin * frame.shape[1]), int(xmax * frame.shape[1]), int(ymin * frame.shape[0]), int(ymax * frame.shape[0])
            cv2.rectangle(frame, (left, top), (right, bottom), (0, 255, 0), 2)
    return frame

# Function to navigate based on detected obstacles and display message
def navigate_and_display_message(frame, boxes, scores, threshold=0.5):
    message = "Path is clear! Move forward."

    for i in range(len(scores)):
        if scores[i] > threshold:
            box = boxes[i]
            xmin = box[1]
            if xmin < 0.33:
                message = "Obstacle detected on the left! Please move right."
            elif xmin > 0.66:
                message = "Obstacle detected on the right! Please move left."
            else:
                message = "Obstacle detected in the center! Please stop or change direction."
            break

    # Display the message on the frame
    cv2.putText(frame, message, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)

    # Speak the message
    engine.say(message)
    engine.runAndWait()

    return frame


In [None]:
from IPython.display import display
import ipywidgets as widgets
import time

class VideoStreamWidget:
    def __init__(self, src=0):
        self.capture = cv2.VideoCapture(src)
        self.status, self.frame = self.capture.read()
        self.thread = Thread(target=self.update, args=())
        self.thread.daemon = True
        self.thread.start()

    def update(self):
        while True:
            if self.capture.isOpened():
                self.status, self.frame = self.capture.read()
                time.sleep(0.01)

    def show_frame(self):
        frame = self.frame.copy()
        # Process only if the frame is valid
        if frame is not None:
            boxes, scores, classes = detect_objects(frame, infer)
            frame_with_detections = visualize_detection(frame, boxes, scores, classes)
            frame_with_message = navigate_and_display_message(frame_with_detections, boxes, scores)
            _, jpeg = cv2.imencode('.jpg', frame_with_message)
            return jpeg.tobytes()
        return None

# Create an image widget for display
image_widget = widgets.Image(format='jpeg')
display(image_widget)

video_stream_widget = VideoStreamWidget()

try:
    while True:
        frame_bytes = video_stream_widget.show_frame()
        if frame_bytes:
            image_widget.value = frame_bytes
        time.sleep(0.01)  # Adjust the sleep time to control the refresh rate
except KeyboardInterrupt:
    pass
finally:
    video_stream_widget.capture.release()


Image(value=b'', format='jpeg')