# Libraries

In [32]:
import cv2
import matplotlib.pyplot as plt
import numpy as np
import os
import subprocess
from pathlib import Path
import platform
import torch
import torch.nn.functional as F
from torchvision.transforms import Compose
from pathlib import Path
import openvino as ov
from ipywidgets import widgets

# Setup - for Depth Anything Library

In [33]:
# Setup - for Depth Anything Library
repo_path = r"D:\project\ISRO_final\Test tube detection\Depth-Anything"

repo_dir = Path(repo_path)

if not repo_dir.exists():
    subprocess.run(["git", "clone", "https://github.com/LiheYoung/Depth-Anything"], check=True)
os.chdir(repo_dir)

subprocess.run(["pip", "install", "-q", "openvino>=2023.3.0", "datasets>=2.14.6", "nncf"], check=True)



# setup_attention_file():
atten_path = r"D:\project\ISRO_final\Test tube detection\Depth-Anything\torchhub\facebookresearch_dinov2_main\dinov2\layers\attention.py"
attention_file_path = Path(atten_path)
orig_attention_path = attention_file_path.parent / ("orig_" + attention_file_path.name)

if not orig_attention_path.exists():
    attention_file_path.rename(orig_attention_path)

    with orig_attention_path.open("r") as f:
        data = f.read()
        data = data.replace("XFORMERS_AVAILABLE = True", "XFORMERS_AVAILABLE = False")
        with attention_file_path.open("w") as out_f:
            out_f.write(data)

# Histogram Equalization

In [34]:
import cv2

def hist_equal(image):
    lab_img = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
    l, a, b = cv2.split(lab_img)
    
    clahe = cv2.createCLAHE(clipLimit=3.5, tileGridSize=(8, 8))
    clahe_img = clahe.apply(l)

    clahe_upd = cv2.merge((clahe_img, a, b))
    clahe_bgr = cv2.cvtColor(clahe_upd, cv2.COLOR_LAB2BGR)

    return clahe_bgr

# HSV based detection 

In [35]:
def hsv_detect(image):
    """Detects red color regions in an image using HSV color space."""
    img_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Convert to HSV color space
    hsv_img = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2HSV)

    # Define red color range
    low_red = np.array([0, 150, 150])
    high_red = np.array([10, 255, 255])

    # Create a mask for red color
    mask = cv2.inRange(hsv_img, low_red, high_red)

    # Apply morphological operations for noise reduction (optional)
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (4, 4))
    dilation = cv2.dilate(mask, kernel)
    img_median_mask = cv2.medianBlur(dilation, 9)
    mask = img_median_mask

    # Apply the mask to the original image
    result = cv2.bitwise_and(img_rgb, img_rgb, mask=mask)

    return result, mask

# Frame Stabilizer

In [36]:
def stabilize_frame(frame):
    class Tracker:
        def __init__(self):
            self.trackedFeatures = []
            self.prevGray = None
            self.freshStart = True
            self.rigidTransform = np.eye(3, dtype=np.float32)  # Affine 2x3 in a 3x3 matrix

        def process_image(self, img):
            gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

            if self.freshStart:
                corners = cv2.goodFeaturesToTrack(gray, 300, 0.01, 10)
                if corners is not None:
                    corners = corners.reshape(-1, 2)
                    self.trackedFeatures.extend(corners)
                self.freshStart = False

            if self.prevGray is not None:
                status = np.empty((len(self.trackedFeatures), 1), dtype=np.uint8)
                errors = np.empty_like(status)
                corners, status, _ = cv2.calcOpticalFlowPyrLK(self.prevGray, gray, np.array(self.trackedFeatures),
                                                               None, status, errors, winSize=(10, 10))

                if len(status) - np.count_nonzero(status) > 0.2 * len(status):
                    self.rigidTransform = np.eye(3, dtype=np.float32)
                    self.trackedFeatures.clear()
                    self.prevGray = None
                    self.freshStart = True
                    return
                else:
                    self.freshStart = False

                new_rigid_transform = cv2.estimateAffinePartial2D(np.array(self.trackedFeatures), corners)[0]
                if new_rigid_transform is not None:
                    new_rigid_transform = np.vstack([new_rigid_transform, [0, 0, 1]])
                    self.rigidTransform = np.dot(new_rigid_transform, self.rigidTransform)

                self.trackedFeatures = [corner for corner, stat in zip(corners, status) if stat]

            self.prevGray = gray.copy()

        def stabilize_frame(self, frame):
            self.process_image(frame)

            inv_trans = np.linalg.inv(self.rigidTransform)[:2]
            stabilized_frame = cv2.warpAffine(frame, inv_trans, (frame.shape[1], frame.shape[0]))

            return stabilized_frame

    tracker = Tracker()
    stabilized_frame = tracker.stabilize_frame(frame)
    return stabilized_frame

