In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [20]:
import torch
import torch.nn as nn
from torchvision import transforms, models
import cv2
import os
import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.metrics import roc_auc_score, f1_score, accuracy_score, confusion_matrix
#from torchvision.models import efficientnet_b0, EfficientNet_B0_Weights



In [None]:
# # class ResNet18ContrastiveWithClassifier(nn.Module):
# #     def __init__(self, projection_dim=128):
# #         super(ResNet18ContrastiveWithClassifier, self).__init__()
# #         backbone = models.resnet18(pretrained=False)
# #         self.backbone = nn.Sequential(*list(backbone.children())[:-1])
# #         self.projection_head = nn.Linear(backbone.fc.in_features, projection_dim)
# #         self.classifier_head = nn.Linear(backbone.fc.in_features, 1)

# #     def forward(self, x):
# #         features = self.backbone(x).squeeze(-1).squeeze(-1)
# #         projection = self.projection_head(features)
# #         classification = torch.sigmoid(self.classifier_head(features))
# #         return projection, classification

# class ResNet34ContrastiveWithClassifier(nn.Module):
#     def __init__(self, projection_dim=128):
#         super(ResNet34ContrastiveWithClassifier, self).__init__()
#         backbone = models.resnet34(pretrained=True)
#         self.backbone = nn.Sequential(*list(backbone.children())[:-1])  
#         self.projection_head = nn.Linear(backbone.fc.in_features, projection_dim)
#         self.classifier_head = nn.Linear(backbone.fc.in_features, 1)

#     def forward(self, x):
#         features = self.backbone(x).squeeze(-1).squeeze(-1)  # shape: (B, 512)
#         projection = self.projection_head(features)
#         classification = torch.sigmoid(self.classifier_head(features))
#         return projection, classification
# class EfficientNetB0ContrastiveWithClassifier(nn.Module):
#     def __init__(self, projection_dim=128):
#         super(EfficientNetB0ContrastiveWithClassifier, self).__init__()
#         backbone = efficientnet_b0(weights=EfficientNet_B0_Weights.IMAGENET1K_V1)
#         self.backbone_features = backbone.features
#         self.pooling = backbone.avgpool
#         self.feature_dim = 1280
#         self.projection_head = nn.Linear(self.feature_dim, projection_dim)
#         self.classifier_head = nn.Linear(self.feature_dim, 1)

#     def forward(self, x):
#         features = self.backbone_features(x)
#         features = self.pooling(features)
#         features = features.flatten(start_dim=1)
#         projection = self.projection_head(features)
#         classification = torch.sigmoid(self.classifier_head(features))
#         return projection, classification
class DenseNet121ContrastiveWithClassifier(nn.Module):
    def __init__(self, projection_dim=128):
        super(DenseNet121ContrastiveWithClassifier, self).__init__()
        densenet = models.densenet121(pretrained=True)
        self.features = densenet.features
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.feature_dim = 1024
        self.projection_head = nn.Linear(self.feature_dim, projection_dim)
        self.classifier_head = nn.Linear(self.feature_dim, 1)

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        projection = self.projection_head(x)
        classification = torch.sigmoid(self.classifier_head(x))
        return projection, classification


In [22]:
# class GradCAM:
#     def __init__(self, model, target_layer_idx):
#         self.model = model
#         self.target_layer_idx = target_layer_idx
#         self.gradient = None
#         self.activation = None
#         self.hook_layers()

#     def hook_layers(self):
#         def backward_hook(module, grad_in, grad_out):
#             self.gradient = grad_out[0]

#         def forward_hook(module, input, output):
#             self.activation = output

#         target = self.model.backbone[self.target_layer_idx]
#         target.register_forward_hook(forward_hook)
#         target.register_backward_hook(backward_hook)

#     def generate_cam(self, input_tensor):
#         _, output = self.model(input_tensor)
#         score = output.squeeze()
#         self.model.zero_grad()
#         score.backward(retain_graph=True)

#         grads = self.gradient[0].detach().cpu().numpy()
#         acts = self.activation[0].detach().cpu().numpy()
#         weights = np.mean(grads, axis=(1, 2))

#         cam = np.zeros(acts.shape[1:], dtype=np.float32)
#         for i, w in enumerate(weights):
#             cam += w * acts[i]

#         cam = np.maximum(cam, 0)
#         cam = cv2.resize(cam, (224, 224))
#         return (cam - cam.min()) / (cam.max() - cam.min() + 1e-8)
# class GradCAM_EfficientNet:
#     def __init__(self, model, target_layer):
#         self.model = model
#         self.target_layer = target_layer
#         self.gradients = None
#         self.activations = None
#         self.hook()

#     def hook(self):
#         def forward_hook(module, input, output):
#             self.activations = output

#         def backward_hook(module, grad_input, grad_output):
#             self.gradients = grad_output[0]

#         self.target_layer.register_forward_hook(forward_hook)
#         self.target_layer.register_backward_hook(backward_hook)

