# U-Net CNN Model for Ancient Wall Segmentation (4 Classes) - Normal Maps Only
## Classes:
- 0: Background (Black)
- 1: Ashlar (Blue)
- 2: Quarry (Yellow)
- 3: Polygonal (Red)


# (1) Python import section

In [None]:
# Import required modules
import os
import glob
import cv2
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
from matplotlib.patches import Patch

# PyTorch imports
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torchvision import transforms
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.preprocessing import LabelEncoder
from datetime import datetime
from tqdm import tqdm

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = True

# Setup device
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA version: {torch.version.cuda}")
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    device = torch.device('cuda')
else:
    print("No GPU available, using CPU")
    device = torch.device('cpu')

# Enable TF32 for better performance
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True

# (2) Specify the parameters

In [None]:
# Define classes
CLASS_NAMES = ['Background', 'Ashlar', 'Polygonal', 'Quarry']
CLASS_COLORS = ['black', 'blue', 'red', 'yellow']
NUM_CLASSES = 4

# Data directories - Mac
#normalsDirectory = "/Users/nilsschnorr/Desktop/2025-03-12_Trainingsdaten/1280_normals"
#maskDirectory = "/Users/nilsschnorr/Desktop/2025-03-12_Trainingsdaten/1280_masks"

# Data directories - Windows
normalsDirectory = "C:/Users/admin/Desktop/AWS_TRAINING/2025-08-10_4classEX/snippets_normalmaps"
maskDirectory = "C:/Users/admin/Desktop/AWS_TRAINING/2025-08-10_4classEX/snippets_masks"

# Model parameters
modelTrainedName = "2025-08-11_3-channel_4-class-EX"
modelTrainedExt = "pth"
numEpochs = 300
debugModus = True

modelTrainedName = modelTrainedName + "_" + str(numEpochs) + "." + modelTrainedExt

print('=== 4-Class Segmentation Model (Normal Maps Only) ===')
print('Classes:', CLASS_NAMES)
print('Model name:', modelTrainedName)
print('Number of epochs:', numEpochs)
print('Input channels: 3 (Normal Maps)')

# (3) Define the U-Net Model

In [None]:
class DoubleConv(nn.Module):
    """Double convolution block"""
    def __init__(self, in_channels, out_channels, dropout_rate=0.1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, bias=True)
        self.relu1 = nn.ReLU(inplace=True)
        self.dropout = nn.Dropout2d(dropout_rate)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=True)
        self.relu2 = nn.ReLU(inplace=True)
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.dropout(x)
        x = self.conv2(x)
        x = self.relu2(x)
        return x

class Down(nn.Module):
    """Downscaling with maxpool then double conv"""
    def __init__(self, in_channels, out_channels, dropout_rate=0.1):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels, dropout_rate)
        )

    def forward(self, x):
        return self.maxpool_conv(x)

class Up(nn.Module):
    """Upscaling with bilinear interpolation"""
    def __init__(self, in_channels, out_channels, dropout_rate=0.1):
        super().__init__()
        self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False)
        self.conv_after_up = nn.Conv2d(in_channels, in_channels // 2, kernel_size=3, padding=1)
        self.conv = DoubleConv(in_channels, out_channels, dropout_rate)

    def forward(self, x1, x2):
        x1 = self.up(x1)
        x1 = self.conv_after_up(x1)
        
        # Handle size mismatch
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]

        if diffX != 0 or diffY != 0:
            x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                            diffY // 2, diffY - diffY // 2])
        
        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)

class MultiUNet(nn.Module):
    def __init__(self, n_channels=3, n_classes=4):
        super(MultiUNet, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes

        # Encoder
        self.inc = DoubleConv(n_channels, 16, 0.1)
        self.down1 = Down(16, 32, 0.1)
        self.down2 = Down(32, 64, 0.2)
        self.down3 = Down(64, 128, 0.2)
        self.down4 = Down(128, 256, 0.3)
        
        # Decoder
        self.up1 = Up(256, 128, 0.2)
        self.up2 = Up(128, 64, 0.2)
        self.up3 = Up(64, 32, 0.1)
        self.up4 = Up(32, 16, 0.1)
        self.outc = nn.Conv2d(16, n_classes, kernel_size=1)

    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        logits = self.outc(x)
        return logits

def initialize_weights(model):
    """Initialize model weights"""
    for m in model.modules():
        if isinstance(m, nn.Conv2d):
            nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='relu')
            if m.bias is not None:
                nn.init.constant_(m.bias, 0)

