## DATA EXTRACTION AND SETUP

In [None]:

import rasterio
from rasterio.plot import show
from PIL import Image
import numpy as np
import os


input_folder = '/home/praveen-murugan/Downloads/TM_B13/data/'
output_folder = '/home/praveen-murugan/Downloads/TM_B13/data/jpg'


substrings = ['_True_color', 'B06', 'B07', 'B08', 'B09']

os.makedirs(output_folder, exist_ok=True)

for file_name in os.listdir(input_folder):
    if any(substring in file_name for substring in substrings):
        tiff_path = os.path.join(input_folder, file_name)
        
        try:
            with rasterio.open(tiff_path) as src:
               
                band = src.read(1)
                
                
                if band.max() > 255:
                    band = np.interp(band, (band.min(), band.max()), (0, 255))
                band = band.astype(np.uint8)
                
               
                img = Image.fromarray(band)
                jpg_file_name = os.path.splitext(file_name)[0] + '.jpg'
                jpg_path = os.path.join(output_folder, jpg_file_name)
                
                img.save(jpg_path, 'JPEG')
                
                print(f"Converted {file_name} to {jpg_file_name}")
        
        except Exception as e:
            print(f"Error converting {file_name}: {e}")

print("Conversion completed.")




### LAND MASK APP

In [None]:
import os
import zipfile
import numpy as np
from PIL import Image
import rasterio


input_dir = '/home/praveen-murugan/Downloads/TM_B13/zips/'  
output_dir = '/home/praveen-murugan/Downloads/TM_B13/data/'  
mask_path = '/home/praveen-murugan/Downloads/TM_B13/mask/nland_mask.png'  


os.makedirs(output_dir, exist_ok=True)

#  mask 
def apply_mask(image, mask):
  
    if mask.ndim == 3:
        mask = mask[:, :, 0]
    
   
    if len(image.shape) == 3:
        mask = np.repeat(mask[np.newaxis, :, :], image.shape[0], axis=0)
    
    masked_image = np.where(mask == 0, image, 0)
    return masked_image


def process_zip_file(zip_file_path, mask):
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
       
        temp_dir = os.path.join(output_dir, os.path.basename(zip_file_path).replace('.zip', ''))
        os.makedirs(temp_dir, exist_ok=True)
        zip_ref.extractall(temp_dir)
       
        image_paths = [os.path.join(temp_dir, f) for f in os.listdir(temp_dir) if f.endswith('.tiff')]
        
        for image_path in image_paths:
        
            with rasterio.open(image_path) as src:
                image = src.read()
                profile = src.profile

             
                masked_image = apply_mask(image, mask)

                output_path = os.path.join(output_dir, f'{os.path.basename(zip_file_path).replace(".zip", "")}_{os.path.basename(image_path)}')
                with rasterio.open(output_path, 'w', **profile) as dst:
                    dst.write(masked_image)

mask = np.array(Image.open(mask_path))

for zip_file in os.listdir(input_dir):
    if zip_file.endswith('.zip'):
        process_zip_file(os.path.join(input_dir, zip_file), mask)


## NIR mask test

In [None]:
import os
import numpy as np
import cv2
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
import rasterio


data_dir = "/home/praveen-murugan/Downloads/TM_B13/data"

def load_and_normalize_band(file_path):
    """Load and normalize a single band."""
    with rasterio.open(file_path) as src:
        band = src.read(1).astype(np.float32)
       
        band_normalized = (band - band.min()) / (band.max() - band.min())
    return band_normalized

