<a href="https://colab.research.google.com/github/AbhiJ2706/mvs_vision_demo/blob/main/mvs_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import cv2
import math
import datetime
import time as timer
from adafruit_servokit import ServoKit as sk
import threading
from threading import Thread
from queue import Queue
from collections import deque

time = datetime.datetime.now()
print("------------------------------------------------------")
hour = time.hour
print(hour)
hour *= math.pi
mod = math.sin((1/24)*hour)
print(mod)
realUpper = mod + 1
print(realUpper)

default_yaw_angle = 90
default_throttle_angle = 0


class processingThread(threading.Thread):

    def __init__(self, queue, resultQueue, args=(), kwargs=None):
        threading.Thread.__init__(self, args=(), kwargs=None)
        self.queue = queue 
        self.resultQueue = resultQueue
        self.avg = [0 for i in range(0, 8)]
        self.lower_color = np.array([0, 69, 113])
        self.upper_color = np.array([175, 226, 255])
        self.diagonalView = math.radians(60.0)
        self.horizontalAspect = 16
        self.verticalAspect = 9
        self.diagonalAspect = math.hypot(self.horizontalAspect, self.verticalAspect)
        self.horizontalView = math.atan(math.tan(self.diagonalView / 2) * (self.horizontalAspect / self.diagonalAspect)) * 2
        self.verticalView = math.atan(math.tan(self.diagonalView / 2) * (self.verticalAspect / self.diagonalAspect)) * 2
        self.H_FOCAL_LENGTH = 640 / (2 * math.tan((self.horizontalView / 2)))
        self.V_FOCAL_LENGTH = 480 / (2 * math.tan((self.verticalView / 2)))
        self.processing = False

    def run(self):
        while 1:
            startTime = timer.time()
            val = self.queue.get()
            img = self.threshold_video(val)
            frame, distance, yaw = self.find_targets(val, img)
            self.resultQueue.put((frame, distance, yaw, (timer.time() - startTime)))
            

    def threshold_range(self, im, lo, hi):
        unused, t1 = cv2.threshold(im, lo, 255, type=cv2.THRESH_BINARY)
        unused, t2 = cv2.threshold(im, hi, 255, type=cv2.THRESH_BINARY_INV)
        return cv2.bitwise_and(t1, t2)

    def threshold_video(self, blur):
        hsv = cv2.cvtColor(blur, cv2.COLOR_BGR2HSV)
        h, s, v = cv2.split(hsv)
        h = self.threshold_range(h, self.lower_color[0], self.upper_color[0])
        s = self.threshold_range(s, self.lower_color[1], self.upper_color[1])
        v = self.threshold_range(v, self.lower_color[2], self.upper_color[2])
        combined_mask = cv2.bitwise_and(h, cv2.bitwise_and(s, v))
        return combined_mask

    def checkContours(self, cntW, cntH, image_width):
        if cntH != 0:
            return cntW > (image_width / 7) and round(cntW/cntH) == 1
        else:
            return False

    def calculateDist(self, cntHeight, image_height):
        for cnt in  self.avg:
            if cnt == 0:
                cnt = cntHeight

        del  self.avg[len( self.avg) - 1]
        self.avg.insert(0, cntHeight)
        PIX_HEIGHT = 0
        for cnt in  self.avg:
            PIX_HEIGHT += cnt

        PIX_HEIGHT = PIX_HEIGHT/len( self.avg)
        TARGET_HEIGHT = 0.45
        VIEWANGLE = math.atan((TARGET_HEIGHT * image_height) / (2 * 90 * 4))
        distance = ((TARGET_HEIGHT * image_height) / (2 * PIX_HEIGHT * math.tan(VIEWANGLE)))
        return distance

    def calculateYaw(self, pixelX, centerX):
        yaw = math.degrees(math.atan((pixelX - centerX) /  self.H_FOCAL_LENGTH))
        return round(yaw*100) / 100

    def find_targets(self, frame, mask):
        _, contours, _ = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_TC89_KCOS)
        contours = sorted(contours, key=lambda x: cv2.contourArea(x), reverse=True)
        screenHeight, screenWidth, _ = frame.shape
        centerX = (screenWidth / 2) - .5
        centerY = (screenHeight / 2) - .5
        image = frame.copy()
        dist = 0
        angle = 0
        if len(contours) != 0:
            for cnt in contours:
                pts, dim, a = cv2.minAreaRect(cnt)
                x = pts[0]
                y = pts[1]
                cntHeight = 0
                cntWidth = 0
                if dim[0] < dim[1]:
                    cntHeight = dim[0]
                    cntWidth = dim[1]
                else:
                    cntHeight = dim[1]
                    cntWidth = dim[0]
                if self.checkContours(cntWidth, cntHeight, screenWidth):
                    M = cv2.moments(cnt)
                    if M["m00"] != 0:
                        cx = int(M["m10"] / M["m00"])
                        cy = int(M["m01"] / M["m00"])
                        theCX = cx
                        theCY = cy
                    else:
                        cx, cy = 0, 0
                    distance = self.calculateDist(cntHeight, screenHeight)
                    dist = distance
                    yaw = self.calculateYaw(centerX, cx)
                    angle = yaw
                    rect = cv2.minAreaRect(cnt)
                    box = cv2.boxPoints(rect)
                    box = np.int0(box)
                    cv2.drawContours(image, [box], 0, (23, 184, 80), 3)
                    cv2.line(image, (cx, screenHeight), (cx, 0), (255, 255, 255))
                    cv2.circle(image, (cx, cy), 6, (255, 255, 255))
                    cv2.drawContours(image, [cnt], 0, (23, 184, 80), 1)
                    cv2.putText(image, "Distance: " + str(distance), (40, 180), cv2.FONT_HERSHEY_COMPLEX, .5,(0, 0, 0))
                    cv2.putText(image, "Yaw: " + str(yaw), (40, 140), cv2.FONT_HERSHEY_COMPLEX, .5,(0, 0, 0))
        return image, dist, angle

