# Landslide Detection using CNN with S2 image

## A. CNN Model

In [None]:
# !pip install "geoai-py"

In [None]:
import os
import re

import geoai
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
from sklearn.metrics import confusion_matrix, classification_report

from libs.utils import get_device
device = get_device()

### Training from [Landslide Segmentation](https://www.kaggle.com/datasets/niyarrbarman/landslide-divided)

This dataset is a simplified version, and originally made by IARAI for landslide4sense 2022.

In [None]:
out_folder = './data/Landslide_Segmentation'

In [None]:
# Test train_segmentation_model with automatic size detection
geoai.train_segmentation_model(
    images_dir=f"{out_folder}/train/images",
    labels_dir=f"{out_folder}/train/masks",
    output_dir=f"{out_folder}/train/unet_models",
    architecture="unet",
    encoder_name="resnet34",
    encoder_weights="imagenet",
    num_channels=3,
    num_classes=2,  # landslide and non-landslide
    batch_size=8,
    num_epochs=50,
    learning_rate=0.0005,
    val_split=0.2,
    verbose=True,
    early_stopping_patience=10,
    device=device
)

### Evaluate

In [None]:
model_path = f"{out_folder}/train/unet_models/best_model.pth"

history_path = f"{out_folder}/train/unet_models/training_history.pth"
geoai.plot_performance_metrics(
    history_path=history_path,
    figsize=(15, 5),
    verbose=True,
)

### Run model on dataset sample

In [None]:
def evaluate_set(model_path, data_dir):
    images_dir = f"{data_dir}/images"
    # masks_dir = f"{data_dir}/masks"
    predictions_dir = f"{data_dir}/predictions"
    geoai.semantic_segmentation_batch(
        input_dir=images_dir,
        output_dir=predictions_dir,
        model_path=model_path,
        architecture="unet",
        encoder_name="resnet34",
        num_channels=3,
        num_classes=2,
        window_size=512,
        overlap=256,
        batch_size=4,
        quiet=True,
    )

In [None]:
test_dir = './data/Landslide_Segmentation/validation'
evaluate_set(model_path, test_dir)

### Test on Evaluation set

In [None]:
def calculate_confusion_matrix(masks_dir, predictions_dir):
    """
    Calculate confusion matrix for semantic segmentation results
    
    Args:
        masks_dir: Directory containing ground truth masks
        predictions_dir: Directory containing prediction masks
    """
    all_true = []
    all_pred = []
    
    # Get all mask files
    mask_files = sorted([f for f in os.listdir(masks_dir) if f.endswith(('.png', '.jpg', '.tif'))])
    
    pattern = re.compile(r'mask_(\d+)\.png')
    
    for mask_file in mask_files:
        number = pattern.search(mask_file).group(1)
        predict_mask_file = f'image_{number}_mask.png'
        
        # Load ground truth mask
        true_mask_path = os.path.join(masks_dir, mask_file)
        true_mask_img = Image.open(true_mask_path)
        
        # Convert to grayscale/single channel if needed (handles RGBA images)
        if true_mask_img.mode in ('RGBA', 'RGB', 'LA'):
            true_mask_img = true_mask_img.convert('L')
        
        true_mask = np.array(true_mask_img)
        
        # Load predicted mask
        pred_mask_path = os.path.join(predictions_dir, predict_mask_file)
        if not os.path.exists(pred_mask_path):
            print(f"Warning: Prediction not found for {pred_mask_path}")
            continue
        
        pred_mask_img = Image.open(pred_mask_path)
        
        # Convert to grayscale if needed
        if pred_mask_img.mode in ('RGBA', 'RGB', 'LA'):
            pred_mask_img = pred_mask_img.convert('L')
        
        pred_mask = np.array(pred_mask_img)
        
        # Flatten and append
        all_true.extend(true_mask.flatten())
        all_pred.extend(pred_mask.flatten())
    
    # Convert to numpy arrays
    all_true = np.array(all_true)
    all_pred = np.array(all_pred)
    
    print(f"Total samples: {len(all_true)}")
    print(f"Unique values in ground truth: {np.unique(all_true)}")
    print(f"Unique values in predictions: {np.unique(all_pred)}")
    
    # Calculate confusion matrix
    cm = confusion_matrix(all_true, all_pred)
    
    # Plot confusion matrix
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=['Non-Landslide', 'Landslide'],
                yticklabels=['Non-Landslide', 'Landslide'])
    plt.title('Confusion Matrix - Landslide Segmentation')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.tight_layout()
    plt.show()
    
    # Calculate metrics
    print("\nClassification Report:")
    print(classification_report(all_true, all_pred, 
                                target_names=['Non-Landslide', 'Landslide']))
    
    # Calculate additional metrics
    tn, fp, fn, tp = cm.ravel()
    accuracy = (tp + tn) / (tp + tn + fp + fn)
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    iou = tp / (tp + fp + fn) if (tp + fp + fn) > 0 else 0
    
    print(f"\nAdditional Metrics:")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1-Score: {f1_score:.4f}")
    print(f"IoU (Intersection over Union): {iou:.4f}")
    
    return cm

