<a href="https://colab.research.google.com/github/FarahAhmedAtef/Computer-Vision-based-Autonomous-Robotic-Pick-and-Place-System/blob/main/Refined_Script_for_computer_vision_tasks.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import sys
import argparse
import glob
import time
import difflib

import cv2
import numpy as np
from ultralytics import YOLO

import matplotlib.pyplot as plt
import json

import logging
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

logging.basicConfig(
    stream=sys.stderr,
    level=logging.DEBUG,
    format='[%(asctime)s] %(name)s %(levelname)s: %(message)s'
)

logging.getLogger("ppocr").setLevel(logging.DEBUG)
logging.getLogger("ppocr").propagate = True
sys.stderr = open(os.devnull, 'w')
def transform_point(image_point):

    H = np.load("homography_matrix.npy")
    point = np.array([[[image_point[0], image_point[1]]]], dtype=np.float32)
    world_point = cv2.perspectiveTransform(point, H)
    return tuple(world_point[0][0])

from paddleocr import PaddleOCR
ocr = PaddleOCR(use_angle_cls=True, lang='en')  # Or 'de' for German, etc.

def find_text_center_from_roi(frame, bbox_coords, classname):

    x1, y1, x2, y2 = bbox_coords
    roi = frame[y1:y2, x1:x2]
    best_match = None
    best_score = 0
    best_box = None
    besttext2 = None
    bestbox2 = None

    try:
        result = ocr.ocr(roi, cls=True)

        if result is not None and len(result) > 0:
            for line in result[0]:
                if not line or not isinstance(line, (list, tuple)) or len(line) < 2:
                    continue

                text = ''
                if isinstance(line[1], (list, tuple)) and len(line[1]) > 0 and isinstance(line[1][0], str):
                    text = line[1][0]


                box = line[0]
                #print(text)
                score = difflib.SequenceMatcher(None, text.lower(), classname.lower()).ratio()
                if score > best_score:
                    best_score = score
                    best_match = text
                    best_box = box


            if best_box is not None and best_score > 0.4:
                #print('da difflib:' + best_match)
                best_box = np.array(best_box, dtype=np.int32)
                best_box[:, 0] += x1
                best_box[:, 1] += y1
                cv2.polylines(frame, [best_box], True, (0, 255, 0), 2)
                center_x = int(np.mean(best_box[:, 0]))
                center_y = int(np.mean(best_box[:, 1]))
                return (center_x, center_y)
            else:
                if result is not None and len(result) > 0:
                  for line in result[0]:
                        if not line or not isinstance(line, (list, tuple)) or len(line) < 2:
                            continue

                        text = ''
                        if isinstance(line[1], (list, tuple)) and len(line[1]) > 0 and isinstance(line[1][0], str):
                            text = line[1][0]


                        box = line[0]

                        if classname.lower() == 'snickers':
                            if 'sn' in text.lower() or 'ic' in text.lower() or 'ck' in text.lower() or 'ke' in text.lower() or 'er' in text.lower() or 'rs' in text.lower():
                                #print('da el sec:'+ text)
                                besttext2 = text.lower()
                                bestbox2 = box

                        elif classname == 'lion':
                            if 's' not in text.lower():
                                #print('right')
                                if 'li' in text.lower() or 'io' in text.lower() or 'on' in text.lower() or 'no' in text.lower():
                                    #print('da el sec:'+ text)
                                    besttext2 = text.lower()
                                    bestbox2 = box
                        elif classname == 'mars':
                            if 'rs' in text.lower() or 'w' in text.lower():
                                #print('da el sec:'+ text)
                                besttext2 = text.lower()
                                bestbox2 = box
                  if bestbox2 is not None:
                    best_box = np.array(bestbox2, dtype=np.int32)
                    best_box[:, 0] += x1
                    best_box[:, 1] += y1
                    cv2.polylines(frame, [best_box], True, (0, 255, 0), 2)
                    center_x = int(np.mean(best_box[:, 0]))
                    center_y = int(np.mean(best_box[:, 1]))
                    return (center_x, center_y)









    except Exception as e:
        return None

    return None

