In [1]:
import sys, json
from itertools import groupby
from datetime import datetime
sys.path.append(r"../")

from PIL import Image, ImageDraw
from pathlib import Path
import numpy as np
from tqdm import tqdm
from sklearn.metrics import mean_squared_error

from detectron2.data import MetadataCatalog
import cameratransform as ct

import ct_assist as cta
from ct_assist import reference_detection as rd
from ct_assist import transform as ctf
from ct_assist.utils import accuracy

import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
data_dir = r"../../dataset2"
anno_dir = data_dir + "/annotations"
# Local path to images!
im_dir = Path(r"D:\University\2020-2021\Internship\dataset2\images")

In [3]:
# Local path to simulations!
with open(r"D:\University\2020-2021\Internship\dataset2\annotations\simulation_properties.json") as f:
    sims = json.load(f)
sim = sims[0]  # Only one simulation filed
sim.keys()

dict_keys(['resolution_max', 'particle_number', 'particle_radius', 'particle_max', 'particle_min', 'particle_band_width', 'time_scale', 'use_adaptive_timesteps', 'flip_ratio', 'viscosity_base', 'viscosity_exponent', 'mesh_scale', 'mesh_particle_radius', 'mesh_concave_lower', 'mesh_concave_upper', 'mesh_smoothen_neg', 'mesh_smoothen_pos', 'cache_directory', 'cache_type', 'cache_frame_end', 'execution_time_data', 'successful_simulation_data', 'execution_time_mesh', 'successful_simulation_mesh', 'fluid_volume_cm3_per_frame', 'area_cm2_per_frame', 'domain_longest_side', 'domain_volume', 'id'])

In [4]:
# Local path to images!
with open(r"D:\University\2020-2021\Internship\dataset2\annotations\images.json") as f:
    imgs = json.load(f)

In [5]:
def load():
    with open(anno_dir + "/images.json") as f:
        img_lst = json.load(f)

    with open(anno_dir + "/instance_segmentation.json") as f:
        anno_lst = sorted(json.load(f), key=lambda x: x["image_id"])

    predictor, cfg = rd.load_model(return_cfg=True, threshold=0.7)

    X = []  # For camera properties test
    Y_test = []  # For camera properties test
    X_area = []  # For area test
    X_cprops = []  # For testing area with known c-props
    Y_area = []  # For area test(s)

    # dictionary int : str, classes
    classes = MetadataCatalog.get(cfg.DATASETS.TRAIN[0]).get("thing_classes")
    a = 0
    # Group annotations by image_id
    for k, g in groupby(tqdm(anno_lst), lambda x: x["image_id"]):
        a += 1        
        # Open image with Pill and cast to np.array
        file_name = im_dir / img_lst[k].get("file_name")
        img = Image.open(file_name)
        arr = np.asarray(img)[...,:3]

        # Extract head-feet pairs from image
        pred = predictor(arr)
        inst_dict = rd.instances_to_dict(pred, classes)
        ref = rd.extract_reference(inst_dict, always_warn=False)
        # if no references can be found, skip this entry
        if len(ref) == 0:
            continue
        if len(ref[0][0]) == 0:
            continue
        
        # Append kwargs to X
        X.append({"img": img,
                  "reference": [],
                  "height": [],
                  "STD": [],
                  "meta_data": {"focal_length": 50,
                                "image_size": (1920, 1080),
                                "sensor_size": (36, 24)},
                  "multi": True})
        
        # Append roll_deg, tilt_deg, heading_deg, elevation_m to Y_test
        Y_test.append([img_lst[k].get(key) for key in ["roll_deg", "tilt_deg", "heading_deg", "elevation"]])
        # Add found reference objects with height and STD to X
        for r, h, s in ref:
            X[-1]["reference"].append(r)
            X[-1]["height"].append(h)
            X[-1]["STD"].append(s)

        # Only sim_id 0.0 has fluid area saved, so only store those for test
        sim_id = img_lst[k].get("simulation_id")
        if sim_id == 0.0:
            segs = None
            # Get only fluid spill polygons
            for anno in filter(lambda x: x["category_id"] == 0, g):                
                segs = anno["segmentation"]
            # If any fluid spill polygons are found  
            if segs:
                # Simulation frame
                frame = img_lst[k].get("frame")
                # Copy most recent record from X to X_area
                X_area.append(X[-1].copy())
                # Add image coords and area in m^2 per frame to respective datasets
                X_area[-1]["image_coords"] = [np.array(list(zip(a[::2], a[1::2]))) for a in segs]
                Y_area.append(sim["area_cm2_per_frame"][frame] / 10_000)
                # Add kwargs to X_cprops
                X_cprops.append((Y_test[-1], X[-1]["meta_data"], X_area[-1]["image_coords"], img))
    
    return X, Y_test, X_area, X_cprops, Y_area

X, Y_test, X_area, X_cprops, Y_area = load()

	nonzero()
Consider using one of the following signatures instead:
	nonzero(*, bool as_tuple) (Triggered internally at  ..\torch\csrc\utils\python_arg_parser.cpp:766.)
  filter_inds = filter_mask.nonzero()
 22%|█████████████████▏                                                             | 498/2289 [00:36<02:16, 13.16it/s]

