# DPT-DINOv2

In [None]:
from transformers import AutoImageProcessor, AutoModelForDepthEstimation, DPTForDepthEstimation
import torch
import numpy as np
from PIL import Image
import rawpy
from IPython.display import display, HTML
import tifffile as tiff
import matplotlib.pyplot as plt
from scipy.stats import linregress
import cv2
from sklearn.preprocessing import normalize
import os
from skimage.transform import resize
import random
from sklearn.cluster import KMeans, DBSCAN

def display_image_small(im, width=500):
    w, h = im.size
    height = int(h * (width / w))
    image = im.resize((width, height))
    display(image)

def open_image(path, depth_path, result_ground_truth_image_path,img_dim, save_ground_truth=False, display=False):
    depth_img = tiff.imread(depth_path)
    depth_img = resize(depth_img, (img_dim[1], img_dim[0]), order=1, mode='constant', anti_aliasing=False)
    formatted = (depth_img * 255 / np.max(depth_img)).astype("uint8")
    colored_depth = cv2.applyColorMap(formatted, cv2.COLORMAP_INFERNO)[:, :, ::-1]
    depth = Image.fromarray(colored_depth)
    if save_ground_truth:
        # save the ground truth image
        depth.save(result_ground_truth_image_path, format='png')
    # Display the shape of the image array
    with rawpy.imread(path) as raw_image:
        rgb = raw_image.postprocess()
        image = Image.fromarray(rgb)
        # print(f"Original image size: {w}x{h}")
        # print(f"Resized image size: {width}x{height}")
        image = image.resize(img_dim)
        if display:
            print("raw image and ground truth")
            display_image_small(image)
            
            display_image_small(depth)
    return image, depth_img

def preprocess_image(image_processor, image):
    # prepare image for the model
    inputs = image_processor(images=image, return_tensors="pt")
    return inputs

def predict_depth(model, inputs):
    #forward pass
    with torch.no_grad():
        outputs = model(**inputs)
        predicted_depth = outputs.predicted_depth
    return predicted_depth

def post_process_depth(depth, image, predicted_depth_path, display=False):
    # interpolate to original size
    prediction = torch.nn.functional.interpolate(
        depth.unsqueeze(1),
        size=image.size[::-1],
        mode="bicubic",
        align_corners=False,
    )

    # visualize the prediction
    output = prediction.squeeze().cpu().numpy()
    formatted = (output * 255 / np.max(output)).astype("uint8")
    colored_depth = cv2.applyColorMap(formatted, cv2.COLORMAP_INFERNO)[:, :, ::-1]
    depth = Image.fromarray(colored_depth)
    depth.save(predicted_depth_path, format='png')
    if display:
        display_image_small(depth)
    return output, colored_depth, depth


def plot_one_over_z_vs_d(actual_depth, model_output, save_folder, img_name):
    # Remove zero values
    z = actual_depth.flatten()
    d = model_output.flatten()

    non_zero = np.where(z > 0.33)
    z = 1/z[non_zero]
    d = d[non_zero]

    plt.figure()
    # Scatter plot
    plt.scatter(z, d, s=1)
    plt.xlabel("1/z")
    plt.ylabel("d")
   
    # Linear regression
    result = linregress(z, d)
    
    # Line of best fit
    fit_x = np.linspace(np.min(z), np.max(z), 100)
    fit_y = result.slope * fit_x + result.intercept
    # new figure
  
    plt.plot(fit_x, fit_y, '-r', label='Line of best fit, r = {:.3f}'.format(result.rvalue))
    
    # Display R-squared value
    plt.legend()
    plt.title(f"1/z vs d for {img_name}")
    # plt.show( )
    fig = plt.gcf()
    plt.set_size_inches(6, 4)
    plt.savefig(save_folder, dpi=100, bbox_inches='tight')
    plt.close()
    return result.rvalue

