In [1]:
import os
os.environ["SM_FRAMEWORK"] = "tf.keras"
import cv2
import numpy as np
from tensorflow import keras
import glob as gb
from matplotlib import pyplot as plt
from patchify import patchify
from PIL import Image
import segmentation_models as sm
from tensorflow.keras.metrics import MeanIoU

from sklearn.preprocessing import MinMaxScaler, StandardScaler
scaler = MinMaxScaler()


Segmentation Models: using `tf.keras` framework.


In [2]:
root_directory = 'Semantic dataset/'
patch_size = 224

In [None]:
image_dataset = []  
for path, subdirs, files in os.walk(root_directory):
    #print(path)  
    dirname = path.split(os.path.sep)[-1]
    if dirname == 'images':   #Find all 'images' directories
        images = os.listdir(path)  #List of all image names in this subdirectory
        for i, image_name in enumerate(images):  
            if image_name.endswith(".png"):   #Only read jpg images...
               
                image = cv2.imread(path+"/"+image_name, 1)  #Read each image as BGR
                SIZE_X = (image.shape[1]//patch_size)*patch_size #Nearest size divisible by our patch size
                SIZE_Y = (image.shape[0]//patch_size)*patch_size #Nearest size divisible by our patch size
                image = Image.fromarray(image)
                image = image.crop((0 ,0, SIZE_X, SIZE_Y))  #Crop from top left corner
                #image = image.resize((SIZE_X, SIZE_Y))  #Try not to resize for semantic segmentation
                image = np.array(image)             
       
                #Extract patches from each image
                print("Now patchifying image:", path+"/"+image_name)
                patches_img = patchify(image, (patch_size, patch_size, 3), step=patch_size)  #Step=256 for 256 patches means no overlap
        
                for i in range(patches_img.shape[0]):
                    for j in range(patches_img.shape[1]):
                        
                        single_patch_img = patches_img[i,j,:,:]
                        
                        #Use minmaxscaler instead of just dividing by 255. 
                        single_patch_img = scaler.fit_transform(single_patch_img.reshape(-1, single_patch_img.shape[-1])).reshape(single_patch_img.shape)
                        
                        #single_patch_img = (single_patch_img.astype('float16')) / 255. 
                        single_patch_img = single_patch_img[0] #Drop the extra unecessary dimension that patchify adds.                               
                        image_dataset.append(single_patch_img)

Now patchifying image: Semantic dataset/Tile 1\images/images0.png
Now patchifying image: Semantic dataset/Tile 1\images/images1.png
Now patchifying image: Semantic dataset/Tile 1\images/images10.png
Now patchifying image: Semantic dataset/Tile 1\images/images100.png
Now patchifying image: Semantic dataset/Tile 1\images/images103.png
Now patchifying image: Semantic dataset/Tile 1\images/images104.png
Now patchifying image: Semantic dataset/Tile 1\images/images105.png
Now patchifying image: Semantic dataset/Tile 1\images/images106.png
Now patchifying image: Semantic dataset/Tile 1\images/images107.png
Now patchifying image: Semantic dataset/Tile 1\images/images108.png
Now patchifying image: Semantic dataset/Tile 1\images/images109.png
Now patchifying image: Semantic dataset/Tile 1\images/images11.png
Now patchifying image: Semantic dataset/Tile 1\images/images110.png
Now patchifying image: Semantic dataset/Tile 1\images/images111.png
Now patchifying image: Semantic dataset/Tile 1\images/

In [None]:
mask_dataset = []  
for path, subdirs, files in os.walk(root_directory):
    #print(path)  
    dirname = path.split(os.path.sep)[-1]
    if dirname == 'masks':   #Find all 'images' directories
        masks = os.listdir(path)  #List of all image names in this subdirectory
        for i, mask_name in enumerate(masks):  
            if mask_name.endswith(".png"):   #Only read png images... (masks in this dataset)
               
                mask = cv2.imread(path+"/"+mask_name, 1)  #Read each image as Grey (or color but remember to map each color to an integer)
                mask = cv2.cvtColor(mask,cv2.COLOR_BGR2RGB)
                SIZE_X = (mask.shape[1]//patch_size)*patch_size #Nearest size divisible by our patch size
                SIZE_Y = (mask.shape[0]//patch_size)*patch_size #Nearest size divisible by our patch size
                mask = Image.fromarray(mask)
                mask = mask.crop((0 ,0, SIZE_X, SIZE_Y))  #Crop from top left corner
                #mask = mask.resize((SIZE_X, SIZE_Y))  #Try not to resize for semantic segmentation
                mask = np.array(mask)             
       
                #Extract patches from each image
                print("Now patchifying mask:", path+"/"+mask_name)
                patches_mask = patchify(mask, (patch_size, patch_size, 3), step=patch_size)  #Step=256 for 256 patches means no overlap
        
                for i in range(patches_mask.shape[0]):
                    for j in range(patches_mask.shape[1]):
                        
                        single_patch_mask = patches_mask[i,j,:,:]
                        #single_patch_img = (single_patch_img.astype('float32')) / 255. #No need to scale masks, but you can do it if you want
                        single_patch_mask = single_patch_mask[0] #Drop the extra unecessary dimension that patchify adds.                               
                        mask_dataset.append(single_patch_mask) 

In [None]:
image_dataset = np.array(image_dataset)
mask_dataset =  np.array(mask_dataset)

#Sanity check, view few mages
import random
import numpy as np
image_number = random.randint(0, len(image_dataset))
plt.figure(figsize=(12, 6))
plt.subplot(121)
plt.imshow(np.reshape(image_dataset[image_number], (patch_size, patch_size, 3)))
plt.subplot(122)
plt.imshow(np.reshape(mask_dataset[image_number], (patch_size, patch_size, 3)))
plt.show()

In [None]:
afforestation = '#32cd32'.lstrip('#')
afforestation = np.array(tuple(int(afforestation[i:i+2], 16) for i in (0, 2, 4))) #132, 41, 246

deforestation = '#ff0000'.lstrip('#') 
deforestation = np.array(tuple(int(deforestation[i:i+2], 16) for i in (0, 2, 4))) #110, 193, 228

degradation =  '#ffa500'.lstrip('#') 
degradation = np.array(tuple(int(degradation[i:i+2], 16) for i in (0, 2, 4))) #254, 221, 58

other_area = '#ffff00'.lstrip('#')
other_area = np.array(tuple(int(other_area[i:i+2], 16) for i in (0, 2, 4))) # 60, 16, 152

forest = '#006400'.lstrip('#') 
forest = np.array(tuple(int(forest[i:i+2], 16) for i in (0, 2, 4))) #226, 169, 41

Unlabeled = '#808080'.lstrip('#')
Unlabeled = np.array(tuple(int(Unlabeled[i:i+2], 16) for i in (0, 2, 4))) # 60, 16, 152

In [None]:
label = single_patch_mask
label.shape

In [None]:
def rgb_to_2D_label(label):
    
    label_seg = np.zeros(label.shape,dtype=np.uint8)
    label_seg [np.all(label==afforestation,axis=-1)] = 0
    label_seg [np.all(label==deforestation,axis=-1)] = 1
    label_seg [np.all(label==degradation,axis=-1)] = 2
    label_seg [np.all(label==forest,axis=-1)] = 3
    label_seg [np.all(label == other_area,axis=-1)] = 4
    label_seg [np.all(label == Unlabeled,axis=-1)] = 5
    
    label_seg = label_seg[:,:,0]  #Just take the first channel, no need for all 3 channels
    
    return label_seg

In [None]:
labels = []
for i in range(mask_dataset.shape[0]):
    label = rgb_to_2D_label(mask_dataset[i])
    labels.append(label)    

labels = np.array(labels)   
labels = np.expand_dims(labels, axis=3)
 
print("Unique labels in label dataset are: ", np.unique(labels))

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# Assuming 'labels' contains the class labels for your dataset
plt.figure(figsize=(8, 8))
labels_name = ['afforestation', 'deforestation', 'degradation', 'forest', 'other_area', 'Unlabeled']
unique, counts = np.unique(labels, return_counts=True)

# Define colors for each class label
color_mapping = {
    'afforestation': '#32cd32',
    'deforestation': '#ff0000',
    'degradation': '#ffa500',
    'forest': '#006400',
    'other_area': '#ffff00',
    'Unlabeled': '#808080'
}

colors = [color_mapping[label] for label in labels_name]

sns.barplot(x=labels_name, y=counts, palette=colors)
plt.title('ARVI: Number of Data Available for Each Class')
plt.xlabel('Different land cover classes', fontsize=12)
plt.ylabel('Pixel distribution for various land cover classes', fontsize=12)
#plt.xticks(rotation=45)  # Rotate x-axis labels for better visibility
plt.show()

In [None]:
import random
import numpy as np
image_number = random.randint(0, len(image_dataset))
plt.figure(figsize=(12, 6))
plt.subplot(121)
plt.imshow(image_dataset[image_number])
plt.subplot(122)
plt.imshow(labels[image_number][:,:,0])
plt.show()

In [None]:
n_classes = len(np.unique(labels))
print(n_classes)
from keras.utils import to_categorical
labels_cat = to_categorical(labels, num_classes=n_classes)

from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(image_dataset, labels_cat, test_size = 0.30, random_state = 42)

In [None]:
# One-hot encode the labels
n_classes = len(np.unique(labels))
y_train_cat = to_categorical(y_train, num_classes=n_classes)
y_test_cat = to_categorical(y_test, num_classes=n_classes)

In [None]:
import segmentation_models as sm

# Provided class weights
weights = [1.666666, 1.666666, 1.666666, 1.666666, 1.666666, 1.666666]

# Define Dice Loss with class weights
dice_loss = sm.losses.DiceLoss(class_weights=weights)

# Define Categorical Focal Loss
focal_loss = sm.losses.CategoricalFocalLoss()

# Combine the two losses with appropriate weights
total_loss = dice_loss + (1 * focal_loss)

In [None]:
IMG_HEIGHT = X_train.shape[1]
IMG_WIDTH  = X_train.shape[2]
IMG_CHANNELS = X_train.shape[3]
total_classe = y_train.shape[3]
print(IMG_HEIGHT)
print(IMG_WIDTH)
print(IMG_CHANNELS)
print(total_classe)

In [None]:
from keras.models import Model
from keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate, Conv2DTranspose, BatchNormalization, Dropout, Lambda
from keras import backend as K

In [None]:
def jacard_coef(y_true, y_pred):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return (intersection + 1.0) / (K.sum(y_true_f) + K.sum(y_pred_f) - intersection + 1.0)

In [None]:
def unet_model(n_classes=6, IMG_HEIGHT=224, IMG_WIDTH=224, IMG_CHANNELS=3):
#Build the model
    inputs = Input((IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS))
    #s = Lambda(lambda x: x / 255)(inputs)   #No need for this if we normalize our inputs beforehand
    s = inputs

    #Contraction path
    c1 = Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(s)
    c1 = Dropout(0.2)(c1)  # Original 0.1
    c1 = Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c1)
    p1 = MaxPooling2D((2, 2))(c1)
    
    c2 = Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p1)
    c2 = Dropout(0.2)(c2)  # Original 0.1
    c2 = Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c2)
    p2 = MaxPooling2D((2, 2))(c2)
     
    c3 = Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p2)
    c3 = Dropout(0.2)(c3)
    c3 = Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c3)
    p3 = MaxPooling2D((2, 2))(c3)
     
    c4 = Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p3)
    c4 = Dropout(0.2)(c4)
    c4 = Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c4)
    p4 = MaxPooling2D(pool_size=(2, 2))(c4)
     
    c5 = Conv2D(256, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p4)
    c5 = Dropout(0.3)(c5)
    c5 = Conv2D(256, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c5)
    
    #Expansive path 
    u6 = Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(c5)
    u6 = concatenate([u6, c4])
    c6 = Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u6)
    c6 = Dropout(0.2)(c6)
    c6 = Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c6)
     
    u7 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(c6)
    u7 = concatenate([u7, c3])
    c7 = Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u7)
    c7 = Dropout(0.2)(c7)
    c7 = Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c7)
     
    u8 = Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(c7)
    u8 = concatenate([u8, c2])
    c8 = Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u8)
    c8 = Dropout(0.2)(c8)  # Original 0.1
    c8 = Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c8)
     
    u9 = Conv2DTranspose(16, (2, 2), strides=(2, 2), padding='same')(c8)
    u9 = concatenate([u9, c1], axis=3)
    c9 = Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u9)
    c9 = Dropout(0.2)(c9)  # Original 0.1
    c9 = Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c9)
     
    outputs = Conv2D(n_classes, (1, 1), activation='softmax')(c9)
     
    model = Model(inputs=[inputs], outputs=[outputs])
    
    #NOTE: Compile the model in the main program to make it easy to test with various loss functions
    #model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    
    #model.summary()
    
    return model

