<a href="https://colab.research.google.com/github/0x1beef/uap/blob/main/nb/gimbal_clouds.ipynb">
    <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>
<a href="https://kaggle.com/kernels/welcome?src=https://github.com/0x1beef/uap/blob/main/nb/gimbal_clouds.ipynb">
    <img src="https://kaggle.com/static/images/open-in-kaggle.svg" alt="Open In Kaggle"   />
</a>

In [None]:
url = 'https://raw.githubusercontent.com/0x1beef/uap/main/nb'
import urllib.request
for py_file in ['utils.py','common.py','opencv_cuda_installer.py']:
    urllib.request.urlretrieve(f'{url}/{py_file}', py_file)
import utils, common
is_main = (__name__ == '__main__')

## **Download OpenCV built with CUDA support**

In [None]:
import opencv_cuda_installer as cv_cuda
utils.show_env_info()
use_opencv_cuda = %time cv_cuda.install_opencv_cuda()

## **Download the data (center of mass, horizon, video frames)**

In [None]:
utils.download_from_huggingface('logicbear/gimbal/data/object_data.parquet')

In [None]:
(object_data, metadata) = utils.from_parquet_ext('data/object_data.parquet', 'gimbal')
center_of_mass = zip(object_data['center_of_mass_x'], object_data['center_of_mass_y'])
center_of_mass = [c for c in center_of_mass]
human_horizon = object_data['human_horizon']
object_data

In [None]:
%%time
gimbal = common.gimbal_from_huggingface()

## **Define a mask that only includes the clouds**

In [None]:
import cv2
import numpy as np
import math

def get_height_above_clouds(frame):
    time_dist_pairs = [
        (0, 25), (2, 30), (4, 30), (6, 30), (8, 50), (10, 45), (12, 40), (14, 55),
        (16, 45), (18, 40), (20, 40), (22, 45), (24, 50), (28, 55), (30, 60), (35, 60)
    ]
    time = gimbal.get_frame_time(frame)
    for ((t1,d1),(t2,d2)) in zip(time_dist_pairs, time_dist_pairs[1:]):
        if t2 > time:
            return d1 + (time - t1) * (d2 - d1) / (t2 - t1)
    return 0

def unit_vector(angle):
    return np.array((math.cos(angle), math.sin(angle)))

def get_cloud_top_point_angle(frame, height_offset):
    horizon_angle = -math.radians(human_horizon[frame])
    height = get_height_above_clouds(frame) + height_offset
    com = np.array(center_of_mass[frame])
    cloud_top_point = com + unit_vector(horizon_angle + math.pi / 2) * height
    return (cloud_top_point, horizon_angle)

def get_cloud_top_border_intersections(frame, height_offset, frame_width):
    (cloud_top_point, horizon_angle) = get_cloud_top_point_angle(frame, height_offset)
    cloud_heading = unit_vector(horizon_angle)
    # (x,y) = cloud_top_point + cloud_heading * t
    t = lambda x : (x - cloud_top_point[0]) / cloud_heading[0]
    y = lambda x : cloud_top_point[1] + cloud_heading[1] * t(x)
    return (y(0),y(frame_width-1))

flow_masks = [
    (0,   373, 115, 427),
    (194, 412, 243, 427),
    (317, 393, 391, 427),
    (369, 256, 426, 323),
    (34,  194, 69,  207),
    (384, 346, 426, 359),
    (413, 318, 426, 386)
]
def get_full_flow_mask(frame, frame_shape, erode_size):
    (h,w) = frame_shape
    img_mask = np.full(frame_shape, 255, np.uint8)
    # exclude the overlay
    for (j0,i0,j1,i1) in flow_masks:
        img_mask[i0:i1,j0:j1] = 0
    # exclude the border
    cv2.rectangle(img_mask, (0,0), (w-1,h-1), color=0, thickness=1)
    # exclude everything above the cloud line
    (y0,y1) = get_cloud_top_border_intersections(frame, erode_size / 2, w)
    points = np.array([[0, 0], [w-1, 0], [w-1, int(y1)], [0, int(y0)]])
    cv2.fillPoly(img_mask, pts=[points], color=0)
    # erode to account for the pyrlk block size
    erode_kernel = np.ones((erode_size, erode_size), np.uint8) 
    img_mask = cv2.erode(img_mask, erode_kernel)
    return img_mask

In [None]:
import matplotlib.pyplot as plt
def show(*imgs):
    for img in imgs:
        plt.imshow(img, cmap='gray', vmin=0, vmax=255)
        plt.show()