def plot_residuals(actual_depth, model_output, img_name, save_folder):
    # Remove zero values
    z = actual_depth.flatten()
    d = model_output.flatten()
    # print(z.shape)
    non_zero_mask = z > 0.33
    # non_zero = np.where(z > 0.33)
    z = 1/z[non_zero_mask]
    d = d[non_zero_mask]

    # Linear regression
    result = linregress(z, d)

    # Calculate residuals
    residuals = d - result.slope * z - result.intercept

    abs_residuals = np.abs(residuals)
    threshold = 3.5 * np.std(abs_residuals)

    # Identify the outliers
    outliers_mask = abs_residuals > threshold
    outliers = np.column_stack((z[outliers_mask], residuals[outliers_mask]))
    combined_mask = combine_masks([non_zero_mask, outliers_mask])
    # print(combined_mask.shape)
    n_clusters = 4
    # Apply k-means clustering with k=3 to the outliers
    # kmeans = KMeans(n_clusters=n_clusters, random_state=0).fit(outliers)
    dbscan = DBSCAN(eps=0.6, min_samples=7).fit(outliers)
    
    # # Scatter plot of residuals
    # plt.figure()
    # plt.scatter(z[outliers_mask], residuals[outliers_mask], s=1)
    # plt.xlabel("1/z")
    # plt.ylabel("Residuals")
    # plt.title(f"Residuals for {img_name}")
    # plt.show()

    # colors = ['red', 'green', 'yellow', 'purple']
    # Plot the outliers
    plt.figure()
    # plt.scatter(z, residuals, color='blue', label='Data points', s=0.5) 
    colors = ['red', 'yellow',  'purple', 'black']
    # colors = [plt.cm.rainbow(i/float(len(set(dbscan.labels_)))) for i in set(dbscan.labels_)]
    print(set(dbscan.labels_))
    clusters_masks = []
    for i in set(dbscan.labels_):
        cluster_mask = dbscan.labels_ == i
        clusters_masks.append(combine_masks([combined_mask, cluster_mask]).reshape(actual_depth.shape))
        plt.scatter(outliers[cluster_mask, 0], outliers[cluster_mask, 1], color=colors[i], label=f'Cluster {i}', s=0.5)
 
    # plt.scatter(z[outliers_mask], residuals[outliers_mask], color='red', label='Outliers', s=0.5)
    plt.xlabel('1/z')
    plt.ylabel('Residuals')
    plt.legend()
    plt.title(f'Outliers for {img_name}')
    # plt.show()
    fig = plt.gcf()
    plt.set_size_inches(6, 4)
    plt.savefig(save_folder, dpi=100, bbox_inches='tight')
    plt.close()
    return clusters_masks
    

def combine_masks(masks)-> np.ndarray:
    m = masks[0].copy()
    for m2 in masks[1:]:
        # Combine masks
        j = 0
        for i in range(len(m)):
            if m[i] == True:
                m[i] = m[i] and m2[j]
                j+=1
    return m

def underwater_depth_model_analysis(model_path, image_name, dataset_name, raw_image_path, actual_depth_path, results_path, img_dim=(664,443), save_ground_truth=False, display=False):
    # initialise model
    plot_path = f'{results_path}/plot/{image_name}.PNG'
    result_ground_truth_image_path = f'Datasets/{dataset_name}/depth_png/{image_name}.png'
    predicted_depth_path = f'{results_path}/predicted_depth/{image_name}.png'
    outlier_cluster_path = f'{results_path}/outlier_cluster/{image_name}.PNG'
    outlier_image_path = f'{results_path}/outlier_image/{image_name}.PNG'
    
    image_processor = AutoImageProcessor.from_pretrained(model_path)
    model = AutoModelForDepthEstimation.from_pretrained(model_path)
    raw_image, actual_depth = open_image(path=raw_image_path, depth_path=actual_depth_path, 
                                         result_ground_truth_image_path=result_ground_truth_image_path, 
                                         img_dim=img_dim, save_ground_truth=save_ground_truth, display=display)
    inputs = preprocess_image(image_processor=image_processor, image=raw_image)
    predicted_depth = predict_depth(model=model, inputs=inputs)
    model_output, formatted, depth_im = post_process_depth(depth=predicted_depth, image=raw_image, predicted_depth_path=predicted_depth_path, display=display)
    rvalue = plot_one_over_z_vs_d(actual_depth, model_output, plot_path, image_name)
    outlier_cluster_masks = plot_residuals(actual_depth, model_output, image_name)
    display_outliers_on_image(np.array(raw_image), outlier_cluster_masks, image_name, outlier_image_path)
    return rvalue

def display_outliers_on_image(image, outlier_cluster_masks, img_name, save_folder):
    # change the colours of the pixels in the image
    # red, yellow, pink, black
    colors = [(255, 0, 0), (255, 255, 0), (254,1,154), (0, 0, 0)]
    for i, mask in enumerate(outlier_cluster_masks):
        image[mask] = colors[i]
    img = Image.fromarray(image)
    img.save(save_folder, format='png')




img_name = "T_S03047"
raw = f"D1/Raw/{img_name}.ARW"
actual_depth_path = f"D1/depth/depth{img_name}.tif"


## Random images analysis

In [None]:
random_images = {'D1': ['T_S03376.ARW',
  'T_S03386.ARW',
  'T_S03305.ARW',
  'T_S03515.ARW',
  'T_S03175.ARW',
  'T_S03116.ARW',
  'T_S03338.ARW',
  'T_S03265.ARW',
  'T_S03330.ARW',
  'T_S03291.ARW'],
 'D3': ['T_S04900.ARW',
  'T_S04857.ARW',
  'T_S04871.ARW',
  'T_S04870.ARW',
  'T_S04923.ARW',
  'T_S04874.ARW',
  'T_S04866.ARW',
  'T_S04904.ARW',
  'T_S04876.ARW',
  'T_S04890.ARW'],
 'D5': ['LFT_3384.NEF',
  'LFT_3381.NEF',
  'LFT_3396.NEF',
  'LFT_3392.NEF',
  'LFT_3375.NEF',
  'LFT_3414.NEF',
  'LFT_3388.NEF',
  'LFT_3385.NEF',
  'LFT_3380.NEF',
  'LFT_3412.NEF']}

