https://github.com/tensorflow/examples/blob/master/lite/examples/object_detection/raspberry_pi

In [None]:
!pip install protobuf==3.20.1 

In [None]:
!pip install tflite-support==0.4.0 

In [None]:
FILE='./efficientdet_lite0.tflite'  

In [None]:
!curl \
    -L 'https://tfhub.dev/tensorflow/lite-model/efficientdet/lite0/detection/metadata/1?lite-format=tflite' -o {FILE}

In [None]:
!sudo apt-get install libportaudio2
!pip install sounddevice

In [None]:
!pip install pushbullet.py==0.9.1
!pip install pywebio 

In [None]:
from pushbullet import PushBullet
from pywebio.input import *
from pywebio.output import *
from pywebio.session import *
import time 

In [None]:
# Import the following modules
import requests
import json

# Function to send Push Notification


def pushbullet_noti(title, body):

	TOKEN = '' # Pass your Access Token here
	# Make a dictionary that includes, title and body
	msg = {"type": "note", "title": title, "body": body}
	# Sent a posts request
	resp = requests.post('https://api.pushbullet.com/v2/pushes',
						data=json.dumps(msg),
						headers={'Authorization': 'Bearer ' + TOKEN,
								'Content-Type': 'application/json'})
	if resp.status_code != 200: 
		raise Exception('Error', resp.status_code)
	else:
		print('Message sent')


In [None]:
import base64
import html
import io
import time

from IPython.display import display, Javascript
from google.colab.output import eval_js
import numpy as np
from PIL import Image
import cv2
import matplotlib.pyplot as plt

def start_input():
  js = Javascript('''
    var video;
    var div = null;
    var stream;
    var captureCanvas;
    var imgElement;
    var labelElement;
    
    var pendingResolve = null;
    var shutdown = false;
    
    function removeDom() {
       stream.getVideoTracks()[0].stop();
       video.remove();
       div.remove();
       video = null;
       div = null;
       stream = null;
       imgElement = null;
       captureCanvas = null;
       labelElement = null;
    }
    
    function onAnimationFrame() {
      if (!shutdown) {
        window.requestAnimationFrame(onAnimationFrame);
      }
      if (pendingResolve) {
        var result = "";
        if (!shutdown) {
          captureCanvas.getContext('2d').drawImage(video, 0, 0, 512, 512);
          result = captureCanvas.toDataURL('image/jpeg', 0.8)
        }
        var lp = pendingResolve;
        pendingResolve = null;
        lp(result);
      }
    }
    
    async function createDom() {
      if (div !== null) {
        return stream;
      }

      div = document.createElement('div');
      div.style.border = '2px solid black';
      div.style.padding = '3px';
      div.style.width = '100%';
      div.style.maxWidth = '600px';
      document.body.appendChild(div);
      
      const modelOut = document.createElement('div');
      modelOut.innerHTML = "<span>Status:</span>";
      labelElement = document.createElement('span');
      labelElement.innerText = 'No data';
      labelElement.style.fontWeight = 'bold';
      modelOut.appendChild(labelElement);
      div.appendChild(modelOut);
           
      video = document.createElement('video');
      video.style.display = 'block';
      video.width = div.clientWidth - 6;
      video.setAttribute('playsinline', '');
      video.onclick = () => { shutdown = true; };
      stream = await navigator.mediaDevices.getUserMedia(
          {video: { facingMode: "environment"}});
      div.appendChild(video);

      imgElement = document.createElement('img');
      imgElement.style.position = 'absolute';
      imgElement.style.zIndex = 1;
      imgElement.onclick = () => { shutdown = true; };
      div.appendChild(imgElement);
      
      const instruction = document.createElement('div');
      instruction.innerHTML = 
          '<span style="color: red; font-weight: bold;">' +
          'When finished, click here or on the video to stop this demo</span>';
      div.appendChild(instruction);
      instruction.onclick = () => { shutdown = true; };
      
      video.srcObject = stream;
      await video.play();

      captureCanvas = document.createElement('canvas');
      captureCanvas.width = 512; //video.videoWidth;
      captureCanvas.height = 512; //video.videoHeight;
      window.requestAnimationFrame(onAnimationFrame);
      
      return stream;
    }
    async function takePhoto(label, imgData) {
      if (shutdown) {
        removeDom();
        shutdown = false;
        return '';
      }

      var preCreate = Date.now();
      stream = await createDom();
      
      var preShow = Date.now();
      if (label != "") {
        labelElement.innerHTML = label;
      }
            
      if (imgData != "") {
        var videoRect = video.getClientRects()[0];
        imgElement.style.top = videoRect.top + "px";
        imgElement.style.left = videoRect.left + "px";
        imgElement.style.width = videoRect.width + "px";
        imgElement.style.height = videoRect.height + "px";
        imgElement.src = imgData;
      }
      
      var preCapture = Date.now();
      var result = await new Promise(function(resolve, reject) {
        pendingResolve = resolve;
      });
      shutdown = false;
      
      return {'create': preShow - preCreate, 
              'show': preCapture - preShow, 
              'capture': Date.now() - preCapture,
              'img': result};
    }
    ''')

  display(js)
  
