## Loading datasets

In [None]:
import os

import torch
import torch.nn.functional as F
import numpy as np
import cv2

import matplotlib.pyplot as plt
import matplotlib.patches as patches

from utils.config import cfg
from utils.dataset import TrainDataset
from utils.mask import tensor_arr_dist_circle_mask, tensor_img_dist_circle_mask, tensor_img_px_circle_mask, get_tensor_img_px_circle_mask_rgb
from utils.optical_flow.spynet import spynet_optical_flow 
from utils.optical_flow.farneback import farneback_optical_flow
from utils.visualization import show_imgs, show_farneback_optical_flows, save_spynet_optical_flows
from utils.pose import get_circular_poses, tensor_to_cv2

# Dataset and Loader
cfg.merge_from_file('config/predict_friction_mass_independent-spynet-resnet18-pybullet.yaml')
print(cfg)

dataset_test = TrainDataset(
    [[os.path.join('data/same_vis_same_phys/train/', fp) for fp in os.listdir('data/same_vis_same_phys/train/')][1]],
    cfg)
 
loader_train = torch.utils.data.DataLoader(
    dataset_test,
    batch_size=1,  # we have modified data_parallel
    shuffle=False,  # we do not use this param
    drop_last=True,
    num_workers=4,
    collate_fn = (lambda x: x),
    pin_memory=True)

# create loader iterator
iterator_train = iter(loader_train)

### Result Metrics

In [None]:
# get an example
output = iterator_train.next()
output = output[0]['img_data'][0]

In [None]:
def manhattan_distance(x, y):
    """
    Compute the Manhattan distance between two vectors x and y using PyTorch.

    Args:
        x (torch.Tensor): A PyTorch tensor of shape (N, D) representing the first vector.
        y (torch.Tensor): A PyTorch tensor of shape (N, D) representing the second vector.

    Returns:
        torch.Tensor: A PyTorch tensor of shape (N,) containing the Manhattan distance between each pair of
        vectors in x and y.
    """
    return torch.sum(torch.abs(x - y), dim=1)

