In [1]:
import argparse
import sys
import time
import serial
import cv2
import mediapipe as mp

from mediapipe.tasks import python
from mediapipe.tasks.python import vision

from utils import visualize
from picamera2 import Picamera2

# Global variables to calculate FPS
COUNTER, FPS = 0, 0
START_TIME = time.time()
picam2 = Picamera2()
picam2.preview_configuration.main.size = (640,480)
picam2.preview_configuration.main.format = "RGB888"
picam2.preview_configuration.align()
picam2.configure("preview")
picam2.start()

ser = serial.Serial('/dev/ttyACM0', 115200, timeout=None)
time.sleep(3)  # Wait for Arduino's serial to restart
ser.reset_input_buffer()  # Clear the buffer for any previous data
print("Serial connection established.")

# Functions to make:
# Send and recieve data to and from arduino
# go down and grasp (takes one argument that tells whether it is cardboard (z = 70), plastic (z = 25), paper (z = 12))
# go to trash bin (takes one argument that tells whether it is cardboard (z = 100), plastic (z = 50), paper (z = 50))
                    # Cardboard - theta1 = -45, theta2 = -45
                    # plastic - theta1 = -88, theta2 = -45
                    # paper - theta1 = 130, theta2 = 0
# go back home (No arguments, all zero except z = 120 for detection)
# grasp release object (takes two argument that tells whether it is cardboard, plastic, paper and if it is grasp / release) - Done in arduino
# All those functions return data
def Send_data_recieve(data):
    ser.write(data.encode('utf-8'))
    message = ser.readline().decode('utf-8').rstrip()
    return message

def go_down_grasp(Trash_type):
    if Trash_type == 'cardboard':
        data = '1,0,0,0,0,70,103,200,100' 
        return data
    elif Trash_type == 'plastic':
        data = '1,0,0,0,0,25,105,200,100' 
        return data
    elif Trash_type == 'paper':
        data = '1,0,0,0,0,12,101,200,100'
        return data
    
def go_to_trash_bin(Trash_type):
    if Trash_type == 'cardboard':
        data = '1,0,-45,-45,0,100,104,200,100' 
        return data
    elif Trash_type == 'plastic':
        data = '1,0,-88,-45,0,50,106,200,100' 
        return data
    elif Trash_type == 'paper':
        data = '1,0,130,0,0,50,102,200,100'
        return data
    
def home():
    data = '1,0,0,0,0,120,7,200,100' # 3 for neither grasping nor releasing
    return data