In [None]:
metrics=['accuracy', jacard_coef]

In [None]:
def get_model():
    return unet_model(n_classes=n_classes, IMG_HEIGHT=IMG_HEIGHT, IMG_WIDTH=IMG_WIDTH, IMG_CHANNELS=IMG_CHANNELS)

In [None]:
model = get_model()
model.compile(optimizer='adam', loss=total_loss, metrics=metrics)
#model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=metrics)
model.summary()

In [None]:
history1 = model.fit(X_train, y_train_cat,
                    batch_size = 8, 
                    verbose=1, 
                    epochs=5, 
                    validation_data=(X_test, y_test_cat), 
                    shuffle=False)

In [None]:
train_accuracy = history1.history['accuracy']
val_accuracy = history1.history['val_accuracy']

train_loss = history1.history['loss']
val_loss = history1.history['val_loss']
#learning_rate = history1.history['lr']

fig, ax = plt.subplots(nrows=2, ncols=1, figsize=(12, 10))

ax[0].set_title('Training Accuracy vs. Epochs')
ax[0].plot(train_accuracy, 'o-', label='Train Accuracy')
ax[0].plot(val_accuracy, 'o-', label='Validation Accuracy')
ax[0].set_xlabel('Epochs')
ax[0].set_ylabel('Accuracy')
ax[0].legend(loc='best')