'''def motor_logic(distance, yaw):
    try:
        kit.servo[8].angle = 90 + yaw*10 #steering
        print(kit.servo[8].angle)
        kit.servo[9].angle = default_throttle_angle if distance < 1.5 else 180
        print(kit.servo[9].angle)
    except:
        print("error")'''


def adjust_gamma(image, gamma):
        invGamma = 1.0 / gamma
        table = np.array([((i / 255.0) ** invGamma) * 255
                for i in np.arange(0, 256)]).astype("uint8")
        return cv2.LUT(image, table)
        


cap = cv2.VideoCapture(0)
'''kit = sk(channels=16)

kit.servo[8].angle = default_yaw_angle
kit.servo[9].angle = default_throttle_angle'''

input_queues = [Queue(), Queue(), Queue()]
output_queues = [Queue(), Queue(), Queue()]

cameraThreads = [processingThread(input_queues[0], output_queues[0]), processingThread(input_queues[1], output_queues[1]), processingThread(input_queues[2], output_queues[2])]

for t in cameraThreads:
    t.start()

rotation = 0
counterRotation = 0

startGate = False

while True:
    ret, frame = cap.read()
    if ret:
        frame = adjust_gamma(frame, realUpper)
        img = frame.copy()
        cameraThreads[rotation].queue.put(img)
        prevRotation = None
        if rotation == 2:
            prevRotation = 0
            if not startGate:
                startGate  = True
        elif rotation == 1:
            prevRotation = 2
        elif rotation == 0:
            prevRotation = 1

        if startGate:
            while not cameraThreads[prevRotation].resultQueue.qsize():
                print("waiting for queue", prevRotation)
                if cameraThreads[prevRotation].resultQueue.qsize():
                    break
        q_len = cameraThreads[prevRotation].resultQueue.qsize()
        if q_len:
            img = cameraThreads[prevRotation].resultQueue.get()
            #print("read output frame from", prevRotation, ", time taken is", img[3])
            cv2.imshow("img", img[0])
        else:
            pass
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
        rotation = rotation + 1 if rotation < 2 else 0
    else:
        print("empty frame- pass", timer.time())
        pass

cap.release()
cv2.destroyAllWindows()