In [None]:
import numpy as np
import torch
import matplotlib.pyplot as plt
import cv2
import matplotlib.cm as cm
from IPython. display import clear_output
import time
from mycolorpy import colorlist as mcp
import PIL
from Superstuff.matching import Matching

In [None]:
# predict
pred = matcher({'image0': grayL_t, 'image1': grayR_t})

# get sections from output (pred)
kp1 = pred['keypoints0'][0].cpu().numpy()
kp2 = pred['keypoints1'][0].cpu().numpy()
matches = pred['matches0'][0].cpu().numpy()
confidence = pred['matching_scores0'][0].detach().cpu().numpy()

# keep only the valid matches and make points from found kp
valid = matches > -1
pts1 = kp1[valid]
pts2 = kp2[matches[valid]]

# plot kp and matches betwwen 2 images
color = cm.jet(confidence[valid])
text = ['SuperGlue',
    'Keypoints: {}:{}'.format(len(kp1), len(kp2)),
    'Matches: {}'.format(len(pts1))]

out = make_matching_plot_fast(
            grayL, grayR, kp1, kp2, pts1, pts2, color, text,
            path=None, show_keypoints=True)

## Functions:

In [None]:
def make_matching_plot_fast(image0, image1, kpts0, kpts1, mkpts0,
                            mkpts1, color, text, path=None,
                            show_keypoints=False, margin=10,
                            opencv_display=False, opencv_title='',
                            small_text=[]):
    H0, W0 = image0.shape
    H1, W1 = image1.shape
    H, W = max(H0, H1), W0 + W1 + margin

    out = 255*np.ones((H, W), np.uint8)
    out[:H0, :W0] = image0
    out[:H1, W0+margin:] = image1
    out = np.stack([out]*3, -1)

    if show_keypoints:
        kpts0, kpts1 = np.round(kpts0).astype(int), np.round(kpts1).astype(int)
        white = (255, 255, 255)
        black = (0, 0, 0)
        for x, y in kpts0:
            cv2.circle(out, (x, y), 2, black, -1, lineType=cv2.LINE_AA)
            cv2.circle(out, (x, y), 1, white, -1, lineType=cv2.LINE_AA)
        for x, y in kpts1:
            cv2.circle(out, (x + margin + W0, y), 2, black, -1,
                       lineType=cv2.LINE_AA)
            cv2.circle(out, (x + margin + W0, y), 1, white, -1,
                       lineType=cv2.LINE_AA)

    mkpts0, mkpts1 = np.round(mkpts0).astype(int), np.round(mkpts1).astype(int)
    color = (np.array(color[:, :3])*255).astype(int)[:, ::-1]
    for (x0, y0), (x1, y1), c in zip(mkpts0, mkpts1, color):
        c = c.tolist()
        cv2.line(out, (x0, y0), (x1 + margin + W0, y1),
                 color=c, thickness=1, lineType=cv2.LINE_AA)
        # display line end-points as circles
        cv2.circle(out, (x0, y0), 2, c, -1, lineType=cv2.LINE_AA)
        cv2.circle(out, (x1 + margin + W0, y1), 2, c, -1,
                   lineType=cv2.LINE_AA)

    # Scale factor for consistent visualization across scales.
    sc = min(H / 640., 2.0)

    # Big text.
    Ht = int(30 * sc)  # text height
    txt_color_fg = (255, 255, 255)
    txt_color_bg = (0, 0, 0)
    for i, t in enumerate(text):
        cv2.putText(out, t, (int(8*sc), Ht*(i+1)), cv2.FONT_HERSHEY_DUPLEX,
                    1.0*sc, txt_color_bg, 2, cv2.LINE_AA)
        cv2.putText(out, t, (int(8*sc), Ht*(i+1)), cv2.FONT_HERSHEY_DUPLEX,
                    1.0*sc, txt_color_fg, 1, cv2.LINE_AA)

    # Small text.
    Ht = int(18 * sc)  # text height
    for i, t in enumerate(reversed(small_text)):
        cv2.putText(out, t, (int(8*sc), int(H-Ht*(i+.6))), cv2.FONT_HERSHEY_DUPLEX,
                    0.5*sc, txt_color_bg, 2, cv2.LINE_AA)
        cv2.putText(out, t, (int(8*sc), int(H-Ht*(i+.6))), cv2.FONT_HERSHEY_DUPLEX,
                    0.5*sc, txt_color_fg, 1, cv2.LINE_AA)

    if path is not None:
        cv2.imwrite(str(path), out)

    if opencv_display:
        cv2.imshow(opencv_title, out)
        cv2.waitKey(1)
    
    fig, ax = plt.subplots(figsize=(20,10))
    ax.imshow(out)
    ax.set_title(opencv_title)
    ax.axis("off")
    plt.show()

    return out

