In [39]:

import pandas as pd
import numpy as np
import os

import cv2

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

from tqdm import tqdm
import matplotlib.pyplot as plt


In [40]:

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


# Preprocessing_functions

In [41]:
import cv2
import numpy as np
import torch

import cv2
import numpy as np

def overlay_mask_on_frame(original_frame, mask, alpha=0.5):
    if len(mask.shape) == 2:
        mask = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)  
    if len(original_frame.shape) == 2:
        original_frame = cv2.cvtColor(original_frame, cv2.COLOR_GRAY2BGR)  

    mask_resized = cv2.resize(mask, (original_frame.shape[1], original_frame.shape[0]))

    overlayed_image = cv2.addWeighted(original_frame, 1 - alpha, mask_resized, alpha, 0)

    return overlayed_image


def apply_clahe(gray_frame):
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    return clahe.apply(gray_frame)


def preprocess(frame, size=(224, 224)):
    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    resized = cv2.resize(frame_gray, size)
    enhanced = apply_clahe(resized)
    tensor = torch.from_numpy(enhanced).float().unsqueeze(0) / 255.0
    return tensor.unsqueeze(0)



# ****Segmenetation

In [42]:

class ConvBlock(nn.Module):

    def __init__(self, in_channels, out_channels):
        super(ConvBlock, self).__init__()

        # number of input channels is a number of filters in the previous layer
        # number of output channels is a number of filters in the current layer
        # "same" convolutions
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=True),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=True),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        x = self.conv(x)
        return x


class UpConv(nn.Module):

    def __init__(self, in_channels, out_channels):
        super(UpConv, self).__init__()

        self.up = nn.Sequential(
            nn.Upsample(scale_factor=2),
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=True),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        x = self.up(x)
        return x


class AttentionBlock(nn.Module):
    """Attention block with learnable parameters"""

    def __init__(self, F_g, F_l, n_coefficients):
        """
        :param F_g: number of feature maps (channels) in previous layer
        :param F_l: number of feature maps in corresponding encoder layer, transferred via skip connection
        :param n_coefficients: number of learnable multi-dimensional attention coefficients
        """
        super(AttentionBlock, self).__init__()

        self.W_gate = nn.Sequential(
            nn.Conv2d(F_g, n_coefficients, kernel_size=1, stride=1, padding=0, bias=True),
            nn.BatchNorm2d(n_coefficients)
        )

        self.W_x = nn.Sequential(
            nn.Conv2d(F_l, n_coefficients, kernel_size=1, stride=1, padding=0, bias=True),
            nn.BatchNorm2d(n_coefficients)
        )

        self.psi = nn.Sequential(
            nn.Conv2d(n_coefficients, 1, kernel_size=1, stride=1, padding=0, bias=True),
            nn.BatchNorm2d(1),
            nn.Sigmoid()
        )

        self.relu = nn.ReLU(inplace=True)

    def forward(self, gate, skip_connection):
        """
        :param gate: gating signal from previous layer
        :param skip_connection: activation from corresponding encoder layer
        :return: output activations
        """
        g1 = self.W_gate(gate)
        x1 = self.W_x(skip_connection)
        psi = self.relu(g1 + x1)
        psi = self.psi(psi)
        out = skip_connection * psi
        return out