# Moment Based Orientation 

In [37]:
def calculate_orientation(frame_or_mask):
    # Convert the input frame or mask to grayscale
    gray = cv2.cvtColor(frame_or_mask, cv2.COLOR_BGR2GRAY) if len(frame_or_mask.shape) == 3 else frame_or_mask

    # Calculate moments
    moments = cv2.moments(gray)

    # Calculate area
    area = moments['m00']

    # Calculate centroid
    centroid_x = int(moments['m10'] / moments['m00']) if area != 0 else 0
    centroid_y = int(moments['m01'] / moments['m00']) if area != 0 else 0

    # Calculate orientation
    mu20 = moments['mu20']
    mu02 = moments['mu02']
    mu11 = moments['mu11']

    orientation_rad = 0.5 * np.arctan2(2 * mu11, mu20 - mu02)
    orientation_deg = np.degrees(orientation_rad)

    return area, (centroid_x, centroid_y), orientation_deg

# Depth Anything

In [38]:
from depth_anything.dpt import DepthAnything
from depth_anything.util.transform import Resize, NormalizeImage, PrepareForNet

def initialize_model(encoder='vit'):
    model_id = f'depth_anything_{encoder}14'
    depth_anything = DepthAnything.from_pretrained(f'LiheYoung/{model_id}')
    return depth_anything

In [39]:
def compile_model(model_id, depth_anything, example_input):
    OV_DEPTH_ANYTHING_PATH = Path(f'{model_id}.xml') 
    
    if not OV_DEPTH_ANYTHING_PATH.exists():
        ov_model = ov.convert_model(depth_anything, example_input=example_input, input=[1, 3, 518, 518])
        ov.save_model(ov_model, OV_DEPTH_ANYTHING_PATH)
        
    core = ov.Core()
    
    device = widgets.Dropdown(
        options=core.available_devices + ["AUTO"],
        value="AUTO",
        description="Device:",
        disabled=False,
    )

    compiled_model = core.compile_model(OV_DEPTH_ANYTHING_PATH, device.value)
    return compiled_model

In [40]:
def get_transform():
    transform = Compose([
        Resize(
            width=518,
            height=518,
            resize_target=False,
            ensure_multiple_of=14,
            resize_method='lower_bound',
            image_interpolation_method=cv2.INTER_CUBIC,
        ),
        NormalizeImage(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        PrepareForNet(),
    ])
    return transform

In [41]:
def normalize_minmax(data):
    """Normalizes the values in `data` between 0 and 1"""
    return (data - data.min()) / (data.max() - data.min())

In [42]:
def convert_result_to_image(result, colormap="viridis"):
    """
    Convert network result of floating point numbers to an RGB image with
    integer values from 0-255 by applying a colormap.

    `result` is expected to be a single network result in 1,H,W shape
    `colormap` is a matplotlib colormap.
    See https://matplotlib.org/stable/tutorials/colors/colormaps.html
    """
    result = result.squeeze(0)
    result = normalize_minmax(result)
    result = result * 255
    result = result.astype(np.uint8)
    depth_map = cv2.applyColorMap(result, cv2.COLORMAP_INFERNO)[:, :, ::-1]
    return depth_map, result

In [43]:
def to_rgb(image_data):
    """
    Convert image_data from BGR to RGB
    """
    return cv2.cvtColor(image_data, cv2.COLOR_BGR2RGB)

In [44]:
def depth_anything_model():
    encoder = 'vits'
    model_id = f'depth_anything_{encoder}14'
    depth_anything = initialize_model(encoder)
    example_input = np.random.rand(1, 3, 518, 518).astype(np.float32)
    compiled_model = compile_model(model_id, depth_anything, example_input)
    
    return compiled_model

# Main Code

In [46]:
async def main():
    # Open the video file or capture device
    video_path = 0
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        raise ValueError("The video cannot be opened.")

    transform = get_transform()

    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Desired dimensions for each frame
            display_h, display_w = 200, 400

            # Resize frame to the display dimensions
            frame_resized = cv2.resize(frame, (display_w, display_h))

            # Stabilize frame
            stable_frame = stabilize_frame(frame_resized)

            # Histogram Equalization
            histframe = hist_equal(stable_frame)

            # Run HSV detection asynchronously
            masked_img, mask = hsv_detect(frame_resized)
            
            # Convert results to BGR for display
            result_bgr = cv2.cvtColor(masked_img, cv2.COLOR_RGB2BGR)
            mask_bgr = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)

            # Run depth model inference asynchronously
            compiled_model = depth_anything_model()
            input_image = cv2.cvtColor(histframe, cv2.COLOR_BGR2RGB) / 255.0
            input_image = transform({'image': input_image})['image']
            input_image = np.expand_dims(input_image, 0)
            result = compiled_model([input_image])[0]
            
            # Convert network result to RGB image
            result_frame, pixel_depth = convert_result_to_image(result)

            # Resize result frame to match the display dimensions
            result_frame_resized = cv2.resize(result_frame, (display_w, display_h))

            #cv2.cvtColor(result_frame_resized, )
            result_xyz = cv2.bitwise_and(result_frame_resized, result_frame_resized, mask=mask)

            # Ensure all images have the same size
            mask_bgr = cv2.resize(mask_bgr, (display_w, display_h))
            result_bgr = cv2.resize(result_bgr, (display_w, display_h))
            result_xyz = cv2.resize(result_xyz, (display_w, display_h))

            # Stack images for better visualization
            h_stack1 = np.hstack((frame_resized, mask_bgr))
            h_stack2 = np.hstack((result_bgr, result_xyz))

            # Combine both stacks vertically
            final_display = np.vstack((h_stack1, h_stack2))

            # Display the stacked images in a single window
            cv2.imshow("Combined Display", final_display)
            
            # Exit on 'q' key press
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

    except KeyboardInterrupt:
        print("Processing interrupted.")
    finally:
        # Release resources
        cap.release()
        cv2.destroyAllWindows()