ax[1].set_title('Training/Validation Loss vs. Epochs')
ax[1].plot(train_loss, 'o-', label='Train Loss')
ax[1].plot(val_loss, 'o-', label='Validation Loss')
ax[1].set_xlabel('Epochs')
ax[1].set_ylabel('Loss')
ax[1].legend(loc='best')

#ax[2].set_title('Learning Rate vs. Epochs')
#ax[2].plot(learning_rate, 'o-', label='Learning Rate')
#ax[2].set_xlabel('Epochs')
#ax[2].set_ylabel('Loss')
#ax[2].legend(loc='best')

plt.tight_layout()
plt.show()

#out_dir = 'C:/Users/best/Sattelite/Results'
#conf_matrix_path = os.path.join(out_dir, 'tight_layout.png')
#plt.imsave(conf_matrix_path)

In [None]:
# Evaluate the model
score = model.evaluate(X_test, y_test_cat)
print("Test Loss:", score[0])
print("Test Accuracy:", score[1])

In [None]:
# Predict on test data
y_pred = model.predict(X_test)
y_pred_argmax = np.argmax(y_pred, axis=3)
y_test_argmax = np.argmax(y_test_cat, axis=3)

In [None]:
len(y_pred_argmax)

In [None]:
len(y_test_argmax)