def load_image_bands_and_normalize(folder_path):
    """Load and normalize RGB and NIR bands using rasterio."""
    band_files = {
        'B02': None,
        'B03': None,
        'B04': None,
        'B08': None  # NIR band
    }
    for file_name in os.listdir(folder_path):
        if 'Sentinel-2_L2A_B02_(Raw)' in file_name:
            band_files['B02'] = os.path.join(folder_path, file_name)
        elif 'Sentinel-2_L2A_B03_(Raw)' in file_name:
            band_files['B03'] = os.path.join(folder_path, file_name)
        elif 'Sentinel-2_L2A_B04_(Raw)' in file_name:
            band_files['B04'] = os.path.join(folder_path, file_name)
        elif 'Sentinel-2_L2A_B08_(Raw)' in file_name:
            band_files['B08'] = os.path.join(folder_path, file_name)

    missing_bands = [band for band, file_path in band_files.items() if file_path is None]
    if missing_bands:
        raise FileNotFoundError(f"Missing bands: {', '.join(missing_bands)} in folder {folder_path}")

    normalized_bands = {band: load_and_normalize_band(file_path) for band, file_path in band_files.items()}

    normalized_image = np.stack([normalized_bands['B02'], normalized_bands['B03'], normalized_bands['B04'], normalized_bands['B08']], axis=-1)

    return normalized_image

def preprocess_and_cluster_by_band(data_dir):
    """Load data, normalize bands, perform clustering, and generate masks."""
    image_data = []
    mask_data = []

    for folder_name in os.listdir(data_dir):
        folder_path = os.path.join(data_dir, folder_name)
        if not os.path.isdir(folder_path):
            continue

        try:
          
            normalized_image = load_image_bands_and_normalize(folder_path)

            
            normalized_image = cv2.resize(normalized_image, (256, 256))

            
            nir_band = normalized_image[:, :, 3].reshape(-1, 1) 

         
            kmeans = KMeans(n_clusters=3, random_state=42)
            kmeans.fit(nir_band)
            clustered_image = kmeans.labels_.reshape((256, 256))

           
            unique, counts = np.unique(clustered_image, return_counts=True)
            cluster_sizes = dict(zip(unique, counts))
            sorted_clusters = sorted(cluster_sizes, key=cluster_sizes.get)

          
            hyacinth_cluster = sorted_clusters[0]

            mask = (clustered_image == hyacinth_cluster).astype(np.uint8)

            image_data.append(normalized_image)
            mask_data.append(mask)

        except (FileNotFoundError, ValueError) as e:
            print(f"Error processing folder {folder_name}: {e}")

    return np.array(image_data), np.array(mask_data)

X_data, y_data = preprocess_and_cluster_by_band(data_dir)


y_data = np.expand_dims(y_data, axis=-1)

y_data = y_data.astype(np.float32)

num_files = 38  

for i in range(num_files):
    plt.figure(figsize=(18, 6))

    plt.subplot(1, 3, 1)
    plt.title(f"Original Image {i+1}")
    plt.imshow(X_data[i][:, :, :3])  
    plt.axis('off')

   
    plt.subplot(1, 3, 2)
    plt.title(f"NIR Band {i+1}")
    plt.imshow(X_data[i][:, :, 3], cmap='gray')  
    plt.axis('off')

 
    plt.subplot(1, 3, 3)
    plt.title(f"K-Means Mask {i+1} (NIR Band Only)")
    plt.imshow(y_data[i].squeeze(), cmap='gray')
    plt.axis('off')

    plt.show()


 # UNET setup with RGB Bands 

Imports and Initial Setup

In [None]:
import os
import numpy as np
import cv2
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score, jaccard_score, confusion_matrix
import rasterio
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import Sequence
from tensorflow.keras.models import *
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras import backend as K
import albumentations as A
import seaborn as sns

# Set data directories
data_dir = "/home/praveen-murugan/Downloads/TM_B13/data"


Functions for Loading and Normalizing Data

In [None]:
def load_and_normalize_band(file_path):
    """Load and normalize a single band."""
    with rasterio.open(file_path) as src:
        band = src.read(1).astype(np.float32)
      
        band_normalized = (band - band.min()) / (band.max() - band.min())
    return band_normalized

def load_image_bands_and_normalize(folder_path):
    """Load and normalize RGB bands using rasterio."""
    band_files = {
        'B02': None,
        'B03': None,
        'B04': None
    }
    for file_name in os.listdir(folder_path):
        if 'Sentinel-2_L2A_B02_(Raw)' in file_name:
            band_files['B02'] = os.path.join(folder_path, file_name)
        elif 'Sentinel-2_L2A_B03_(Raw)' in file_name:
            band_files['B03'] = os.path.join(folder_path, file_name)
        elif 'Sentinel-2_L2A_B04_(Raw)' in file_name:
            band_files['B04'] = os.path.join(folder_path, file_name)

  
    missing_bands = [band for band, file_path in band_files.items() if file_path is None]
    if missing_bands:
        raise FileNotFoundError(f"Missing bands: {', '.join(missing_bands)} in folder {folder_path}")

    normalized_bands = {band: load_and_normalize_band(file_path) for band, file_path in band_files.items()}

    normalized_image = np.stack([normalized_bands['B02'], normalized_bands['B03'], normalized_bands['B04']], axis=-1)

    return normalized_image