class AttentionUNet(nn.Module):

    def __init__(self, img_ch=1, output_ch=1):  # Changed img_ch to 1
        super(AttentionUNet, self).__init__()

        self.MaxPool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.Conv1 = ConvBlock(img_ch, 64)
        self.Conv2 = ConvBlock(64, 128)
        self.Conv3 = ConvBlock(128, 256)
        self.Conv4 = ConvBlock(256, 512)
        self.Conv5 = ConvBlock(512, 1024)

        self.Up5 = UpConv(1024, 512)
        self.Att5 = AttentionBlock(F_g=512, F_l=512, n_coefficients=256)
        self.UpConv5 = ConvBlock(1024, 512)

        self.Up4 = UpConv(512, 256)
        self.Att4 = AttentionBlock(F_g=256, F_l=256, n_coefficients=128)
        self.UpConv4 = ConvBlock(512, 256)

        self.Up3 = UpConv(256, 128)
        self.Att3 = AttentionBlock(F_g=128, F_l=128, n_coefficients=64)
        self.UpConv3 = ConvBlock(256, 128)

        self.Up2 = UpConv(128, 64)
        self.Att2 = AttentionBlock(F_g=64, F_l=64, n_coefficients=32)
        self.UpConv2 = ConvBlock(128, 64)

        self.Conv = nn.Conv2d(64, output_ch, kernel_size=1, stride=1, padding=0)

    def forward(self, x):
        e1 = self.Conv1(x)

        e2 = self.MaxPool(e1)
        e2 = self.Conv2(e2)

        e3 = self.MaxPool(e2)
        e3 = self.Conv3(e3)

        e4 = self.MaxPool(e3)
        e4 = self.Conv4(e4)

        e5 = self.MaxPool(e4)
        e5 = self.Conv5(e5)

        d5 = self.Up5(e5)
        s4 = self.Att5(gate=d5, skip_connection=e4)
        d5 = torch.cat((s4, d5), dim=1)
        d5 = self.UpConv5(d5)

        d4 = self.Up4(d5)
        s3 = self.Att4(gate=d4, skip_connection=e3)
        d4 = torch.cat((s3, d4), dim=1)
        d4 = self.UpConv4(d4)

        d3 = self.Up3(d4)
        s2 = self.Att3(gate=d3, skip_connection=e2)
        d3 = torch.cat((s2, d3), dim=1)
        d3 = self.UpConv3(d3)

        d2 = self.Up2(d3)
        s1 = self.Att2(gate=d2, skip_connection=e1)
        d2 = torch.cat((s1, d2), dim=1)
        d2 = self.UpConv2(d2)

        out = self.Conv(d2)
        return out


In [43]:

# Import the model class (ensure the model code is in scope or imported)
segmentation_model = AttentionUNet(img_ch=1, output_ch=1)



weights_path = '/kaggle/input/seg2/other/default/1/checkpoint_epoch_30.pth'  # Update with correct path

state_dict = torch.load(weights_path, map_location=device)

segmentation_model.load_state_dict(state_dict)
segmentation_model = segmentation_model.to(device)

# Set to evaluation mode
segmentation_model.eval()

print("Segmentation model weights loaded successfully.")



Segmentation model weights loaded successfully.


  state_dict = torch.load(weights_path, map_location=device)


In [44]:

def extract_mask_matrix_from_video(video_path, model, device, frame_size=(224, 224)):
    cap = cv2.VideoCapture(video_path)
    mask_list = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_clahe = apply_clahe(frame)

        input_tensor = preprocess_frame_for_model(frame_clahe, size=frame_size).to(device)
        with torch.no_grad():
            pred_mask = model(input_tensor)  # Shape: [1, 1, H, W]
        binary_mask = (pred_mask.squeeze().cpu().numpy() > 0.5).astype(np.uint8)  # Shape: [H, W]
        binary_mask=overlay_mask_on_frame(frame, binary_mask)
        mask_list.append(binary_mask)

    cap.release()
    mask_matrix = np.stack(mask_list, axis=0)  
    return mask_matrix


# Feature extraction Helper functions

In [45]:
def traverse_left(colour, mask):
    coord = []
    for i in range(mask.shape[0]):
        for j in range(mask.shape[1]):
            if np.array_equal(mask[i][j], colour):
                coord.append([i, j])
                break
    return coord

def traverse_right(colour, mask):
    coord = []
    for i in range(mask.shape[0]):
        for j in range(mask.shape[1]-1, -1, -1):
            if np.array_equal(mask[i][j], colour):
                coord.append([i, j])
                break
    return coord

def traverse_up(colour, mask):
    coord = []
    for j in range(mask.shape[1]):
        for i in range(mask.shape[0]):
            if np.array_equal(mask[i][j], colour):
                coord.append([i, j])
                break
    return coord


In [46]:
import cv2
import numpy as np
import matplotlib.pyplot as plt