x = torch.tensor([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
y = torch.tensor([[2, 3, 4], [5, 6, 7], [8, 9, 10]])

print(manhattan_distance(x, y))

In [None]:
manhattan_distance(x[:, :2],y[:, :2])

In [None]:
def pixel_mse(image1, image2):
    """
    Compute the pixel-wise mean squared error (MSE) between two images with dimensions C,H,W using PyTorch.

    Args:
        image1 (torch.Tensor): A PyTorch tensor of shape (C, H, W) representing the first image.
        image2 (torch.Tensor): A PyTorch tensor of shape (C, H, W) representing the second image.

    Returns:
        float: The pixel-wise mean squared error (MSE) between image1 and image2.
    """
    return torch.mean(torch.pow(image1 - image2, 2))


x = output[0]
y = output[2]

print(pixel_mse(x, y))  # Output: tensor([ 3,  3, 3]

In [None]:
manhattan_distance(torch.FloatTensor([[37.5818, 18.2550]]), torch.FloatTensor([[33.8364, 22.7585]]))

### Extract integers

In [None]:
def extract_integers(filename):
    split_list = filename.split('.')[0].split('_') 
    int1 = split_list[1]
    int2 = split_list[3]
    return int(f'{int1}{int2}')

def extract_integer(filename):
    return int(filename.split('.')[0].split('_')[1])


def check_list(input_list):
    for buffer_idx, buffer in enumerate(input_list):
        previous_timestep = -1
        for timestep_file in buffer:
            file_parts = timestep_file.split('_')
            current_buffer = int(file_parts[1])
            current_timestep = int(file_parts[3].split('.')[0])

            if current_buffer != buffer_idx or current_timestep != previous_timestep + 1:
                return False

            previous_timestep = current_timestep

    return True


sample_list = [['buffer_0_timestep_0.png', 'buffer_0_timestep_1.png', 'buffer_0_timestep_2.png'],
               ['buffer_1_timestep_0.png', 'buffer_1_timestep_1.png', 'buffer_1_timestep_-1.png'],
               ['buffer_2_timestep_0.png', 'buffer_2_timestep_1.png', 'buffer_2_timestep_2.png']]

result = check_list(sample_list)

### Mean color

In [None]:
# get an example
output = iterator_train.next()
output = output[0]['img_data'][0]

p0 = get_circular_poses(output[0]).numpy()

rgb_values = get_tensor_img_px_circle_mask_rgb(output[0], p0[0,0], p0[0,1], p0[0,2])
print(len(rgb_values))
mean_rgb = rgb_values.mean(axis=0) 
print(mean_rgb)
plt.imshow([[tuple(mean_rgb.numpy())]])

In [None]:
pixel_mse(torch.rand((2,256,256)), torch.rand((2,256,256)))

### Multi-Object Tracking

In [None]:
# get an example
output = iterator_train.next()
output = output[0]['img_data'][0]

p0= get_circular_poses(output[0]).numpy()

# Create some random colors
color = np.random.randint(0, 255, (100, 3))

for i in range(len(output)-1):
    opt_flow_1 = spynet_optical_flow(output[i], output[i+1])
    opt_flow_2 = farneback_optical_flow(output[i], output[i+1])
    print(pixel_mse(opt_flow_1, opt_flow_2).detach().numpy())
    p1 = []
    for keypoint in p0:
        x, y = keypoint[0], keypoint[1]
        x += opt_flow_2[0,int(y), int(x)].detach().numpy()
        y += opt_flow_2[1,int(y), int(x)].detach().numpy()
        p1.append([x, y, keypoint[2]])
        print(x,y,keypoint[2])
        show_imgs([tensor_img_px_circle_mask(output[i+1], x, y, keypoint[2]+10)])
    
    p0 = p1

## Metres to pixel based masking

In [None]:
# px_per_m calculation
q = 55.0 * np.pi / 180
focal_length = 256.0 / np.tan(q/2)
px_per_m = focal_length * 1/2
m_per_px = 1 / px_per_m

In [None]:
# backtracking 256 px_per_m to q
focal_length = 2 * 256
q = np.arctan(256.0 / focal_length) * 2 * 180 / np.pi

## Testing images and optical flow

In [None]:
output = iterator_train.next()
for i in range(len(output[0]['img_data'][0])):
    show_imgs([output[0]['img_data'][0,i]])

In [None]:
for cx,cy,cr in output[0]['state_label'][0][0][:, [0,1,-2]].detach().numpy():
    show_imgs([tensor_img_dist_circle_mask(output[0]['img_data'][0, 0], cx, cy, cr + 0.05)])

In [None]:
opt_flow_masks = []
cxs = []
cys = []
crs = []

for i in range(2):
    opt_flow = farneback_optical_flow(output[0]['img_data'][0][i], output[0]['img_data'][0][i+1])
    opt_flow_masks.append(opt_flow)
    
    for j in range(3):
        cx = output[0]['state_label'][0][i][j, 0]
        cy = output[0]['state_label'][0][i][j, 1]
        cr = output[0]['state_label'][0][i][j, -2]
        
        cxs.append(cx)
        cys.append(cy)
        crs.append(cr)    
        opt_flow_masks.append(tensor_arr_dist_circle_mask(opt_flow, cx, cy, cr, 0.1))

for i in range(len(opt_flow_masks)):
    show_farneback_optical_flows([opt_flow_masks[i]])

In [None]:
opt_flow_masks = []
cxs = []
cys = []
crs = []

for i in range(2):
    opt_flow = spynet_optical_flow(output[0]['img_data'][0][i], output[0]['img_data'][0][i+1])
    opt_flow_masks.append(opt_flow)
    
    for j in range(3):
        cx = output[0]['state_label'][0][i][j, 0]
        cy = output[0]['state_label'][0][i][j, 1]
        cr = output[0]['state_label'][0][i][j, -1]
        
        cxs.append(cx)
        cys.append(cy)
        crs.append(cr)    
        opt_flow_masks.append(tensor_arr_dist_circle_mask(opt_flow, cx, cy, cr, 0.1))

#save_spynet_optical_flows(opt_flow_masks)

In [None]:
input_data = output[0]['img_data']
output_data = output[0]['state_label']

BA, BU, C, H, W = input_data.shape
_, _, num_balls, num_features = output_data.shape
processed_output_data = output_data

C_Final = BU * 3 + (BU - 1) * 2
input_processed = torch.zeros(BA, num_balls, C_Final, H, W)
assert BU == 3
for i in range(BA):
    for j in range(BU):
        img_orig = input_data[i, j, ...]
        for k in range(num_balls):
            cx, cy, cr = output_data[i, j, k, [0, 1, -2]]
            img_masked = tensor_img_dist_circle_mask(img_orig, cx, cy, cr+0.05)
            input_processed[i, k, j * 3:(j + 1) * 3, ...] = img_masked
            if j != BU-1:
                opt_flow = farneback_optical_flow(input_data[i][j], input_data[i][j+1])
                opt_flow_masked = tensor_arr_dist_circle_mask(opt_flow, cx, cy, cr, 0.1)
                input_processed[i, k, BU*3 + j*2:BU*3 + (j+1)*2, ...] = opt_flow_masked
input_processed = torch.flatten(input_processed, end_dim=1)

In [None]:
#input_processed = input_processed.view(-1, num_balls, C_Final, H, W)
#for i in range(BA):
   #print(f'batch {i}')
   #for k in range(num_balls):
       #print(f'ball {k}')
       #for j in range(BU):
           #print(f'buffer {j}')
           #masked_img = input_processed[i,k, j * 3:(j + 1) * 3, ...]
           #show_imgs([masked_img])
           #if j != BU-1:
           #    opt_flow_masked = input_processed[i + k, BU*3 + j*2:BU*3 + (j+1)*2]
           #    show_farneback_optical_flows([opt_flow_masked])

### labelling function

In [None]:
label = torch.zeros(1000, dtype=torch.long)
label[:] = 0

output = torch.rand(1000,9)

pred_log_prob = F.log_softmax(output, dim=1)
preds = torch.argmax(output, dim=1)
valid = (label >= 0).long()
acc_sum = torch.sum(valid * (preds == label).long())
valid_sum = torch.sum(valid)
acc = acc_sum.float() / (valid_sum.float() + 1e-10)
#print(label)
#print(preds)
print(acc)

### Dataset indexing function

In [None]:
def idx2idxs(idx, arr):
    if all(len(arr[0]) == len(x) for x in arr):
        shape = [len(arr), len(arr[0])]
        num_dims = len(shape)
        offset = 1
        idxs = [0] * num_dims
        for i in range(num_dims - 1, -1, -1):
            idxs[i] = idx // offset % shape[i]
            offset *= shape[i]
    else:
        count = 0
        for i in range(len(arr)):
            if count + len(arr[i]) > idx:
                idxs = [i, idx - count]
                break
            count += len(arr[i])
    return tuple(idxs)

In [None]:
idx = 1999
A = np.arange(1000).reshape((8,-1))
B = np.arange(1000, 2000).reshape((5,-1))
C = list(A) + list(B)
i,j = idx2idxs(idx, C)
print(i,j)
print(C[i][j])