Preprocessing and Clustering

In [None]:
def preprocess_and_cluster_by_band(data_dir):
    """Load data, normalize bands, perform clustering, and apply augmentation."""
    image_data = []
    mask_data = []

    for folder_name in os.listdir(data_dir):
        folder_path = os.path.join(data_dir, folder_name)
        if not os.path.isdir(folder_path):
            continue

        try:
         
            normalized_image = load_image_bands_and_normalize(folder_path)

   
            normalized_image = cv2.resize(normalized_image, (256, 256))

            image_flat = normalized_image.reshape(-1, 3)

            kmeans = KMeans(n_clusters=3, random_state=42)
            kmeans.fit(image_flat)
            clustered_image = kmeans.labels_.reshape((256, 256))

       
            unique, counts = np.unique(clustered_image, return_counts=True)
            cluster_sizes = dict(zip(unique, counts))
            sorted_clusters = sorted(cluster_sizes, key=cluster_sizes.get)

         
            hyacinth_cluster = sorted_clusters[0]

            mask = (clustered_image == hyacinth_cluster).astype(np.uint8)

           
            image_data.append(normalized_image)
            mask_data.append(mask)

        except (FileNotFoundError, ValueError) as e:
            print(f"Error processing folder {folder_name}: {e}")

    return np.array(image_data), np.array(mask_data)


Load and Display Data

In [None]:

image_data, mask_data = preprocess_and_cluster_by_band(data_dir)

mask_data = np.expand_dims(mask_data, axis=-1)

mask_data = mask_data.astype(np.float32)


print(f"Total number of images: {len(image_data)}")


plt.figure(figsize=(10, 5))

plt.subplot(1, 2, 1)
plt.title("Sample Image")
plt.imshow(image_data[0])
plt.axis('off')

plt.subplot(1, 2, 2)
plt.title("Sample Mask")
plt.imshow(mask_data[0], cmap='gray')
plt.axis('off')

plt.show()


Define Custom Data Generator

In [None]:
class DataGenerator(Sequence):
    def __init__(self, image_data, mask_data, batch_size, augmentations):
        self.image_data = image_data
        self.mask_data = mask_data
        self.batch_size = batch_size
        self.augmentations = augmentations
        self.indices = np.arange(len(image_data))
    
    def __len__(self):
        return len(self.image_data) // self.batch_size
    
    def __getitem__(self, index):
        batch_indices = self.indices[index * self.batch_size:(index + 1) * self.batch_size]
        batch_images = self.image_data[batch_indices]
        batch_masks = self.mask_data[batch_indices]
        
        augmented_images = []
        augmented_masks = []
        
        for img, mask in zip(batch_images, batch_masks):
            augmented = self.augmentations(image=img, mask=mask)
            augmented_images.append(augmented['image'])
            augmented_masks.append(augmented['mask'])
        
        return np.array(augmented_images), np.array(augmented_masks)
    
    def on_epoch_end(self):
        np.random.shuffle(self.indices)


Define U-Net Model

In [None]:
def dice_coef(y_true, y_pred, smooth=1):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)

def dice_loss(y_true, y_pred):
    return 1 - dice_coef(y_true, y_pred)

