In [1]:
import os
import torch
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torch.nn as nn
import torchvision.models as models
import torch.optim as optim
from tqdm import tqdm

# Set Dataset Path
DATASET_PATH = "/kaggle/input/semantic-segmentation-of-aerial-imagery/Semantic segmentation dataset"

#  Define Device (GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f" Using device: {device}")

 Using device: cuda


In [4]:
#  Define Transformations for Images
image_transform = transforms.Compose([
    transforms.Resize((512, 512)),  # Resize images
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize RGB channels
])

#  Class to Load Dataset
class AerialDataset(Dataset):
    def __init__(self, dataset_path, transform=None):
        self.dataset_path = dataset_path
        self.transform = transform
        self.image_paths = []
        self.mask_paths = []

        # Loop through all tiles (Tile 1 to Tile 8)
        for tile_num in range(1, 9):  
            tile_path = os.path.join(self.dataset_path, f"Tile {tile_num}")
            img_folder = os.path.join(tile_path, "images")
            mask_folder = os.path.join(tile_path, "masks")

            if os.path.exists(img_folder) and os.path.exists(mask_folder):
                images = sorted(os.listdir(img_folder))
                masks = sorted(os.listdir(mask_folder))

                for img_file, mask_file in zip(images, masks):
                    self.image_paths.append(os.path.join(img_folder, img_file))
                    self.mask_paths.append(os.path.join(mask_folder, mask_file))

        print(f" Loaded {len(self.image_paths)} images & {len(self.mask_paths)} masks")

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        mask_path = self.mask_paths[idx]

        #  Load image & mask
        image = Image.open(img_path).convert("RGB")
        mask = Image.open(mask_path).convert("L")  # Convert mask to grayscale

        #  Resize both image and mask to 512x512
        image = image.resize((512, 512), Image.BILINEAR)
        mask = mask.resize((512, 512), Image.NEAREST)  # Keep mask integer values

        #  Convert to tensor
        image = image_transform(image)
        mask = np.array(mask, dtype=np.uint8)  # Convert mask to NumPy array

        #  Fix Mask Values: Convert grayscale [45, 92, ...] → Class Labels [0-5]
        mapping = {45: 0, 92: 1, 155: 2, 171: 3, 172: 4, 212: 5}  # Define mapping
        mask_mapped = np.zeros_like(mask, dtype=np.uint8)  # Empty mask
        for k, v in mapping.items():
            mask_mapped[mask == k] = v  # Apply mapping

        #  Convert back to tensor
        mask_tensor = torch.tensor(mask_mapped, dtype=torch.long)

        return image, mask_tensor

#  Load Dataset
dataset = AerialDataset(DATASET_PATH, transform=image_transform)

#  Create DataLoader
dataloader = DataLoader(dataset, batch_size=4, shuffle=True)
print(" Dataset loaded with resized images & corrected masks!")

 Loaded 0 images & 0 masks


ValueError: num_samples should be a positive integer value, but got num_samples=0

In [None]:
sample_image, sample_mask = dataset[0]  # Load first image-mask pair
print(f"Fixed unique values in the mask: {torch.unique(sample_mask)}")

In [None]:
#  Load Pretrained DeepLabV3+ Model
model = models.segmentation.deeplabv3_resnet101(weights=models.segmentation.DeepLabV3_ResNet101_Weights.DEFAULT)

#  Modify final classifier for number of classes
num_classes = 6  # Change this based on your dataset
model.classifier[4] = nn.Conv2d(256, num_classes, kernel_size=(1,1))

#  Move Model to GPU if available
model.to(device)

print(" Model loaded and ready for training!")

In [None]:
#  Define Loss Function (CrossEntropyLoss for segmentation)
criterion = nn.CrossEntropyLoss()

#  Use Adam Optimizer for better learning
optimizer = optim.Adam(model.parameters(), lr=0.0001)

print(" Loss function & optimizer set up!")

In [None]:
num_epochs = 10  # Change based on your needs

for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    epoch_loss = 0

    for images, masks in tqdm(dataloader, desc=f"Epoch {epoch+1}/{num_epochs}"):
        images, masks = images.to(device), masks.to(device)  # Move to GPU if available
        
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(images)["out"]
        
        #  Compute loss
        loss = criterion(outputs, masks.long())

        #  Backpropagation
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    print(f" Epoch {epoch+1}/{num_epochs} - Loss: {epoch_loss / len(dataloader)}")

