In [1]:
import os
import pandas as pd
import numpy as np
from skimage import io
import torch
import matplotlib.pyplot as plt
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
import cv2
from torchvision import transforms
from PIL import Image
import torch.optim as optim
from tqdm import tqdm

In [2]:
class CustomDataset(Dataset):
    def __init__(self, csv_file, root_dir, target_size=(768, 768), ship_ratio=0.5):
        self.data = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.target_size = target_size

        # Filter images without ships
        images_with_ship = self.data[self.data['EncodedPixels'].notnull()]
        num_images_with_ship = len(images_with_ship)

        # Calculate the number of images without ships dynamically
        num_images_without_ship = int(num_images_with_ship / ship_ratio) - num_images_with_ship

        # Ensure at least one image without ships is included
        num_images_without_ship = max(num_images_without_ship, 1)

        # Sample images without ships without replacement
        images_without_ship = self.data[self.data['EncodedPixels'].isnull()].sample(
            n=num_images_without_ship, random_state=42, replace=False)

        # Concatenate the two subsets
        self.data = pd.concat([images_with_ship, images_without_ship], ignore_index=True)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.data.iloc[idx, self.data.columns.get_loc('ImageId')])

        try:
            # Load the image using PIL
            image = Image.open(img_name).convert('RGB')

            # Resize the image to the target size
            image = image.resize(self.target_size)

            # Assuming you have a column 'EncodedPixels' for masks
            mask_str = self.data.iloc[idx, self.data.columns.get_loc('EncodedPixels')]  

            # Convert the mask string to a NumPy array
            mask = rle_decode(mask_str, target_size=self.target_size)

            # Convert the mask to a PIL image
            mask = Image.fromarray(mask)

            # Apply transformations to convert to tensors
            transform = transforms.Compose([
                transforms.ToTensor(),])

            image = transform(image)
            mask = transform(mask)

            sample = {'image': image, 'mask': mask}

            # print("Sample:", sample)  

            return sample

        except Exception as e:
            print(f"Error: {str(e)} Could not load image at {img_name}")
            return None  


# Function to decode RLE-encoded masks to NumPy arrays
def rle_decode(mask_rle, target_size=(768, 768)):
    if pd.isna(mask_rle):  # Handle missing masks
        return np.zeros(target_size, dtype=np.uint8)

    s = mask_rle.split()
    starts, lengths = [np.asarray(x, dtype=int) for x in (s[0:][::2], s[1:][::2])]
    starts -= 1
    ends = starts + lengths
    img = np.zeros(np.prod(target_size), dtype=np.uint8)
    for lo, hi in zip(starts, ends):
        img[lo:hi] = 1
    return img.reshape(target_size, order='F')

csv_file = "C:\\Users\\Микола\\Downloads\\airbus-ship-detection\\train_ship_segmentations_v2.csv"
root_dir = "C:\\Users\\Микола\\Downloads\\airbus-ship-detection\\train_v2"

# Create an instance of the dataset
custom_dataset = CustomDataset(csv_file=csv_file, root_dir=root_dir)

# Calculate the index for splitting into train and validation sets
split_index = int(0.8 * len(custom_dataset))

# Create training and validation datasets
train_dataset, val_dataset = torch.utils.data.random_split(custom_dataset, [split_index, len(custom_dataset) - split_index])

# Create DataLoaders for training and validation datasets
batch_size = 16  
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(dataset=val_dataset, batch_size=batch_size, shuffle=False)