def unet(pretrained_weights=None, input_size=(256, 256, 3)):
    inputs = Input(input_size)
    conv1 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='he_normal')(inputs)
    conv1 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
    conv2 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='he_normal')(pool1)
    conv2 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
    conv3 = Conv2D(256, 3, activation='relu', padding='same', kernel_initializer='he_normal')(pool2)
    conv3 = Conv2D(256, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)
    conv4 = Conv2D(512, 3, activation='relu', padding='same', kernel_initializer='he_normal')(pool3)
    conv4 = Conv2D(512, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv4)
    drop4 = Dropout(0.5)(conv4)
    pool4 = MaxPooling2D(pool_size=(2, 2))(drop4)

    conv5 = Conv2D(1024, 3, activation='relu', padding='same', kernel_initializer='he_normal')(pool4)
    conv5 = Conv2D(1024, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv5)
    drop5 = Dropout(0.5)(conv5)

    up6 = Conv2D(512, 2, activation='relu', padding='same', kernel_initializer='he_normal')(UpSampling2D(size=(2, 2))(drop5))
    merge6 = concatenate([drop4, up6], axis=3)
    conv6 = Conv2D(512, 3, activation='relu', padding='same', kernel_initializer='he_normal')(merge6)
    conv6 = Conv2D(512, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv6)

    up7 = Conv2D(256, 2, activation='relu', padding='same', kernel_initializer='he_normal')(UpSampling2D(size=(2, 2))(conv6))
    merge7 = concatenate([conv3, up7], axis=3)
    conv7 = Conv2D(256, 3, activation='relu', padding='same', kernel_initializer='he_normal')(merge7)
    conv7 = Conv2D(256, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv7)

    up8 = Conv2D(128, 2, activation='relu', padding='same', kernel_initializer='he_normal')(UpSampling2D(size=(2, 2))(conv7))
    merge8 = concatenate([conv2, up8], axis=3)
    conv8 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='he_normal')(merge8)
    conv8 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv8)

    up9 = Conv2D(64, 2, activation='relu', padding='same', kernel_initializer='he_normal')(UpSampling2D(size=(2, 2))(conv8))
    merge9 = concatenate([conv1, up9], axis=3)
    conv9 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='he_normal')(merge9)
    conv9 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv9)
    conv9 = Conv2D(2, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv9)
    conv10 = Conv2D(1, 1, activation='sigmoid')(conv9)

    model = Model(inputs=inputs, outputs=conv10)

    model.compile(optimizer=Adam(learning_rate=1e-4), loss=dice_loss, metrics=[dice_coef])

    if pretrained_weights:
        model.load_weights(pretrained_weights)

    return model


Evaluate Model Performance

In [None]:

precision_scores = []
recall_scores = []
f1_scores = []
iou_scores = []
confusion_matrices = []

for i in range(len(X_val)):
    true_mask = y_val[i].squeeze().flatten()
    pred_mask = predicted_masks[i].squeeze().flatten()

    precision = precision_score(true_mask, pred_mask)
    recall = recall_score(true_mask, pred_mask)
    f1 = f1_score(true_mask, pred_mask)
    iou = jaccard_score(true_mask, pred_mask)
    cm = confusion_matrix(true_mask, pred_mask)

    precision_scores.append(precision)
    recall_scores.append(recall)
    f1_scores.append(f1)
    iou_scores.append(iou)
    confusion_matrices.append(cm)

precision_scores = np.array(precision_scores)
recall_scores = np.array(recall_scores)
f1_scores = np.array(f1_scores)
iou_scores = np.array(iou_scores)

print(f"Average Precision: {np.mean(precision_scores):.4f}")
print(f"Average Recall: {np.mean(recall_scores):.4f}")
print(f"Average F1 Score: {np.mean(f1_scores):.4f}")
print(f"Average IoU: {np.mean(iou_scores):.4f}")


print("Sample Confusion Matrix:")
print(confusion_matrices[0])


def overlay_masks(image, mask):
    overlay = image.copy()
    overlay[mask[:, :, 0] == 1] = [255, 0, 0]  
    return overlay

for i in range(len(X_val)):
    plt.figure(figsize=(15, 5))

    plt.subplot(1, 3, 1)
    plt.title("Original Image")
    plt.imshow(X_val[i])
    plt.axis('off')

    plt.subplot(1, 3, 2)
    plt.title("Ground Truth Mask")
    plt.imshow(y_val[i].squeeze(), cmap='gray')
    plt.axis('off')

    plt.subplot(1, 3, 3)
    plt.title("Predicted Mask Overlay")
    overlay = overlay_masks(X_val[i], predicted_masks[i])
    plt.imshow(overlay)
    plt.axis('off')

    plt.show()