In [None]:
test_dir = './data/Landslide_Segmentation/validation'
# Calculate confusion matrix for validation set
masks_dir = f"{test_dir}/masks"
predictions_dir = f"{test_dir}/predictions"

cm = calculate_confusion_matrix(masks_dir, predictions_dir)

#### Visualize results

In [None]:
def visualize_segmentation_evaluation(img, gt_mask, pred_mask, save_path, figsize=(15, 5)):
    os.makedirs(os.path.dirname(save_path), exist_ok=True)
    return geoai.plot_prediction_comparison(
        original_image=img,
        ground_truth_image=gt_mask,
        prediction_image=pred_mask,
        titles=["Original", "Prediction", "Ground Truth"],
        figsize=(15, 5),
        save_path=save_path,
        show_plot=True,
        indexes=[0, 1, 2],  # Assuming RGB images
        divider=255,
    )

In [None]:
fig = visualize_segmentation_evaluation(
    f"{test_dir}/images/image_8.png",
    f"{test_dir}/masks/mask_8.png",
    f"{test_dir}/predictions/image_8_mask.png",
    f"{test_dir}/segmentation_comparison_image_8/image_8.png",
    figsize=(15, 5)
)

## Auckland data

In [None]:
auckland_dir = './data/2-data-explore-events'
evaluate_set(model_path, auckland_dir)

In [None]:
def visualize_segmentation_results(img_path, pred_img_path):
    # Set seaborn style
    sns.set_style("whitegrid")

    # Create figure with subplots
    fig, axes = plt.subplots(1, 2, figsize=(15, 6))

    # Load and display original image
    img = Image.open(img_path)
    axes[0].imshow(img)
    axes[0].set_title('Original Image', fontsize=14, fontweight='bold')
    axes[0].axis('off')

    # Load and display prediction mask
    pred_img = Image.open(pred_img_path)
    axes[1].imshow(pred_img, cmap='gray')
    axes[1].set_title('Prediction Mask', fontsize=14, fontweight='bold')
    axes[1].axis('off')

    plt.tight_layout()
    plt.show()

In [None]:
auckland_dir = './data/2-data-explore-events'

imgs = [
    '0_174.701880487593_-36.931306466746_2023-02-13_2023-03-06',
    '16_174.603559053774_-37.0492662414968_2023-01-29_2023-02-19',
    '7968_174.575889109309_-36.9510233656217_2023-01-29_2023-02-19',
    '7975_174.575279574077_-36.9584351646335_2023-01-29_2023-02-19',
    '7996_174.59173891446_-36.9652471775134_2023-01-29_2023-02-19',
    '8866_174.567747838123_-36.5191367689426_2023-01-29_2023-02-19'
]

for img_name in imgs:
    img = f"{auckland_dir}/images/{img_name}.png"
    predict = f"{auckland_dir}/predictions/{img_name}_mask.png"
    visualize_segmentation_results(img, predict)