#  Save the trained model
torch.save(model.state_dict(), "deeplabv3_segmentation.pth")
print(" Model training complete & saved!")

In [2]:
#  Load the trained model
model.load_state_dict(torch.load("deeplabv3_segmentation.pth"))
model.eval()  # Set model to evaluation mode

print(" Model loaded for inference!")

NameError: name 'model' is not defined

In [None]:
import matplotlib.pyplot as plt

#  Pick a test image
test_img_path = dataset.image_paths[1]  # Choose any image from dataset
test_mask_path = dataset.mask_paths[1]

#  Load the image
image = Image.open(test_img_path).convert("RGB")
image_resized = image.resize((512, 512), Image.BILINEAR)
input_tensor = image_transform(image_resized).unsqueeze(0).to(device)  # Add batch dimension

#  Run the model on the image
with torch.no_grad():
    output = model(input_tensor)["out"]
    predicted_mask = torch.argmax(output, dim=1).squeeze().cpu().numpy()  # Get class with highest probability

#  Load ground truth mask
ground_truth_mask = Image.open(test_mask_path).resize((512, 512), Image.NEAREST)
ground_truth_mask = np.array(ground_truth_mask, dtype=np.uint8)

#  Show Image, Prediction, and Ground Truth Mask
fig, ax = plt.subplots(1, 3, figsize=(18, 6))
ax[0].imshow(image)
ax[0].set_title("Original Image")
ax[1].imshow(predicted_mask, cmap="jet")
ax[1].set_title("Predicted Mask")
ax[2].imshow(ground_truth_mask, cmap="jet")
ax[2].set_title("Ground Truth Mask")
plt.show()

In [None]:
import matplotlib.pyplot as plt
from PIL import Image
import torch
import numpy as np

# Custom image path
test_img_path = '/kaggle/input/semantic-segmentation-of-aerial-imagery/Semantic segmentation dataset/Tile 1/images/image_part_001.jpg'  # Path to your image

#  Load the image
image = Image.open(test_img_path).convert("RGB")
image_resized = image.resize((512, 512), Image.BILINEAR)
input_tensor = image_transform(image_resized).unsqueeze(0).to(device)  # Add batch dimension

#  Run the model on the image
with torch.no_grad():
    output = model(input_tensor)["out"]
    predicted_mask = torch.argmax(output, dim=1).squeeze().cpu().numpy()  # Get class with highest probability

#  Show Image and Predicted Mask
fig, ax = plt.subplots(1, 2, figsize=(12, 6))
ax[0].imshow(image)
ax[0].set_title("Original Image")
ax[1].imshow(predicted_mask, cmap="jet")
ax[1].set_title("Predicted Mask")
plt.show()


In [None]:
import os
from tensorflow.keras.models import Model
from tensorflow.keras.applications import ResNet50
import h5py
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.utils import Sequence
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import (Conv2D, BatchNormalization, Activation, UpSampling2D, 
                                     AveragePooling2D, Conv2DTranspose, Concatenate, Input)
from tensorflow.keras.models import Model
from tensorflow.keras.applications import ResNet50
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import Sequence
from tensorflow.keras import backend as K

In [None]:
with h5py.File('/kaggle/input/landslide4sense/TrainData/img/image_1.h5', 'r') as hf:
    print(hf.keys())

In [None]:
with h5py.File('/kaggle/input/landslide4sense/TrainData/mask/mask_10.h5', 'r') as hf1:
    print(hf1.keys())

In [None]:
path_single = r'/kaggle/input/landslide4sense/TrainData/img/image_1.h5'
path_single_mask = r'/kaggle/input/landslide4sense/TrainData/mask/mask_1.h5'

f_data = np.zeros((1, 128, 128, 3))