plt.figure(figsize=(8, 6))
sns.heatmap(confusion_matrices[0], annot=True, fmt='d', cmap='Blues')
plt.title("Confusion Matrix for First Sample")
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.show()



Model Training and Evaluation

In [None]:

model = unet(input_size=(256, 256, 3))


X_train, X_val, y_train, y_val = train_test_split(image_data, mask_data, test_size=0.2, random_state=42)

augmentations = A.Compose([
    A.Rotate(limit=90, p=0.5),
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.1, rotate_limit=0.1, p=0.5)
])


train_generator = DataGenerator(X_train, y_train, batch_size=4, augmentations=augmentations)
val_generator = DataGenerator(X_val, y_val, batch_size=4, augmentations=augmentations)


model_checkpoint = ModelCheckpoint('1unet_water_hyacinth.keras', monitor='loss', verbose=1, save_best_only=True)


model.fit(train_generator,
          epochs=50,
          validation_data=val_generator,
          callbacks=[model_checkpoint])

model = unet(input_size=(256, 256, 3))
model.load_weights('1unet_water_hyacinth.keras')


predictions = model.predict(X_val)

threshold = 0.5
predicted_masks = (predictions > threshold).astype(np.uint8)


# UNET setup with RGB & NIR Bands 

Imports and Band Normalisation

In [None]:
import os
import numpy as np
import cv2
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score, jaccard_score, confusion_matrix
import rasterio
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import Sequence
from tensorflow.keras.models import *
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras import backend as K
import albumentations as A
import seaborn as sns

data_directory = "/home/praveen-murugan/Downloads/TM_B13/data"

def load_normalize_band(file_path):
    with rasterio.open(file_path) as src:
        band_data = src.read(1).astype(np.float32)
        normalized_band = (band_data - band_data.min()) / (band_data.max() - band_data.min())
    return normalized_band

def load_normalize_bands(folder_path):
    band_files = {
        'blue': None,
        'green': None,
        'red': None,
        'nir': None
    }
    for file_name in os.listdir(folder_path):
        if 'Sentinel-2_L2A_B02_(Raw)' in file_name:
            band_files['blue'] = os.path.join(folder_path, file_name)
        elif 'Sentinel-2_L2A_B03_(Raw)' in file_name:
            band_files['green'] = os.path.join(folder_path, file_name)
        elif 'Sentinel-2_L2A_B04_(Raw)' in file_name:
            band_files['red'] = os.path.join(folder_path, file_name)
        elif 'Sentinel-2_L2A_B08_(Raw)' in file_name:
            band_files['nir'] = os.path.join(folder_path, file_name)

    missing_bands = [band for band, file_path in band_files.items() if file_path is None]
    if missing_bands:
        raise FileNotFoundError(f"Missing bands: {', '.join(missing_bands)} in folder {folder_path}")

    normalized_bands = {band: load_normalize_band(file_path) for band, file_path in band_files.items()}
    normalized_image = np.stack([normalized_bands['blue'], normalized_bands['green'], normalized_bands['red'], normalized_bands['nir']], axis=-1)
    return normalized_image

def process_and_cluster(data_directory):
    image_list = []
    mask_list = []

    for folder_name in os.listdir(data_directory):
        folder_path = os.path.join(data_directory, folder_name)
        if not os.path.isdir(folder_path):
            continue

        try:
            normalized_image = load_normalize_bands(folder_path)
            resized_image = cv2.resize(normalized_image, (256, 256))
            nir_band = resized_image[:, :, 3].reshape(-1, 1)

            kmeans = KMeans(n_clusters=3, random_state=42)
            kmeans.fit(nir_band)
            clustered_image = kmeans.labels_.reshape((256, 256))

            unique, counts = np.unique(clustered_image, return_counts=True)
            cluster_sizes = dict(zip(unique, counts))
            sorted_clusters = sorted(cluster_sizes, key=cluster_sizes.get)

            hyacinth_cluster = sorted_clusters[0]
            hyacinth_mask = (clustered_image == hyacinth_cluster).astype(np.uint8)

            image_list.append(resized_image)
            mask_list.append(hyacinth_mask)

        except (FileNotFoundError, ValueError) as e:
            print(f"Error processing folder {folder_name}: {e}")

    return np.array(image_list), np.array(mask_list)

