In [None]:
# Import libraries
import os
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import torch
from torch.utils.data import Dataset, random_split
from torchvision import transforms
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import precision_score, recall_score, f1_score
import cv2

from torchvision.transforms.functional import resize, to_pil_image, to_tensor
from torchvision.transforms import Compose, ToPILImage, Resize, ToTensor
from albumentations import Compose, HorizontalFlip, VerticalFlip, Rotate, Normalize
from albumentations.pytorch import ToTensorV2



In [None]:
# Set image and mask directories
image_directory = 'processed_data/npy_tiles/rgbs'
mask_directory = 'processed_data/npy_tiles/masks'

# List all image and mask files
image_files = sorted([f for f in os.listdir(image_directory) if f.lower().endswith('.npy')])
mask_files = sorted([f for f in os.listdir(mask_directory) if f.lower().endswith('.npy')])



In [None]:
# Load and display a sample image and mask
sample_image = np.load(os.path.join(image_directory, image_files[6733]))
sample_mask = np.load(os.path.join(mask_directory, mask_files[6733]))
fig, ax = plt.subplots(1, 2, figsize=(10, 5))
ax[0].imshow(sample_image / 255.0)
ax[0].set_title("Sample Image")
ax[1].imshow(sample_mask.squeeze(), cmap='gray')
ax[1].set_title("Sample Mask")
plt.show()



In [None]:
import numpy as np
import os

# Set image and mask directories
image_directory = 'processed_data/npy_tiles/rgbs'
mask_directory = 'processed_data/npy_tiles/masks'

# List all image and mask files
image_files = sorted([f for f in os.listdir(image_directory) if f.lower().endswith('.npy')])
mask_files = sorted([f for f in os.listdir(mask_directory) if f.lower().endswith('.npy')])

# Load a sample image and mask
sample_image = np.load(os.path.join(image_directory, image_files[0]))
sample_mask = np.load(os.path.join(mask_directory, mask_files[0]))

# Print out the properties of the image and mask
print(f"Sample Image:")
print(f"Shape: {sample_image.shape}")
print(f"Data type: {sample_image.dtype}")
print(f"Min value: {sample_image.min()}")
print(f"Max value: {sample_image.max()}")
print("\n")
print(f"Sample Mask:")
print(f"Shape: {sample_mask.shape}")
print(f"Data type: {sample_mask.dtype}")
print(f"Unique values: {np.unique(sample_mask)}")


In [None]:
def calculate_dataset_statistics(image_directory, num_channels=3):
    """
    Calculate the mean and standard deviation of the dataset.
    Args:
        image_directory (str): Path to the directory with images.
        num_channels (int): Number of image channels.

    Returns:
        mean (list): Mean value for each channel.
        std (list): Standard deviation for each channel.
    """
    total_sum = np.zeros(num_channels)
    total_sq_sum = np.zeros(num_channels)
    total_num_pixels = 0

    image_files = sorted([f for f in os.listdir(image_directory) if f.lower().endswith('.npy')])

    for image_file in image_files:
        image = np.load(os.path.join(image_directory, image_file))
        total_sum += image.sum(axis=(0, 1))
        total_sq_sum += np.sum(np.square(image), axis=(0, 1))
        total_num_pixels += np.prod(image.shape[:2])

    mean = total_sum / total_num_pixels
    std = np.sqrt((total_sq_sum / total_num_pixels) - np.square(mean))

    return mean.tolist(), std.tolist()


In [None]:
# Calculate dataset statistics
mean, std = calculate_dataset_statistics(image_directory)

print(f"Mean: {mean}")
print(f"Standard deviation: {std}")


In [None]:
# Transformation pipelines
transforms = Compose([
    Normalize(mean=mean, std=std, always_apply=True, p=1.0),
    ToTensorV2()
])

class ConstructionDataset(Dataset):
    def __init__(self, image_directory, mask_directory, transforms=None):
        self.image_files = sorted([f for f in os.listdir(image_directory) if f.lower().endswith('.npy')])
        self.mask_files = sorted([f for f in os.listdir(mask_directory) if f.lower().endswith('.npy')])
        self.image_directory = image_directory
        self.mask_directory = mask_directory
        self.transforms = transforms

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        # Load image and mask
        img_name = os.path.join(self.image_directory, self.image_files[idx])
        mask_name = os.path.join(self.mask_directory, self.mask_files[idx])
        image = np.load(img_name)
        mask = np.load(mask_name)

        # Modify the mask to have a single channel and convert to float
        mask = mask.reshape((1, mask.shape[0], mask.shape[1])).astype(np.float32)

        # Apply transformations
        if self.transforms:
            augmented = self.transforms(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask']
        
        # Remove the channel dimension from the mask if it exists
        #if mask.ndim > 2:
            #mask = mask.squeeze(-1)

        return {
            'image': image,
            'mask': mask
        }




# Create a dataset instance
dataset = ConstructionDataset(image_directory, mask_directory, transforms=transforms)


In [None]:
# Split the dataset into training and validation sets
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Create data loaders for training and validation sets
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=0)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=4, shuffle=False, num_workers=0)