# (4) Load the input images and masks

In [None]:
# Define image size
SIZE_X, SIZE_Y = 512, 512

# Get sorted lists of files
normal_paths = sorted(glob.glob(os.path.join(normalsDirectory, "*.png")))

# Load training images (Normal maps only)
train_images = []
for i, normals_path in enumerate(normal_paths):
    # Load normals image
    normals = cv2.imread(normals_path, cv2.IMREAD_COLOR)
    if normals is None or normals.shape[-1] != 3:
        print(f"Error loading normals image: {normals_path}")
        continue
    normals = cv2.resize(normals, (SIZE_Y, SIZE_X), interpolation=cv2.INTER_AREA)
    
    train_images.append(normals)
    print(f"{i}: {normals_path}")

train_images = np.array(train_images)
print(f'\nTrain images shape: {train_images.shape}')

# Load masks
train_masks = []
mask_paths = sorted(glob.glob(os.path.join(maskDirectory, "*.png")))
for i, mask_path in enumerate(mask_paths):
    mask = cv2.imread(mask_path, 0)
    mask = cv2.resize(mask, (SIZE_Y, SIZE_X), interpolation=cv2.INTER_NEAREST)
    train_masks.append(mask)
    print(f"{i}: {mask_path}")

train_masks = np.array(train_masks)
print(f'\nTrain masks shape: {train_masks.shape}')
print(f'Unique mask values: {np.unique(train_masks)}')

# (5) Prepare the data for training

In [None]:
# Encode labels
labelencoder = LabelEncoder()
n, h, w = train_masks.shape
train_masks_reshaped = train_masks.reshape(-1)
train_masks_reshaped_encoded = labelencoder.fit_transform(train_masks_reshaped)
train_masks_encoded_original_shape = train_masks_reshaped_encoded.reshape(n, h, w)

# Normalize images
train_images = train_images.astype('float32') / 255.0
train_masks_input = train_masks_encoded_original_shape

# Split data: 90% train, 10% validation
X_train, X_val, y_train, y_val = train_test_split(
    train_images, train_masks_input, 
    test_size=0.1,  # 10% for validation
    random_state=0
)

print(f"Training set: {X_train.shape[0]} images")
print(f"Validation set: {X_val.shape[0]} images")
print(f"Classes in dataset: {np.unique(y_train)}")

# Convert to PyTorch tensors
X_train_tensor = torch.FloatTensor(X_train).permute(0, 3, 1, 2)
y_train_tensor = torch.LongTensor(y_train)
X_val_tensor = torch.FloatTensor(X_val).permute(0, 3, 1, 2)
y_val_tensor = torch.LongTensor(y_val)

# Calculate class weights
class_labels = np.unique(train_masks_reshaped_encoded)
class_weights_balanced = compute_class_weight('balanced', classes=class_labels, y=train_masks_reshaped_encoded)

# Use equal weights (set to False for balanced weights)
USE_EQUAL_WEIGHTS = True

if USE_EQUAL_WEIGHTS:
    print("\nUsing EQUAL class weights")
    class_weights = np.ones(NUM_CLASSES)
    class_weights_tensor = None
else:
    print("\nUsing BALANCED class weights")
    class_weights = class_weights_balanced
    class_weights_tensor = torch.FloatTensor(class_weights).to(device)

print("\nClass weights:")
for i, weight in enumerate(class_weights):
    print(f"Class {i} ({CLASS_NAMES[i]}): {weight:.4f}")

# (6) Compile the model

In [None]:
# Model configuration
IMG_HEIGHT = X_train.shape[1]
IMG_WIDTH = X_train.shape[2]
IMG_CHANNELS = X_train.shape[3]
n_classes = NUM_CLASSES

print(f"Model input: {IMG_HEIGHT}x{IMG_WIDTH}x{IMG_CHANNELS}")
print(f"Output classes: {n_classes}")

# Create model
model = MultiUNet(n_channels=IMG_CHANNELS, n_classes=n_classes).to(device)
initialize_weights(model)