# Open the HDF5 file
with h5py.File(path_single, 'r') as hdf:
    # Print keys in the HDF5 file
    ls = list(hdf.keys())
    print("Available keys in the HDF5 file:", ls)
    
    # Check if 'img' key exists
    if 'img' not in ls:
        raise KeyError("'img' key not found in HDF5 file")
    
    # Load the image data
    data = np.array(hdf.get('img'))
    print("Input data shape:", data.shape)
    
    # Check the shape to avoid indexing errors
    if data.shape[2] < 14:
        raise ValueError("The data has fewer than 14 channels. Shape:", data.shape)

    # Display a sample image (e.g., Red channel)
    plt.imshow(data[:, :, 3])  # Red channel (adjust as needed)
    plt.title("Red Channel")
    plt.show()

    # Extract specific bands for NDVI calculation
    data_red = data[:, :, 3]
    data_green = data[:, :, 2]
    data_blue = data[:, :, 1]
    data_nir = data[:, :, 7]

    # Calculate NDVI (Normalized Difference Vegetation Index)
    data_ndvi = np.divide(data_nir - data_red, np.add(data_nir, data_red), where=(data_nir + data_red) != 0)
    
    # Store NDVI and other bands in f_data
    f_data[0, :, :, 0] = data_ndvi
    f_data[0, :, :, 1] = data[:, :, 12]
    f_data[0, :, :, 2] = data[:, :, 13]
    print("data_ndvi shape:", data_ndvi.shape, "f_data shape:", f_data.shape)
    
    # Plot NDVI
    plt.imshow(data_ndvi, cmap='viridis')
    plt.title("NDVI")
    plt.show()

In [None]:
with h5py.File(path_single_mask) as hdf:

    ls = list(hdf.keys())

    print("ls", ls)

    data = np.array(hdf.get('mask'))

    print("input data shape:", data.shape)

    plt.imshow(data)

In [None]:
class DataGenerator(Sequence):
    def __init__(self, image_dir, mask_dir, batch_size=1, img_size=(128, 128), shuffle=True):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.batch_size = batch_size
        self.img_size = img_size
        self.shuffle = shuffle

        # List all the image and mask file names
        self.image_files = sorted(os.listdir(image_dir))
        self.mask_files = sorted(os.listdir(mask_dir))

    def __len__(self):
        # Number of batches per epoch
        return int(np.floor(len(self.image_files) / self.batch_size))

    def on_epoch_end(self):
        # Shuffle data at the end of each epoch
        if self.shuffle:
            temp = list(zip(self.image_files, self.mask_files))
            np.random.shuffle(temp)
            self.image_files, self.mask_files = zip(*temp)

    def __getitem__(self, index):
        # Get the list of image and mask files for this batch
        batch_images = self.image_files[index * self.batch_size:(index + 1) * self.batch_size]
        batch_masks = self.mask_files[index * self.batch_size:(index + 1) * self.batch_size]

        # Load and preprocess the images and masks
        images = []
        masks = []
        for img_file, mask_file in zip(batch_images, batch_masks):
            img = self.load_image(os.path.join(self.image_dir, img_file))
            mask = self.load_mask(os.path.join(self.mask_dir, mask_file))
            images.append(img)
            masks.append(mask)

        # Return as numpy arrays
        return np.array(images), np.array(masks)

    def load_image(self, img_path):
        with h5py.File(img_path, 'r') as hdf:
            img_data = np.array(hdf.get('img'))  # (128, 128, 14)
            img_data = img_data / 255.0  # Normalize to [0, 1]
            return img_data

    def load_mask(self, mask_path):
        with h5py.File(mask_path, 'r') as hdf:
            mask_data = np.array(hdf.get('mask'))  # (128, 128)
            mask_data = np.expand_dims(mask_data, axis=-1)  # (128, 128, 1)
            return mask_data