In [None]:
def test_flow_mask():
    erosion = 19
    frame = 0
    img_mask = get_full_flow_mask(frame, gimbal.get_frame(frame).shape, erosion)
    show(img_mask)
    
if is_main: test_flow_mask()

## **Test and run the cloud motion tracking algorithm**

In [None]:
from numba import njit
from dataclasses import dataclass

num_frames = gimbal.get_frame_count()

params_dict = {
    'frame_diff_from': 6,
    'frame_diff_to': 10,
    'erosion': 19,
    'min_mag': 1,
    'gftt': {
        'quality_level': 0.001,
        'block_size': 5,
        'min_dist': 0,
    },
    'pyrlk': {
        'block_size': 31,
        'max_level': 5,
        'iters': 100
    },
    'affine': {
        'threshold': 3,
        'max_iters': 2000,
        'confidence': 0.9999,
        'refine_iters': 10
    }
}

Params = utils.dict_to_obj(params_dict).__class__

def get_frame_diff(frame, par:Params):
    return int(par.frame_diff_from + frame * (par.frame_diff_to + 1 - par.frame_diff_from) / num_frames)

def get_flow_roi(img):
    return img[136:]
def get_flow_roi_ofs():
    return (136, 0)

class CudaOpticalFlow:
    def init_mats(self, *mats):
        for mat in mats:
            setattr(self, mat, cv2.cuda.GpuMat())
    
    def __init__(self, par:Params):
        print(f'optical flow running on device {cv2.cuda.getDevice()}')
        self.stream = cv2.cuda.Stream()
        self.init_mats('gpu_img', 'gpu_mask', 'gpu_corners')
        pg = par.gftt
        self.cuda_gftt = cv2.cuda.createGoodFeaturesToTrackDetector(
            cv2.CV_8UC1, 0, pg.quality_level, pg.min_dist, pg.block_size, False)
        self.init_mats('gpu_img_next', 'gpu_corners_next')
        self.init_mats('gpu_status', 'gpu_error')
        pp = par.pyrlk
        self.cuda_pyrlk = cv2.cuda.SparsePyrLKOpticalFlow.create(
            (pp.block_size, pp.block_size), pp.max_level, pp.iters)
    
    def run(self, img, img_next, img_mask, par):
        self.gpu_img.upload(img, self.stream)
        self.gpu_img_next.upload(img_next, self.stream)
        self.gpu_mask.upload(img_mask, self.stream)
        self.gpu_corners = self.cuda_gftt.detect(self.gpu_img, self.gpu_corners, 
            self.gpu_mask, self.stream)
        (self.gpu_corners_next, self.gpu_status, self.gpu_error) = self.cuda_pyrlk.calc(
            self.gpu_img, self.gpu_img_next, self.gpu_corners,
            self.gpu_corners_next, self.gpu_status, self.gpu_error, self.stream)
        corners = self.gpu_corners.download(self.stream)[0]
        corners_next = self.gpu_corners_next.download(self.stream)[0]
        status = self.gpu_status.download(self.stream)[0]
        self.stream.waitForCompletion()
        return (corners, corners_next, status)

class OpticalFlow:
    def __init__(self, par:Params):
        pp = par.pyrlk
        self.pyrlk = cv2.SparsePyrLKOpticalFlow.create(
            (pp.block_size, pp.block_size), pp.max_level)
    
    def run(self, img, img_next, img_mask, par: Params):
        pg = par.gftt
        corners = cv2.goodFeaturesToTrack(img, 0, pg.quality_level,
            pg.min_dist, None, img_mask, pg.block_size, False)
        (corners_next, status, error) = self.pyrlk.calc(
            img, img_next, corners, None)
        corners = np.reshape(corners, (len(corners), 2))
        corners_next = np.reshape(corners_next, (len(corners_next), 2))
        return (corners, corners_next, status)
    
def make_flow_algorithm(par:Params):
    if use_opencv_cuda:
        return CudaOpticalFlow(par)
    else:
        return OpticalFlow(par)

@njit
def filter_map_flow_results_jit(c1, c2, status, img_mask, ofs, min_mag, c1out, c2out):
    (h,w) = img_mask.shape
    (ofs_y,ofs_x) = ofs
    def filter_map_status(i,j):
        if status[i] == 0:
            return (1,j)
        (x1,y1) = c1[i]
        (x2,y2) = c2[i]
        if not (x2 >= 0 and y2 >= 0 and x2 < w and y2 < h):
            return (2,j)
        if img_mask[int(y2), int(x2)] != 255:
            return (3,j)
        (dx,dy) = (x2-x1,y2-y1)
        mag = math.sqrt(dx*dx+dy*dy)
        if mag < min_mag:
            return (4,j)
        c1out[j] = (x1+ofs_x, y1+ofs_y)
        c2out[j] = (x2+ofs_x, y2+ofs_y)
        return (0,j+1)
    j = 0
    for i in range(len(c1)):
        (status[i],j) = filter_map_status(i,j)
    return j