In [None]:
from sklearn.metrics import confusion_matrix, accuracy_score
import seaborn as sns

# Flatten the predicted and ground truth labels
y_pred_flat = y_pred_argmax.flatten()
y_test_flat = y_test_argmax.flatten()

# Define class labels
class_labels = ['afforestation', 'deforestation', 'degradation', 'forest', 'other_area', 'Unlabeled']

# Compute the confusion matrix
conf_matrix = confusion_matrix(y_test_flat, y_pred_flat)

# Compute class-wise accuracy
class_accuracy = [conf_matrix[i, i] / np.sum(conf_matrix[i, :]) for i in range(conf_matrix.shape[0])]

# Plot the confusion matrix
plt.figure(figsize=(10, 8))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', cbar=False, xticklabels=class_labels, yticklabels=class_labels)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()

# Print class-wise accuracy
for i, label in enumerate(class_labels):
    print(f'Accuracy for {label}: {class_accuracy[i]:.2%}')

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score

# Compute precision, recall, and F1 score for each class
precision = precision_score(y_test_flat, y_pred_flat, average=None)
recall = recall_score(y_test_flat, y_pred_flat, average=None)
f1 = f1_score(y_test_flat, y_pred_flat, average=None)

# Print precision, recall, and F1 score for each class
for i, label in enumerate(class_labels):
    print(f'Class: {label}')
    print(f'  Precision: {precision[i]*100:.2f}%')
    print(f'  Recall: {recall[i]*100:.2f}%')
    print(f'  F1 Score: {f1[i]*100:.2f}%')
    print()

