In [None]:
%%capture
!pip install gtts ultralytics supervision pydub googletrans==4.0.0-rc1

In [None]:
# import dependencies
from IPython.display import display, Javascript, Image, Audio
from google.colab.output import eval_js
from google.colab.patches import cv2_imshow
from base64 import b64decode, b64encode
import cv2, PIL, io, os, html, time
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import torch
from gtts import gTTS
from ultralytics import YOLO
from googletrans import Translator
from supervision import Detections, LabelAnnotator, BoundingBoxAnnotator
import soundfile as sf
import librosa

In [None]:
# function to convert the JavaScript object into an OpenCV image
def js_to_image(js_reply):
  """
  Params:
          js_reply: JavaScript object containing image from webcam
  Returns:
          img: OpenCV BGR image
  """
  # decode base64 image
  image_bytes = b64decode(js_reply.split(',')[1])
  # convert bytes to numpy array
  jpg_as_np = np.frombuffer(image_bytes, dtype=np.uint8)
  # decode numpy array into OpenCV BGR image
  img = cv2.imdecode(jpg_as_np, flags=1)

  return img

# function to convert OpenCV Rectangle bounding box image into base64 byte string to be overlayed on video stream
def bbox_to_bytes(bbox_array):
  """
  Params:
          bbox_array: Numpy array (pixels) containing rectangle to overlay on video stream.
  Returns:
        bytes: Base64 image byte string
  """
  # convert array into PIL image
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGB')
  iobuf = io.BytesIO()
  # format bbox into png for return
  bbox_PIL.save(iobuf, format='png')
  # format return string
  bbox_bytes = 'data:image/png;base64,{}'.format((str(b64encode(iobuf.getvalue()), 'utf-8')))

  return bbox_bytes

In [None]:
# JavaScript to properly create our live video stream using our webcam as input
def video_stream():
  js = Javascript('''
    var video;
    var div = null;
    var stream;
    var captureCanvas;
    var imgElement;
    var labelElement;

    var pendingResolve = null;
    var shutdown = false;

    function removeDom() {
       stream.getVideoTracks()[0].stop();
       video.remove();
       div.remove();
       video = null;
       div = null;
       stream = null;
       imgElement = null;
       captureCanvas = null;
       labelElement = null;
    }

    function onAnimationFrame() {
      if (!shutdown) {
        window.requestAnimationFrame(onAnimationFrame);
      }
      if (pendingResolve) {
        var result = "";
        if (!shutdown) {
          captureCanvas.getContext('2d').drawImage(video, 0, 0, 640, 480);
          result = captureCanvas.toDataURL('image/jpeg', 0.8)
        }
        var lp = pendingResolve;
        pendingResolve = null;
        lp(result);
      }
    }

    async function createDom() {
      if (div !== null) {
        return stream;
      }

      div = document.createElement('div');
      div.style.border = '2px solid black';
      div.style.padding = '3px';
      div.style.width = '100%';
      div.style.maxWidth = '600px';
      document.body.appendChild(div);

      const modelOut = document.createElement('div');
      modelOut.innerHTML = "<span>Status:</span>";
      labelElement = document.createElement('span');
      labelElement.innerText = 'No data';
      labelElement.style.fontWeight = 'bold';
      modelOut.appendChild(labelElement);
      div.appendChild(modelOut);

      video = document.createElement('video');
      video.style.display = 'block';
      video.width = div.clientWidth - 6;
      video.setAttribute('playsinline', '');
      video.onclick = () => { shutdown = true; };
      stream = await navigator.mediaDevices.getUserMedia(
          {video: { facingMode: "environment"}});
      div.appendChild(video);

      imgElement = document.createElement('img');
      imgElement.style.position = 'absolute';
      imgElement.style.zIndex = 1;
      imgElement.onclick = () => { shutdown = true; };
      div.appendChild(imgElement);

      const instruction = document.createElement('div');
      instruction.innerHTML =
          '<span style="color: red; font-weight: bold;">' +
          'When finished, click here or on the video to stop this demo</span>';
      div.appendChild(instruction);
      instruction.onclick = () => { shutdown = true; };

      video.srcObject = stream;
      await video.play();

      captureCanvas = document.createElement('canvas');
      captureCanvas.width = 640; //video.videoWidth;
      captureCanvas.height = 480; //video.videoHeight;
      window.requestAnimationFrame(onAnimationFrame);

      return stream;
    }
    async function stream_frame(label, imgData) {
      if (shutdown) {
        removeDom();
        shutdown = false;
        return '';
      }

      var preCreate = Date.now();
      stream = await createDom();

      var preShow = Date.now();
      if (label != "") {
        labelElement.innerHTML = label;
      }

      if (imgData != "") {
        var videoRect = video.getClientRects()[0];
        imgElement.style.top = videoRect.top + "px";
        imgElement.style.left = videoRect.left + "px";
        imgElement.style.width = videoRect.width + "px";
        imgElement.style.height = videoRect.height + "px";
        imgElement.src = imgData;
      }

      var preCapture = Date.now();
      var result = await new Promise(function(resolve, reject) {
        pendingResolve = resolve;
      });
      shutdown = false;

      return {'create': preShow - preCreate,
              'show': preCapture - preShow,
              'capture': Date.now() - preCapture,
              'img': result};
    }
    ''')

  display(js)