def color_segments_with_skip(coords_sorted, color_mask, colors):
    n_segments = len(colors)
    total_length = 0
    distances = [0]

    for i in range(1, len(coords_sorted)):
        prev = coords_sorted[i - 1]
        curr = coords_sorted[i]
        total_length += np.linalg.norm(curr - prev)
        distances.append(total_length)

    skip_len = 1.5 * total_length / 7
    usable_len = total_length - skip_len
    segment_len = usable_len / n_segments

    start_idx = 0
    while start_idx < len(distances) and distances[start_idx] < skip_len:
        start_idx += 1

    segments = [[] for _ in range(n_segments)]
    current_length = distances[start_idx]
    i = start_idx
    while i < len(distances) and current_length < total_length:
        segment_index = int((current_length - skip_len) // segment_len)
        if segment_index < n_segments:
            segments[segment_index].append(coords_sorted[i])
        current_length = distances[i]
        i += 1

    for seg_id, segment in enumerate(segments):
        for x, y in segment:
            color_mask[y, x] = colors[seg_id]

def get_segment_midpoints(color_mask, segment_colors):
    midpoints = []

    for color in segment_colors:
        color_arr = np.array(color, dtype=np.uint8)
        color_pixels = np.all(color_mask == color_arr, axis=-1)

        ys, xs = np.where(color_pixels)
        if len(xs) == 0 or len(ys) == 0:
            midpoints.append(None)
            continue

        x_mean = int(np.mean(xs))
        y_mean = int(np.mean(ys))
        midpoints.append((x_mean, y_mean))

    return midpoints

def get_segment_areas(color_mask, segment_colors):
    areas = []

    for color in segment_colors:
        color_arr = np.array(color, dtype=np.uint8)
        color_pixels = np.all(color_mask == color_arr, axis=-1)

        ys, xs = np.where(color_pixels)
        area = len(xs)  # Area is the number of pixels of this color
        areas.append(area)

    return areas


def extract_colored_inner_lining(color_mask):
    left_colors = [
        [255, 0, 0],     
        [0, 255, 0],     # Green
        [0, 0, 255],     # Red
    ]
    right_colors = [
        [255, 255, 0],  
        [255, 0, 255],   
        [0, 255, 255],  
    ]

    inner_lining_mask = np.zeros_like(color_mask)

    for i in left_colors:
        coord=traverse_right(i, color_mask)
        for x, y in coord:
            inner_lining_mask[x, y] = i

    for i in right_colors:
        coord=traverse_left(i, color_mask)
        for x, y in coord:
            inner_lining_mask[x, y] = i
    return inner_lining_mask

def color_u_mask(mask):
    ys, xs = np.where(mask == 255)
    coords = np.column_stack((xs, ys))

    topmost_index = np.argmin(ys)
    x_center = xs[topmost_index]

    left_arm = coords[coords[:, 0] < x_center]
    right_arm = coords[coords[:, 0] > x_center]

    left_sorted = left_arm[np.argsort(left_arm[:, 1])]
    right_sorted = right_arm[np.argsort(right_arm[:, 1])]

    left_colors = [
        [255, 0, 0],     # Blue
        [0, 255, 0],     # Green
        [0, 0, 255],     # Red
    ]
    right_colors = [
        [255, 255, 0],   # Cyan
        [255, 0, 255],   # Magenta
        [0, 255, 255],   # Yellow
    ]

    color_mask = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)

    color_segments_with_skip(left_sorted, color_mask, left_colors)
    color_segments_with_skip(right_sorted, color_mask, right_colors)


    
   
    return color_mask



# Feature Extraction

In [47]:
def get_segment_points(color_mask, segment_colors):
    segment_points = []

    for color in segment_colors:
        print(color)
        for i in range(color_mask.shape[0]):
            for j in range(color_mask.shape[1]):
                if np.array_equal(color_mask[i][j], color):
                    segment_points.append([i, j])
                    
        
    return segment_points

def Feature_extraction(video_path):
    segment_colors = [
        [255, 0, 0],     # Blue    - Left segment 1
        [0, 255, 0],     # Green   - Left segment 2
        [0, 0, 255],     # Red     - Left segment 3
        [255, 255, 0],   # Cyan    - Right segment 1
        [255, 0, 255],   # Magenta - Right segment 2
        [0, 255, 255],   # Yellow  - Right segment 3
    ]


    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError(f"Could not open video file: {video_path}")

    frames = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)

    cap.release()

    if not frames:
        raise ValueError("No frames were read from the video.")

    video_matrix = np.stack(frames, axis=0)
    features_area=[]
    features_boundary=[]
    features_center=[]

    area_ref=[]
    for frame in video_matrix:
        preprocessed = preprocess(frame)  
        mask = segmentation_model(preprocessed) 
        plot_image(mask)
        # mask_np = mask.squeeze().numpy().astype(np.uint8)  
        mask_np = mask.squeeze().detach().cpu().numpy().astype(np.uint8)

    
        color_mask = color_u_mask(mask_np)  
    
        centers = get_segment_points(color_mask, segment_colors)
        print(centers)
        break
        # areas = get_segment_pixels(color_mask, segment_colors)
        # boundary = get_inner_boundary(color_mask, segment_colors)
    
        # features_center.append(centers)
        # features_area.append(areas)
        # features_boundary.append(boundary)

    print(video_matrix.shape)
    return video_matrix


