In [2]:
import cv2
import numpy as np
from importlib import reload
import matplotlib.pyplot as plt
from IPython.display import Video

import torch
from torchvision import transforms
from torchvision.io import read_video, read_video_timestamps

import kornia as K
import kornia.feature as KF
from kornia_moons.feature import *
from kornia.contrib import ImageStitcher
from kornia.geometry.transform import warp_perspective, get_perspective_transform

import utils
def load_torch_image(fname):
    img = K.image_to_tensor(cv2.imread(fname), False).float() /255.
    img = K.color.bgr_to_rgb(img)
    return img

In [3]:
fname = "../deep-stabilization/dvs/video/s_114_outdoor_running_trail_daytime/ControlCam_20200930_104820.mp4"

In [4]:
video_frames, audio_frames, meta = read_video(fname, end_pts=100, pts_unit="sec")
print(meta)
print("video size: ", video_frames.shape)
print("audio size: ", audio_frames.shape)

{'video_fps': 30.020507836629136, 'audio_fps': 48000}
video size:  torch.Size([486, 1080, 1920, 3])
audio size:  torch.Size([2, 775168])


In [None]:
# utils.show_frames(video_frames[:100:10], 2, 5, (30,16))

In [None]:
img1 = video_frames[0:1].permute(0,3,1,2).float() / 255
img2 = video_frames[100:101].permute(0,3,1,2).float() / 255

print(img1.shape)

feature1 = transforms.CenterCrop((270*3,480*3))(img1)
feature2 = transforms.CenterCrop((270*3,480*3))(img2)

feature1 = torch.cat(transforms.FiveCrop(256)(feature1))
feature2 = torch.cat(transforms.FiveCrop(256)(feature2))

print(feature1.shape)

# K.color.rgb_to_grayscale(img1).shape
utils.show_frame(feature1[3].permute(1,2,0))

In [None]:
matcher2 = KF.LocalFeatureMatcher(
    KF.SIFTFeature(2000, device="cuda"),
    KF.DescriptorMatcher('smnn', 0.9)
    )

input_dict = {"image0": K.color.rgb_to_grayscale(feature1).cuda(), # LofTR works on grayscale images only 
              "image1": K.color.rgb_to_grayscale(feature2).cuda()}

with torch.no_grad():
    correspondences = matcher2(input_dict)
    del input_dict["image0"], input_dict["image1"]
    
for k,v in correspondences.items():
    print (k)

print(len(correspondences["keypoints0"]))

In [None]:
# for x in range(5):
#     idx = torch.topk(correspondences["confidence"][correspondences["batch_indexes"]==x], 100).indices
#     print((correspondences["keypoints0"][correspondences["batch_indexes"]==x][idx] - correspondences["keypoints1"][correspondences["batch_indexes"]==x][idx]).mean(dim=0))
# print("\n\n\n")
# for x in range(5):
#     idx = torch.topk(correspondences["confidence"][correspondences["batch_indexes"]==x], 150).indices
#     print((correspondences["keypoints0"][correspondences["batch_indexes"]==x][idx] - correspondences["keypoints1"][correspondences["batch_indexes"]==x][idx]).mean(dim=0))
# print("\n\n\n")
tmp = []
for x in range(5):
    tmp.append((correspondences["keypoints0"][correspondences["batch_indexes"]==x] - correspondences["keypoints1"][correspondences["batch_indexes"]==x]).median(dim=0)[0])
    print(tmp[-1])

In [None]:
src = torch.Tensor([
    [135*1+128, 240*1+128],# 左上
    [135*1+128, 240*7-128],# 右上
    [135*7-128, 240*1+128],# 左下
    [135*7-128, 240*7-128] # 右下
]).cuda()

dst = torch.vstack(tmp[:4]) + src

In [None]:
img1[0].permute(1,2,0).shape

In [None]:
res = cv2.warpAffine(img1[0].permute(1,2,0).numpy(), H[:2], (1080, 1920))

In [None]:
utils.show_frame(torch.from_numpy(res))

In [None]:
H, inliers = cv2.findFundamentalMat(mkpts0, mkpts1, cv2.USAC_MAGSAC, 0.5, 0.999, 100000)

In [None]:
print(src)
print(dst)
b = get_perspective_transform(src.unsqueeze(0), dst.unsqueeze(0))

out = warp_perspective(img1.cuda(), b, (1080,1920)).cpu()
outt = torch.where(out == 0.0, img2, out)
utils.show_frame(outt[0].permute(1,2,0))