def video_frame(label, bbox):
  data = eval_js('stream_frame("{}", "{}")'.format(label, bbox))
  return data

In [None]:
class ObjectDetection:
    def __init__(self, capture_index):
        self.capture_index = capture_index
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        print("Using Device: ", self.device)
        self.model = self.load_model()
        self.CLASS_NAMES_DICT = self.model.model.names
        self.box_annotator = BoundingBoxAnnotator()
        self.label_annotator = LabelAnnotator()
        self.KNOWN_DISTANCE = 16 #INCHES
        self.MOBILE_WIDTH = 3.0 #INCHES
        model = YOLO("yolov8l.pt", task='detect')
        res = model.predict('/Ref_images/person_mobile1.jpg', verbose=False)
        x1, y1, x2, y2 = res[0][1].boxes.xyxy.cpu().numpy()[0]
        # mobile width in ref frame (in pixels)
        self.mobile_width_in_rf = x2 - x1


    def load_model(self):
        model = YOLO("yolov8l.pt", task='detect')
        model.fuse()
        return model

    def predict(self, frame):
        results = self.model.predict(frame, verbose=False)
        return results

    def focal_length_finder (self, measured_distance, real_width, width_in_rf):
        focal_length = (width_in_rf * measured_distance) / real_width
        return focal_length

    # distance finder function
    def distance_finder(self, focal_length, real_object_width, width_in_frmae):
        distance = (real_object_width * focal_length) / width_in_frmae
        return int(round(distance,0))

    def mobile_width_in_frame(self, detections):
        for xyxy, mask, confidence, class_id, tracker_id, data in detections:
            if class_id==67:
                x1, y1, x2, y2 = xyxy
                width = x2 - x1
                return width

    def play_audio(self, text, ch):
        if ch==1:
            tts = gTTS(text=text, lang='en', tld="us")
            print(text)
        elif ch==2:
            translator = Translator()
            translated = translator.translate(text, src='en', dest='hi')
            tts = gTTS(text=translated.text, lang='hi', tld="us")
            print(translated.text)
        else:
            translator = Translator()
            translated = translator.translate(text, src='en', dest='mr')
            tts = gTTS(text=translated.text, lang='mr', tld="us")
            print(translated.text)

        tts.save('temp_audio.wav')
        audio, sr = librosa.load("temp_audio.wav")
        # Increase the speed by 1.5x
        faster_audio = librosa.effects.time_stretch(audio, rate=1.5)
        sf.write("temp_audio.wav", faster_audio, sr)
        display(Audio('temp_audio.wav', autoplay=True))
        time.sleep(2)

    def plot_bboxes(self, results, frame, ch):
        xyxys = []
        confidences = []
        class_ids = []

        # Extract detections if conf > 0.8
        for result in results[0]:
            class_id = result.boxes.cls.cpu().numpy().astype(int)
            confidence = result.boxes.conf.cpu().numpy()
            if confidence > 0.75:
                xyxy = result.boxes.xyxy.cpu().numpy()
                xyxys.append(xyxy.reshape(-1, 4))
                confidences.append(confidence)
                class_ids.append(class_id)

        if xyxys:
            # Setup detections for visualization
            detections = Detections(
                xyxy=np.concatenate(xyxys),
                confidence=np.concatenate(confidences),
                class_id=np.concatenate(class_ids),
            )

            # Logic to check if mobile detected
            if any([True if class_id==67 else False for xyxy, mask, confidence, class_id, tracker_id, data in detections]):
                focal_mobile = self.focal_length_finder(self.KNOWN_DISTANCE, self.MOBILE_WIDTH, self.mobile_width_in_rf)
                width_in_frame = self.mobile_width_in_frame(detections)
                distance = self.distance_finder(focal_mobile, self.MOBILE_WIDTH, width_in_frame)
                # Format custom labels
                self.labels = [f"Mobile at Distance: {distance} inches" if class_id==67 else f'{self.CLASS_NAMES_DICT[class_id]} {confidence:0.2f}' for xyxy, mask, confidence, class_id, tracker_id, data in detections]

                txt = f'Mobile is {distance} inches away'
                self.play_audio(txt, ch)

                # Annotate and display frame
                frame = self.box_annotator.annotate(scene=frame, detections=detections)
                frame = self.label_annotator.annotate(scene=frame, detections=detections, labels=self.labels)
                return frame

            # Format custom labels
            self.labels = [f"{self.CLASS_NAMES_DICT[class_id]} {confidence:0.2f}" for xyxy, mask, confidence, class_id, tracker_id, data in detections]

            # Play audio sound
            if self.labels:
                if len(self.labels)==1:
                    for label in self.labels:
                        item = label.split()
                        if len(item)==3:
                            txt = f"There is a {item[0]} {item[1]} ahead"
                            self.play_audio(txt, ch)
                        elif len(item)==2:
                            txt = f"There is a {item[0]} ahead"
                            self.play_audio(txt, ch)
                else:
                    txt = f'There are {len(self.labels)} objects: '
                    for label in self.labels[:-1]:
                        item = label.split()
                        if len(item)==3:
                            txt += f"{item[0]} {item[1]}, "
                        elif len(item)==2:
                            txt += f"{item[0]}, "
                    # Adding last item
                    last_item = self.labels[-1].split()
                    if len(last_item)==3:
                        txt += f"and {last_item[0]} {last_item[1]}"
                    elif len(last_item)==2:
                        txt += f"and {last_item[0]}"
                    self.play_audio(txt, ch)

            # Annotate and display frame
            frame = self.box_annotator.annotate(scene=frame, detections=detections)
            frame = self.label_annotator.annotate(scene=frame, detections=detections, labels=self.labels)
        return frame

    def __call__(self):
        print("Choose Language...\n  1. For English\n  2. For Hindi\n  3. For Marathi\n")
        while True:
            try:
                ch = int(input('Enter Here (1, 2 or 3): '))
                if ch not in [1, 2, 3]:
                    print("Invalid input. Please enter 1, 2, or 3.")
                else:
                    break
            except ValueError:
                print("Invalid input. Please enter a valid number.")

        # start streaming video from webcam
        video_stream()
        # label for video
        label_html = 'Capturing...'
        # initialze bounding box to empty
        bbox = ''
        count = 0
        while True:
            js_reply = video_frame(label_html, bbox)
            if not js_reply:
                break

            # convert JS response to OpenCV Image
            frame = js_to_image(js_reply["img"])
            results = self.predict(frame)
            frame = self.plot_bboxes(results, frame, ch)
            cv2_imshow(frame)
            if cv2.waitKey(5) & 0xFF == 27:
                break