In [None]:
# Get one batch of training data
data = next(iter(train_loader))

# Extract image and mask from the batch
image_ds_viz = data['image'][0].cpu().numpy()  # Get the first image of the batch
mask = data['mask'][0].cpu().numpy()  # Get the corresponding mask

# For visualization purposes, we might need to transpose the image back to HxWxC format from CxHxW
image_ds_viz = np.transpose(image_ds_viz, (1, 2, 0))
image_ds_viz = (image_ds_viz * std) + mean  # denormalize
image_ds_viz = image_ds_viz * 255.0
image_ds_viz = image_ds_viz.astype(np.uint8)  # convert to integers

if mask.ndim == 3:  # Only transpose if mask has 3 dimensions
    mask = np.transpose(mask, (1, 2, 0))

# Visualize the image and mask
fig, ax = plt.subplots(1, 2, figsize=(12, 6))

ax[0].imshow(image_ds_viz)
ax[0].set_title('Image')

ax[1].imshow(mask.squeeze(), cmap='gray')  # Use squeeze() to remove single-dimensional entries from the shape of the mask.
ax[1].set_title('Mask')

plt.show()

In [None]:
torch.cuda.is_available()

In [None]:
class UNet(nn.Module):
    """U-Net architecture for image segmentation."""

    def __init__(self, in_channels, out_channels):
        super(UNet, self).__init__()

        def conv_block(in_channels, out_channels):
            """Create a convolutional block with two convolutional layers and ReLU activations."""
            return nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
                nn.ReLU(inplace=True)
            )

        def upsample(in_channels, out_channels):
            """Create an upsampling layer using transposed convolution."""
            return nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2)

        # Define encoder blocks
        self.encoder1 = conv_block(in_channels, 64)
        self.encoder2 = conv_block(64, 128)
        self.encoder3 = conv_block(128, 256)
        self.encoder4 = conv_block(256, 512)
        self.encoder5 = conv_block(512, 1024)
        self.encoder6 = conv_block(1024, 2048)

        # Define middle block
        self.middle = conv_block(2048, 4096)

        # Define decoder blocks
        self.decoder6 = conv_block(4096, 2048)
        self.decoder5 = conv_block(2048, 1024)
        self.decoder4 = conv_block(1024, 512)
        self.decoder3 = conv_block(512, 256)
        self.decoder2 = conv_block(256, 128)
        self.decoder1 = conv_block(128, 64)
        self.decoder0 = conv_block(64, 32)

        # Define upsampling layers
        self.upsample6 = upsample(4096, 2048)
        self.upsample5 = upsample(2048, 1024)
        self.upsample4 = upsample(1024, 512)
        self.upsample3 = upsample(512, 256)
        self.upsample2 = upsample(256, 128)
        self.upsample1 = upsample(128, 64)

        # Define pooling layer
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        # Define output convolution
        self.out_conv = nn.Conv2d(32, out_channels, kernel_size=1)

    def forward(self, x):
        # Encode input through encoder blocks
        enc1 = self.encoder1(x)
        enc2 = self.encoder2(self.pool(enc1))
        enc3 = self.encoder3(self.pool(enc2))
        enc4 = self.encoder4(self.pool(enc3))
        enc5 = self.encoder5(self.pool(enc4))
        enc6 = self.encoder6(self.pool(enc5))

        # Pass through middle block
        middle = self.middle(self.pool(enc6))

        # Decode and upsample
        dec6 = self.upsample6(middle)
        dec6 = torch.cat((dec6, enc6), dim=1)
        dec6 = self.decoder6(dec6)

        dec5 = self.upsample5(dec6)
        dec5 = torch.cat((dec5, enc5), dim=1)
        dec5 = self.decoder5(dec5)

        dec4 = self.upsample4(dec5)
        dec4 = torch.cat((dec4, enc4), dim=1)
        dec4 = self.decoder4(dec4)

        dec3 = self.upsample3(dec4)
        dec3 = torch.cat((dec3, enc3), dim=1)
        dec3 = self.decoder3(dec3)

        dec2 = self.upsample2(dec3)
        dec2 = torch.cat((dec2, enc2), dim=1)
        dec2 = self.decoder2(dec2)

        dec1 = self.upsample1(dec2)
        dec1 = torch.cat((dec1, enc1), dim=1)
        dec1 = self.decoder1(dec1)

        dec0 = self.decoder0(dec1)

        # Generate output mask
        out = self.out_conv(dec0)
        return out

# Instantiate the model
model = UNet(3, 1)




In [None]:
# Set up training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5, verbose=True)

num_epochs = 50