In [None]:
#Using built in keras function for IoU
from tensorflow.keras.metrics import MeanIoU, Accuracy
n_classes = 6
IOU_keras = MeanIoU(num_classes=n_classes)  
IOU_keras.update_state(y_test_argmax, y_pred_argmax)
print("Mean IoU =", IOU_keras.result().numpy())

In [None]:
from sklearn.metrics import confusion_matrix

# Define a function to calculate IoU for each class
def calculate_iou_per_class(conf_matrix):
    iou_per_class = []
    for i in range(conf_matrix.shape[0]):
        intersection = conf_matrix[i, i]
        union = np.sum(conf_matrix[i, :]) + np.sum(conf_matrix[:, i]) - intersection
        iou = intersection / union if union > 0 else 0
        iou_per_class.append(iou)
    return iou_per_class

# Compute confusion matrix
conf_matrix = confusion_matrix(y_test_flat, y_pred_flat)

# Calculate IoU for each class
iou_per_class = calculate_iou_per_class(conf_matrix)

# Print IoU for each class
for i, label in enumerate(class_labels):
    print(f'IoU for {label}: {iou_per_class[i]*100:.2f}%')

In [None]:
num_samples = 10
test_img_numbers = random.sample(range(X_test.shape[0]), num_samples)
for test_img_number in test_img_numbers:
    test_img = X_test[test_img_number]
    ground_truth=y_test_argmax[test_img_number]
    #test_img_norm=test_img[:,:,0][:,:,None]
    test_img_input=np.expand_dims(test_img, 0)
    prediction = (model.predict(test_img_input))
    predicted_img=np.argmax(prediction, axis=3)[0,:,:]
    plt.figure(figsize=(12, 8))
    plt.subplot(231)
    plt.title('Testing Image')
    plt.imshow(test_img)
    plt.subplot(232)
    plt.title('Testing Label')
    plt.imshow(ground_truth, vmin=0, vmax = n_classes)
    plt.subplot(233)
    plt.title('Prediction on test image')
    plt.imshow(predicted_img, vmin=0, vmax = n_classes)
    plt.show()