if __name__=="__main__":
    detector = ObjectDetection(capture_index=0)
    detector()

### Only for testing purpose

In [None]:
model = YOLO("/content/drive/MyDrive/Major project/yolov8l.pt", task='detect')
res = model.predict('/content/drive/MyDrive/Major project/mobile1.jpg', verbose=False)
# for result in res[0][1]
class_id = res[0][1].boxes.cls.cpu().numpy().astype(int)
confidence = res[0][1].boxes.conf.cpu().numpy()
x1, y1, x2, y2 = res[0][1].boxes.xyxy.cpu().numpy()[0]
width = x2 - x1
print(class_id[0], confidence[0])
print(width)

In [None]:
model = YOLO("/content/drive/MyDrive/Major project/yolov8l.pt", task='detect')
res = model.predict('/content/drive/MyDrive/Major project/mobile1.jpg', verbose=False)
xyxys = []
confidences = []
class_ids = []
for result in res[0]:
    class_id = result.boxes.cls.cpu().numpy().astype(int)
    confidence = result.boxes.conf.cpu().numpy()
    if confidence > 0.75:
        xyxy = result.boxes.xyxy.cpu().numpy()
        xyxys.append(xyxy.reshape(-1, 4))
        confidences.append(confidence)
        class_ids.append(class_id)

if xyxys:
    # Setup detections for visualization
    detections = Detections(
        xyxy=np.concatenate(xyxys),
        confidence=np.concatenate(confidences),
        class_id=np.concatenate(class_ids),
    )
    for xyxy, mask, confidence, class_id, tracker_id, data in detections:
      if class_id==67:
        print(xyxy)