#based on https://stackoverflow.com/questions/58422690/filtering-reducing-a-numpy-array
def filter_map_flow_results(c1, c2, status, img_mask, min_mag):
    ofs = get_flow_roi_ofs()
    c1out = np.empty_like(c1)
    c2out = np.empty_like(c2)
    j = filter_map_flow_results_jit(c1, c2, status, img_mask, ofs, min_mag, c1out, c2out)
    c1out.resize((j,2), refcheck=False)
    c2out.resize((j,2), refcheck=False)
    return (c1out, c2out, status)
    
def get_optical_flow(frame, frame_diff, par:Params, flow_algorithm):
    next_frame = frame + frame_diff
    if next_frame >= num_frames:
        return ([],[],[])
    (img, img_next) = [gimbal.get_frame(f) for f in [frame, next_frame]]
    img_mask = get_full_flow_mask(frame, img.shape, par.erosion)
    (img, img_next, img_mask) = [get_flow_roi(i) for i in [img, img_next, img_mask]]
    (corners, corners_next, status) = flow_algorithm.run(img, img_next, img_mask, par)
    return filter_map_flow_results(corners, corners_next, status, img_mask, par.min_mag)

@njit
def get_inlier_mean(corners, inliers):
    (sum_x, sum_y, num_inliers) = (0.0, 0.0, 0)
    # todo: does this need to be more numerically stable ?
    for ((cx, cy), inlier) in zip(corners, inliers):
        if inlier:
            sum_x += float(cx); sum_y += float(cy)
            num_inliers += 1
    return np.array([sum_x / num_inliers, sum_y / num_inliers])

@dataclass
class CloudMotionData:
    frame: int; frame_diff: int
    magnitude: float; angle: float; rotation: float; transform: np.ndarray
    corners: np.ndarray; corners_next: np.ndarray; status: np.ndarray; inliers: np.ndarray
    c_inl_mean: float; cn_inl_mean: float

def get_cloud_motion(frame, frame_diff, corners, corners_next, status, par: Params):
    pa = par.affine
    (transform, inliers) = cv2.estimateAffinePartial2D(corners, corners_next, None,
        cv2.RANSAC, pa.threshold, pa.max_iters, pa.confidence, pa.refine_iters)
    rotation = math.degrees(math.atan2(transform[1, 0], transform[0, 0]))
    inliers = inliers.ravel() # from [[0],[1],...] to [0,1,...]
    c_inl_mean = get_inlier_mean(corners, inliers)
    cn_inl_mean = get_inlier_mean(corners_next, inliers)
    mean = cn_inl_mean - c_inl_mean
    magnitude = np.linalg.norm(mean) / frame_diff
    angle = math.degrees(math.atan2(mean[1], mean[0]))
    return CloudMotionData(frame, frame_diff,
        magnitude, angle, rotation, transform,
        corners, corners_next, status, inliers,
        c_inl_mean, cn_inl_mean)

def run_cloud_tracking_for(frames):
    par = Params()
    flow_algorithm = make_flow_algorithm(par)
    motions = []
    for frame in frames:
        frame_diff = get_frame_diff(frame, par)
        (corners, corners_next, status) = get_optical_flow(frame, frame_diff, par, flow_algorithm)
        if len(corners) != 0:
            motions.append(get_cloud_motion(frame, frame_diff, corners, corners_next, status, par))
    return motions

def get_cloud_frame_range():
    if not use_opencv_cuda: # only do the last 1/10 the of the frames on the CPU
        return range(num_frames - int(num_frames/10), num_frames)
    return range(0, num_frames)

def run_cloud_tracking():
    return run_cloud_tracking_for(get_cloud_frame_range())

def run_cloud_tracking_parallel():
    return utils.run_jobs_in_parallel(work_func = run_cloud_tracking_for, 
        jobs = get_cloud_frame_range(), workers = 4, cv_cuda = True)

In [None]:
def show_flow(img, corners, corners_next, inliers):
    img_rgb = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)
    for ((c1x,c1y), (c2x,c2y), inlier) in zip(corners, corners_next, inliers):
        color = (0,255,0) if inlier else (255, 0, 0)
        cv2.line(img_rgb, (int(c1x),int(c1y)), (int(c2x),int(c2y)), color, 1)
    plt.imshow(img_rgb)
    
