#### ESP32 Stereo Camera Code

In [1]:
import copy
import math

import requests

import numpy as np
import cv2
import matplotlib.pyplot as plt
import scipy
import scipy.optimize
import torch
import torchvision
import torchvision.transforms.functional as tvtf

#this is the file with auxillary functions. stereo_image_utils.py. Should be in the same
#directory as the notebook
import stereo_image_utils
from stereo_image_utils import get_detections, get_cost, draw_detections, annotate_class2 
from stereo_image_utils import get_horiz_dist_corner_tl, get_horiz_dist_corner_br, get_dist_to_centre_tl, get_dist_to_centre_br
from ultralytics import YOLO

In [2]:
URL_left = "http://192.168.87.147"
URL_right = "http://192.168.87.178"
AWB = True
cnt = 1

#focal length. Pre-calibrated in stereo_image_v6 notebook
fl = -2.4910721700855447
tantheta = 0.4442359802989689




In [3]:
COLOURS = [
    tuple(int(colour_hex.strip('#')[i:i+2], 16) for i in (0, 2, 4))
    for colour_hex in plt.rcParams['axes.prop_cycle'].by_key()['color']
]

In [4]:
model = YOLO('best.pt') 

In [5]:
#capture the images
cap_left = cv2.VideoCapture(URL_left + ":81/stream")
cap_right = cv2.VideoCapture(URL_right + ":81/stream")

In [6]:
#functions for the command handler

def set_resolution(url: str, index: int=1, verbose: bool=False):
    try:
        if verbose:
            resolutions = "10: UXGA(1600x1200)\n9: SXGA(1280x1024)\n8: XGA(1024x768)\n7: SVGA(800x600)\n6: VGA(640x480)\n5: CIF(400x296)\n4: QVGA(320x240)\n3: HQVGA(240x176)\n0: QQVGA(160x120)"
            print("available resolutions\n{}".format(resolutions))

        if index in [10, 9, 8, 7, 6, 5, 4, 3, 0]:
            requests.get(url + "/control?var=framesize&val={}".format(index))
        else:
            print("Wrong index")
    except:
        print("SET_RESOLUTION: something went wrong")

def set_quality(url: str, value: int=1, verbose: bool=False):
    try:
        if value >= 10 and value <=63:
            requests.get(url + "/control?var=quality&val={}".format(value))
    except:
        print("SET_QUALITY: something went wrong")

def set_awb(url: str, awb: int=1):
    try:
        awb = not awb
        requests.get(url + "/control?var=awb&val={}".format(1 if awb else 0))
    except:
        print("SET_QUALITY: something went wrong")
    return awb


#26 37 38

In [7]:
import math
import numpy as np
def calculate_real_area(dis,pixel_area):
    fov_degrees = 62  
    pixel_size_um = 2.2  
    camera_distance_to_ground_cm = dis
    image_width_pixels = 1600 

    # Convert pixel size to cm
    pixel_size_cm = pixel_size_um * 1e-4

    # Calculate the view width at the pothole's distance
    fov_radians = math.radians(fov_degrees)
    view_width_cm = 2 * camera_distance_to_ground_cm * math.tan(fov_radians / 2)

    # Calculate ground coverage per pixel (GCP)
    gcp_cm_per_pixel = view_width_cm / image_width_pixels

    real_area_cm2 = pixel_area * (gcp_cm_per_pixel ** 2)
    return real_area_cm2

In [8]:
import requests
WRITE_API_KEY = '1768JVHS6OOX0ET1'
base_url = 'https://api.thingspeak.com/update'
count=0

