## Import all the necessary modules

In [1]:
import os
import numpy as np
import cv2
from tqdm import tqdm
from albumentations import HorizontalFlip, VerticalFlip, Rotate
from skimage import img_as_float
from skimage.filters import gabor
from PIL import Image
import torch
import torch.nn.functional as F
from torchvision import transforms
from PIL import Image
import unet_model
import cv2
from tqdm import tqdm  # For progress bar
import torchvision.transforms as transforms
import torchvision.models as models
import warnings
warnings.filterwarnings('ignore')

  check_for_updates()


In [2]:
output_folder = r"pipeline-output"

if not os.path.exists(output_folder):
    os.makedirs(output_folder)

In [3]:
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = torch.device("cpu")
print("Device: ",device)

Device:  cpu


## Read the image

In [4]:
size = (512, 512)
path = r"C:\Users\xerom\Documents\CAPSTONE\Classification\Data\Stage 3\2.png"
original_image = cv2.imread(path)
original_image = cv2.resize(original_image, size, interpolation=cv2.INTER_AREA)


original_image_path = os.path.join(output_folder, "original_image.png")
cv2.imwrite(original_image_path,original_image)

True

## Removal of Non-Temporal Images

## Gabor Filter - used for Ridge Segmentation

In [6]:

def enhance_edges(image):
    image = cv2.normalize(image, None, alpha=0, beta=255, norm_type=cv2.NORM_MINMAX)
    image = np.uint8(image)
    image = cv2.cvtColor(image,cv2.COLOR_BGR2GRAY)
    # Convert the image to float for Sato filter
    image_float = img_as_float(image)
    # Define the structuring element (kernel) for morphological operations
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (50, 50))
    # Apply the Top-Hat transformation
    top_hat = cv2.morphologyEx(image, cv2.MORPH_TOPHAT, kernel) 
    # Apply the Bottom-Hat transformation
    bottom_hat = cv2.morphologyEx(image, cv2.MORPH_BLACKHAT, kernel)
    # Enhance the image by adding Top-Hat and subtracting Bottom-Hat
    enhanced_image = cv2.add(image, top_hat)   # Add Top-Hat to enhance bright regions
    enhanced_image = cv2.subtract(enhanced_image, bottom_hat)  # Subtract Bottom-Hat to enhance dark regions
    
    return enhanced_image

def gab(image):
    img = enhance_edges(image)
    filt_real, filt_imaginary = gabor(img,1/4,30,sigma_x=1,sigma_y=1)
    # img = Image.fromarray(filt_real)
    return filt_real

def refine_edges(image):
    mask = np.zeros(image.shape[:2], dtype=np.uint8)
    center = (int(image.shape[1] / 2), int(image.shape[0] / 2))
    radius = image.shape[1]//2 - 1
    cv2.circle(mask, center, radius, 255, -1)
    result = cv2.bitwise_and(mask,image)
    return result


In [7]:
# Apply Gabor Filter

gab_image = gab(original_image)
gab_image = refine_edges(gab_image)

In [8]:
gab_image_path = os.path.join(output_folder, "gabor_image.png")
cv2.imwrite(gab_image_path,gab_image)
print(f"Gabor image saved at {gab_image_path}")


Gabor image saved at pipeline-output\gabor_image.png


## Apply Adaptive Siigmoid 

In [None]:
def sigmoid_correction(image, k=10, x0=0.5):
    # Normalize the image
    normalized_img = image / 255.0
    # Apply the sigmoid function
    sigmoid_img = 1 / (1 + np.exp(-k * (normalized_img - x0)))
    # Scale back to original range
    corrected_img = (sigmoid_img * 255).astype(np.uint8)
    return corrected_img


def adaptive_sigmoid(image):
    mask = np.zeros(image.shape[:2], dtype=np.uint8)
    center = (int(image.shape[1] / 2), int(image.shape[0] / 2))
    radius = image.shape[1]//2
    cv2.circle(mask, center, radius, 255, -1)
    hist = cv2.calcHist([image], [1], mask, [256], [0, 256])
    # Calculate cumulative distribution function (CDF)
    cdf = hist.cumsum()
    # Normalize CDF
    cdf_normalized = cdf * hist.max() / cdf.max()
    # Find the intensity level where CDF reaches 50% of the total pixel count
    total_pixels = cdf[-1]
    x_0 = np.searchsorted(cdf, total_pixels * 0.5)/255
    k = 15
    sig = sigmoid_correction(image,k,x_0)
    return sig


In [None]:
adaptive_sigmoid_image = adaptive_sigmoid(original_image)

In [None]:
adaptive_sigmoid_image_path = os.path.join(output_folder, "adaptive_sigmoid_image.png")
cv2.imwrite(adaptive_sigmoid_image_path,adaptive_sigmoid_image)
print(f"Adaptive Sigmoid image saved at {adaptive_sigmoid_image_path}")


## Combine the Blood Vessel and Ridge Masks

In [None]:

# Parse mask for saving
def mask_parse(mask):
    mask = np.expand_dims(mask, axis=-1)  # (512, 512, 1)
    mask = np.concatenate([mask, mask, mask], axis=-1)  # (512, 512, 3)
    return mask

# Load models
bv_model = unet_model.build_unet()
bv_model.load_state_dict(torch.load(r'Models\bv_no_enhancement.pth', map_location=device))
# bv_model = bv_model.cuda()  # Load model on GPU if available
bv_model.eval()