#     def generate_cam(self, input_tensor):
#         self.model.eval()
#         _, output = self.model(input_tensor)
#         score = output.squeeze()
#         self.model.zero_grad()
#         score.backward(retain_graph=True)

#         grads = self.gradients[0].cpu().detach().numpy()
#         acts = self.activations[0].cpu().detach().numpy()
#         weights = grads.mean(axis=(1, 2))

#         cam = np.zeros(acts.shape[1:], dtype=np.float32)
#         for i, w in enumerate(weights):
#             cam += w * acts[i]

#         cam = np.maximum(cam, 0)
#         cam = cv2.resize(cam, (224, 224))
#         return (cam - cam.min()) / (cam.max() - cam.min() + 1e-8)
class GradCAM_DenseNet:
    def __init__(self, model, target_layer):
        self.model = model
        self.target_layer = target_layer
        self.gradients = None
        self.activations = None
        self.hook()

    def hook(self):
        def forward_hook(module, input, output):
            self.activations = output

        def backward_hook(module, grad_input, grad_output):
            self.gradients = grad_output[0]

        self.target_layer.register_forward_hook(forward_hook)
        self.target_layer.register_backward_hook(backward_hook)

    def generate_cam(self, input_tensor):
        self.model.eval()
        _, output = self.model(input_tensor)
        score = output.squeeze()
        self.model.zero_grad()
        score.backward(retain_graph=True)

        grads = self.gradients[0].cpu().detach().numpy()
        acts = self.activations[0].cpu().detach().numpy()
        weights = grads.mean(axis=(1, 2))

        cam = np.zeros(acts.shape[1:], dtype=np.float32)
        for i, w in enumerate(weights):
            cam += w * acts[i]

        cam = np.maximum(cam, 0)
        cam = cv2.resize(cam, (224, 224))
        return (cam - cam.min()) / (cam.max() - cam.min() + 1e-8)



In [23]:
import matplotlib.pyplot as plt
import cv2
import numpy as np

def apply_grad_cam_with_colormap(frame, cam, save_path, reverse_colormap=False):
    frame_resized = cv2.resize(frame, (224, 224))
    heatmap = cv2.applyColorMap(
        np.uint8(255 * (1 - cam) if reverse_colormap else 255 * cam),
        cv2.COLORMAP_JET,
    )
    overlayed = cv2.addWeighted(cv2.cvtColor(frame_resized, cv2.COLOR_GRAY2BGR), 0.5, heatmap, 0.5, 0)

    plt.figure(figsize=(10, 4))
    plt.subplot(1, 2, 1)
    plt.imshow(frame_resized, cmap='gray')
    plt.axis('off')
    plt.title("Original")

    im = plt.subplot(1, 2, 2)
    img = plt.imshow(overlayed)
    plt.axis('off')
    plt.title("Grad-CAM")
    plt.colorbar(img, fraction=0.046, pad=0.04)
    plt.tight_layout()
    plt.savefig(save_path)
    plt.close()

In [None]:
# Paths
#model_path = '/content/trained_resnet34_contrastive_with_classifier_ssl.pth'
label_csv = '/content/25_97_labels for SSL.csv'
video_dir = '/content/drive/Shareddrives/mBSUS/Data/Raw Data/mbsus_case_videos'
output_dir = '/content/grad_cam_frames'
os.makedirs(output_dir, exist_ok=True)

# model = ResNet34ContrastiveWithClassifier().cuda()
# model.load_state_dict(torch.load(model_path))
# model.eval()
# model_path = '/content/trained_efficientnet_contrastive_with_classifier_ssl .pth'  # update path
# model = EfficientNetB0ContrastiveWithClassifier().cuda()
# model.load_state_dict(torch.load(model_path))
# model.eval()
model_path = '/content/trained_densenet121_contrastive_with_classifier_ssl.pth'
model = DenseNet121ContrastiveWithClassifier().cuda()
model.load_state_dict(torch.load(model_path))
model.eval()


Downloading: "https://download.pytorch.org/models/densenet121-a639ec97.pth" to /root/.cache/torch/hub/checkpoints/densenet121-a639ec97.pth
100%|██████████| 30.8M/30.8M [00:00<00:00, 162MB/s]