# Train and validate the model
for epoch in range(num_epochs):
    print(f"Epoch {epoch+1}/{num_epochs}")
    model.train()
    train_loss = 0.0

    # Training loop
    for images, masks in train_loader:
        images = data['image'].to(device)
        masks = data['mask'].to(device)

        optimizer.zero_grad()

        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * images.size(0)

    train_loss /= len(train_loader.dataset)
    print(f"Training Loss: {train_loss:.4f}")

    model.eval()
    val_loss = 0.0

    # Validation loop
    with torch.no_grad():
        for images, masks in val_loader:
            images = data['image'].to(device)
            masks = data['mask'].to(device)

            outputs = model(images)
            loss = criterion(outputs, masks)

            val_loss += loss.item() * images.size(0)

    val_loss /= len(val_loader.dataset)
    scheduler.step(val_loss)
    print(f"Validation Loss: {val_loss:.4f}\n")




In [None]:
# Display a sample prediction
sample_image, sample_mask = val_dataset[0]
sample_image = sample_image.unsqueeze(0).to(device)

with torch.no_grad():
    output = model(sample_image)

output_mask = (output > 0).squeeze().cpu().numpy()

fig, ax = plt.subplots(1, 3, figsize=(15, 5))
ax[0].imshow(sample_image.squeeze().permute(1, 2, 0).cpu().numpy())
ax[0].set_title("Input Image")
ax[1].imshow(sample_mask.squeeze(), cmap="gray")
ax[1].set_title("Ground Truth Mask")
ax[2].imshow(output_mask, cmap="gray")
ax[2].set_title("Predicted Mask")
plt.show()


In [None]:
# Display a sample prediction
sample_image, sample_mask = val_dataset[0]
sample_image = sample_image.unsqueeze(0).to(device)

with torch.no_grad():
    output = model(sample_image)

output_mask = (output > 0).squeeze().cpu().numpy()

# Normalize the image for display
image_display = sample_image.squeeze().permute(1, 2, 0).cpu().numpy()
image_display = (image_display - image_display.min()) / (image_display.max() - image_display.min())

fig, ax = plt.subplots(1, 3, figsize=(15, 5))
ax[0].imshow(image_display)
ax[0].set_title("Input Image")
ax[1].imshow(sample_mask.squeeze(), cmap="gray")
ax[1].set_title("Ground Truth Mask")
ax[2].imshow(output_mask, cmap="gray")
ax[2].set_title("Predicted Mask")
plt.show()

In [None]:
# Calculate evaluation metrics
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def output_to_binary_mask(output, threshold=0.5):
    prob = sigmoid(output)
    mask = (prob > threshold).astype(np.uint8)
    return mask

def iou(y_true, y_pred):
    intersection = np.logical_and(y_true, y_pred)
    union = np.logical_or(y_true, y_pred)
    iou_score = np.sum(intersection) / np.sum(union)
    return iou_score

model.eval()
y_true_list = []
y_pred_list = []

with torch.no_grad():
    for images, masks in val_loader:
        images = images.to(device)
        masks = masks.to(device)

        outputs = model(images)

        y_true_list.extend(masks.cpu().numpy().flatten())
        y_pred_list.extend(outputs.cpu().numpy().flatten())

y_true = np.array(y_true_list)
y_pred_list = np.array(y_pred_list)  # Convert the list to a NumPy array
y_pred_binary = output_to_binary_mask(y_pred_list)

precision = precision_score(y_true, y_pred_binary)
recall = recall_score(y_true, y_pred_binary)
f1 = f1_score(y_true, y_pred_binary)
iou_score = iou(y_true, y_pred_binary)

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-score: {f1:.4f}")
print(f"IoU: {iou_score:.4f}")


In [None]:
# Save the trained model
torch.save(model.state_dict(), "unet_model.pth")



In [None]:
# Load the model for further use or evaluation
loaded_model = UNet(3, 1)
loaded_model.load_state_dict(torch.load("unet_model.pth"))
loaded_model = loaded_model.to(device)



In [None]:
# Display predictions for a number of samples from the validation set
num_samples = 5

with torch.no_grad():
    for i, (images, masks) in enumerate(val_loader):
        if i == num_samples:
            break

        images = images.to(device)
        masks = masks.cpu().numpy()

        outputs = model(images)  # Ensure you're using the correct model here
        predicted_masks = (outputs > 0).cpu().numpy()  # I've replaced output_to_binary_mask with a simple threshold operation

        for idx in range(images.size(0)):
            fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(12, 4))

            # Normalize the image for display
            image_display = images[idx].permute(1, 2, 0).cpu().numpy()
            image_display = (image_display - image_display.min()) / (image_display.max() - image_display.min())

            ax1.imshow(image_display)
            ax1.set_title("Input Image")
            ax1.axis("off")

            ax2.imshow(masks[idx][0], cmap="gray")
            ax2.set_title("Ground Truth")
            ax2.axis("off")

            ax3.imshow(predicted_masks[idx][0], cmap="gray")
            ax3.set_title("Predicted Mask")
            ax3.axis("off")

            plt.show()