In [None]:
out = warp_perspective(img1.cuda(), torch.from_numpy(H).cuda().unsqueeze(0).float(), (1080,1920)).cpu()
outtt = torch.where(out == 0.0, img2, out)
utils.show_frame(outtt[0].permute(1,2,0))

In [None]:
th = torch.quantile(correspondences["confidence"], 0.0)
idx = correspondences["confidence"] > th
print(idx.sum())

In [None]:
mkpts0 = correspondences['keypoints0'][idx].cpu().numpy()
mkpts1 = correspondences['keypoints1'][idx].cpu().numpy()
H, inliers = cv2.findFundamentalMat(mkpts0, mkpts1, cv2.USAC_MAGSAC, 0.5, 0.999, 100000)
inliers = inliers > 0

In [None]:
draw_LAF_matches(
    KF.laf_from_center_scale_ori(torch.from_numpy(mkpts0).view(1,-1, 2),
                                torch.ones(mkpts0.shape[0]).view(1,-1, 1, 1),
                                torch.ones(mkpts0.shape[0]).view(1,-1, 1)),

    KF.laf_from_center_scale_ori(torch.from_numpy(mkpts1).view(1,-1, 2),
                                torch.ones(mkpts1.shape[0]).view(1,-1, 1, 1),
                                torch.ones(mkpts1.shape[0]).view(1,-1, 1)),
    torch.arange(mkpts0.shape[0]).view(-1,1).repeat(1,2),
    K.tensor_to_image(img1),
    K.tensor_to_image(img2),
    inliers,
    draw_dict={'inlier_color': (0.2, 1, 0.2),
               'tentative_color': None, 
               'feature_color': (0.2, 0.5, 1), 'vertical': False})

In [None]:
from kornia.geometry.transform import get_perspective_transform, warp_perspective
idx = torch.topk(correspondences["confidence"], 12).indices
# idx = torch.randperm(20)
src = correspondences["keypoints0"][idx[:4]].unsqueeze(0)
dst = correspondences["keypoints1"][idx[:4]].unsqueeze(0)
a = get_perspective_transform(src, dst)
src = correspondences["keypoints0"][idx[2:6]].unsqueeze(0)
dst = correspondences["keypoints1"][idx[2:6]].unsqueeze(0)
b = get_perspective_transform(src, dst)

