Team Name: Pathfinder \
Team Member: Vignesh S S\
Problem Statememnt: AI-Based Computer Vision for Healthcare Hackathon\
Dataset Link: https://drive.google.com/drive/u/1/folders/1UVvAea975DxWIYFSULqmqVMKLquBOs1Y \

**Image quality analyser based on outputs of Segmentation and Landmark prediction made**



In [1]:
# Cell 1: Install all required libraries
!pip install -U torchmetrics albumentations segmentation-models-pytorch

Collecting torchmetrics
  Downloading torchmetrics-1.8.2-py3-none-any.whl.metadata (22 kB)
Collecting segmentation-models-pytorch
  Downloading segmentation_models_pytorch-0.5.0-py3-none-any.whl.metadata (17 kB)
Collecting lightning-utilities>=0.8.0 (from torchmetrics)
  Downloading lightning_utilities-0.15.2-py3-none-any.whl.metadata (5.7 kB)
Downloading torchmetrics-1.8.2-py3-none-any.whl (983 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m983.2/983.2 kB[0m [31m26.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading segmentation_models_pytorch-0.5.0-py3-none-any.whl (154 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.8/154.8 kB[0m [31m18.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading lightning_utilities-0.15.2-py3-none-any.whl (29 kB)
Installing collected packages: lightning-utilities, torchmetrics, segmentation-models-pytorch
Successfully installed lightning-utilities-0.15.2 segmentation-models-pytorch-0.5.0 torchmetrics-1.8.2


In [6]:
from google.colab import drive
import os

# Mount your Google Drive at /content/drive
drive.mount('/content/drive')

# --- Configuration ---
# IMPORTANT: Change this to the exact name of your zip file in Google Drive
ZIP_FILE_NAME = "/Hackathon_Dataset_20251025T071740Z.zip"
ZIP_PATH_ON_DRIVE = f"/content/{ZIP_FILE_NAME}"
UNZIP_DESTINATION = "/content/"  # Unzips to the fast local /content/ disk

# --- Unzip Logic ---
if os.path.exists(ZIP_PATH_ON_DRIVE):
    print(f"Found zip file at: {ZIP_PATH_ON_DRIVE}")
    print("Unzipping to local disk... This may take a minute.")
    # -q (quiet) suppresses the long list of all 200+ files
    !unzip -q "{ZIP_PATH_ON_DRIVE}" -d "{UNZIP_DESTINATION}"
    print("Unzipping complete!")

    # Verify the expected folder is now present
    print("Contents of /content/ (your unzipped data):")
    !ls "/content/"
else:
    print(f"ERROR: Zip file not found at: {ZIP_PATH_ON_DRIVE}")
    print("Please check the file name and its location in your Google Drive.")

Mounted at /content/drive
Found zip file at: /content//Hackathon_Dataset_20251025T071740Z.zip
Unzipping to local disk... This may take a minute.
Unzipping complete!
Contents of /content/ (your unzipped data):
 drive		      Hackathon_Dataset_20251025T071740Z.zip
'Hackathon Dataset'   sample_data


In [7]:
#The Complete IQA + AI Pipeline

# --- 1. Imports ---
import torch
import torch.nn as nn
import cv2  # OpenCV
import numpy as np
import os
import glob
from tqdm.notebook import tqdm
import segmentation_models_pytorch as smp
from google.colab import drive

In [12]:
from google.colab.patches import cv2_imshow # To display images in Colab
from IPython.display import HTML # To embed videos in Colab

In [13]:
SAVE_VIDEO_PATH = os.path.join(
    DRIVE_PATH, "hackathon_submission/Results/Video1_output.mp4"
)
os.makedirs(os.path.dirname(SAVE_VIDEO_PATH), exist_ok=True)

In [14]:
DRIVE_PATH = "/content/drive/My Drive/"
# This is the path to the unzipped folder on the *local* Colab disk
LOCAL_DATA_PATH = "/content/Hackathon Dataset/"
TARGET_IMG_SIZE = (384, 384)
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# --- Define Model Save Paths (on Google Drive) ---
SEG_MODEL_PATH = os.path.join(
    DRIVE_PATH,
    "hackathon_submission/task_1_segmentation/Model Weights/best_model_hypothesis_1.pth"
)
LAND_MODEL_PATH = os.path.join(
    DRIVE_PATH,
    "hackathon_submission/task_1_landmark/Model Weights/best_model_hypothesis_1.pth"
)

# --- Define Output Path (on Google Drive) ---
RESULTS_SAVE_DIR = os.path.join(
    DRIVE_PATH, "hackathon_submission/Results/Video_1_Frames/"
)
os.makedirs(RESULTS_SAVE_DIR, exist_ok=True)


In [18]:
# --- 3. Re-define Model Architectures (to load weights) ---

# Model 1: Segmentation
# --- Hyperparameters ---
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
NUM_CLASSES = 3
NUM_EPOCHS = 40  # <--- CHANGED: Increased from 25 to 40
LEARNING_RATE = 1e-4

# --- Model (U-Net) ---
# --- !!! THIS IS THE KEY CHANGE !!! ---
model = smp.Unet(
    encoder_name="efficientnet-b4",  # <--- CHANGED: Swapped for a powerful encoder
    encoder_weights="imagenet",   # We still use pre-trained weights
    in_channels=1,
    classes=NUM_CLASSES,
).to(DEVICE)

# Model 2: Landmark
class LandmarkUNet(nn.Module):
    def __init__(self, n_outputs=4):
        super().__init__()
        self.base_model = smp.Unet(
            encoder_name="resnet34", encoder_weights=None,
            in_channels=1, classes=3
        )
        self.encoder = self.base_model.encoder
        self.pooling = nn.AdaptiveAvgPool2d(1)
        self.flatten = nn.Flatten()
        self.head = nn.Sequential(
            nn.Linear(512, 128), nn.ReLU(), nn.Linear(128, n_outputs)
        )
    def forward(self, x):
        features = self.encoder(x)
        x = features[-1]
        x = self.pooling(x)
        x = self.flatten(x)
        x = self.head(x)
        return x

land_model = LandmarkUNet(n_outputs=4).to(DEVICE)

# --- 4. Load Saved Model Weights ---
print(f"Loading models from {DRIVE_PATH}...")
try:
    seg_model.load_state_dict(torch.load(SEG_MODEL_PATH, map_location=DEVICE))
    land_model.load_state_dict(torch.load(LAND_MODEL_PATH, map_location=DEVICE))
    seg_model.eval()
    land_model.eval()
    print("Models loaded successfully.")
except Exception as e:
    print(f"--- FATAL ERROR: Could not load models. Check paths. {e} ---")
    raise e

# --- 5. Helper Functions for Preprocessing & IQA ---

def letterbox_frame(image_gray, target_size=(384, 384)):
    h, w = image_gray.shape[:2]
    scale = min(target_size[0] / h, target_size[1] / w)
    new_w, new_h = int(w * scale), int(h * scale)

    resized_image = cv2.resize(
        image_gray, (new_w, new_h), interpolation=cv2.INTER_LINEAR
    )

    top = (target_size[0] - new_h) // 2
    left = (target_size[1] - new_w) // 2
    bottom = target_size[0] - new_h - top
    right = target_size[1] - new_w - left

    padded_image_gray = cv2.copyMakeBorder(
        resized_image, top, bottom, left, right,
        borderType=cv2.BORDER_CONSTANT, value=0
    )

    scale_info = (scale, left, top)
    tensor_image = torch.from_numpy(padded_image_gray).float().unsqueeze(0).unsqueeze(0)
    tensor_image = (tensor_image / 255.0).to(DEVICE)

    return tensor_image, padded_image_gray, scale_info

def get_circularity(mask):
    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if not contours: return 0.0
    contour = max(contours, key=cv2.contourArea)
    area = cv2.contourArea(contour)
    perimeter = cv2.arcLength(contour, True)
    if perimeter == 0: return 0.0
    return (4 * np.pi * area) / (perimeter**2)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/106 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/77.9M [00:00<?, ?B/s]

Loading models from /content/drive/My Drive/...
Models loaded successfully.


In [19]:
# --- 6. (UPGRADED) IQA Function with Debugging ---

# --- !!! TUNE THESE THRESHOLDS !!! ---
# Based on the debug output, you may need to change these.
# I've lowered them to be safer.
IQA_DEBUG_MODE = True       # SET TO TRUE to see why it's failing
MIN_CARDIAC_AREA = 1000     # Lowered
MIN_THORAX_AREA = 5000      # Lowered
MAX_CHAMBER_DARKNESS = 80   # Increased (allows brighter)
MIN_THORAX_CIRCULARITY = 0.6 # Lowered (allows less circular)

def run_iqa(frame_tensor, original_padded_frame, seg_model):
    with torch.no_grad():
        mask_logits = seg_model(frame_tensor)
        mask_pred = torch.argmax(mask_logits, dim=1).squeeze().cpu().numpy().astype(np.uint8)

    # --- 1. Size Check ---
    cardiac_area = np.sum(mask_pred == 1)
    thorax_area = np.sum(mask_pred == 2)

    # --- 2. Darkness Check ---
    cardiac_pixels = original_padded_frame[mask_pred == 1]
    avg_darkness = np.mean(cardiac_pixels) if len(cardiac_pixels) > 0 else 999

    # --- 3. Circularity Check ---
    thorax_mask = np.where(mask_pred == 2, 255, 0).astype(np.uint8)
    circularity = get_circularity(thorax_mask)

    # --- Debug Print ---
    if IQA_DEBUG_MODE:
        debug_msg = (
            f"  [IQA Values: "
            f"CardiacArea={cardiac_area} (Min={MIN_CARDIAC_AREA}), "
            f"ThoraxArea={thorax_area} (Min={MIN_THORAX_AREA}), "
            f"Darkness={avg_darkness:.1f} (Max={MAX_CHAMBER_DARKNESS}), "
            f"Circularity={circularity:.2f} (Min={MIN_THORAX_CIRCULARITY})]"
        )

    # --- Check Rules ---
    if (cardiac_area < MIN_CARDIAC_AREA) or (thorax_area < MIN_THORAX_AREA):
        return f"FAIL: Area too small {debug_msg if IQA_DEBUG_MODE else ''}"
    if avg_darkness > MAX_CHAMBER_DARKNESS:
        return f"FAIL: Chambers not dark {debug_msg if IQA_DEBUG_MODE else ''}"
    if circularity < MIN_THORAX_CIRCULARITY:
        return f"FAIL: Thorax not circular {debug_msg if IQA_DEBUG_MODE else ''}"

    return "PASS" # All checks passed

In [20]:
# --- 7. The Main AI Pipeline (Now saving to VIDEO) ---

VIDEO_PATH = os.path.join(LOCAL_DATA_PATH, "Videos/Video1.mp4")
print(f"\n--- Starting AI Pipeline for {VIDEO_PATH} ---")

cap = cv2.VideoCapture(VIDEO_PATH)
if not cap.isOpened():
    print(f"ERROR: Could not open video file: {VIDEO_PATH}")
else:
    # Get video properties for VideoWriter
    frame_width = int(TARGET_IMG_SIZE[1])
    frame_height = int(TARGET_IMG_SIZE[0])
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # --- NEW: Setup VideoWriter ---
    # We will write our output video to this object
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out_video = cv2.VideoWriter(
        SAVE_VIDEO_PATH, fourcc, fps, (frame_width, frame_height)
    )

    frame_count = 0
    saved_frame_count = 0
    first_pass_shown = False

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        frame_tensor, frame_padded_gray, scale_info = letterbox_frame(frame_gray)

        # Create a 3-channel (color) image for drawing
        frame_to_draw = cv2.cvtColor(frame_padded_gray, cv2.COLOR_GRAY2BGR)

        quality_status = run_iqa(frame_tensor, frame_padded_gray, seg_model)

        if quality_status == "PASS":
            with torch.no_grad():
                norm_landmarks = land_model(frame_tensor)

            scale, pad_left, pad_top = scale_info
            landmarks = norm_landmarks.cpu().numpy().squeeze()
            landmarks = (landmarks * TARGET_IMG_SIZE[1])

            center_y = TARGET_IMG_SIZE[0] // 2
            for x_coord in landmarks:
                cv2.circle(
                    img=frame_to_draw, center=(int(x_coord), center_y),
                    radius=5, color=(0, 255, 0), thickness=-1
                )

            saved_frame_count += 1

            # --- NEW: Display first good frame in Colab ---
            if not first_pass_shown:
                print("\n--- FIRST FRAME THAT PASSED IQA ---")
                cv2_imshow(frame_to_draw)
                print("-----------------------------------")
                first_pass_shown = True

        # --- NEW: Write frame to video ---
        # We write every frame to the video.
        # If it's a "FAIL" frame, it will be the original padded frame.
        # If it's a "PASS" frame, it will have the green dots.
        # This shows the IQA working (dots will appear and disappear).
        out_video.write(frame_to_draw)

        frame_count += 1
        if frame_count % 10 == 0: # Print status every 10 frames
            print(f"Frame {frame_count}: {quality_status}")

    cap.release()
    out_video.release() # --- NEW: Finalize the video
    print("--- Pipeline Finished ---")
    print(f"Processed {frame_count} total frames.")
    print(f"Saved {saved_frame_count} high-quality frames to video.")
    print(f"Output video saved to: {SAVE_VIDEO_PATH}")

# --- 8. NEW: Display Final Video in Colab ---
print("\n--- Embedding Final Video ---")

# Create a relative path for the HTML player
# We need to strip "/content/drive/My Drive/" and add "drive/My Drive/"
relative_video_path = SAVE_VIDEO_PATH.replace("/content/drive/My Drive/", "drive/My Drive/")

HTML(f"""
<video width="640" height="480" controls>
  <source src="{relative_video_path}" type="video/mp4">
  Your browser does not support the video tag.
</video>
""")


--- Starting AI Pipeline for /content/Hackathon Dataset/Videos/Video1.mp4 ---
Frame 10: FAIL: Area too small   [IQA Values: CardiacArea=0 (Min=1000), ThoraxArea=4149 (Min=5000), Darkness=999.0 (Max=80), Circularity=0.87 (Min=0.6)]
Frame 20: FAIL: Area too small   [IQA Values: CardiacArea=0 (Min=1000), ThoraxArea=3787 (Min=5000), Darkness=999.0 (Max=80), Circularity=0.87 (Min=0.6)]
Frame 30: FAIL: Area too small   [IQA Values: CardiacArea=10 (Min=1000), ThoraxArea=4171 (Min=5000), Darkness=31.2 (Max=80), Circularity=0.86 (Min=0.6)]
Frame 40: FAIL: Area too small   [IQA Values: CardiacArea=57 (Min=1000), ThoraxArea=3896 (Min=5000), Darkness=34.1 (Max=80), Circularity=0.88 (Min=0.6)]
Frame 50: FAIL: Area too small   [IQA Values: CardiacArea=377 (Min=1000), ThoraxArea=3195 (Min=5000), Darkness=16.6 (Max=80), Circularity=0.85 (Min=0.6)]
Frame 60: FAIL: Area too small   [IQA Values: CardiacArea=112 (Min=1000), ThoraxArea=3384 (Min=5000), Darkness=21.5 (Max=80), Circularity=0.86 (Min=0.6)]
F