image_data, mask_data = process_and_cluster(data_directory)

mask_data = np.expand_dims(mask_data, axis=-1)
mask_data = mask_data.astype(np.float32)

print(f"Total number of images: {len(image_data)}")

plt.figure(figsize=(10, 5))

plt.subplot(1, 2, 1)
plt.title("Sample Image")
plt.imshow(image_data[0][:, :, :3])
plt.axis('off')

plt.subplot(1, 2, 2)
plt.title("Sample Mask")
plt.imshow(mask_data[0], cmap='gray')
plt.axis('off')

plt.show()


Data Generator

In [None]:
class CustomDataGenerator(Sequence):
    def __init__(self, images, masks, batch_size, augmentations):
        self.images = images
        self.masks = masks
        self.batch_size = batch_size
        self.augmentations = augmentations
        self.indices = np.arange(len(images))
    
    def __len__(self):
        return len(self.images) // self.batch_size
    
    def __getitem__(self, index):
        batch_indices = self.indices[index * self.batch_size:(index + 1) * self.batch_size]
        batch_images = self.images[batch_indices]
        batch_masks = self.masks[batch_indices]
        
        augmented_images = []
        augmented_masks = []
        
        for img, mask in zip(batch_images, batch_masks):
            augmented = self.augmentations(image=img, mask=mask)
            augmented_images.append(augmented['image'])
            augmented_masks.append(augmented['mask'])
        
        return np.array(augmented_images), np.array(augmented_masks)
    
    def on_epoch_end(self):
        np.random.shuffle(self.indices)


Model Setup

In [None]:
def dice_coefficient(y_true, y_pred, smooth=1):
    y_true_flat = K.flatten(y_true)
    y_pred_flat = K.flatten(y_pred)
    intersection = K.sum(y_true_flat * y_pred_flat)
    return (2. * intersection + smooth) / (K.sum(y_true_flat) + K.sum(y_pred_flat) + smooth)

def dice_loss(y_true, y_pred):
    return 1 - dice_coefficient(y_true, y_pred)

def unet_model(pretrained_weights=None, input_size=(256, 256, 4)):
    inputs = Input(input_size)
    conv1 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='he_normal')(inputs)
    conv1 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
    conv2 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='he_normal')(pool1)
    conv2 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
    conv3 = Conv2D(256, 3, activation='relu', padding='same', kernel_initializer='he_normal')(pool2)
    conv3 = Conv2D(256, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)
    conv4 = Conv2D(512, 3, activation='relu', padding='same', kernel_initializer='he_normal')(pool3)
    conv4 = Conv2D(512, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv4)
    drop4 = Dropout(0.5)(conv4)
    pool4 = MaxPooling2D(pool_size=(2, 2))(drop4)

    conv5 = Conv2D(1024, 3, activation='relu', padding='same', kernel_initializer='he_normal')(pool4)
    conv5 = Conv2D(1024, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv5)
    drop5 = Dropout(0.5)(conv5)

    up6 = Conv2D(512, 2, activation='relu', padding='same', kernel_initializer='he_normal')(UpSampling2D(size=(2, 2))(drop5))
    merge6 = concatenate([drop4, up6], axis=3)
    conv6 = Conv2D(512, 3, activation='relu', padding='same', kernel_initializer='he_normal')(merge6)
    conv6 = Conv2D(512, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv6)

    up7 = Conv2D(256, 2, activation='relu', padding='same', kernel_initializer='he_normal')(UpSampling2D(size=(2, 2))(conv6))
    merge7 = concatenate([conv3, up7], axis=3)
    conv7 = Conv2D(256, 3, activation='relu', padding='same', kernel_initializer='he_normal')(merge7)
    conv7 = Conv2D(256, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv7)

    up8 = Conv2D(128, 2, activation='relu', padding='same', kernel_initializer='he_normal')(UpSampling2D(size=(2, 2))(conv7))
    merge8 = concatenate([conv2, up8], axis=3)
    conv8 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='he_normal')(merge8)
    conv8 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv8)

    up9 = Conv2D(64, 2, activation='relu', padding='same', kernel_initializer='he_normal')(UpSampling2D(size=(2, 2))(conv8))
    merge9 = concatenate([conv1, up9], axis=3)
    conv9 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='he_normal')(merge9)
    conv9 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv9)
    conv9 = Conv2D(2, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv9)
    output = Conv2D(1, 1, activation='sigmoid')(conv9)

    model = Model(inputs=inputs, outputs=output)

    model.compile(optimizer=Adam(learning_rate=1e-4), loss=dice_loss, metrics=[dice_coefficient])

    if pretrained_weights:
        model.load_weights(pretrained_weights)

    return model