def take_photo(label, img_data):
  data = eval_js('takePhoto("{}", "{}")'.format(label, img_data))
  return data

In [None]:
def js_reply_to_image(js_reply):
    """
    input: 
          js_reply: JavaScript object, contain image from webcam

    output: 
          image_array: image array RGB size 512 x 512 from webcam
    """
    jpeg_bytes = base64.b64decode(js_reply['img'].split(',')[1])
    image_PIL = Image.open(io.BytesIO(jpeg_bytes))
    image_array = np.array(image_PIL)

    return image_array

In [None]:
import cv2
import numpy as np
import argparse
import sys
import time

import cv2
from tflite_support.task import core
from tflite_support.task import processor
from tflite_support.task import vision

In [None]:
_margin = 10 
_row=10
_font_size=1
_font_thickness =1
_text_color=(0,0,255)

In [None]:
def visualize(image:np.ndarray,detection_result:processor.DetectionResult):
  '''
    Draws bounding boxes on the detection results and returns it
    Inputs - input image, detection entities to visualize
    Output-Image with bounding box
  '''
  for detection in detection_result.detections:
    #draw the bounding box
    bbox = detection.bounding_box
    start_point = bbox.origin_x,bbox.origin_y
    end_point = bbox.origin_x+bbox.width, bbox.origin_y+bbox.height
    cv2.rectangle(image,start_point,end_point,_text_color,3)
    #print the label
    category = detection.classes[0]
    class_name = category.class_name
    probability = round(category.score,2)
    result_text = class_name + " " +str(probability*100)+ "%"
    text_location = (_margin+bbox.origin_x , _margin+_row+bbox.origin_y)
    cv2.putText(image, result_text,text_location,cv2.FONT_HERSHEY_PLAIN, _font_size,_text_color,_font_thickness)

  return image


In [None]:
from datetime import datetime 
import time

In [None]:
def run(model: str, num_threads: int,
        enable_edgetpu: bool):
  '''
  Continuously run the inference on the images from the camera
  Inputs- 
    model-name of the tflite pbject detection model
    cameraid-id of the camera
    width-width of the frame captured in the video
    height- height of the frame captured
    num_threads-No of cpus to run the model
    enable_tpu - whether or not to enable the edge TPU 
  '''
  row_size = 20  # pixels
  left_margin = 24  # pixels
  text_color = (0, 0, 255)  # red
  font_size = 1
  font_thickness = 1
  fps_avg_frame_count = 10

  # Initialize the object detection model
  base_options = core.BaseOptions(
        file_name=model, use_coral=enable_edgetpu, num_threads=num_threads)
  detection_options = processor.DetectionOptions(
        max_results=3, score_threshold=0.3)
  options = vision.ObjectDetectorOptions(
        base_options=base_options, detection_options=detection_options)
  detector = vision.ObjectDetector.create_from_options(options)

  

  start_input()
  label_html = 'Capturing...'
  img_data = ''
  count = 0 
  fri=0 
  result = cv2.VideoWriter('output.mp4', 
                         cv2.VideoWriter_fourcc(*'MJPG'),
                         1, (512,512))
  while True:
    js_reply = take_photo(label_html, img_data)
    if not js_reply:
      break
    image = js_reply_to_image(js_reply)
    width,height,_ = image.shape
    image = cv2.flip(image,1)

    
    #convert the image from bgr to rgb if required
    rgb_image = cv2.cvtColor(image,cv2.COLOR_BGR2RGB)
    result.write(rgb_image)
    #create a tensor image object
    input_tensor = vision.TensorImage.create_from_array(rgb_image)
    detection_result = detector.detect(input_tensor)
    for detect in (detection_result).detections:
      if detect.classes[0].class_name == "bird":
        fri+=1
        if fri==1:
          c_time = time.time() 
          print("Bird identified")
          pushbullet_noti("Alert", "A bird detected")
        else:
          diff =  time.time()-c_time 
          # print(diff)
          if diff>10:
            print("Bird identified")
            pushbullet_noti("Alert", "A bird detected")
            fri=0
      
        
    image = visualize(image, detection_result) 
    
    # plt.imshow(image) 
    # plt.show()
    if not js_reply:
        break
  # video.release()
  result.release()
    
  

In [None]:
model = 'efficientdet_lite0.tflite'
num_threads = 1
enable_edgetpu = False
run(model,num_threads,enable_edgetpu)