In [23]:
# Run the main coroutine
await main()

# Trial Code

In [21]:
'''async def compile_model_async(model_id, depth_anything, example_input):
    OV_DEPTH_ANYTHING_PATH = Path(f'{model_id}.xml') 
    
    if not OV_DEPTH_ANYTHING_PATH.exists():
        ov_model = ov.convert_model(depth_anything, example_input=example_input, input=[1, 3, 518, 518])
        ov.save_model(ov_model, OV_DEPTH_ANYTHING_PATH)
        
    core = ov.Core()
    
    device = widgets.Dropdown(
        options=core.available_devices + ["AUTO"],
        value="AUTO",
        description="Device:",
        disabled=False,
    )

    compiled_model = core.compile_model(OV_DEPTH_ANYTHING_PATH, device.value)
    return compiled_model

    

async def depth_anything_model():
    encoder = 'vits'
    model_id = f'depth_anything_{encoder}14'
    depth_anything = initialize_model(encoder)
    example_input = np.random.rand(1, 3, 518, 518).astype(np.float32)
    compiled_model = await compile_model_async(model_id, depth_anything, example_input)
    
    return compiled_model

    

async def main():
    # Open the video file or capture device
    video_path = 0
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        raise ValueError("The video cannot be opened.")

    transform = get_transform()

    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Desired dimensions for each frame
            display_h, display_w = 200, 400

            # Resize frame to the display dimensions
            frame_resized = cv2.resize(frame, (display_w, display_h))

            # Stabilize frame
            stable_frame = stabilize_frame(frame_resized)

            # Histogram Equalization
            histframe = hist_equal(stable_frame)

            # Run HSV detection asynchronously
            masked_img, mask = hsv_detect(frame_resized)
            
            # Convert results to BGR for display
            result_bgr = cv2.cvtColor(masked_img, cv2.COLOR_RGB2BGR)
            mask_bgr = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)

            # Run depth model inference asynchronously
            compiled_model = await depth_anything_model()
            input_image = cv2.cvtColor(histframe, cv2.COLOR_BGR2RGB) / 255.0
            input_image = transform({'image': input_image})['image']
            input_image = np.expand_dims(input_image, 0)
            result = compiled_model([input_image])[0]
            
            # Convert network result to RGB image
            result_frame, pixel_depth = convert_result_to_image(result)

            # Resize result frame to match the display dimensions
            result_frame_resized = cv2.resize(result_frame, (display_w, display_h))

            #cv2.cvtColor(result_frame_resized, )
            result_xyz = cv2.bitwise_and(result_frame_resized, result_frame_resized, mask=mask)

            # Display the resulting frames
            cv2.imshow('Original Frame', frame_resized)
            cv2.imshow('Mask', mask_bgr)
            cv2.imshow('Masked Image', result_bgr)
            #cv2.imshow('Depth Image', result_frame_resized)
            cv2.imshow('Depth Image', result_xyz)

            # Exit on 'q' key press
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

    except KeyboardInterrupt:
        print("Processing interrupted.")
    finally:
        # Release resources
        cap.release()
        cv2.destroyAllWindows()

# Run the main coroutine
await main()'''