In [None]:
""" Atrous Spatial Pyramid Pooling """
def ASPP(inputs):
    shape = inputs.shape

    y_pool = AveragePooling2D(pool_size=(shape[1], shape[2]), name='average_pooling')(inputs)
    y_pool = Conv2D(filters=256, kernel_size=1, padding='same', use_bias=False)(y_pool)
    y_pool = BatchNormalization(name=f'bn_1')(y_pool)
    y_pool = Activation('relu', name=f'relu_1')(y_pool)
    y_pool = UpSampling2D((shape[1], shape[2]), interpolation="bilinear")(y_pool)

    y_1 = Conv2D(filters=256, kernel_size=1, dilation_rate=1, padding='same', use_bias=False)(inputs)
    y_1 = BatchNormalization()(y_1)
    y_1 = Activation('relu')(y_1)

    y_6 = Conv2D(filters=256, kernel_size=3, dilation_rate=6, padding='same', use_bias=False)(inputs)
    y_6 = BatchNormalization()(y_6)
    y_6 = Activation('relu')(y_6)

    y_12 = Conv2D(filters=256, kernel_size=3, dilation_rate=12, padding='same', use_bias=False)(inputs)
    y_12 = BatchNormalization()(y_12)
    y_12 = Activation('relu')(y_12)

    y_18 = Conv2D(filters=256, kernel_size=3, dilation_rate=18, padding='same', use_bias=False)(inputs)
    y_18 = BatchNormalization()(y_18)
    y_18 = Activation('relu')(y_18)

    y = Concatenate()([y_pool, y_1, y_6, y_12, y_18])

    y = Conv2D(filters=256, kernel_size=1, dilation_rate=1, padding='same', use_bias=False)(y)
    y = BatchNormalization()(y)
    y = Activation('relu')(y)
    return y