In [None]:
raw_image_count = 0
datasets_dir = 'Datasets'
results_dir = 'Results'
default_image_dim = (576,384)
models = {"model_dpt3_1": "Intel/dpt-swinv2-large-384", "model_depth_anything": "nielsr/depth-anything-small"}
# models = {"model_depth_anything": "nielsr/depth-anything-small"}
mean_r_values = {}
save_ground_truth = True
nsample_images = 10
# selected_raw_images = {}
selected_raw_images = random_images
for model_name, model_path in models.items():
    results_paths = {}
    for dataset_dir in os.listdir(datasets_dir):
        dataset_path = datasets_dir + '/' + dataset_dir
        dataset_name = os.path.basename(dataset_path)
        if dataset_name == 'D1':
            continue
        results_paths[dataset_name]= results_dir + '/'+ model_name + '/' + dataset_name
        if not os.path.exists(results_paths[dataset_name]):
            os.makedirs(results_paths[dataset_name]+ '/plot')
            os.makedirs(results_paths[dataset_name]+ '/predicted_depth')
        raw_images = dataset_path + '/Raw'
        depth_images = dataset_path + '/depth'
        if os.path.isdir(raw_images):
            mean_r_values[(model_name, dataset_name)] = 0
            if dataset_name not in selected_raw_images.keys():  
                all_raw_images = os.listdir(raw_images)
                selected_raw_images[dataset_name] = random.sample(all_raw_images, nsample_images)
            for raw_image in selected_raw_images[dataset_name]:
                raw_image_path = raw_images + '/' + raw_image
                raw_image_name = raw_image[:-4]
                depth_image_path = depth_images + '/depth' + raw_image_name + '.tif'

                r_value = underwater_depth_model_analysis(model_path=model_path, image_name=raw_image_name, 
                                                          dataset_name=dataset_name, raw_image_path=raw_image_path, 
                                                          actual_depth_path=depth_image_path, results_path=results_paths[dataset_name], 
                                                          img_dim=default_image_dim, save_ground_truth=save_ground_truth, display=False)
                mean_r_values[(model_name, dataset_name)] += r_value
                # print(depth_image_path)
                # underwater_depth_model_analysis(dataset_name, model_dinov2, raw_image_path, depth_image_path)
                raw_image_count += 1
                if raw_image_count % 5 == 0:
                    print(f"Processed {raw_image_count} images")
            mean_r_values[(model_name, dataset_name)] /= nsample_images
                
    save_ground_truth = False
    


In [None]:
print(mean_r_values)

## residual values analysis

In [None]:
D3_image = 'T_S04857.ARW'
D5_image = 'LFT_3396.NEF'
default_image_dim = (576,384)
image = D5_image
dataset_name = 'D5'
image_name = image[:-4]
raw_image_path = f'Datasets/{dataset_name}/Raw/{image}'
depth_image_path = f'Datasets/{dataset_name}/depth/depth{image_name}.tif'
model_path = "nielsr/depth-anything-small"

results_path = 'Results/model_depth_anything/{dataset_name}'
save_ground_truth = False

r_value = underwater_depth_model_analysis(model_path=model_path, image_name=image_name, 
                                                          dataset_name=dataset_name, raw_image_path=raw_image_path, 
                                                          actual_depth_path=depth_image_path, results_path=results_path, 
                                                          img_dim=default_image_dim, save_ground_truth=save_ground_truth, display=False)


In [None]:
D1_image = 'T_S03386.ARW'
D3_image = 'T_S04874.ARW'
D5_image = 'LFT_3414.NEF'
default_image_dim = (576,384)
image = D1_image
dataset_name = 'D1'
image_name = image[:-4]
raw_image_path = f'Datasets/{dataset_name}/Raw/{image}'
depth_image_path = f'Datasets/{dataset_name}/depth/depth{image_name}.tif'
model_path = "nielsr/depth-anything-small"
with rawpy.imread(raw_image_path) as raw_image:
        rgb = raw_image.postprocess()
        image = Image.fromarray(rgb)
        display(image)
results_path = 'Results/model_depth_anything/{dataset_name}'
save_ground_truth = False

r_value = underwater_depth_model_analysis(model_path=model_path, image_name=image_name, 
                                                          dataset_name=dataset_name, raw_image_path=raw_image_path, 
                                                          actual_depth_path=depth_image_path, results_path=results_path, 
                                                          img_dim=default_image_dim, save_ground_truth=save_ground_truth, display=False)
