In [3]:
import os
import torch
import torch.nn as nn
import keras
import numpy as np
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from sklearn.model_selection import train_test_split

# Paths to the data directories
image_dir = r"C:\Users\bjs\Downloads\datasets\forest\Images"
mask_dir = r"C:\Users\bjs\Downloads\datasets\forest\Masks"

# Get list of image and mask file paths
image_paths = sorted([os.path.join(image_dir, fname) for fname in os.listdir(image_dir)])
mask_paths = sorted([os.path.join(mask_dir, fname) for fname in os.listdir(mask_dir)])

# Split data
train_images, val_images, train_masks, val_masks = train_test_split(image_paths, mask_paths, test_size=0.2, random_state=42)

# Custom Dataset class
class SegmentationDataset(Dataset):
    def __init__(self, image_paths, mask_paths, transform=None):
        self.image_paths = image_paths
        self.mask_paths = mask_paths
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx]).convert("L")
        mask = Image.open(self.mask_paths[idx]).convert("L")
        
        if self.transform:
            image = self.transform(image)
            mask = self.transform(mask)
        
        return image, mask

# Define transforms
transform = transforms.Compose([
    transforms.Resize((448, 448)),
    transforms.ToTensor(),
])

# Set batch size for flexibility
batch_size = 8  # Change this value as needed

# Create Datasets and DataLoaders
train_dataset = SegmentationDataset(train_images, train_masks, transform=transform)
val_dataset = SegmentationDataset(val_images, val_masks, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Define UNet Model
class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()
        # Encoder
        self.encoder1 = nn.Sequential(nn.Conv2d(1, 64, 3, padding=1), nn.ReLU(), nn.Conv2d(64, 64, 3, padding=1), nn.ReLU())
        self.pool1 = nn.MaxPool2d(2)
        
        self.encoder2 = nn.Sequential(nn.Conv2d(64, 128, 3, padding=1), nn.ReLU(), nn.Conv2d(128, 128, 3, padding=1), nn.ReLU())
        self.pool2 = nn.MaxPool2d(2)
        
        # Bottleneck
        self.bottleneck = nn.Sequential(nn.Conv2d(128, 256, 3, padding=1), nn.ReLU(), nn.Conv2d(256, 256, 3, padding=1), nn.ReLU())
        
        # Decoder
        self.upconv2 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.decoder2 = nn.Sequential(nn.Conv2d(256, 128, 3, padding=1), nn.ReLU(), nn.Conv2d(128, 128, 3, padding=1), nn.ReLU())
        
        self.upconv1 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.decoder1 = nn.Sequential(nn.Conv2d(128, 64, 3, padding=1), nn.ReLU(), nn.Conv2d(64, 64, 3, padding=1), nn.ReLU())
        
        # Output
        self.conv_last = nn.Conv2d(64, 1, kernel_size=1)

    def forward(self, x):
        e1 = self.encoder1(x)
        e1p = self.pool1(e1)
        
        e2 = self.encoder2(e1p)
        e2p = self.pool2(e2)
        
        b = self.bottleneck(e2p)
        
        d2 = self.upconv2(b)
        d2 = torch.cat((d2, e2), dim=1)
        d2 = self.decoder2(d2)
        
        d1 = self.upconv1(d2)
        d1 = torch.cat((d1, e1), dim=1)
        d1 = self.decoder1(d1)
        
        return torch.sigmoid(self.conv_last(d1))

model = UNet().cuda()




AssertionError: Torch not compiled with CUDA enabled

In [None]:

# Dice Loss function
def dice_loss(pred, target, smooth=1e-5):
    pred = pred.contiguous()
    target = target.contiguous()
    intersection = (pred * target).sum(dim=2).sum(dim=2)
    dice = (2. * intersection + smooth) / (pred.sum(dim=2).sum(dim=2) + target.sum(dim=2).sum(dim=2) + smooth)
    return 1 - dice.mean()

# Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# Training Loop with model saving
num_epochs = 10
model_save_path = "unet_segmentation_model.pth"  # Path to save the trained model

for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    for images, masks in train_loader:
        images, masks = images.cuda(), masks.cuda()
        
        # Forward pass
        outputs = model(images)
        loss = dice_loss(outputs, masks)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
    
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {train_loss/len(train_loader)}")

# Save the trained model
torch.save(model.state_dict(), model_save_path)
print(f"Model saved to {model_save_path}")

In [None]:

# Inference function
def run_inference(model, image_path, transform=transform):
    model.eval()
    with torch.no_grad():
        image = Image.open(image_path).convert("L")
        input_tensor = transform(image).unsqueeze(0).cuda()  # Add batch dimension and move to GPU
        output = model(input_tensor)
        output = output.squeeze().cpu().numpy()  # Remove batch dimension and move to CPU
        return output  # Returns the output mask as a NumPy array

# Example usage for inference
# Replace 'sample_image_path' with an actual path to an image you want to test
sample_image_path = 'C:\Thesis\Road-Anomaly-Detector\road_anomaly_detector\main\model\forest_041.jpg'
output_mask = run_inference(model, sample_image_path)

# Displaying the result using matplotlib (optional)
import matplotlib.pyplot as plt

plt.figure(figsize=(10,5))
plt.subplot(1,2,1)
plt.title("Input Image")
plt.imshow(Image.open(sample_image_path).convert("L"), cmap='gray')

plt.subplot(1,2,2)
plt.title("Predicted Mask")
plt.imshow(output_mask, cmap='gray')
plt.show()