# Deforestation Detection - Land Cover Segmentation

This notebook trains a U-Net model for land cover classification using satellite imagery.

## Dataset
- **Training**: 60 image-mask pairs
- **Validation**: 14 image-mask pairs
- **Classes**: 5 (urban, water, forest, agriculture, road)


In [None]:
# Install required packages
!pip install torch torchvision torchaudio
!pip install segmentation-models-pytorch
!pip install albumentations
!pip install tensorboard


In [None]:
# Install required packages
!pip install scikit-learn

# Import libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.transforms import functional as F

import numpy as np
import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import os
from tqdm import tqdm
import albumentations as A
from albumentations.pytorch import ToTensorV2
from sklearn.model_selection import train_test_split

# Segmentation models
import segmentation_models_pytorch as smp

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)


In [None]:
# Upload your dataset to Colab
# Method 1: Upload zip file and extract
from google.colab import files
import zipfile

# Upload the dataset zip file
print("Please upload your dataset zip file...")
uploaded = files.upload()

# Extract the zip file
for filename in uploaded.keys():
    if filename.endswith('.zip'):
        with zipfile.ZipFile(filename, 'r') as zip_ref:
            zip_ref.extractall('dataset')
        print(f"Extracted {filename} to dataset/ folder")

# Method 2: If you have the dataset in Google Drive
# from google.colab import drive
# drive.mount('/content/drive')
# dataset_path = '/content/drive/MyDrive/your_dataset_path'


In [None]:
# Dataset configuration
class_dict = {
    'urban': (0, 255, 255),
    'water': (0, 0, 255), 
    'forest': (0, 255, 0),
    'agriculture': (255, 255, 0),
    'road': (255, 0, 255)
}

num_classes = len(class_dict)
print(f"Number of classes: {num_classes}")
print("Class mapping:", class_dict)

# Dataset paths (adjust based on your upload method)
train_image_dir = 'dataset/train_image'
train_mask_dir = 'dataset/train_mask'
val_image_dir = 'dataset/val_image'
val_mask_dir = 'dataset/val_mask'

# Check if paths exist
print("Checking dataset paths...")
print(f"Train images exist: {os.path.exists(train_image_dir)}")
print(f"Train masks exist: {os.path.exists(train_mask_dir)}")
print(f"Val images exist: {os.path.exists(val_image_dir)}")
print(f"Val masks exist: {os.path.exists(val_mask_dir)}")

# If validation doesn't exist, use test data or split training data
if not os.path.exists(val_image_dir) or not os.path.exists(val_mask_dir):
    print("Validation data not found. Using test data or splitting training data...")
    # Try test data first
    test_image_dir = 'dataset/test_image'
    test_mask_dir = 'dataset/test_mask'
    
    if os.path.exists(test_image_dir) and os.path.exists(test_mask_dir):
        val_image_dir = test_image_dir
        val_mask_dir = test_mask_dir
        print("Using test data for validation")
    else:
        # Split training data
        from sklearn.model_selection import train_test_split
        print("Splitting training data for validation...")
        # This will be handled in the dataset creation


In [None]:
# Custom Dataset Class
class LandCoverDataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform=None, num_classes=5, split_ratio=None):
        self.image_dir = Path(image_dir)
        self.mask_dir = Path(mask_dir)
        self.transform = transform
        self.num_classes = num_classes
        
        # Get all image files
        self.image_files = list(self.image_dir.glob('*.jpg'))
        self.mask_files = list(self.mask_dir.glob('*.png'))
        
        # Filter for matching pairs
        self.valid_pairs = []
        for img_file in self.image_files:
            mask_file = self.mask_dir / f"{img_file.stem}.png"
            if mask_file.exists():
                self.valid_pairs.append((img_file, mask_file))
        
        # If split_ratio is provided, split the data
        if split_ratio is not None:
            from sklearn.model_selection import train_test_split
            train_pairs, val_pairs = train_test_split(
                self.valid_pairs, 
                test_size=split_ratio, 
                random_state=42
            )
            self.valid_pairs = train_pairs if split_ratio < 0.5 else val_pairs
        
        print(f"Found {len(self.valid_pairs)} valid image-mask pairs")
    
    def __len__(self):
        return len(self.valid_pairs)
    
    def __getitem__(self, idx):
        img_path, mask_path = self.valid_pairs[idx]
        
        # Load image and mask
        image = np.array(Image.open(img_path).convert('RGB'))
        mask = np.array(Image.open(mask_path).convert('RGB'))
        
        # Convert mask to class indices
        mask_indices = self.rgb_to_class_indices(mask)
        
        if self.transform:
            augmented = self.transform(image=image, mask=mask_indices)
            image = augmented['image']
            mask_indices = augmented['mask']
        
        return image, mask_indices.long()
    
    def rgb_to_class_indices(self, mask_rgb):
        """Convert RGB mask to class indices"""
        mask_indices = np.zeros((mask_rgb.shape[0], mask_rgb.shape[1]), dtype=np.uint8)
        
        for i, (class_name, rgb_color) in enumerate(class_dict.items()):
            # Find pixels matching this class color
            class_mask = np.all(mask_rgb == rgb_color, axis=2)
            mask_indices[class_mask] = i
        
        return mask_indices