# Loss function
class CEDiceLoss(nn.Module):
    def __init__(self, weight=None, smooth=1e-5):
        super().__init__()
        self.ce = nn.CrossEntropyLoss(weight=weight)
        self.smooth = smooth
        
    def forward(self, inputs, targets):
        ce_loss = self.ce(inputs, targets)
        
        # Dice loss
        inputs_soft = F.softmax(inputs, dim=1)
        targets_one_hot = F.one_hot(targets, num_classes=inputs.shape[1]).permute(0,3,1,2).float()
        
        dims = (2, 3)
        intersection = (inputs_soft * targets_one_hot).sum(dims)
        cardinality = inputs_soft.sum(dims) + targets_one_hot.sum(dims)
        dice_score = (2. * intersection + self.smooth) / (cardinality + self.smooth)
        dice_loss = 1. - dice_score.mean()
        
        return 0.5 * ce_loss + 0.5 * dice_loss

criterion = CEDiceLoss(weight=class_weights_tensor)
optimizer = optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.999), eps=1e-7)

# Model summary
total_params = sum(p.numel() for p in model.parameters())
print(f"\nTotal parameters: {total_params:,}")

# IoU metric
def calculate_iou(predictions, labels, n_classes):
    ious = []
    predictions = torch.argmax(predictions, dim=1)
    
    for cls in range(n_classes):
        pred_mask = (predictions == cls)
        true_mask = (labels == cls)
        
        intersection = (pred_mask & true_mask).float().sum()
        union = (pred_mask | true_mask).float().sum()
        
        if union == 0:
            iou = 1.0 if intersection == 0 else 0.0
        else:
            iou = intersection / union
        
        ious.append(iou.item() if isinstance(iou, torch.Tensor) else iou)
    
    return np.mean(ious)

# (7) Train and save the model

In [None]:
print(f'Starting training at: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')

# Create data loaders
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)  # Added shuffle=True for better training
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

# Training history
history = {
    'loss': [], 'val_loss': [], 'accuracy': [], 'val_accuracy': [],
    'iou_metric': [], 'val_iou_metric': []
}

# Training loop
for epoch in range(numEpochs):
    # Training phase
    model.train()
    train_loss = train_correct = train_total = train_iou = 0.0
    
    data_iterator = tqdm(train_loader, desc=f'Epoch {epoch+1}/{numEpochs}') if epoch % 10 == 0 else train_loader
    
    for data, target in data_iterator:
        data, target = data.to(device), target.to(device)
        
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        
        train_loss += loss.item()
        _, predicted = torch.max(output.data, 1)
        train_total += target.numel()
        train_correct += (predicted == target).sum().item()
        
        with torch.no_grad():
            train_iou += calculate_iou(output, target, n_classes)

    # Validation phase
    model.eval()
    val_loss = val_correct = val_total = val_iou = 0.0
    
    with torch.no_grad():
        for data, target in val_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)
            
            val_loss += loss.item()
            _, predicted = torch.max(output.data, 1)
            val_total += target.numel()
            val_correct += (predicted == target).sum().item()
            val_iou += calculate_iou(output, target, n_classes)
    
    # Calculate metrics
    epoch_train_loss = train_loss / len(train_loader)
    epoch_val_loss = val_loss / len(val_loader)
    epoch_train_acc = train_correct / train_total
    epoch_val_acc = val_correct / val_total
    epoch_train_iou = train_iou / len(train_loader)
    epoch_val_iou = val_iou / len(val_loader)
    
    # Store history
    history['loss'].append(epoch_train_loss)
    history['val_loss'].append(epoch_val_loss)
    history['accuracy'].append(epoch_train_acc)
    history['val_accuracy'].append(epoch_val_acc)
    history['iou_metric'].append(epoch_train_iou)
    history['val_iou_metric'].append(epoch_val_iou)
    
    # Print progress
    if (epoch + 1) % 10 == 0:
        print(f'\nEpoch [{epoch+1}/{numEpochs}]')
        print(f'Loss: {epoch_train_loss:.4f} -> {epoch_val_loss:.4f}')
        print(f'Acc: {epoch_train_acc:.4f} -> {epoch_val_acc:.4f}')
        print(f'IoU: {epoch_train_iou:.4f} -> {epoch_val_iou:.4f}')

# Save model
torch.save({
    'epoch': numEpochs,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': epoch_train_loss,
    'history': history,
    'n_classes': n_classes,
    'class_names': CLASS_NAMES,
    'class_colors': CLASS_COLORS,
    'img_channels': IMG_CHANNELS
}, modelTrainedName)