In [None]:
def import_stereo_video(left_name, right_name, start_loss=20, end_loss=20, display=True):
    # load in video file
    left_vid = cv2.VideoCapture(left_name)
    right_vid = cv2.VideoCapture(right_name)
    
    # print some specs
    fps = right_vid.get(5)
    frame_count = right_vid.get(7)
    print("Loaded in video:\nFPS: {} \tFrame Count: {}".format(int(fps), int(frame_count)-end_loss-start_loss))
    
    # convert to arrays
    left_array = []
    ret = True
    while ret == True:
        ret, frame = left_vid.read()
        if frame is not None:
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            left_array.append(frame)
    left_array = np.array(left_array)
    left_array = left_array[start_loss:-end_loss]
    print("Left Array Loaded  | Size: {}".format(left_array.shape))

    right_array = []
    ret = True
    while ret == True:
        ret, frame = right_vid.read()
        if frame is not None:
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            right_array.append(frame)
    right_array = np.array(right_array)
    right_array = right_array[start_loss:-end_loss]
    print("Right Array Loaded | Size: {}".format(left_array.shape))
    
    if display:
        plt.imshow(left_array[0])
        plt.title("First Frame:")
        plt.axis("off")
        plt.show()
    
    return left_array, right_array

In [None]:
def find_points(left_array, right_array, display=True):
    # convert to tensor
    # set first frame and first points for left and right
    l_frame1 = left_array[0]
    l_frame1_gray = cv2.cvtColor(l_frame1, cv2.COLOR_RGB2GRAY)
    l_frame1_gray_t = torch.Tensor(l_frame1_gray).reshape((1,1,480,640))

    r_frame1 = right_array[0]
    r_frame1_gray = cv2.cvtColor(r_frame1, cv2.COLOR_RGB2GRAY)
    r_frame1_gray_t = torch.Tensor(r_frame1_gray).reshape((1,1,480,640))
    
    # use superpoint and superglue to find some points:
    config = {
            'superpoint': {
                'nms_radius': 2,
                'keypoint_threshold': 0,  # 1e-30
                'max_keypoints': -1
            },
            'superglue': {
                'weights': 'indoor',
                'sinkhorn_iterations': 20,
                'match_threshold': 0.01, # was 0.2
            }
        }

    # initializing matcher
    matcher = Matching(config).eval()#.to(device)
    
    # finding matching points in first frame for l and r
    pred = matcher({'image0': l_frame1_gray_t, 'image1': r_frame1_gray_t})

    # get sections from output (pred)
    l_kp = pred['keypoints0'][0].cpu().numpy()
    r_kp = pred['keypoints1'][0].cpu().numpy()
    matches = pred['matches0'][0].cpu().numpy()
    confidence = pred['matching_scores0'][0].detach().cpu().numpy()

    # keep only the valid matches and make points from found kp
    valid = matches > -1
    l_pts = l_kp[valid]
    r_pts = r_kp[matches[valid]]

    # plot kp and matches betwwen 2 images
    color = cm.jet(confidence[valid])
    text = ['SuperGlue',
        'Keypoints: {}:{}'.format(len(l_kp), len(r_kp)),
        'Matches: {}'.format(len(l_pts))]
    
    # view matches found
    if display:
        out = make_matching_plot_fast(
                    l_frame1_gray, r_frame1_gray, l_kp, r_kp, l_pts, r_pts, color, text,
                    path=None, show_keypoints=True)
    
    return l_frame1, r_frame1, l_pts, r_pts, l_frame1_gray, r_frame1_gray