In [48]:
# video_path = "/kaggle/input/medimgproject/HMC-QU Dataset-Kaggle/HMC-QU/A4C/ES0001 _4CH_1.avi"
# video_matrix = Feature_extraction(video_path)

# print("Video shape:", video_matrix.shape)  # (num_frames, height, width, 3)

In [49]:
import matplotlib.pyplot as plt

def plot_image(image):
    # If the image is in grayscale (single channel), it will be a 2D array
    if len(image.shape) == 2:
        plt.imshow(image, cmap='gray')
    else:
        plt.imshow(image)  # Color image (3 channels)

    plt.axis('off')  # Turn off axis labels
    plt.show()


In [50]:
import cv2
import numpy as np
import torch
import matplotlib.pyplot as plt

def plot_image(image):
    # Ensure that the image is a NumPy array
    # if isinstance(image, torch.Tensor):
    #     image = image.detach().cpu().numpy()
    if frame.ndim == 3 and frame.shape[2] == 1:
        frame = frame.squeeze(axis=-1)  # Remove the channel dimension if it's just a single channel (grayscale)
    plot_image(frame)  # Plot the mask to visualize

    # If the image is in grayscale (single channel), it will be a 2D array
    if len(image.shape) == 2:
        plt.imshow(image, cmap='gray')
    else:
        plt.imshow(image)  # Color image (3 channels)

    plt.axis('off')  # Turn off axis labels
    plt.show()

def get_segment_points(color_mask, segment_colors):
    segment_points = []

    for color in segment_colors:
        print(f"Checking for color: {color}")
        color = np.array(color, dtype=np.uint8)
        for i in range(color_mask.shape[0]):
            for j in range(color_mask.shape[1]):
                if np.array_equal(color_mask[i][j], color):  # Check if the pixel matches the segment color
                    segment_points.append([i, j])
                    
    return segment_points
def apply_clahe(frame):
    """Apply CLAHE to enhance contrast on a BGR frame."""
    lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB)
    l, a, b = cv2.split(lab)

    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    cl = clahe.apply(l)

    merged = cv2.merge((cl, a, b))
    enhanced = cv2.cvtColor(merged, cv2.COLOR_LAB2BGR)
    return enhanced

def preprocess_frame_for_model(frame, size=(224, 224)):
    """Resize and convert a frame to grayscale for model input."""
    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    resized = cv2.resize(frame_gray, size)  # Shape: [H, W]
    tensor = torch.from_numpy(resized).float().unsqueeze(0) / 255.0  # Shape: [1, H, W]
    return tensor.unsqueeze(0)  # Shape: [1, 1, H, W]

# def preprocess(frame, size=(224, 224)):
#     # Preprocessing the frame (grayscale and resizing)
#     frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
#     resized = cv2.resize(frame_gray, size)
#     return resized

def segmentation_model(preprocessed_frame):
    # Dummy segmentation model, replace with your actual model
    # Assuming the model returns a 3D tensor (height, width, channels)
    return torch.rand((preprocessed_frame.shape[0], preprocessed_frame.shape[1], 3))

def extract_mask_matrix_from_video(video_path, model, device, frame_size=(224, 224)):
    cap = cv2.VideoCapture(video_path)
    mask_list = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_clahe = apply_clahe(frame)

        input_tensor = preprocess_frame_for_model(frame_clahe, size=frame_size).to(device)
        print(input_tensor.shape)
        with torch.no_grad():
            pred_mask = model(input_tensor)  # Shape: [1, 1, H, W]
            
        print(pred_mask.shape)

        binary_mask = (pred_mask.squeeze().cpu().numpy() > 0.5).astype(np.uint8)  # Shape: [H, W]
        # binary_mask=overlay_mask_on_frame(frame, binary_mask)
        mask_list.append(binary_mask)

    cap.release()

    # Stack all binary masks into a 3D numpy array (frames, height, width)
    mask_matrix = np.stack(mask_list, axis=0)  # Shape: [num_frames, H, W]
    return mask_matrix