In [3]:
class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()

        # Downsampling path
        self.conv1 = self.conv_block(3, 8)
        self.conv2 = self.conv_block(8, 16)
        self.conv3 = self.conv_block(16, 32)
        self.conv4 = self.conv_block(32, 64)

        # Upsampling path
        self.upconv4 = self.upconv_block(64, 32)
        self.upconv3 = self.upconv_block(32, 16, skip_channels=32)
        self.upconv2 = self.upconv_block(16, 8, skip_channels=16)
        self.upconv1 = self.upconv_block(8, 8, skip_channels=8)

        # Final convolutional layer
        self.final_conv = nn.Conv2d(8, 1, kernel_size=1)

    def conv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2))

    def upconv_block(self, in_channels, out_channels, skip_channels=0):
        return nn.Sequential(
            nn.ConvTranspose2d(in_channels + skip_channels, out_channels, kernel_size=2, stride=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True))


    def forward(self, x):
        # Downsampling
        x1 = self.conv1(x)
        # print("Shape after conv1:", x1.shape)
        x2 = self.conv2(x1)
        # print("Shape after conv2:", x2.shape)
        x3 = self.conv3(x2)
        # print("Shape after conv3:", x3.shape)
        x4 = self.conv4(x3)
        # print("Shape after conv4:", x4.shape)

        # Upsampling with skip connections
        x = self.upconv4(x4)
        # print("Shape after upconv4:", x.shape)
        x = self.upconv3(torch.cat([x, x3], dim=1)) 
        # print("Shape after upconv3:", x.shape)
        x = self.upconv2(torch.cat([x, x2], dim=1))  
        # print("Shape after upconv2:", x.shape)
        x = self.upconv1(torch.cat([x, x1], dim=1))  
        # print("Shape after upconv1:", x.shape)

        # Final convolutional layer
        x = self.final_conv(x)
        # print("Shape after final_conv:", x.shape)
        return x

model = UNet()
# print(model)

# Sample input
batch_size = 8
input_channels = 3
input_height = 768
input_width = 768
sample_input = torch.randn((batch_size, input_channels, input_height, input_width))

# Forward pass
output_conv = model(sample_input)

# # Final convolution
# output_final = model.final_conv(output_upconv1)

In [None]:
# Define the Dice Score function
def dice_score(pred, target, smooth=1.0):
    intersection = (pred * target).sum()
    union = pred.sum() + target.sum()
    dice = (2.0 * intersection + smooth) / (union + smooth)
    return dice.item()

# Move the model to GPU if available;
device = torch.device("cuda:0")
assert torch.cuda.is_available(), "CUDA is not available on this machine."
model = model.to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()  ####################### change? 
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 10

# Training loop
for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0
    total_dice = 0.0

    for batch in tqdm(train_loader, desc=f"Epoch {epoch + 1}/{num_epochs}"):
        images = batch['image'].to(device)
        masks = batch['mask'].to(device)

        # Explicitly send the input tensor to the GPU
        images = images.to(device)

        # # Print information about the images
        # print("Batch Shape:", images.shape)
        # print("Min Pixel Value:", images.min())
        # print("Max Pixel Value:", images.max())
        # print("Unique Pixel Values:", torch.unique(images))

        # Forward pass
        outputs = model(images)

        loss = criterion(outputs, masks)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Calculate Dice Score (assuming binary segmentation)
        predictions = torch.sigmoid(outputs)
        dice = dice_score(predictions > 0.5, masks)

        total_loss += loss.item()
        total_dice += dice

    average_loss = total_loss / len(train_loader)
    average_dice = total_dice / len(train_loader)
    print(f"Epoch {epoch + 1}/{num_epochs}, Average Loss: {average_loss:.4f}, Average Dice Score: {average_dice:.4f}")

# Validation loop
model.eval()
total_dice_val = 0.0

with torch.no_grad():
    for batch_val in tqdm(val_loader, desc="Validation"):
        images_val = batch_val['image'].to(device)
        masks_val = batch_val['mask'].to(device)

        # Explicitly send the input tensor to the GPU
        images_val = images_val.to(device)

        # Forward pass
        outputs_val = model(images_val)

        # Calculate Dice Score (assuming binary segmentation)
        predictions_val = torch.sigmoid(outputs_val)
        dice_val = dice_score(predictions_val > 0.5, masks_val)

        total_dice_val += dice_val

average_dice_val = total_dice_val / len(val_loader)
print(f"Average Dice Score on Validation Set: {average_dice_val:.4f}")