In [None]:
def find_object_point(n, nadd, l_pts, r_pts, l_frame1, r_frame1):
    # Create some random colors (100 of them)
    color = np.random.randint(0, 255, (100, 3))

    # copies for drawing on
    left_c = l_frame1.copy()
    right_c = r_frame1.copy()

    # left drawing
    for i in l_pts[n:n+nadd]:
        point = (i[0].astype(int), i[1].astype(int))
        left_c = cv2.circle(left_c, point, 2, (255,0,0), -1)

    # right drawing
    for i in r_pts[n:n+nadd]:
        point = (i[0].astype(int), i[1].astype(int))
        right_c = cv2.circle(right_c, point, 2, (255,0,0), -1)

    # plot left and right
    fig, ax = plt.subplots(figsize=(20,20))
    ax.imshow(np.hstack([left_c, right_c]))
    ax.set_title('Left -> Right')
    ax.axis("off")
    plt.show()
    
    return n, nadd

In [None]:
# calculate optical flow using Lucas-Kadane algo from opencv
def KLT(left_array, right_array, l_frame1, r_frame1, l_frame1_gray, r_frame1_gray, l_pts, r_pts, n, nadd=1, display=True):
    # Create some random colors (100 of them)
    color = np.random.randint(0, 255, (100, 3))
    
    # Parameters for lucas kanade optical flow
    lk_params = dict( winSize  = (15, 15),
                      maxLevel = 2,
                      criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))
    
    # copying arrays for drawing purposes:
    left_array_c = left_array.copy()
    right_array_c = right_array.copy()
    
    # tracking the point: 
    # setting initial point
    l_points1 = l_pts[n:n+nadd].reshape(-1, 1, 2)
    r_points1 = r_pts[n:n+nadd].reshape(-1, 1, 2)

    # mask for drawing purposes
    l_mask = np.zeros_like(l_frame1)
    r_mask = np.zeros_like(r_frame1)

    l_saved_points = [l_points1]
    r_saved_points = [r_points1]

    l_saved_imgs = [l_frame1]
    r_saved_imgs = [r_frame1]
    l_mask_imgs = [l_mask]
    r_mask_imgs = [r_mask]

    # create a loop to iteravily go through each frame: (starting from the second frame)
    for l_frame2, r_frame2 in zip(left_array_c[1:], right_array_c[1:]):
        l_frame2_gray = cv2.cvtColor(l_frame2, cv2.COLOR_RGB2GRAY)
        r_frame2_gray = cv2.cvtColor(r_frame2, cv2.COLOR_RGB2GRAY)

        l_points2, l_status, l_error = cv2.calcOpticalFlowPyrLK(l_frame1_gray, l_frame2_gray, l_points1, None, **lk_params)
        r_points2, r_status, r_error = cv2.calcOpticalFlowPyrLK(r_frame1_gray, r_frame2_gray, r_points1, None, **lk_params)

        # Select good points (if status = 1 therefore found point in next frame)
        if l_points2 is not None:
            l_good_new = l_points2[l_status==1]
            l_good_old = l_points1[l_status==1]
        if r_points2 is not None:
            r_good_new = r_points2[r_status==1]
            r_good_old = r_points1[r_status==1]

        # draw the tracks
        for i, (new, old) in enumerate(zip(l_good_new, l_good_old)):
            a, b = new.ravel()
            c, d = old.ravel()
            l_mask = cv2.line(l_mask, (int(a), int(b)), (int(c), int(d)), color[i].tolist(), 2)
            l_frame2 = cv2.circle(l_frame2, (int(a), int(b)), 2, color[i].tolist(), -1)
        l_img = cv2.add(l_frame2, l_mask)
        for i, (new, old) in enumerate(zip(r_good_new, r_good_old)):
            a, b = new.ravel()
            c, d = old.ravel()
            r_mask = cv2.line(r_mask, (int(a), int(b)), (int(c), int(d)), color[i].tolist(), 2)
            r_frame2 = cv2.circle(r_frame2, (int(a), int(b)), 2, color[i].tolist(), -1)
        r_img = cv2.add(r_frame2, r_mask)

        # Now update the previous frame and previous points
        l_frame1_gray = l_frame2_gray.copy()
        l_points1 = l_good_new.reshape(-1, 1, 2)
        r_frame1_gray = r_frame2_gray.copy()
        r_points1 = r_good_new.reshape(-1, 1, 2)

        # save points
        l_saved_points.append(l_points1)
        r_saved_points.append(r_points1)

        # save imgs
        l_saved_imgs.append(l_img)
        r_saved_imgs.append(r_img)

        # save masks
        l_mask_imgs.append(l_mask)
        r_mask_imgs.append(r_mask)
    
    if display:
        fig, ax = plt.subplots(1,2,figsize=(20,20))
        ax[0].imshow(l_mask)
        ax[0].axis("off")
        ax[1].imshow(r_mask)
        ax[1].axis("off")
        plt.show()
        
    return l_saved_points, r_saved_points, l_saved_imgs, r_saved_imgs, l_mask_imgs, r_mask_imgs