In [None]:
# Data augmentation and transforms
train_transform = A.Compose([
    A.Resize(256, 256),
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.RandomRotate90(p=0.5),
    A.RandomBrightnessContrast(p=0.2),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(),
])

val_transform = A.Compose([
    A.Resize(256, 256),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(),
])

# Create datasets
# Check if validation data exists, if not split training data
if os.path.exists(val_image_dir) and os.path.exists(val_mask_dir):
    print("Using separate validation data")
    train_dataset = LandCoverDataset(train_image_dir, train_mask_dir, transform=train_transform)
    val_dataset = LandCoverDataset(val_image_dir, val_mask_dir, transform=val_transform)
else:
    print("Splitting training data for validation (80% train, 20% val)")
    # Create full dataset first
    full_dataset = LandCoverDataset(train_image_dir, train_mask_dir, transform=None)
    
    # Split into train and val
    from sklearn.model_selection import train_test_split
    train_pairs, val_pairs = train_test_split(full_dataset.valid_pairs, test_size=0.2, random_state=42)
    
    # Create datasets with split data
    train_dataset = LandCoverDataset(train_image_dir, train_mask_dir, transform=train_transform)
    train_dataset.valid_pairs = train_pairs
    
    val_dataset = LandCoverDataset(train_image_dir, train_mask_dir, transform=val_transform)
    val_dataset.valid_pairs = val_pairs
    
    print(f"Training samples: {len(train_dataset.valid_pairs)}")
    print(f"Validation samples: {len(val_dataset.valid_pairs)}")

# Create data loaders
batch_size = 8
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2)

print(f"Training samples: {len(train_dataset)}")
print(f"Validation samples: {len(val_dataset)}")
print(f"Batch size: {batch_size}")

# Verify data loaders
print(f"Train loader batches: {len(train_loader)}")
print(f"Val loader batches: {len(val_loader)}")

# Test loading a batch
if len(val_loader) > 0:
    print("Validation data loaded successfully!")
else:
    print("ERROR: No validation data found!")


In [None]:
# Create U-Net model
model = smp.Unet(
    encoder_name="resnet34",        # Use ResNet34 as backbone
    encoder_weights="imagenet",     # Use ImageNet pretrained weights
    in_channels=3,                  # RGB input
    classes=num_classes,            # 5 classes
    activation=None,                # No activation (we'll use CrossEntropyLoss)
)

model = model.to(device)
print(f"Model created with {sum(p.numel() for p in model.parameters())} parameters")

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=5, factor=0.5)