KeyboardInterrupt: 

In [None]:
def run_n(X=X_c, Y=Y_c, X_area=X_area, Y_area=Y_area, n=3):
    now = datetime.now()
    current_time = now.strftime("%H:%M:%S")
    print(current_time, ":: Testing camera properties...")
    
    score1 = [accuracy.camera_properties(X, Y) for _ in range(n)]
    arrs = []
    for i in range(len(Y)):
        arr = np.zeros(4)
        for m in range(n):
            arr += score1[m][1][i]
        
        arrs.append(arr / n)
    arrs = np.asarray(arrs)
    Y = np.asarray(Y)
    avg_prop_rmse = (mean_squared_error(Y[:, 0], arrs[:, 0], squared=False),
            mean_squared_error(Y[:, 1], arrs[:, 1], squared=False),
            mean_squared_error(Y[:, 2], arrs[:, 2], squared=False),
            mean_squared_error(Y[:, 3], arrs[:, 3], squared=False))
    
    now = datetime.now()
    current_time = now.strftime("%H:%M:%S")
    print(current_time, ":: Finished testing camera properties.")
    
    
    now = datetime.now()
    current_time = now.strftime("%H:%M:%S")
    print(current_time, ":: Testing Fluid area...")
    
    score2 = [accuracy.area(X_area, Y_area) for _ in range(n)]
    areas = []
    for j in range(len(score2[0][1])):
        area = 0
        for m in range(n):
            area += score2[m][1][j]
        areas.append(area / n)
            
    areas = np.asarray(areas)
    avg_area_rmse = mean_squared_error(Y_area, areas, squared=False)
    
    now = datetime.now()
    current_time = now.strftime("%H:%M:%S")
    print(current_time, ":: Finished testing fluid area.")
    
    print("Camera properties:", *[s[0] for s in score1], sep="\n")
    print("Camera properties averaged:", avg_prop_rmse)
    print()
    print("Spill area:", *[s[0] for s in score2], sep="\n")
    print("Spill area averaged:", avg_area_rmse)

run_n(n=10)

In [None]:
# Copy X and Y_test for testing
X_c = X.copy()
Y_c = Y_test.copy()

In [None]:
imgs = []
_points = []

# Test area with known camera properties
def make_cam(roll_deg, tilt_deg, heading_deg, elevation, focal_length, image_size, sensor_size, img_cords, img):

    proj = ct.RectilinearProjection(focallength_mm=focal_length,
                                         sensor=sensor_size,
                                         image=image_size)
    # initialize the camera
    cam = ct.Camera(projection=proj, orientation=ct.SpatialOrientation(elevation_m=elevation,
                                          tilt_deg=tilt_deg,
                                          heading_deg=heading_deg,
                                          roll_deg=roll_deg
                                          ))
    
    
    points = (cam.spaceFromImage(points=x) for x in img_cords)
    global imgs
    imgs.append(img)
    global _points
    _points.append(points)
    areas = sum(accuracy.calc_area(poly[:, :2]) for poly in points)
    return areas
areas = [make_cam(*ext, **inte, img_cords=imp, img=img) for ext, inte, imp, img in tqdm(X_cprops)]
print(areas)
mean_squared_error(Y_area, areas, squared=False)

In [None]:
score, ypred, ytue = accuracy.camera_properties(X_c, Y_c)
score

In [None]:
score2, pred_area = accuracy.area(X_area, Y_area)
score2

In [None]:
fig, ax = plt.subplots(5, figsize=(15, 15))

# for i in [0, 1, 3]:
# ax[0].plot(ypred[...,0], label="pred")
# ax[0].plot(ytue[..., 0], label="true")

ax[0].scatter(range(ytue.shape[0]), ytue[..., 0], label="True")
ax[0].scatter(range(ytue.shape[0]), ypred[...,0], label="Predicted")
ax[0].set_title("Roll degree")


# ax[1].plot(ypred[...,1], label="pred")
# ax[1].plot(ytue[..., 1], label="true")
ax[1].scatter(range(ytue.shape[0]), ytue[..., 1], label="True")
ax[1].scatter(range(ytue.shape[0]), ypred[...,1], label="Predicted")
ax[1].set_title("Tilt degree")

# ax[2].plot(ypred[...,3], label="pred")
# ax[2].plot(ytue[..., 3], label="true")
ax[2].scatter(range(ytue.shape[0]), ytue[..., 3], label="True")
ax[2].scatter(range(ytue.shape[0]), ypred[...,3], label="Predicted")
ax[2].set_title("Elevation (m)")

ax[3].set_yscale('log')
ax[3].scatter(range(len(X_area)), Y_area, label="True")
ax[3].scatter(range(len(X_area)), pred_area, label="Predicted")
# ax[3].scatter(range(len(X_area)), areas, label="Predicted with known camera properties")
ax[3].set_title("Area (m$^{2}$) (log-scaled)")


# ax[3].set_yscale('log')
ax[4].scatter(range(len(X_area)), Y_area, label="True")
ax[4].scatter(range(len(X_area)), pred_area, label="Predicted")
# ax[4].scatter(range(len(X_area)), areas, label="Predicted with known camera properties")
ax[4].set_title("Area (m$^{2}$)")



plt.legend()
plt.show()