In [None]:
def data_3D(l_saved_points, r_saved_points, display="base", disp_limit=1):
    # camera baselines
    baseline = 49.9465
    focal_length = 641.4094545
    
    # finding the disparity of each point (and saving x and y points for left image)
    disparity = []
    xdata = []
    ydata = []

    for i in range(len(l_saved_points)):
        diff = l_saved_points[i][0][0] - r_saved_points[i][0][0]
        x = l_saved_points[i][0][0][0]
        y = l_saved_points[i][0][0][1]
        xdata.append(x)
        ydata.append(y)
        disparity.append(diff[0])
    
    # depth (in cm) (if the disparity is closer than 8 pixels we are 2 close to the camera and depth becomes incredibly large so just cap it at 3m)
    depth = []
    for i in disparity:
        i = np.sqrt(i**2)
        if i > disp_limit:
            d = (baseline*focal_length)/i
        else:
            d = 3000
        depth.append(d)
    zdata = depth
    
    # depth on left image path
    ax = plt.axes(projection='3d')
    ax.scatter3D(xdata, ydata, zdata, c=zdata, cmap='jet')
    ax.set_xlabel("x (pixels)")
    ax.set_ylabel("y (pixels)")
    ax.set_zlabel("depth (cm)")

    if display == "base":
        pass
    elif display == "y":
        ax.view_init(0, 0) # y-depth
    elif display == "x":
        ax.view_init(0, 90) # x-depth
    elif display == "xy":
        ax.view_init(90, 90) # y-x
        
    plt.show()

    return xdata, ydata, zdata

In [None]:
# saving video with point tracking with depth colour
def save_depth_tracking_video(left_array, xdata, ydata, zdata, save_name):
    # setting colours based on the depth data
    hex_colors = mcp.gen_color_normalized(cmap="jet", data_arr=zdata)
    rgb_colors = []
    for c in hex_colors:
        rgb = PIL.ImageColor.getcolor(c, "RGB")
        rgb_colors.append(rgb)

    # copying array for drawing purposes
    left_array_c = left_array.copy()

    # image array
    limages = []

    # creating a mask
    maskl = np.zeros_like(left_array_c[0])
    imgl = left_array_c[0]

    # draw on each frame and save img
    for i in range(len(left_array_c)):
        maskl = cv2.circle(maskl, (int(xdata[i]), int(ydata[i])), 2, rgb_colors[i], -1)
        imgl = cv2.add(left_array_c[i], maskl)
        limages.append(imgl)

    # save images to npy file 
    np.save("{}.npy".format(save_name), limages)

## Main:

In [None]:
left_name = 'Stereo_Videos/everywhere_l.mp4'
right_name = 'Stereo_Videos/everywhere_r.mp4'
left_array, right_array = import_stereo_video(left_name, right_name, start_loss=20, end_loss=20, display=True)

In [None]:
l_frame1, r_frame1, l_pts, r_pts, l_frame1_gray, r_frame1_gray = find_points(left_array, right_array, display=True)

In [None]:
n, nadd = find_object_point(491, 1, l_pts, r_pts, l_frame1, r_frame1)

In [None]:
l_saved_points, r_saved_points, l_saved_imgs, r_saved_imgs, l_mask_imgs, r_mask_imgs = KLT(left_array, right_array, l_frame1, r_frame1, l_frame1_gray, r_frame1_gray, l_pts, r_pts, n, nadd, display=True)

In [None]:
xdata, ydata, zdata = data_3D(l_saved_points, r_saved_points, display="base", disp_limit=1)

In [None]:
# view same as mask but with depth cmap
plt.scatter(xdata, ydata, c=zdata, cmap='jet')
plt.colorbar()
plt.show()

In [None]:
save_depth_tracking_video(left_array, xdata, ydata, zdata, save_name="everywhere-tracked")