In [None]:
if __name__ == '__main__':
    set_resolution(URL_left, index=7)
    set_resolution(URL_right, index=7)

    while True:
        if cap_left.isOpened():
            ret_l, frame_l = cap_left.read()
            if ret_l:
                cv2.imshow("left_eye", frame_l) 
            else:
                cap_left.release()
                cap_left = cv2.VideoCapture(URL_left + ":81/stream")

        if cap_right.isOpened():
            ret_r, frame_r = cap_right.read()

            if ret_r:
                cv2.imshow("right_eye", frame_r) 
            else:
                cap_right.release()
                cap_right = cv2.VideoCapture(URL_right + ":81/stream")
        
        if ret_r and ret_l :
            #do stereo matching
            
            imgs = [frame_l,frame_r]
            #cv2.imwrite('frame_l.jpg',frame_l)
            #cv2.imwrite('frame_r.jpg',frame_r)
            
            if cnt == 0:
                cnt = 1
                
                det= get_detections(model,imgs)
                if not det:
                    print('no detection')
#                 if (len(det[1])==len(det[0])):
#                     det[1] = det[1][:-1]
                else:
                    sz1 = frame_r.shape[1]
                    centre = sz1/2
                    print(det)
                    cost = get_cost(det, lbls = 'pothole',sz1 = centre)
                    tracks = scipy.optimize.linear_sum_assignment(cost)
    
                    dists_tl =  get_horiz_dist_corner_tl(det)
                    dists_br =  get_horiz_dist_corner_br(det)
    
                    final_dists = []
                    dctl = get_dist_to_centre_tl(det[0],cntr = centre)
                    dcbr = get_dist_to_centre_br(det[0], cntr = centre)
    
                    for i, j in zip(*tracks):
                        if dctl[i] < dcbr[i]:
                            final_dists.append(dists_tl[i][j])
    
                        else:
                            final_dists.append(dists_br[i][j])
                    
                    #final distances as list
                    fd = [i for i in final_dists]
                    #find distance away
                    dists_away = (9.1/2)*sz1*(1/tantheta)/np.array((fd))+fl
                    for i in range(len(dists_away)):
                        print(f'pothole is {dists_away[i]:.1f}cm away')
                    t1 = [list(tracks[1]), list(tracks[0])]
                    frames_ret=[]
                    for i, imgi in enumerate(imgs):
                        results = model(imgi)
                        img=results[0].plot()
                        frames_ret.append(img)
                    cv2.imshow("left_eye", cv2.cvtColor(frames_ret[0],cv2.COLOR_RGB2BGR))
                    cv2.imshow("right_eye", cv2.cvtColor(frames_ret[1],cv2.COLOR_RGB2BGR))
                    count=count+1
                    while True:
                        key1 = cv2.waitKey(1)
                        if key1 == ord('p'):
                            cv2.imwrite(f'frame_l{count}.jpg',frame_l)
                            cv2.imwrite(f'frame_r{count}.jpg',frame_r)
                            break
    #                 key1 = cv2.waitKey(1)
        
        key = cv2.waitKey(1)
        if key == ord('s'):
            mask = (results[0].masks.data[0].cpu().numpy() * 255).astype('uint8')
            pothole_mask = mask> 128  
            pothole_pixel_area = np.sum(pothole_mask)
            pothole_area_cm2 = calculate_real_area(dists_away,pothole_pixel_area)
            '''data = {
            'api_key': WRITE_API_KEY,
            'field3': dists_away,
            'field4': pothole_area_cm2,}
            response = requests.get(base_url, params=data)
            if response.status_code == 200:
                print('Data successfully written to ThingSpeak.')'''
            print(f'distance:{dists_away}')
            print(f'area:{pothole_area_cm2}')

        if key == ord('r'):
            idx = int(input("Select resolution index: "))
            set_resolution(URL_left, index=idx, verbose=True)
            set_resolution(URL_right, index=idx, verbose=True)

        elif key == ord('q'):
            val = int(input("Set quality (10 - 63): "))
            set_quality(URL_left, value=val)
            set_quality(URL_right, value=val)

        elif key == ord('a'):
            AWB = set_awb(URL_left, AWB)
            AWB = set_awb(URL_right, AWB)
            
        elif key == ord('p'): #3d
            cnt = 0

        elif key == 27:
            break

    cv2.destroyAllWindows()
    cap_left.release()
    cap_right.release()

[(234, 169, 299, 200), (70, 148, 135, 182)]
pothole is 27.5cm away
distance:[     27.486]
area:[       1.23]