print(f'\nTraining completed at: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
print(f'Final results - Loss: {history["val_loss"][-1]:.4f}, Acc: {history["val_accuracy"][-1]:.2%}, IoU: {history["val_iou_metric"][-1]:.4f}')

In [None]:
# Plot training history
epochs = range(1, len(history['loss']) + 1)

plt.figure(figsize=(15, 5))

plt.subplot(1, 3, 1)
plt.plot(epochs, history['loss'], 'y-', label='Training')
plt.plot(epochs, history['val_loss'], 'r-', label='Validation')
plt.title('Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

plt.subplot(1, 3, 2)
plt.plot(epochs, history['accuracy'], 'y-', label='Training')
plt.plot(epochs, history['val_accuracy'], 'r-', label='Validation')
plt.title('Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)

plt.subplot(1, 3, 3)
plt.plot(epochs, history['iou_metric'], 'y-', label='Training')
plt.plot(epochs, history['val_iou_metric'], 'r-', label='Validation')
plt.title('IoU')
plt.xlabel('Epochs')
plt.ylabel('IoU')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

# (8) Evaluate and visualize results

In [None]:
# Set up colormap
cmap_seg = ListedColormap(CLASS_COLORS)
model.eval()

# Visualize predictions on validation set
for idx in range(min(5, len(X_val))):
    val_img = X_val[idx]
    ground_truth = y_val[idx]
    
    # Make prediction
    with torch.no_grad():
        val_img_tensor = torch.FloatTensor(val_img).permute(2, 0, 1).unsqueeze(0).to(device)
        prediction = model(val_img_tensor)
        predicted_img = torch.argmax(prediction, dim=1)[0].cpu().numpy()
    
    # Convert normal map for display
    normals_disp = cv2.cvtColor((val_img * 255).astype(np.uint8), cv2.COLOR_BGR2RGB)
    
    # Plot
    fig, axes = plt.subplots(1, 3, figsize=(18, 6))
    
    axes[0].imshow(normals_disp)
    axes[0].set_title("Normal Map Input")
    axes[0].axis("off")
    
    axes[1].imshow(ground_truth, cmap=cmap_seg, vmin=0, vmax=n_classes-1)
    axes[1].set_title("Ground Truth")
    axes[1].axis("off")
    
    axes[2].imshow(predicted_img, cmap=cmap_seg, vmin=0, vmax=n_classes-1)
    axes[2].set_title("Prediction")
    axes[2].axis("off")
    
    legend_elements = [Patch(facecolor=CLASS_COLORS[i], label=CLASS_NAMES[i]) for i in range(n_classes)]
    fig.legend(handles=legend_elements, loc='center right', bbox_to_anchor=(0.98, 0.5))
    
    plt.show()

# (9) Save validation predictions

In [None]:
# Create output directory
model_folder_name = modelTrainedName.rsplit('.', 1)[0]
output_dir = f"C:/Users/admin/Desktop/{model_folder_name}"
os.makedirs(output_dir, exist_ok=True)

model.eval()
cmap_seg = ListedColormap(CLASS_COLORS)

# Save all validation predictions
for idx in range(len(X_val)):
    val_img = X_val[idx]
    ground_truth = y_val[idx]
    
    # Make prediction
    with torch.no_grad():
        val_img_tensor = torch.FloatTensor(val_img).permute(2, 0, 1).unsqueeze(0).to(device)
        prediction = model(val_img_tensor)
        predicted_img = torch.argmax(prediction, dim=1)[0].cpu().numpy()
    
    # Convert normal map for display
    normals_disp = cv2.cvtColor((val_img * 255).astype(np.uint8), cv2.COLOR_BGR2RGB)
    
    # Create figure
    fig, axes = plt.subplots(1, 3, figsize=(18, 6))
    
    axes[0].imshow(normals_disp)
    axes[0].set_title("Normal Map Input")
    axes[0].axis("off")
    
    axes[1].imshow(ground_truth, cmap=cmap_seg, vmin=0, vmax=n_classes-1)
    axes[1].set_title("Ground Truth")
    axes[1].axis("off")
    
    axes[2].imshow(predicted_img, cmap=cmap_seg, vmin=0, vmax=n_classes-1)
    axes[2].set_title("Prediction")
    axes[2].axis("off")
    
    legend_elements = [Patch(facecolor=CLASS_COLORS[i], label=CLASS_NAMES[i]) for i in range(n_classes)]
    fig.legend(handles=legend_elements, loc='center right', bbox_to_anchor=(0.98, 0.5))
    
    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, f"val_image_{idx:04d}.png"), dpi=150, bbox_inches='tight')
    plt.close(fig)

print(f"Saved {len(X_val)} validation predictions to {output_dir}")