DenseNet121ContrastiveWithClassifier(
  (features): Sequential(
    (conv0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (norm0): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu0): ReLU(inplace=True)
    (pool0): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (denseblock1): _DenseBlock(
      (denselayer1): _DenseLayer(
        (norm1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu1): ReLU(inplace=True)
        (conv1): Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (norm2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu2): ReLU(inplace=True)
        (conv2): Conv2d(128, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      )
      (denselayer2): _DenseLayer(
        (norm1): BatchNorm2d(96, eps=1e-05, momentum=0.1, affine=True, track_runni

In [None]:
#grad_cam = GradCAM(model, target_layer_idx=7)  
target_layer = model.features[7]  
grad_cam = GradCAM_DenseNet(model, target_layer)



In [None]:
# labels_df = pd.read_csv(label_csv).fillna(-1)
# frame_labels = {}
# for _, row in labels_df.iterrows():
#     key = f"{row['Video_ID']}_{row['Region']}.mp4"
#     if row['ConsolidationStartFrame'] == -1 or row['ConsolidationEndFrame'] == -1:
#         frame_labels[key] = set()
#     else:
#         frame_labels[key] = set(range(int(row['ConsolidationStartFrame']), int(row['ConsolidationEndFrame']) + 1))

labels_df = pd.read_csv(label_csv).fillna(-1)

# Build dictionaries
frame_labels = {}
videos_with_consolidation = set()

for _, row in labels_df.iterrows():
    key = f"{row['Video_ID']}_{row['Region']}.mp4"

    if row['ConsolidationStartFrame'] == -1 or row['ConsolidationEndFrame'] == -1:
        frame_labels[key] = set()
    else:
        start = int(row['ConsolidationStartFrame'])
        end = int(row['ConsolidationEndFrame'])
        frame_labels[key] = set(range(start, end + 1))

    if row['Percent of video with consolidation '] > 0:
        videos_with_consolidation.add(key)

In [33]:
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.Grayscale(num_output_channels=3),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])
y_true, y_scores = [], []


In [34]:
# for video_file, frame_set in tqdm(frame_labels.items(), desc="Processing Videos"):
#     video_path = os.path.join(video_dir, video_file)
#     if not os.path.exists(video_path):
#         print(f"{video_file} not found")
#         continue

#     cap = cv2.VideoCapture(video_path)
#     total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

#     for idx in range(total_frames):
#         ret, frame = cap.read()
#         if not ret:
#             break

#         gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
#         input_tensor = transform(gray).unsqueeze(0).cuda()

#         with torch.no_grad():
#             _, pred = model(input_tensor)
#             score = pred.item()

#         y_scores.append(score)
#         y_true.append(1 if idx in frame_set else 0)

#         if score > 0.5:
#             cam = grad_cam.generate_cam(input_tensor)
#             save_path = os.path.join(output_dir, f"{video_file[:-4]}_frame_{idx}.jpg")
#             apply_grad_cam_with_colormap(gray, cam, save_path)

#     cap.release()

# for video_file, frame_set in tqdm(frame_labels.items(), desc="Processing Videos"):
#     video_path = os.path.join(video_dir, video_file)
#     if not os.path.exists(video_path):
#         print(f"{video_file} not found")
#         continue

#     cap = cv2.VideoCapture(video_path)
#     total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

#     for idx in range(total_frames):
#         ret, frame = cap.read()
#         if not ret:
#             break

#         gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
#         input_tensor = transform(gray).unsqueeze(0).cuda()

#         with torch.no_grad():
#             _, pred = model(input_tensor)
#             score = pred.item()

#         y_scores.append(score)
#         y_true.append(1 if idx in frame_set else 0)

#         # Generate Grad-CAM only for relevant videos
#         if video_file in videos_with_consolidation and score > 0.5:
#             cam = grad_cam.generate_cam(input_tensor)

#             # Red = High Activation ⇒ reverse_colormap = True
#             save_path = os.path.join(output_dir, f"{video_file[:-4]}_frame_{idx}.jpg")
#             apply_grad_cam_with_colormap(gray, cam, save_path, reverse_colormap=True)

#     cap.release()

for video_file, frame_set in tqdm(frame_labels.items(), desc="Processing Videos"):
    video_path = os.path.join(video_dir, video_file)
    if not os.path.exists(video_path):
        print(f"{video_file} not found")
        continue

    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    for idx in range(total_frames):
        ret, frame = cap.read()
        if not ret:
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        input_tensor = transform(gray).unsqueeze(0).cuda()

        with torch.no_grad():
            _, pred = model(input_tensor)
            score = pred.item()

        y_scores.append(score)
        y_true.append(1 if idx in frame_set else 0)

        if video_file in videos_with_consolidation and score > 0.5:
            cam = grad_cam.generate_cam(input_tensor)
            save_path = os.path.join(output_dir, f"{video_file[:-4]}_frame_{idx}.jpg")
            apply_grad_cam_with_colormap(gray, cam, save_path, reverse_colormap=True)

    cap.release()


  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_fu

052-0_LPT.mp4 not found


  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_full_backward_hook(args, result, grad_fn)
  self._maybe_warn_non_fu

In [None]:
threshold = np.percentile(y_scores, 95)
y_pred = [1 if s >= threshold else 0 for s in y_scores]

print(f"\n--- Frame-level Metrics (Classifier Head) ---")
print(f"AUC: {roc_auc_score(y_true, y_scores):.3f}")
print(f"Accuracy: {accuracy_score(y_true, y_pred):.3f}")
print("Confusion Matrix:")
print(confusion_matrix(y_true, y_pred))
print(f"Threshold (85th percentile): {threshold:.3f}")