In [None]:
# Training function
def train_epoch(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct_pixels = 0
    total_pixels = 0
    
    for images, masks in tqdm(train_loader, desc="Training"):
        images, masks = images.to(device), masks.to(device)
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        
        # Calculate pixel accuracy
        _, predicted = torch.max(outputs, 1)
        correct_pixels += (predicted == masks).sum().item()
        total_pixels += masks.numel()
    
    epoch_loss = running_loss / len(train_loader)
    epoch_acc = correct_pixels / total_pixels
    
    return epoch_loss, epoch_acc

# Validation function
def validate_epoch(model, val_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct_pixels = 0
    total_pixels = 0
    
    # Check if validation loader is empty
    if len(val_loader) == 0:
        print("Warning: Validation loader is empty!")
        return 0.0, 0.0
    
    with torch.no_grad():
        for images, masks in tqdm(val_loader, desc="Validation"):
            images, masks = images.to(device), masks.to(device)
            
            outputs = model(images)
            loss = criterion(outputs, masks)
            
            running_loss += loss.item()
            
            # Calculate pixel accuracy
            _, predicted = torch.max(outputs, 1)
            correct_pixels += (predicted == masks).sum().item()
            total_pixels += masks.numel()
    
    epoch_loss = running_loss / len(val_loader)
    epoch_acc = correct_pixels / total_pixels if total_pixels > 0 else 0.0
    
    return epoch_loss, epoch_acc


In [None]:
# Training loop
num_epochs = 50
best_val_loss = float('inf')
train_losses = []
val_losses = []
train_accs = []
val_accs = []

print("Starting training...")
print(f"Number of epochs: {num_epochs}")
print(f"Device: {device}")

for epoch in range(num_epochs):
    print(f"\nEpoch {epoch+1}/{num_epochs}")
    print("-" * 50)
    
    # Training
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
    
    # Validation
    val_loss, val_acc = validate_epoch(model, val_loader, criterion, device)
    
    # Learning rate scheduling
    scheduler.step(val_loss)
    
    # Store metrics
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accs.append(train_acc)
    val_accs.append(val_acc)
    
    # Print metrics
    print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
    print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
    print(f"Learning Rate: {optimizer.param_groups[0]['lr']:.6f}")
    
    # Save best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'best_model.pth')
        print("New best model saved!")
    
    # Early stopping (optional)
    if epoch > 10 and val_loss > best_val_loss * 1.1:
        print("Early stopping triggered!")
        break

print("\nTraining completed!")


In [None]:
# Plot training curves
plt.figure(figsize=(15, 5))

plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Val Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
plt.plot(train_accs, label='Train Accuracy')
plt.plot(val_accs, label='Val Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

print(f"Best validation loss: {best_val_loss:.4f}")
print(f"Final training accuracy: {train_accs[-1]:.4f}")
print(f"Final validation accuracy: {val_accs[-1]:.4f}")


In [None]:
# Load best model and make predictions
model.load_state_dict(torch.load('best_model.pth'))
model.eval()

# Visualize predictions
def visualize_predictions(model, val_loader, device, num_samples=4):
    model.eval()
    fig, axes = plt.subplots(num_samples, 4, figsize=(16, 4*num_samples))
    
    with torch.no_grad():
        for i, (images, masks) in enumerate(val_loader):
            if i >= num_samples:
                break
                
            images, masks = images.to(device), masks.to(device)
            outputs = model(images)
            predictions = torch.argmax(outputs, dim=1)
            
            # Convert to numpy for visualization
            img = images[0].cpu().permute(1, 2, 0).numpy()
            img = (img * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406]))
            img = np.clip(img, 0, 1)
            
            true_mask = masks[0].cpu().numpy()
            pred_mask = predictions[0].cpu().numpy()
            
            # Plot original image
            axes[i, 0].imshow(img)
            axes[i, 0].set_title('Original Image')
            axes[i, 0].axis('off')
            
            # Plot true mask
            axes[i, 1].imshow(true_mask, cmap='tab10')
            axes[i, 1].set_title('True Mask')
            axes[i, 1].axis('off')
            
            # Plot predicted mask
            axes[i, 2].imshow(pred_mask, cmap='tab10')
            axes[i, 2].set_title('Predicted Mask')
            axes[i, 2].axis('off')
            
            # Plot difference
            diff = (true_mask != pred_mask).astype(float)
            axes[i, 3].imshow(diff, cmap='Reds')
            axes[i, 3].set_title('Difference (Red = Wrong)')
            axes[i, 3].axis('off')
    
    plt.tight_layout()
    plt.show()

# Show predictions
visualize_predictions(model, val_loader, device)


In [None]:
# Download the trained model
from google.colab import files

# Save the model
torch.save({
    'model_state_dict': model.state_dict(),
    'class_dict': class_dict,
    'num_classes': num_classes,
    'model_config': {
        'encoder_name': 'resnet34',
        'encoder_weights': 'imagenet',
        'in_channels': 3,
        'classes': num_classes
    }
}, 'deforestation_model.pth')

# Download the model
files.download('deforestation_model.pth')
print("Model downloaded successfully!")

# Also download the training curves data
import json
training_data = {
    'train_losses': train_losses,
    'val_losses': val_losses,
    'train_accs': train_accs,
    'val_accs': val_accs,
    'best_val_loss': best_val_loss
}

with open('training_curves.json', 'w') as f:
    json.dump(training_data, f)

files.download('training_curves.json')
print("Training curves data downloaded!")
