In [2]:
import numpy as np
import rasterio
from shapely.geometry import Polygon
from shapely.ops import transform
import cv2
from PIL import Image, ImageDraw
import os
import torch
from torch.utils.data import Dataset



In [3]:
def load_image(image_s3_uri):
    """Load an image from S3 and convert it to a NumPy array."""
    # Open the image using rasterio
    with rasterio.open(image_s3_uri) as src:
        # Read the image data
        image = src.read()  # This will read all the bands
        image = np.moveaxis(image, 0, -1)  # Move channels to the last dimension
        image = image[:, :, :3]  # Assuming you want to use only the first 3 bands (R, G, B)
        image_normalized = (image - np.min(image)) / (np.max(image) - np.min(image)) * 255
        image_normalized = image_normalized.astype(np.uint8)
    return image_normalized


def load_annotations(annotation_s3_uri,Model_Latestimage_filename):
    """Load annotations for a specific image from a JSON file on S3."""
    fs = s3fs.S3FileSystem()
    with fs.open(annotation_s3_uri, 'r') as f:
        data = json.load(f)
    
    for img in data['images']:
        if img['file_name'] == image_filename:
            return img['annotations']
    return None
    
def load_original_annotations(annotation_s3_uri):
    """Load annotations for a specific image from a JSON file on S3."""
    fs = s3fs.S3FileSystem()
    with fs.open(annotation_s3_uri, 'r') as f:
        data = json.load(f)
    return data


def polygon_to_mask(polygon, width, height):
    mask = Image.new('L', (width, height), 0)
    ImageDraw.Draw(mask).polygon(polygon, outline=1, fill=1)
    return torch.tensor(np.array(mask), dtype=torch.float32)


def convert_to_binary_masks(predicted_masks):
    """
    Converts boolean masks from SAM2's output to binary masks.

    Args:
    - predicted_masks (list of dict): List of predicted masks with 'segmentation' key containing boolean arrays.

    Returns:
    - binary_masks (list of np.array): List of binary masks (1 and 0).
    """
    binary_masks = []
    for mask_data in predicted_masks:
        # Convert the boolean segmentation mask to an integer binary mask
        binary_mask = mask_data['segmentation'].astype(np.uint8)
        binary_masks.append(binary_mask)
    
    return binary_masks

In [4]:
# Load Json
import s3fs
import json
train_annotation_s3_uri = 's3://solafune/train_annotation.json'

# Building the U-net And Predicting the Input Masks

In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),  # Batch Normalization
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),  # Batch Normalization
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)

class Down(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(Down, self).__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)

class Up(nn.Module):
    def __init__(self, in_channels, out_channels, bilinear=True):
        super(Up, self).__init__()
        if bilinear:
            self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
            self.conv = DoubleConv(in_channels, out_channels)
        else:
            self.up = nn.ConvTranspose2d(in_channels // 2, in_channels // 2, kernel_size=2, stride=2)
            self.conv = DoubleConv(in_channels, out_channels)

    def forward(self, x1, x2):
        x1 = self.up(x1)
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]
        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])
        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)

class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)

class UNet(nn.Module):
    def __init__(self, n_channels, n_classes):
        super(UNet, self).__init__()
        self.inc = nn.Sequential(DoubleConv(n_channels, 64), nn.Dropout(0.5))  # Added Dropout
        self.down1 = Down(64, 128)
        self.down2 = Down(128, 256)
        self.down3 = Down(256, 512)
        self.down4 = Down(512, 512)
        self.up1 = Up(1024, 256)
        self.up2 = Up(512, 128)
        self.up3 = Up(256, 64)
        self.up4 = Up(128, 64)
        self.outc = OutConv(64, n_classes)

    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        logits = self.outc(x)
        return logits

# Initialize the model weights
def init_weights(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.ConvTranspose2d):
        nn.init.kaiming_normal_(m.weight, nonlinearity='relu')
    elif isinstance(m, nn.BatchNorm2d):
        nn.init.constant_(m.weight, 1)
        nn.init.constant_(m.bias, 0)




In [8]:
# Load the trained model (use the latest checkpoint)
device = torch.device("cuda")
model = UNet(n_channels=3, n_classes=1).to(device)
model.load_state_dict(torch.load("unet_epoch_20.pth"))  # Adjust filename if needed
model.eval()  # Set the model to evaluation mode

  model.load_state_dict(torch.load("unet_epoch_20.pth"))  # Adjust filename if needed


UNet(
  (inc): Sequential(
    (0): DoubleConv(
      (double_conv): Sequential(
        (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU(inplace=True)
        (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (5): ReLU(inplace=True)
      )
    )
    (1): Dropout(p=0.5, inplace=False)
  )
  (down1): Down(
    (maxpool_conv): Sequential(
      (0): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (1): DoubleConv(
        (double_conv): Sequential(
          (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
          (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (2): ReLU(inplace=True)
          (3): Conv2d(128, 128, kernel_size=(3, 3)

In [9]:
# Example of loading a single test image
test_image_filename = "s3://solafune/test_images/images/test_0.tif"  # Replace with your test image path

In [10]:
def visualize_test(test_image_filename):
    # Load and preprocess the test image
    image = load_image(test_image_filename)
    original_shape = image.shape[:2]  # Capture the original shape of the image
    image = torch.tensor(image, dtype=torch.float32).permute(2, 0, 1) / 255.0
    image = image.unsqueeze(0)  # Add batch dimension
    
    image = image.to(device)
    
    with torch.no_grad():  # Disable gradient computation for inference
        output = model(image)
        prediction = torch.sigmoid(output)  # Apply sigmoid to get probabilities
        prediction = prediction.squeeze(0).cpu().numpy()  # Remove batch dimension and move to CPU
    
    threshold = 0.5
    binary_mask = (prediction > threshold).astype(np.uint8)  # Apply threshold to get binary mask
    
    # Resize the binary mask to match the original image dimensions (if necessary)
    binary_mask = F.interpolate(torch.tensor(binary_mask[0]).unsqueeze(0).unsqueeze(0).float(), size=original_shape, mode='bilinear', align_corners=False).squeeze().numpy()
    
    # Load the original image for visualization (without preprocessing)
    original_image = load_image(test_image_filename)
    
    # Plot the results
    plt.figure(figsize=(15, 5))
    
    plt.subplot(1, 3, 1)
    plt.title("Original Image")
    plt.imshow(original_image)
    
    plt.subplot(1, 3, 2)
    plt.title("Predicted Mask")
    plt.imshow(binary_mask, cmap='gray')
    
    # Create a red colormap for the mask
    red_cmap = ListedColormap(['black', 'red'])
    
    plt.subplot(1, 3, 3)
    plt.title("Original Image with Predicted Mask Overlay")
    plt.imshow(original_image)
    plt.imshow(binary_mask, cmap=red_cmap, alpha=0.4)  # alpha controls the transparency of the mask
    
    plt.show()


In [12]:
#visualize_test("s3://solafune/test_images/images/test_0.tif")

## Calling SAM2 to predict masks Using the Custom Class to predict using Mask