out = warp_perspective(img1.cuda(), (a+b)/2, (1080//4,1920//4)).cpu()
outt = torch.where(out < 0.0, img2, out)
utils.show_frame(outt[0].permute(1,2,0))

In [22]:
# Import numpy and OpenCV
import numpy as np
import cv2# Read input video

fname = "../deep-stabilization/dvs/video/s_114_outdoor_running_trail_daytime/ControlCam_20200930_104820.mp4"
cap = cv2.VideoCapture(fname)
 
# Get frame count
n_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
 
# Get width and height of video stream
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
 
# Define the codec for output video
 
# Set up output video
fps = 30
print(w, h)

# Read first frame
_, prev = cap.read()
 
# Convert frame to grayscale
prev_gray = cv2.cvtColor(prev, cv2.COLOR_BGR2GRAY)
# prev_gray = (prev_gray&192)|((prev_gray&32)<<1)

# Pre-define transformation-store array
transforms = np.zeros((n_frames-1, 3), np.float32)
log = []
homo = []
for i in range(n_frames-2):
    log.append([])
    # Detect feature points in previous frame
    prev_pts = cv2.goodFeaturesToTrack(prev_gray,
                                     maxCorners=400,
                                     qualityLevel=0.3,
                                     minDistance=20,
                                     blockSize=9)
    criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)
    prev_pts = cv2.cornerSubPix( prev_gray, prev_pts, (5,5), (-1,1), criteria )
    # Read next frame
    success, curr = cap.read()
    if not success:
        break
 
    # Convert to grayscale
    curr_gray = cv2.cvtColor(curr, cv2.COLOR_BGR2GRAY)
 
    # Calculate optical flow (i.e. track feature points)
    curr_pts, status, err = cv2.calcOpticalFlowPyrLK(prev_gray, curr_gray, prev_pts, None)
 
    # Sanity check
    assert prev_pts.shape == curr_pts.shape
 
    # Filter only valid points
    idx = np.where(status==1)[0]
    prev_pts = prev_pts[idx]
    curr_pts = curr_pts[idx]
 
    #Find transformation matrix
    retval, inliers = cv2.estimateAffine2D(prev_pts, curr_pts)
    retval = cv2.findHomography(prev_pts, curr_pts)[0]
    homo.append(cv2.findHomography(prev_pts, curr_pts)[0])
    # """
    # adding
    # """
    # cv2.find(prev_pts, curr_pts)
    # arr = np.arange(prev_pts.shape[0])
    # tests = []
    # for x in range(100):
    #     index = np.random.choice(prev_pts.shape[0], size=(4,), replace=False)
    #     tests.append(cv2.getPerspectiveTransform(prev_pts[index], curr_pts[index]))
    # test = np.stack(tests)
 
    # Extract traslation
    dx = retval[0][2]
    dy = retval[1][2]
 
    # Extract rotation angle
    da = np.arctan2(retval[1,0], retval[0,0])
    log[-1].append(len(inliers))
    log[-1].append(np.arctan2(retval[0,1], retval[1,1]))
 
    # Store transformation
    transforms[i] = [dx,dy,da]
 
    # Move to next frame
    prev_gray = curr_gray
 
    print("Frame: {:03d}/{:3d} -  Tracked points : {:3d}".format(i, n_frames, len(prev_pts)), end="\r", flush=True)
  
# Compute trajectory using cumulative sum of transformations
print("transforms: ", len(transforms))
trajectory = np.cumsum(transforms, axis=0)

1920 1080
transforms:  485  Tracked points : 128


In [126]:
intrinsic = np.array([
        [1920/1.27, 0.0, 0.5*(1920-1)], 
        [0.0, 1920/1.27, 0.5*(1080-1)], 
        [0.0, 0.0, 1.0]
        ])

In [56]:
tests = []
arr = np.arange(prev_pts.shape[0])
for x in range(100):
    index = np.random.choice(prev_pts.shape[0], size=(10,), replace=False)
    tests.append(cv2.findFundamentalMat(prev_pts[index], curr_pts[index]))

In [94]:
import kornia
p_pts = torch.from_numpy(prev_pts).permute(1,0,2)
c_pts = torch.from_numpy(curr_pts).permute(1,0,2)
pts, tran = kornia.geometry.epipolar.normalize_points(torch.cat([p_pts, c_pts], dim=1))
p_pts, c_pts = pts.narrow(1,0,128), pts.narrow(1,128,128)
fund1 = kornia.geometry.epipolar.find_fundamental(p_pts, c_pts, weights=torch.ones((1,128)))
kornia.geometry.epipolar.projections_from_fundamental(fund1)[0, :, : ,1]

tensor([[ -23.1438,   13.7808,  -45.6928,   -0.8982],
        [  13.1808, -117.8158,   -5.3662,   -0.1274],
        [ -45.4236,   -6.2476,  -99.1835,    0.4207]])

In [79]:
fund2 = cv2.findFundamentalMat(prev_pts, curr_pts)[0]
fund2 = torch.from_numpy(fund2).view(1,3,3)
kornia.geometry.epipolar.projections_from_fundamental(fund2)[0, :, : ,1]

tensor([[-7.2350e-03,  2.8201e-02, -2.5415e-01, -9.6713e-01],
        [ 2.7516e-02, -1.0726e-01,  9.6716e-01, -2.5428e-01],
        [ 1.3312e-04,  3.4640e-05, -1.1435e-01, -1.1996e-03]],
       dtype=torch.float64)

In [23]:
from scipy.signal import savgol_filter
def movingAverage(curve, window_size):
    # Define the filter
    f = np.ones(window_size)/window_size
    # Add padding to the boundaries
    curve_pad = np.lib.pad(curve, (window_size-3, 2), 'edge')
    # Apply convolution
    curve_smoothed = np.convolve(curve_pad, f, mode='valid')
    # Remove padding
    curve_smoothed = curve_smoothed
    # return smoothed curve
    return savgol_filter(curve, window_size, 3)
    # return curve_smoothed
# def movingAverage(curve, radius):
#     window_size = 2 * radius + 1
#     # Define the filter
#     f = np.ones(window_size)/window_size
#     # Add padding to the boundaries
#     curve_pad = np.lib.pad(curve, (radius, radius), 'edge')
#     # Apply convolution
#     curve_smoothed = np.convolve(curve_pad, f, mode='same')
#     # Remove padding
#     curve_smoothed = curve_smoothed[radius:-radius]
#     # return smoothed curve
#     return savgol_filter(curve, window_size, 3)
#     # return curve_smoothed

def fixBorder(frame):
    s = frame.shape
    # Scale the image 4% without moving the center
    T = cv2.getRotationMatrix2D((s[1]/2, s[0]/2), 0, 1.04)
    frame = cv2.warpAffine(frame, T, (s[1], s[0]))
    return frame

def smooth(trajectory, SMOOTHING_RADIUS=31):
    smoothed_trajectory = np.copy(trajectory)
    # Filter the x, y and angle curves
    for i in range(3):
        smoothed_trajectory[:,i] = movingAverage(trajectory[:,i], SMOOTHING_RADIUS)
 
    return smoothed_trajectory

In [24]:
fps, w, h = 30, 1920, 1080
# Calculate difference in smoothed_trajectory and trajectory
smoothed_trajectory = smooth(trajectory)
difference = smoothed_trajectory - trajectory
transforms_smooth = transforms + difference


# Reset stream to first frame
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)