def test_flow():
    par = Params()
    flow = make_flow_algorithm(par)
    frame = 980 #1000 # 5
    frame_diff = get_frame_diff(frame, par)
    (corners, corners_next, status) = get_optical_flow(frame, frame_diff, par, flow)
    print(len(corners), len(corners_next))
    m = get_cloud_motion(frame, frame_diff, corners, corners_next, status, par)
    print(m.magnitude, m.angle, m.rotation)
    show_flow(gimbal.get_frame(frame), corners, corners_next, m.inliers)
    
if is_main: test_flow()

In [None]:
if is_main:
    if use_opencv_cuda:
        cloud_motions = %time run_cloud_tracking_parallel()
    else: # the CPU version already uses multiple threads
        cloud_motions = %time run_cloud_tracking()
    print(len(cloud_motions))

## **Plot the data and save it all to a parquet file**

In [None]:
if is_main:
    import pandas as pd
    print(pd.__version__)
    df_motion = pd.DataFrame.from_records([m.__dict__ for m in cloud_motions], index = ["frame"])
    df_motion = df_motion.sort_index() # the parallel results are out of order
    df_motion = common.gimbal_fix_wh_to_bh(df_motion, ['magnitude', 'angle', 'rotation'])
    print(df_motion.dtypes)
    df_motion

In [None]:
def plot_motion(*fields):
    plot_range = slice(df_motion.first_valid_index(), df_motion.last_valid_index())
    for field in fields:
        df_motion[field][plot_range].plot(legend = True)
        plt.show()

if is_main:
    plot_motion('magnitude','rotation')
    plt.ylim(bottom=13, top=50)
    plot_motion('angle')

In [None]:
if is_main:
    inlier_count = df_motion.inliers.map(lambda i : np.sum(i))
    inlier_count[372-10:372] = np.nan
    inlier_count.interpolate().plot(label = 'inlier count', legend = True)
    plt.show()

In [None]:
def test_flatten_ndarrays():
    from IPython.display import display
    (df_new, nd_shapes) = %time utils.flatten_ndarrays(df_motion)
    df_old = %time utils.restore_ndarrays(df_new, nd_shapes)
    display(df_new)
    print(nd_shapes)
    assert(df_motion.equals(df_old) == True)
if is_main: test_flatten_ndarrays()

In [None]:
if is_main:
    metadata = {
        'params': params_dict
    }
    utils.to_parquet_ext(df_motion, 'cloud_motion.parquet', 'clouds', metadata)
    !du -hs cloud_motion.parquet

In [None]:
if is_main: utils.upload_to_huggingface('cloud_motion.parquet', 'logicbear/gimbal_clouds/data')

## **Save the corner means for a range of frame_diffs**

In [None]:
def run_cloud_tracking_means_for(frames):
    par = Params()
    flow_algorithm = make_flow_algorithm(par)
    motions = []
    for frame in frames:
        fd_from = int(2 + frame * (4 + 1 - 2) / num_frames)
        fd_to = fd_from + 10
        for frame_diff in range(fd_from, fd_to + 1):
            (corners, corners_next, status) = get_optical_flow(frame, frame_diff, par, flow_algorithm)
            if len(corners) != 0:
                motion = get_cloud_motion(frame, frame_diff, corners, corners_next, status, par)
                for attr in ['corners','corners_next','status','inliers']:
                    setattr(motion, attr, None)
                motions.append(motion)
    return motions

if is_main:
    ffds = [(m.frame, m.frame_diff) for m in run_cloud_tracking_means_for([0,1015,1025])]
    print(ffds)

def run_cloud_tracking_means():
    cloud_means = utils.run_jobs_in_parallel(work_func = run_cloud_tracking_means_for, 
        jobs = get_cloud_frame_range(), workers = 4, cv_cuda = True)
    df_means = pd.DataFrame.from_records([m.__dict__ for m in cloud_means],
        index = ['frame','frame_diff'],
        exclude = ['corners','corners_next','status','inliers'])
    print(df_means.dtypes)
    from IPython.display import display
    display(df_means)
    metadata = {
        'params': params_dict
    }
    utils.to_parquet_ext(df_means, 'cloud_means.parquet', 'clouds', metadata)
    !du -hs cloud_means.parquet
    utils.upload_to_huggingface('cloud_means.parquet', 'logicbear/gimbal_cloud_means/data')
    
if is_main:
    %time run_cloud_tracking_means()
    pass