In [None]:
def DeepLabV3Plus(shape):
    """ Inputs """
    inputs = Input(shape)

    """ Pre-trained ResNet50 """
    base_model = ResNet50(weights=None, include_top=False, input_tensor=inputs)

    """ Pre-trained ResNet50 Output """
    image_features = base_model.get_layer('conv4_block6_out').output
    x_a = ASPP(image_features)
    x_a = UpSampling2D((4, 4), interpolation="bilinear")(x_a)

    """ Get low-level features """
    x_b = base_model.get_layer('conv2_block2_out').output
    x_b = Conv2D(filters=48, kernel_size=1, padding='same', use_bias=False)(x_b)
    x_b = BatchNormalization()(x_b)
    x_b = Activation('relu')(x_b)

    x = Concatenate()([x_a, x_b])

    x = Conv2D(filters=256, kernel_size=3, padding='same', activation='relu',use_bias=False)(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    x = Conv2D(filters=256, kernel_size=3, padding='same', activation='relu', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = UpSampling2D((4, 4), interpolation="bilinear")(x)

    """ Outputs """
    x = Conv2D(1, (1, 1), name='output_layer')(x)
    x = Activation('sigmoid')(x)

    """ Model """
    model = Model(inputs=inputs, outputs=x)
    return model

In [None]:
def dice_loss(y_true, y_pred, smooth=1e-6):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return 1 - (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)

def dice_coefficient(y_true, y_pred, smooth=1e-6):
    y_true_f = K.cast(y_true, dtype='float32')  # Ensure both are float32
    y_pred_f = K.flatten(y_pred)
    y_true_f = K.flatten(y_true_f)  # Flatten after conversion

    intersection = K.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)

In [None]:
train_generator = DataGenerator(
    image_dir='/kaggle/input/landslide4sense/TrainData/img',
    mask_dir='/kaggle/input/landslide4sense/TrainData/mask',
    batch_size=32,  # Adjust based on memory
    img_size=(128, 128),  # Match your input data
    shuffle=True
)

In [None]:
strategy = tf.distribute.MirroredStrategy()
input_shape = (128, 128, 14)
with strategy.scope():
    model = DeepLabV3Plus(input_shape)
    model.compile(optimizer=Adam(learning_rate=1e-4), 
                  loss=dice_loss, 
                  metrics=['accuracy'])
    model.summary()

In [None]:
history = model.fit(train_generator, epochs=20)

In [None]:
model.save("deeplabv3_landslide.h5")

In [None]:
model.save('deeplabv3_landslide_keras.h5')  # Save model in Keras .h5 format


In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import h5py

# Load the pre-trained model without compilation or custom objects
model = tf.keras.models.load_model('/kaggle/working/deeplabv3_landslide_keras.h5', compile=False)

# Load images from HDF5
def load_images_from_h5(file_path, dataset_name='img'):
    with h5py.File(file_path, 'r') as f:
        images = np.array(f[dataset_name])  # Load images from the specified dataset
        print("Images shape:", images.shape)
    return images

image_file_path = '/kaggle/input/landslide4sense/TrainData/img/image_10.h5'  # Path to H5 file containing images
images = load_images_from_h5(image_file_path)

# Preprocess the images (resize, normalize)
images_resized = tf.image.resize(images, (128, 128)) / 255.0  # Adjust size and normalization

# Perform inference
predictions = model.predict(images_resized)

# Visualize a sample result (the predicted mask for the first image)
plt.subplot(1, 2, 1)
plt.imshow(images_resized[0])  # Display original image
plt.title('Original Image')

plt.subplot(1, 2, 2)
plt.imshow(predictions[0], cmap='jet')  # Display predicted segmentation mask
plt.title('Predicted Mask')

plt.show()


In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import h5py

# Load the pre-trained model without compilation or custom objects
model = tf.keras.models.load_model('/kaggle/working/deeplabv3_landslide_keras.h5', compile=False)

# Load images from HDF5
def load_images_from_h5(file_path, dataset_name='img'):
    with h5py.File(file_path, 'r') as f:
        images = np.array(f[dataset_name])  # Load images from the specified dataset
        print("Images shape:", images.shape)
    return images

image_file_path = '/kaggle/input/landslide4sense/TrainData/img/image_10.h5'  # Path to H5 file containing images
images = load_images_from_h5(image_file_path)

# Preprocess the images (resize, normalize)
images_resized = tf.image.resize(images, (128, 128)) / 255.0  # Adjust size and normalization

# Add the batch dimension (None, 128, 128, 14)
images_resized = np.expand_dims(images_resized, axis=0)  # This adds the batch dimension

# Perform inference
predictions = model.predict(images_resized)

# Visualize a sample result
# Visualize the first channel (e.g., the first channel in the image)
plt.subplot(1, 2, 1)
plt.imshow(images_resized[0, :, :, 0], cmap='gray')  # Display the first channel
plt.title('Original Image - Channel 0')

# If the output is a multi-class prediction, choose the most probable class
predicted_mask = predictions[0]  # Get the first image's prediction
predicted_mask_class = np.argmax(predicted_mask, axis=-1)  # Get the most probable class

# Visualize the predicted segmentation mask
plt.subplot(1, 2, 2)
plt.imshow(predicted_mask_class, cmap='jet')  # Display predicted segmentation mask
plt.title('Predicted Mask')

plt.show()


In [None]:
import numpy as np
import h5py
import matplotlib.pyplot as plt
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Path to the trained model
model_path = 'deeplabv3_landslide_keras.h5'
# Path to the input image for inference
input_image_path = '/kaggle/input/landslide4sense/TrainData/img/image_101.h5'

# Load the trained model
model = load_model(model_path, custom_objects={'dice_loss': dice_loss, 'dice_coefficient': dice_coefficient})

# Function to load and preprocess the image
def load_and_preprocess_image(image_path, img_size=(128, 128)):
    with h5py.File(image_path, 'r') as hdf:
        img_data = np.array(hdf.get('img'))  # Shape (128, 128, 14)
        
        # Normalize image data (same as done during training)
        img_data = img_data / 255.0  # Normalize to [0, 1]
        
        # Resize the image (if necessary) to match input size
        if img_data.shape[:2] != img_size:
            img_data = np.resize(img_data, (*img_size, img_data.shape[2]))
        
        return img_data

# Load and preprocess the image
image = load_and_preprocess_image(input_image_path)

# Expand dimensions to match the model's input (batch size, height, width, channels)
image_input = np.expand_dims(image, axis=0)  # Shape becomes (1, 128, 128, 14)

# Run the model for prediction
prediction = model.predict(image_input)

# Get the predicted mask (for binary segmentation, threshold the output)
predicted_mask = (prediction > 0.5).astype(np.uint8)  # Apply threshold

# Plot the original image and the predicted mask
fig, ax = plt.subplots(1, 2, figsize=(18, 6))

# Show original image (for visual inspection)
ax[0].imshow(image[:, :, 0], cmap='viridis')  # Show the first channel (e.g., NDVI)
ax[0].set_title('Original Image (First Channel)')

# Show predicted mask
ax[1].imshow(predicted_mask[0, :, :, 0], cmap='jet')  # Predicted mask for the first image in the batch
ax[1].set_title('Predicted Mask')

plt.show()