def Feature_extraction(video_path, model, device):
    segment_colors = [
        [255, 0, 0],     # Blue    - Left segment 1
        [0, 255, 0],     # Green   - Left segment 2
        [0, 0, 255],     # Red     - Left segment 3
        [255, 255, 0],   # Cyan    - Right segment 1
        [255, 0, 255],   # Magenta - Right segment 2
        [0, 255, 255],   # Yellow  - Right segment 3
    ]


   

    video_matrix =extract_mask_matrix_from_video(video_path, model, device)
    features_area = []
    features_boundary = []
    features_center = []

    for frame in video_matrix:
        
        
       
        plot_image(frame)  # Plot the mask to visualize
        break

        # Assuming color_u_mask is a function to map the mask to color segments
        # color_mask = mask_np  # Use this directly if it is already color-mapped
        
        # centers = get_segment_points(color_mask, segment_colors)
        # print(centers)

        # # Store extracted features
        # features_center.append(centers)
        # # features_area.append(areas)  # If you have area extraction logic
        # # features_boundary.append(boundary)  # If you have boundary extraction logic
        # break

    print(video_matrix.shape)
    return video_matrix

video_path = "/kaggle/input/medimgproject/HMC-QU Dataset-Kaggle/HMC-QU/A4C/ES0001 _4CH_1.avi"
video_matrix = Feature_extraction(video_path, segmentation_model, device)

print("Video shape:", video_matrix.shape)  # (num_frames, height, width, 3)


torch.Size([1, 1, 224, 224])
torch.Size([1, 1, 3])
torch.Size([1, 1, 224, 224])
torch.Size([1, 1, 3])
torch.Size([1, 1, 224, 224])
torch.Size([1, 1, 3])
torch.Size([1, 1, 224, 224])
torch.Size([1, 1, 3])
torch.Size([1, 1, 224, 224])
torch.Size([1, 1, 3])
torch.Size([1, 1, 224, 224])
torch.Size([1, 1, 3])
torch.Size([1, 1, 224, 224])
torch.Size([1, 1, 3])
torch.Size([1, 1, 224, 224])
torch.Size([1, 1, 3])
torch.Size([1, 1, 224, 224])
torch.Size([1, 1, 3])
torch.Size([1, 1, 224, 224])
torch.Size([1, 1, 3])
torch.Size([1, 1, 224, 224])
torch.Size([1, 1, 3])
torch.Size([1, 1, 224, 224])
torch.Size([1, 1, 3])
torch.Size([1, 1, 224, 224])
torch.Size([1, 1, 3])
torch.Size([1, 1, 224, 224])
torch.Size([1, 1, 3])
torch.Size([1, 1, 224, 224])
torch.Size([1, 1, 3])
torch.Size([1, 1, 224, 224])
torch.Size([1, 1, 3])
torch.Size([1, 1, 224, 224])
torch.Size([1, 1, 3])
torch.Size([1, 1, 224, 224])
torch.Size([1, 1, 3])
torch.Size([1, 1, 224, 224])
torch.Size([1, 1, 3])
torch.Size([1, 1, 224, 224])
to

UnboundLocalError: cannot access local variable 'frame' where it is not associated with a value

In [None]:
def Feature_extraction(video_path):
    segment_colors = [
        [255, 0, 0],     # Blue    - Left segment 1
        [0, 255, 0],     # Green   - Left segment 2
        [0, 0, 255],     # Red     - Left segment 3
        [255, 255, 0],   # Cyan    - Right segment 1
        [255, 0, 255],   # Magenta - Right segment 2
        [0, 255, 255],   # Yellow  - Right segment 3
    ]

    cap = cv2.VideoCapture(video_path)
    mask_list = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Step 1: Enhance contrast using CLAHE
        frame_clahe = preprocess(frame)

        # Step 2: Preprocess and move to device
        input_tensor = torch.tensor(frame_clahe, dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(device)

        # Step 3: Get prediction from segmentation model
        with torch.no_grad():
            pred_mask = segmentation_model(input_tensor)  # Shape: [1, 1, H, W]

        # Step 4: Convert to binary mask
        binary_mask = (pred_mask.squeeze().cpu().numpy() > 0.5).astype(np.uint8)  # Shape: [H, W]
        mask_list.append(binary_mask)

        # Check the dimensions and plot the image (inside the loop)
        if binary_mask.ndim == 3 and binary_mask.shape[2] == 1:
            binary_mask = binary_mask.squeeze(axis=-1)  # Remove the channel dimension if it's just a single channel (grayscale)

        plot_image(binary_mask)  # Plot the mask to visualize

    cap.release()

    video_matrix = np.stack(mask_list, axis=0)
    print(video_matrix.shape)
    return video_matrix

video_path = "/kaggle/input/medimgproject/HMC-QU Dataset-Kaggle/HMC-QU/A4C/ES0001 _4CH_1.avi"
video_matrix = Feature_extraction(video_path)

print("Video shape:", video_matrix.shape)  # (num_frames, height, width, 3)