Evaluations

In [None]:
model = unet_model(input_size=(256, 256, 4))

X_train, X_val, y_train, y_val = train_test_split(image_data, mask_data, test_size=0.2, random_state=42)

augmentations = A.Compose([
    A.Rotate(limit=90, p=0.5),
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.1, rotate_limit=0.1, p=0.5)
])

train_generator = CustomDataGenerator(X_train, y_train, batch_size=4, augmentations=augmentations)
val_generator = CustomDataGenerator(X_val, y_val, batch_size=4, augmentations=augmentations)

model_checkpoint = ModelCheckpoint('unet_water_hyacinth.keras', monitor='loss', verbose=1, save_best_only=True)

model.fit(train_generator,
          epochs=50,
          validation_data=val_generator,
          callbacks=[model_checkpoint])

model = unet_model(input_size=(256, 256, 4))
model.load_weights('unet_water_hyacinth.keras')

predictions = model.predict(X_val)

threshold = 0.5
predicted_masks = (predictions > threshold).astype(np.uint8)

precision_scores = []
recall_scores = []
f1_scores = []
iou_scores = []
confusion_matrices = []

for i in range(len(X_val)):
    true_mask = y_val[i].squeeze().flatten()
    pred_mask = predicted_masks[i].squeeze().flatten()

    precision = precision_score(true_mask, pred_mask)
    recall = recall_score(true_mask, pred_mask)
    f1 = f1_score(true_mask, pred_mask)
    iou = jaccard_score(true_mask, pred_mask)
    cm = confusion_matrix(true_mask, pred_mask)

    precision_scores.append(precision)
    recall_scores.append(recall)
    f1_scores.append(f1)
    iou_scores.append(iou)
    confusion_matrices.append(cm)

precision_scores = np.array(precision_scores)
recall_scores = np.array(recall_scores)
f1_scores = np.array(f1_scores)
iou_scores = np.array(iou_scores)

print(f"Average Precision: {np.mean(precision_scores):.4f}")
print(f"Average Recall: {np.mean(recall_scores):.4f}")
print(f"Average F1 Score: {np.mean(f1_scores):.4f}")
print(f"Average IoU: {np.mean(iou_scores):.4f}")

print("Sample Confusion Matrix:")
print(confusion_matrices[0])


In [None]:
def overlay_predicted_mask(image, mask):
    overlay = image[:, :, :3].copy()
    overlay[mask[:, :, 0] == 1] = [255, 0, 0]
    return overlay

for i in range(len(X_val)):
    plt.figure(figsize=(15, 5))

    plt.subplot(1, 3, 1)
    plt.title("Original Image")
    plt.imshow(X_val[i][:, :, :3])
    plt.axis('off')

    plt.subplot(1, 3, 2)
    plt.title("Ground Truth Mask")
    plt.imshow(y_val[i].squeeze(), cmap='gray')
    plt.axis('off')

    plt.subplot(1, 3, 3)
    plt.title("Predicted Mask Overlay")
    overlay = overlay_predicted_mask(X_val[i], predicted_masks[i])
    plt.imshow(overlay)
    plt.axis('off')

    plt.show()

plt.figure(figsize=(8, 6))
sns.heatmap(confusion_matrices[0], annot=True, fmt='d', cmap='Blues')
plt.title("Confusion Matrix for First Sample")
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.show()