'async def compile_model_async(model_id, depth_anything, example_input):\n    OV_DEPTH_ANYTHING_PATH = Path(f\'{model_id}.xml\') \n    \n    if not OV_DEPTH_ANYTHING_PATH.exists():\n        ov_model = ov.convert_model(depth_anything, example_input=example_input, input=[1, 3, 518, 518])\n        ov.save_model(ov_model, OV_DEPTH_ANYTHING_PATH)\n        \n    core = ov.Core()\n    \n    device = widgets.Dropdown(\n        options=core.available_devices + ["AUTO"],\n        value="AUTO",\n        description="Device:",\n        disabled=False,\n    )\n\n    compiled_model = core.compile_model(OV_DEPTH_ANYTHING_PATH, device.value)\n    return compiled_model\n\nasync def depth_anything_model():\n    encoder = \'vits\'\n    model_id = f\'depth_anything_{encoder}14\'\n    depth_anything = initialize_model(encoder)\n    example_input = np.random.rand(1, 3, 518, 518).astype(np.float32)\n    compiled_model = await compile_model_async(model_id, depth_anything, example_input)\n    \n    return comp

In [18]:
'''def hsv_detect(image):

    img_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    
    # Convert the image to HSV
    hsv_img = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2HSV)

    # Red Color range
    low_red = np.array([0, 150, 150])
    high_red = np.array([10, 255, 255])

    # Create a mask for the red color
    mask = cv2.inRange(hsv_img, low_red, high_red)

    # Morphological transform
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (4, 4))
    dilation = cv2.dilate(mask,kernel)

    # Binarization
    img_median_mask = cv2.medianBlur(dilation, 9)

    gray_img = cv2.cvtColor(visualization_image, cv2.COLOR_RGB2GRAY)
    otsu_value , result_img = cv2.threshold(gray_img,0,255, cv2.THRESH_TOZERO + cv2.THRESH_OTSU)

    # Apply the mask to the original image
    result = cv2.bitwise_and(img_rgb, img_rgb, mask=img_median_mask)

    return result, img_median_mask '''

'def hsv_detect(image):\n\n    img_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)\n    \n    # Convert the image to HSV\n    hsv_img = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2HSV)\n\n    # Red Color range\n    low_red = np.array([0, 150, 150])\n    high_red = np.array([10, 255, 255])\n\n    # Create a mask for the red color\n    mask = cv2.inRange(hsv_img, low_red, high_red)\n\n    # Morphological transform\n    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (4, 4))\n    dilation = cv2.dilate(mask,kernel)\n\n    # Binarization\n    img_median_mask = cv2.medianBlur(dilation, 9)\n\n    gray_img = cv2.cvtColor(visualization_image, cv2.COLOR_RGB2GRAY)\n    otsu_value , result_img = cv2.threshold(gray_img,0,255, cv2.THRESH_TOZERO + cv2.THRESH_OTSU)\n\n    # Apply the mask to the original image\n    result = cv2.bitwise_and(img_rgb, img_rgb, mask=img_median_mask)\n\n    return result, img_median_mask '

In [20]:
'''
def hist_equal(image):
    
    lab_img = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
    l, a, b = cv2.split(lab_img)
    
    clahe = cv2.createCLAHE(clipLimit=3.5, tileGridSize=(8,8))
    clahe_img = clahe.apply(l)

    clahe_upd = cv2.merge((clahe_img,a,b))
    clahe = cv2.cvtColor(clahe_upd, cv2.COLOR_LAB2BGR)

    final_img = cv2.cvtColor(clahe, cv2.COLOR_BGR2RGB)

    return final_img '''

'\ndef hist_equal(image):\n    \n    lab_img = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)\n    l, a, b = cv2.split(lab_img)\n    \n    clahe = cv2.createCLAHE(clipLimit=3.5, tileGridSize=(8,8))\n    clahe_img = clahe.apply(l)\n\n    clahe_upd = cv2.merge((clahe_img,a,b))\n    clahe = cv2.cvtColor(clahe_upd, cv2.COLOR_LAB2BGR)\n\n    final_img = cv2.cvtColor(clahe, cv2.COLOR_BGR2RGB)\n\n    return final_img '