ridge_model = unet_model.build_unet()
ridge_model.load_state_dict(torch.load(r'Models\gabor_ridge_aug.pth', map_location=device))
# ridge_model = ridge_model.cuda()  # Load model on GPU if available
ridge_model.eval()
transform = transforms.Compose([transforms.ToTensor()])

# Load and resize the image
def load_image(image_path):
    if image_path.endswith(('.png', '.jpg', '.jpeg', '.tif')):
        image = Image.open(image_path).convert('RGB')
        image = image.resize((512, 512))
        image = transform(image).unsqueeze(0).to(device)  # Ensure tensor is on GPU
        return image
    
# Get mask prediction
def get_prediction(model, image_tensor):
    with torch.no_grad():
        output = model(image_tensor)
        output = torch.sigmoid(output)
        output = output[0].cpu().numpy()  # Move to CPU for further processing
        output = np.squeeze(output, axis=0)
        output = output > 0.5
        output = np.array(output, dtype=np.uint8)
        return output


# Superpose masks on the original image with sigmoid correction
def superpose_masks(original_image, bv_mask, ridge_mask):
    # Convert original image to NumPy
    original_image_np = np.array(original_image)
    
    # Apply sigmoid correction to the original image
    corrected_image_np = adaptive_sigmoid(original_image_np)
    
    # Initialize the combined mask with 3 channels (for RGB)
    combined_mask = np.zeros((bv_mask.shape[0], bv_mask.shape[1], 3), dtype=np.uint8)

    # COLOR coding for the MASKS
    #-----------------------------------------------------------------------------
    combined_mask[bv_mask == 1] = [179, 2, 2]  
    combined_mask[ridge_mask == 1] = [25, 10, 242]  
    #------------------------------------------------------------------------------
    # Superpose the masks on the sigmoid-corrected original image
    superposed_image = corrected_image_np.copy()
    mask_indices = combined_mask > 0
    superposed_image[mask_indices] = combined_mask[mask_indices]
    
    return Image.fromarray(superposed_image)

# Process a single image
def process_single_image(bv_image_path, ridge_image_path, output_path):
    # Load the images and resize if necessary
    bv_image_tensor = load_image(bv_image_path)
    ridge_image_tensor = load_image(ridge_image_path)
    original_image = Image.open(bv_image_path).convert('RGB').resize((512, 512))

    # Generate masks
    bv_mask = get_prediction(bv_model, bv_image_tensor)
    ridge_mask = get_prediction(ridge_model, ridge_image_tensor)

    # Superpose the masks onto the original image
    superposed_image = superpose_masks(original_image, bv_mask, ridge_mask)

    # Save the output image
    superposed_image.save(output_path)
    print(f"Superposed image saved at {output_path}")

# Single image input and output paths
bv_image_path = path
ridge_image_path = r'pipeline-output\gabor_image.png'
output_path = r'pipeline-output\superposed_image.png'

# Process the single image
process_single_image(bv_image_path, ridge_image_path, output_path)


## Send the Superposed image as input to Binary Classifier

In [None]:
'''
0 -> No RoP
1 -> RoP

'''


NUM_CLASSES = 2 ## Either RoP/ No RoP

# Load ResNet50 model
model = models.resnet50()
model.fc = torch.nn.Linear(model.fc.in_features,NUM_CLASSES )
model.eval()  # Set the model to evaluation mode

# Load your trained weights if needed
model.load_state_dict(torch.load('Models\ResNet50_Rop_NoRop.pth', map_location=device))
model = model.to(device)

# Define transformations for the input image
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to the model's expected input size
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Use ImageNet's mean and std
])

# Load and preprocess the image
def load_image(image_path):
    image = Image.open(image_path).convert('RGB')
    image = transform(image).unsqueeze(0)  # Add batch dimension
    return image.to(device)

# Prediction function
def predict(image_path, model):
    image = load_image(image_path)
    with torch.no_grad():  # Disable gradient computation for inference
        output = model(image)
        _, predicted = torch.max(output, 1)  # Get the class index with highest score
    return predicted.item()

# Test on a single image
image_path = r'pipeline-output\superposed_image.png'
predicted_class = predict(image_path, model)
print(f'Predicted class index: {predicted_class}')


## If Predicted Class == 0 -> NO RoP

In [None]:
## Take PMA as input and if PMA > 40 then classify it as FVR else classify it as TAR

## If Predicted Class == 1 -> RoP

In [None]:
'''
0 -> Stage 1
1 -> Stage 2
2 -> Stage 3

'''


NUM_CLASSES = 3 # Stage1, Stage2, Stage3

# Load ResNet50 model
model = models.resnet50()
model.fc = torch.nn.Linear(model.fc.in_features,NUM_CLASSES )
model.eval()  # Set the model to evaluation mode

# Load your trained weights if needed
model.load_state_dict(torch.load('Models\ResNet50_3Stages.pth', map_location=device))
model = model.to(device)

# Define transformations for the input image
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to the model's expected input size
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Use ImageNet's mean and std
])

# Load and preprocess the image
def load_image(image_path):
    image = Image.open(image_path).convert('RGB')
    image = transform(image).unsqueeze(0)  # Add batch dimension
    return image.to(device)

# Prediction function
def predict(image_path, model):
    image = load_image(image_path)
    with torch.no_grad():  # Disable gradient computation for inference
        output = model(image)
        _, predicted = torch.max(output, 1)  # Get the class index with highest score
    return predicted.item()

# Test on a single image
image_path = r'pipeline-output\superposed_image.png'
predicted_class = predict(image_path, model)
print(f'Predicted class index: {predicted_class}')