def compute_angle_from_frame(frame, center_x, center_y, w, h):

    tl_x = int(center_x - w / 2)
    tl_y = int(center_y - h / 2)

    tl_x = max(0, tl_x)
    tl_y = max(0, tl_y)
    br_x = min(tl_x + w, frame.shape[1])
    br_y = min(tl_y + h, frame.shape[0])

    if tl_x >= br_x or tl_y >= br_y:
        return None, None, None

    roi = frame[tl_y:br_y, tl_x:br_x]


    if roi.size == 0:
        return None, None, None

    mask = np.zeros(roi.shape[:2], np.uint8)
    bgdModel = np.zeros((1, 65), np.float64)
    fgdModel = np.zeros((1, 65), np.float64)
    rect = (5, 5, roi.shape[1] - 10, roi.shape[0] - 10)

    try:
        cv2.grabCut(roi, mask, rect, bgdModel, fgdModel, 5, cv2.GC_INIT_WITH_RECT)
    except:
        return None, None, None

    mask2 = np.where((mask == cv2.GC_FGD) | (mask == cv2.GC_PR_FGD), 1, 0).astype('uint8')
    roi_seg = roi * mask2[:, :, np.newaxis]

    gray_seg = cv2.cvtColor(roi_seg, cv2.COLOR_BGR2GRAY)
    _, thresh_seg = cv2.threshold(gray_seg, 1, 255, cv2.THRESH_BINARY)
    contours, _ = cv2.findContours(thresh_seg, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    angle = None
    islongest = None
    if not contours:
        return None, None, None

    largest_contour = max(contours, key=cv2.contourArea)
    rect = cv2.minAreaRect(largest_contour)
    (box_x, box_y), (rect_w, rect_h), angle = rect

    if angle < -45:
            side_length = rect_w
    else:
            side_length = rect_h


    if angle < 0:
            angle += 90
    islongest = (side_length == max(rect_w, rect_h))

    return angle,islongest



parser = argparse.ArgumentParser()
parser.add_argument('--model', help='Path to YOLO model file (example: "runs/detect/train/weights/best.pt")',
                    required=True)
parser.add_argument('--source', help='Image source, can be image file ("test.jpg"), \
                    image folder ("test_dir"), video file ("testvid.mp4"), index of USB camera ("usb0"), or index of Picamera ("picamera0")',
                    required=True)
parser.add_argument('--thresh', help='Minimum confidence threshold for displaying detected objects (example: "0.4")',
                    default=0.5)
parser.add_argument('--resolution', help='Resolution in WxH to display inference results at (example: "640x480"), \
                    otherwise, match source resolution',
                    default=None)
parser.add_argument('--record', help='Record results from video or webcam and save it as "demo1.avi". Must specify --resolution argument to record.',
                    action='store_true')

args = parser.parse_args()


model_path = args.model
img_source = args.source
min_thresh = args.thresh
user_res = args.resolution
record = args.record

if (not os.path.exists(model_path)):
    print('ERROR: Model path is invalid or model was not found. Make sure the model filename was entered correctly.' , file=sys.stderr)
    sys.exit(0)

model = YOLO(model_path, task='detect')
labels = model.names

img_ext_list = ['.jpg','.JPG','.jpeg','.JPEG','.png','.PNG','.bmp','.BMP']
vid_ext_list = ['.avi','.mov','.mp4','.mkv','.wmv']

if os.path.isdir(img_source):
    source_type = 'folder'
elif os.path.isfile(img_source):
    _, ext = os.path.splitext(img_source)
    if ext in img_ext_list:
        source_type = 'image'
    elif ext in vid_ext_list:
        source_type = 'video'
    else:
        print(f'File extension {ext} is not supported.', file=sys.stderr)
        sys.exit(0)
elif 'usb' in img_source:
    source_type = 'usb'
    usb_idx = int(img_source[3:])
elif 'picamera' in img_source:
    source_type = 'picamera'
    picam_idx = int(img_source[8:])
else:
    print(f'Input {img_source} is invalid. Please try again.' , file=sys.stderr)
    sys.exit(0)

resize = False
if user_res:
    resize = True
    resW, resH = int(user_res.split('x')[0]), int(user_res.split('x')[1])

if record:
    if source_type not in ['video','usb']:
        print('Recording only works for video and camera sources. Please try again.', file=sys.stderr)
        sys.exit(0)
    if not user_res:
        print('Please specify resolution to record video at.', file=sys.stderr)
        sys.exit(0)

    record_name = 'demo1.avi'
    record_fps = 30
    recorder = cv2.VideoWriter(record_name, cv2.VideoWriter_fourcc(*'MJPG'), record_fps, (resW,resH))

if source_type == 'image':
    imgs_list = [img_source]
elif source_type == 'folder':
    imgs_list = []
    filelist = glob.glob(img_source + '/*')
    for file in filelist:
        _, file_ext = os.path.splitext(file)
        if file_ext in img_ext_list:
            imgs_list.append(file)
elif source_type == 'video' or source_type == 'usb':

    if source_type == 'video': cap_arg = img_source
    elif source_type == 'usb': cap_arg = usb_idx
    cap = cv2.VideoCapture(cap_arg)

    if user_res:
        ret = cap.set(3, resW)
        ret = cap.set(4, resH)

elif source_type == 'picamera':
    from picamera2 import Picamera2
    cap = Picamera2()
    cap.configure(cap.create_video_configuration(main={"format": 'RGB888', "size": (resW, resH)}, controls={"FrameRate": 50}))
    cap.set_controls({"ExposureTime": 5000})
    cap.start()

bbox_colors = [(164,120,87), (68,148,228), (93,97,209), (178,182,133), (88,159,106),
              (96,202,231), (159,124,168), (169,162,241), (98,118,150), (172,176,184)]

avg_frame_rate = 0
frame_rate_buffer = []
fps_avg_len = 200
img_count = 0
frame_count=0
while True:

    t_start = time.perf_counter()

    if source_type == 'image' or source_type == 'folder':
        if img_count >= len(imgs_list):
            print('All images have been processed. Exiting program.', file=sys.stderr)
            sys.exit(0)
        img_filename = imgs_list[img_count]
        frame = cv2.imread(img_filename)
        img_count = img_count + 1

    elif source_type == 'video':
        ret, frame = cap.read()
        if not ret:
            print('Reached end of the video file. Exiting program.', file=sys.stderr)
            break

    elif source_type == 'usb':
        ret, frame = cap.read()
        if (frame is None) or (not ret):
            print('Unable to read frames from the camera. This indicates the camera is disconnected or not working. Exiting program.', file=sys.stderr)
            break


    elif source_type == 'picamera':
        frame = cap.capture_array()
        if frame is None:
            print('Unable to read frames from the Picamera. This indicates the camera is disconnected or not working. Exiting program.', file=sys.stderr)
            break
        img_count=1
    if resize:
        frame = cv2.resize(frame, (resW, resH))


        results = model(frame, verbose=False)



    detections = results[0].boxes

    object_count = 0
    detection_results = []
    STR = 'Logo recognition : fail'
    for i in range(len(detections)):

        xyxy_tensor = detections[i].xyxy.cpu()
        xyxy = xyxy_tensor.numpy().squeeze()
        xmin, ymin, xmax, ymax = xyxy.astype(int)

        w = xmax - xmin
        h = ymax - ymin
        cx = (xmin + xmax) // 2
        cy = (ymin + ymax) // 2
        classidx = int(detections[i].cls.item())
        classname = labels[classidx]

        angle, is_longest= compute_angle_from_frame(frame, cx, cy, w, h)
        classidx = int(detections[i].cls.item())
        classname = labels[classidx]
        ocr_center = find_text_center_from_roi(frame, (xmin, ymin, xmax, ymax), classname)

        if ocr_center:
            cx, cy = ocr_center
            STR = 'Logo recognition : success'

        dobot_center = transform_point((cx,cy))
        cx,cy = dobot_center

        conf = detections[i].conf.item()

        if conf > 0.5:
            detection_results.append({
                "class": str(classname),
                "center_x": int(round(cx)),
                "center_y": int(round(cy)),
                "rotation": bool(is_longest),
                "STR": str(STR),
                "angle_deg": int(round(angle)) if angle is not None else None
            })
        STR = 'Logo recognition : fail'
    print(json.dumps(detection_results))

    if img_count == 1:
        print("First frame processed. Exiting script.", file=sys.stderr)
        sys.exit(0)


print(f'Average pipeline FPS: {avg_frame_rate:.2f}', file=sys.stderr)
if source_type == 'video' or source_type == 'usb':
    cap.release()
elif source_type == 'picamera':
    cap.stop()
if record: recorder.release()
cv2.destroyAllWindows()


# END