def run(model: str, max_results: int, score_threshold: float, 
        camera_id: int, width: int, height: int) -> None:
    """Continuously run inference on images acquired from the camera.

    Args:
        model: Name of the TFLite object detection model.
        max_results: Max number of detection results.
        score_threshold: The score threshold of detection results.
        camera_id: The camera id to be passed to OpenCV.
        width: The width of the frame captured from the camera.
        height: The height of the frame captured from the camera.
    """

    # Visualization parameters
    row_size = 50  # pixels
    left_margin = 24  # pixels
    text_color = (0, 0, 0)  # black
    font_size = 1
    font_thickness = 1
    fps_avg_frame_count = 10

    detection_frame = None
    detection_result_list = []

    def save_result(result: vision.ObjectDetectorResult, unused_output_image: mp.Image, timestamp_ms: int):
        global FPS, COUNTER, START_TIME

        # Calculate the FPS
        if COUNTER % fps_avg_frame_count == 0:
            FPS = fps_avg_frame_count / (time.time() - START_TIME)
            START_TIME = time.time()

        detection_result_list.append(result)
        COUNTER += 1

    # Initialize the object detection model
    base_options = python.BaseOptions(model_asset_path=model)
    options = vision.ObjectDetectorOptions(base_options=base_options,
                                           running_mode=vision.RunningMode.LIVE_STREAM,
                                           max_results=max_results, score_threshold=score_threshold,
                                           result_callback=save_result)
    detector = vision.ObjectDetector.create_from_options(options)

    if ser.in_waiting > 0: # Check if the robot reached home to start detection
            response = ser.readline().decode('utf-8').rstrip()
            if response == 'Homing Done Successfully':
                # Continuously capture images from the camera and run inference
                while True:
                    im = picam2.capture_array()  
                    image = cv2.resize(im, (640, 480))
                    image = cv2.flip(image, -1)

                    # Convert the image from BGR to RGB as required by the TFLite model.
                    rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
                    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_image)

                    # Run object detection using the model.
                    detector.detect_async(mp_image, time.time_ns() // 1_000_000)

                    # Show the FPS
                    fps_text = 'FPS = {:.1f}'.format(FPS)
                    text_location = (left_margin, row_size)
                    current_frame = image
                    cv2.putText(current_frame, fps_text, text_location, cv2.FONT_HERSHEY_DUPLEX,
                                font_size, text_color, font_thickness, cv2.LINE_AA)

                    if detection_result_list:
                        current_frame = visualize(current_frame, detection_result_list[0])
                        for Detect in detection_result_list.detections:
                            if Detect.categories[0].score >= 0.75: #Only take action if the detection is high enough
                                if Detect.categories[0].category_name == 'plastic':
                                    # Function to go down and grasp
                                    print(Send_data_recieve(go_down_grasp(Detect.categories[0].category_name)))
                                    # Wait for serial input
                                    # Function to go to plastic pin and release
                                    print(go_to_trash_bin(go_down_grasp(Detect.categories[0].category_name)))
                                    # Wait for serial input
                                    # Function to go back home
                                    print(home())
                                    # Wait for serial input

                                elif Detect.categories[0].category_name == 'cardboard':
                                    # Function to go down and grasp
                                    print(Send_data_recieve(go_down_grasp(Detect.categories[0].category_name)))
                                    # Wait for serial input
                                    # Function to go to cardboard pin and release
                                    print(go_to_trash_bin(go_down_grasp(Detect.categories[0].category_name)))
                                    # Wait for serial input
                                    # Function to go back home
                                    print(home())
                                    # Wait for serial input
                                elif Detect.categories[0].category_name == 'paper':
                                    # Function to go down and grasp
                                    print(Send_data_recieve(go_down_grasp(Detect.categories[0].category_name)))
                                    # Wait for serial input
                                    # Function to go to paper pin and release
                                    print(go_to_trash_bin(go_down_grasp(Detect.categories[0].category_name)))
                                    # Wait for serial input
                                    # Function to go back home
                                    print(home())
                                    # Wait for serial input
                        detection_frame = current_frame
                        detection_result_list.clear()

                    if detection_frame is not None:
                        cv2.imshow('object_detection', detection_frame)

                    # Introduce a delay to control the frame rate to 2 fps
                    time.sleep(0.5)  # Sleep for 500 milliseconds to achieve 2 fps

                    # Stop the program if the ESC key is pressed.
                    if cv2.waitKey(1) == 27:
                        break

    detector.close()
    cv2.destroyAllWindows()

def main():
    parser = argparse.ArgumentParser(
        formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument(
        '--model',
        help='Path of the object detection model.',
        required=False,
        default='best.tflite')
    parser.add_argument(
        '--maxResults',
        help='Max number of detection results.',
        required=False,
        default=5)
    parser.add_argument(
        '--scoreThreshold',
        help='The score threshold of detection results.',
        required=False,
        type=float,
        default=0.25)
    parser.add_argument(
        '--cameraId', help='Id of camera.', required=False, type=int, default=0)
    parser.add_argument(
        '--frameWidth',
        help='Width of frame to capture from camera.',
        required=False,
        type=int,
        default=640)
    parser.add_argument(
        '--frameHeight',
        help='Height of frame to capture from camera.',
        required=False,
        type=int,
        default=480)
    args = parser.parse_args()

    run(args.model, int(args.maxResults),
        args.scoreThreshold, int(args.cameraId), args.frameWidth, args.frameHeight)

if __name__ == '__main__':
    main()


ModuleNotFoundError: No module named 'serial'