frames=[]
# Write n_frames-1 transformed frames
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter('../video_out.mp4', fourcc, fps, (w, h))
for i in range(n_frames-2):
    # Read next frame
    success, frame = cap.read()
    if not success:
        break

    # Extract transformations from the new transformation array
    dx = transforms_smooth[i,0]
    dy = transforms_smooth[i,1]
    da = transforms_smooth[i,2]
 
    # Reconstruct transformation matrix accordingly to new values
    m = np.zeros((3,3), np.float32)
    m[0,0] = np.cos(da)
    m[0,1] = -np.sin(da)
    m[1,0] = np.sin(da)
    m[1,1] = np.cos(da)
    m[0,2] = dx
    m[1,2] = dy
    m[2] = homo[i][2]
 
    # Apply affine wrapping to the given frame
    # frame_stabilized = cv2.warpAffine(frame.astype(np.float64)/255, m, (w,h))
    # tmp = sqrtm(sqrtm(sqrtm(sqrtm(acc_homo[i]@np.linalg.inv(acc_homo[max(0,i-16)]))))).real
    # tmp = homo[i]@tmp@np.linalg.inv(acc_homo[i])
    # tmp[2] = homo[i][2]
    frame_stabilized = cv2.warpPerspective(frame.astype(np.float64)/255, m, (w,h))

    # Fix border artifacts
    # frame_stabilized = fixBorder(frame_stabilized)

    # Write the frame to the file
    frame_out = cv2.hconcat([frame.astype(np.float64)/255, frame_stabilized])

    # If the image is too big, resize it.
    if frame_out.shape[1] > 1920:
        frame_out = cv2.resize(frame_out, (frame_out.shape[1]//2, frame_out.shape[0]));
 
    frames.append(frame_out)
    out.write((frame_out*255).astype(np.uint8))

out.release()

In [19]:
import numpy.linalg as LA
from torch.autograd import Variable
def norm_quat(quat):
    norm_quat = LA.norm(quat)   
    if norm_quat > 1e-6:
        quat = quat / norm_quat   
        #     [0 norm_quat norm_quat - 1e-6]
    else:
        # print('bad len for Reciprocal')
        quat = np.array([0,0,0,1])
    return quat

def torch_norm_quat(quat, USE_CUDA = True):
    # Method 1:
    batch_size = quat.size()[0]
    quat_out = Variable(torch.zeros((batch_size, 4), requires_grad=True))
    if USE_CUDA == True:
        quat_out = quat_out.cuda()
    for i in range(batch_size):
        norm_quat = torch.norm(quat[i])   
        if norm_quat > 1e-6:        
            quat_out[i] = quat[i] / norm_quat  
            #     [0 norm_quat norm_quat - 1e-6]
        else:
            quat_out[i,:3] = quat[i,:3] * 0
            quat_out[i,3] = quat[i,3] / quat[i,3]

    # Method 2:
    # quat = quat / (torch.unsqueeze(torch.norm(quat, dim = 1), 1) + 1e-6) # check norm
    return quat_out

def QuaternionReciprocal(q):
    quat = np.array([-q[0], -q[1], -q[2], q[3]])  
    return norm_quat(quat)

def torch_QuaternionReciprocal(q,  USE_CUDA = True):
    quat = torch.cat((-q[:,0:1], -q[:,1:2], -q[:,2:3], q[:,3:]), dim = 1) 
    batch_size = quat.size()[0]

    quat = torch_norm_quat(quat)
    return quat

def QuaternionProduct(q1, q2):
    x1 = q1[0]  
    y1 = q1[1]   
    z1 = q1[2]   
    w1 = q1[3]   

    x2 = q2[0]  
    y2 = q2[1]  
    z2 = q2[2]  
    w2 = q2[3]  

    quat = np.zeros(4)
    quat[3] =  w1 * w2 - x1 * x2 - y1 * y2 - z1 * z2  
    quat[0] =  w1 * x2 + x1 * w2 + y1 * z2 - z1 * y2  
    quat[1] = w1 * y2 - x1 * z2 + y1 * w2 + z1 * x2  
    quat[2] = w1 * z2 + x1 * y2 - y1 * x2 + z1 * w2 

    return norm_quat(quat)

def torch_QuaternionProduct(q1, q2, USE_CUDA = True):
    x1 = q1[:,0]  
    y1 = q1[:,1]   
    z1 = q1[:,2]   
    w1 = q1[:,3]   

    x2 = q2[:,0]  
    y2 = q2[:,1]  
    z2 = q2[:,2]  
    w2 = q2[:,3]  

    batch_size = q1.size()[0]
    quat = Variable(torch.zeros((batch_size, 4), requires_grad=True))
    if USE_CUDA == True:
        quat = quat.cuda()
    
    quat[:,3] =  w1 * w2 - x1 * x2 - y1 * y2 - z1 * z2  
    quat[:,0] =  w1 * x2 + x1 * w2 + y1 * z2 - z1 * y2  
    quat[:,1] = w1 * y2 - x1 * z2 + y1 * w2 + z1 * x2  
    quat[:,2] = w1 * z2 + x1 * y2 - y1 * x2 + z1 * w2  

    quat = torch_norm_quat(quat)

    return quat

def get_data_at_timestamp(self, gyro_data, ois_data, time_stamp, quat_t_1):
    quat_t = GetGyroAtTimeStamp(gyro_data, time_stamp)
    quat_dif = QuaternionProduct(quat_t, QuaternionReciprocal(quat_t_1))  
    return quat_dif

def get_relative_quats(quat):
    num_inputs = quat.shape[0]
    quats = np.zeros((num_inputs, 4))  
    quats[0,:] = np.array([0, 0, 0, 1])
    for i in range(1, num_inputs):
        quats[i,:] = QuaternionProduct(quat[i], quats[i-1,:])   # R_t = delta R_t * R_t-1
        quats[i,:] = quats[i,:] / LA.norm(quats[i,:]) 
    return quats 

In [20]:
quats.shape

torch.Size([472, 4])

In [21]:
import torch
import numpy as np
from kornia.geometry.conversions import rotation_matrix_to_quaternion, QuaternionCoeffOrder, normalize_homography
n_homo = normalize_homography(torch.from_numpy(np.array(homo)), (1080,1920), (1080,1920))
quats = rotation_matrix_to_quaternion(n_homo, order=QuaternionCoeffOrder.WXYZ)[10:-2,[1,2,3,0]]
get_relative_quats(quats.numpy())

array([[ 0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
         1.00000000e+00],
       [ 1.07243783e-04, -2.41898522e-04, -2.08230014e-05,
         9.99999965e-01],
       [ 4.01388428e-04, -3.33386593e-04,  5.98872714e-04,
         9.99999685e-01],
       ...,
       [-6.96774177e-01, -4.17902098e-01, -4.37305643e-01,
         3.85522187e-01],
       [-6.96078259e-01, -4.17020978e-01, -4.38120507e-01,
         3.86806132e-01],
       [-6.95171253e-01, -4.16523131e-01, -4.39439293e-01,
         3.87477119e-01]])

In [163]:
x = sqrtm(sqrtm(sqrtm(sqrtm(sqrtm(acc_homo[i]))))).real
print(x)
print(acc_homo[i])
dx = transforms_smooth[i,0]
dy = transforms_smooth[i,1]
da = transforms_smooth[i,2]

# Reconstruct transformation matrix accordingly to new values
m = np.zeros((2,3), np.float32)
m[0,0] = np.cos(da)
m[0,1] = -np.sin(da)
m[1,0] = np.sin(da)
m[1,1] = np.cos(da)
m[0,2] = dx
m[1,2] = dy
print(m)

[[ 1.33650631e+00  5.72309689e-02 -3.71291010e+02]
 [ 3.95840586e-02  1.16016210e+00 -7.96622217e+01]
 [ 1.20057797e-04  1.91982145e-05  9.03927825e-01]]
[[ 4.65115394e+02  2.79564289e+02 -6.86257487e+05]
 [ 7.55197021e+01  1.39863309e+02 -1.38874281e+05]
 [ 2.03738676e-01  1.18128286e-01 -2.95844101e+02]]
[[  0.45065314  -0.8926991  316.80075   ]
 [  0.8926991    0.45065314 177.64972   ]]


In [154]:
from torchvision.io import read_video, read_video_timestamps

In [155]:
from IPython.display import Video
Video("../video_out.mp4", width=960, height=540)

In [None]:
Video("../stable_video.avi", width=960, height=540)