#  Wildfire Fire Detection from Satellite Imagery  (UNET)

## Import libraries

In [None]:
#Import Libraries
import os
import sys
import time
import pandas as pd
import numpy as np
import cv2 
import matplotlib.pyplot as plt

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

In [None]:
# Add the path to the functions directory
sys.path.append('../functions')  # Add the path to the functions directory

## A. Load Dataset

### A Load tiles from ../dataset/tiles

In [None]:
# import user defined function for loading tiles from datasets/tiles using functions/load_dataset.py
from load_dataset import load_images_from_directory

In [None]:
# specify path and call function 
directory = "../dataset/tiles"
tiles = load_images_from_directory(directory)

- ### Visualize tile after loading 

In [None]:
# confirm if tiles [] holds images, print length and plot 

# Confirm if tiles[] holds images
if len(tiles) > 0:
    # Display the number of images loaded
    print(f"Number of images loaded: {len(tiles)}")

    # Display a sample image
    sample_tile = tiles[1]

    print(f"Shape of sample image: {sample_tile.shape}")

    # Plot the sample image
    plt.imshow(cv2.cvtColor(sample_tile, cv2.COLOR_BGR2RGB))
    plt.title("Sample Image")
    plt.axis('off')
    plt.show()
else:
    print("No images loaded from the directory.")


- ### Print tile image matrix 

In [None]:
## View Image matrix to see values
print(tiles[1])

- ### Print tile data type

In [None]:
tiles[1].dtype

- ### Normalize Tiles to values [0,1]

In [None]:
# import user defined function for normalizing tiles from functions/tile_normalize
from tile_normalize import normalize_tiles

In [None]:
# call function to normalize
normalized_tiles = normalize_tiles(tiles)

- ### Print normalized tile data type

In [None]:
normalized_tiles[0].dtype

- ### Visualize Normalized Tiles

In [None]:
# confirm if tiles [] holds images, print length and plot 

# Confirm if tiles[] holds images
if len(normalized_tiles) > 0:
    # Display the number of images loaded
    print(f"Number of images loaded: {len(normalized_tiles)}")

    # Display a sample image
    sample_tile = normalized_tiles[1]

    print(f"Shape of sample image: {sample_tile.shape}")

    uint8_image = (sample_tile * 255).astype(np.uint8)
    # Plot the uint8 image
    plt.imshow(cv2.cvtColor(uint8_image, cv2.COLOR_BGR2RGB))
    plt.title("Normalized Image (uint8)")
    plt.axis('off')
    plt.show()
else:
    print("No images loaded from the directory.")


- #### print image matrix of tile vs normalized tile 

In [None]:
# Print the values side by side using string formatting
from tabulate import tabulate

# Define the data as a list of lists
data = [
    ["Tile Image Matrix", tiles[1]],
    ["Normalized Tile Image Matrix", normalized_tiles[1]]
]

# Print the table
print(tabulate(data, headers=["Variable", "Value"]))


In [None]:
### minium and max values for tiles
print("Normalized tiles Min:", normalized_tiles[0].min())
print("Normalized tiles Max:", normalized_tiles[0].max())
print("Normalized tiles Data type:", normalized_tiles[0].dtype)

### Load tile masks ../dataset/masks

In [None]:
from load_binary_masks import load_binary_masks

In [None]:
# specify path and call function 
mask_dir = "../dataset/masks"
masks = load_binary_masks(mask_dir)

In [None]:
print(f"masks has type {masks[0].dtype}, image is {type(masks[0])}, dimensions are {masks[0].shape}")

- ### Plot Mask

In [None]:
# Confirm if masks[] holds masks
if len(masks) > 0:
    # Display the number of masks loaded
    print(f"Number of masks loaded: {len(masks)}")

    # Display a sample image
    sample_mask = masks[1]

    print(f"Shape of sample mask: {sample_mask.shape}")

    # Plot the sample image as grayscale
    plt.imshow(sample_mask, cmap='binary')
    plt.title("Sample Mask")
    plt.axis('off')
    plt.show()
else:
    print("No images loaded from the directory.")

In [None]:
print("Masks  Min:", masks[0].min())
print("Masks  Max:", masks[0].max())
print("Masks Data type", masks[0].dtype)

- ### Normalize Masks

In [None]:
# call function to normalize masks for consistency 
normalized_masks = normalize_tiles(masks)

In [None]:
# check the normalized masks
print("Normalized Masks  Min:", normalized_masks[0].min())
print("Normalized Masks  Max:", normalized_masks[0].max())
print("Normalized Masks Data type", normalized_masks[0].dtype)

In [None]:
num_channels = normalized_masks[0].shape[-1]
print("Number of channels in the mask array:", num_channels)

- ### Plot a Normalized Mask

In [None]:
# Confirm if normalized_masks[] holds masks
if len(normalized_masks) > 0:
    # Display the number of masks loaded
    print(f"Number of Normalized masks loaded: {len(normalized_masks)}")

    # Display a sample image
    sample_mask = normalized_masks[1]

    print(f"Shape of sample mask: {sample_mask.shape}")

    # Plot the sample image as grayscale
    plt.imshow(sample_mask, cmap='binary')
    plt.title("Sample Normalized Mask")
    plt.axis('off')
    plt.show()
else:
    print("No images loaded from the directory.")

- ### Plot Tiles, Masks, Normalized Tiles, Normalized Masks

In [None]:
# Assuming you have your data loaded in variables:
# tiles, normalized_tiles, masks, normalized_masks

# Sample index to display 
index = 1 

# Create a figure with 2 rows and 2 columns of subplots
fig, axes = plt.subplots(nrows=2, ncols=2, figsize=(12, 8))

# Plot the 'tiles' image
axes[0, 0].imshow(tiles[index])  # Adjust cmap if not grayscale
axes[0, 0].set_title("Tiles\nMin: {:.4f}, Max: {:.4f}".format(tiles[index].min(), tiles[index].max()))

# Plot the 'normalized_tiles' image
axes[0, 1].imshow(normalized_tiles[index])
axes[0, 1].set_title("Normalized Tiles\nMin: {:.4f}, Max: {:.4f}".format(normalized_tiles[index].min(), normalized_tiles[index].max()))

# Plot the 'masks' image
axes[1, 0].imshow(masks[index], cmap='gray')
axes[1, 0].set_title("Masks\nMin: {:.4f}, Max: {:.4f}".format(masks[index].min(), masks[index].max()))

# Plot the 'normalized_masks' image
axes[1, 1].imshow(normalized_masks[index], cmap='gray')
axes[1, 1].set_title("Normalized Masks\nMin: {:.4f}, Max: {:.4f}".format(normalized_masks[index].min(), normalized_masks[index].max()))

#  Adjust spacing for aesthetics
fig.tight_layout()
plt.show()


## Split into Training and Validation sets

In [None]:
# Import library
from sklearn.model_selection import train_test_split

- ### confirm details and length of the Masks and tiles np arrays

In [None]:
# import user defined function to plot tiles and masks from functions/iplot_masks_tiles.py 
from plot_masks_tiles import plot_masks_vs_tiles

In [None]:
# Define the number of tiles to plot
num_tiles_to_plot = 5

# Plot the tiles and masks
fig, axs = plt.subplots(num_tiles_to_plot, 2, figsize=(15, 30))  # Adjust the figure size here

for i in range(num_tiles_to_plot):
    # Plot the tile (colored)
    tile_index = i  # Adjust this index based on how your tiles are loaded
    mask_index = i  # Adjust this index based on how your masks are loaded
    
    #temporarily convert normalized_tiles to unit8 to plot
    uint8_image = (normalized_tiles[tile_index] * 255).astype(np.uint8)
    # Plot the uint8 image
    axs[i, 0].imshow(cv2.cvtColor(uint8_image, cv2.COLOR_BGR2RGB))
    axs[i, 0].set_title(f'Tile {tile_index}')
    axs[i, 0].axis('off')
    
    # Plot the mask (black and white)
    axs[i, 1].imshow(normalized_masks[mask_index], cmap='binary')
    axs[i, 1].set_title(f'Mask {mask_index}')
    axs[i, 1].axis('off')

plt.tight_layout(h_pad=0.1)  # Adjust the spacing between subplots here
plt.show()



In [None]:
print(f'type of Tiles {type(normalized_tiles)}')
print(f'Tiles dtype {normalized_tiles[0].dtype} ')

In [None]:
print(f'type of Mask {type(normalized_masks)}')
print(f'Mask dtype {normalized_masks[0].dtype} ')

- ### Convert the Normalized tiles and masks to NParrays

In [None]:
## convert normalized_tiles to np arrays
np_normalized_tiles = np.asarray(normalized_tiles)

In [None]:
## convert normalized_masks to np arrays
np_normalized_masks = np.asarray(normalized_masks)

In [None]:
print(f'type of Tiles {type(np_normalized_tiles)}')
print(f'Tiles dtype {np_normalized_tiles.dtype} ')
print(f"Shape of normalized tiles: {np_normalized_tiles.shape}")
print("----------------------------------------------------------------")
print(f'type of Mask {type(np_normalized_masks)}')
print(f'Mask dtype {np_normalized_masks.dtype} ')
print(f"Shape of normalized masks: {np_normalized_masks.shape}")

- ### Split Dataset

In [None]:
# import library
from sklearn.model_selection import train_test_split

In [None]:
# split 80/20
X_train, X_val, y_train, y_val = train_test_split(
    normalized_tiles, normalized_masks, test_size=0.2, random_state=42
)

In [None]:
print(f"type of X_train {type(X_train)}") 
print(f"type of X_val {type(X_val)}") 
print(f"type of y_train {type(y_train)}") 
print(f"type of y_val {type(y_val)}")

- ### Convert Split to NP arrays

In [None]:
X_train = np.array(X_train)
X_val = np.array(X_val)
y_train = np.array(y_train)
y_val = np.array(y_val)

In [None]:
proportion_fire_train = np.count_nonzero(y_train.flatten() == 1)  / len(y_train.flatten())
proportion_fire_val = np.count_nonzero(y_val.flatten() == 1) / len(y_val.flatten())
 
print("Proportion of fire pixels in y_train:", proportion_fire_train)
print("Proportion of fire pixels in y_val:", proportion_fire_val)


In [None]:
print(f"type of X_train {type(X_train)}") 
print(f"type of X_val {type(X_val)}") 
print(f"type of y_train {type(y_train)}") 
print(f"type of y_val {type(y_val)}")

In [None]:
print(f"X_train shape: {X_train.shape}")
print(f"X_val shape: {X_val.shape}")
print(f"y_train shape: {y_train.shape}")
print(f"y_val shape: {y_val.shape}")

- ### plot Split values before model training 

In [None]:
import matplotlib.pyplot as plt

# Choose an index for the sample to plot
index = 1  # You can change this index to visualize different samples

# Plot the training sample
#temporarily convert X_train to unit8 to plot
uint8_image_train = (X_train[index] * 255).astype(np.uint8)
plt.subplot(1, 2, 1)
plt.imshow(uint8_image_train )
plt.title('Training Sample')
plt.axis('off')

# Plot the corresponding ground truth mask
plt.subplot(1, 2, 2)
plt.imshow(y_train[index], cmap='gray')  # Assuming the mask is grayscale
plt.title('Ground Truth Mask')
plt.axis('off')

plt.show()

# Plot the validation sample
plt.subplot(1, 2, 1)
#temporarily convert X_Val to unit8 to plot
uint8_image_val = (X_val[index] * 255).astype(np.uint8)
plt.imshow(uint8_image_val)
plt.title('Validation Sample')
plt.axis('off')

# Plot the corresponding ground truth mask
plt.subplot(1, 2, 2)
plt.imshow(y_val[index], cmap='gray')  # Assuming the mask is grayscale
plt.title('Ground Truth Mask')
plt.axis('off')

plt.show()


#### Note:
<p> 
    - X_train and y_train contains the training data <br> 
    - X_val and y_val contains the validation data
</p>

## UNET Model design

- ### Setup Early  Earlystopping and Checkpoints

In [None]:
# Define directory to save the checkpoints
checkpoint_dir = '../checkpoints'

# directory check
os.makedirs(checkpoint_dir, exist_ok=True)

# checkpoint callback
checkpoint_callback = ModelCheckpoint(
    filepath=os.path.join(checkpoint_dir, 'unet_checkpoint.keras'),
    monitor='val_loss',
    save_best_only=True,
    save_weights_only=False,
    mode='min',
    verbose=1
)

# Define the early stopping callback
early_stopping_callback = EarlyStopping(
    monitor='val_loss',
    patience=3,
    restore_best_weights=True,
    verbose=1
)

- ### import cross entropy loss

In [None]:
from tensorflow.keras.losses import BinaryCrossentropy

# Create an instance of BinaryCrossentropy
bce_loss = BinaryCrossentropy()

In [None]:
#import adam
from tensorflow.keras.optimizers.legacy import Adam as AdamLegacy


- ### Import Unet script

In [None]:
# import unet model function from functions/unet.py
from unet import unet
from unet_dual_input import unet_dual

### Build Model

In [None]:
unet_model = unet()

In [None]:
# trying dice loss function
from dice_loss import dice_loss as dls

### Compile Model

In [None]:
# Compile the model with an appropriate loss function for multiclass segmentation
unet_model = unet()
unet_model.compile(optimizer=AdamLegacy(learning_rate=0.0001), loss=bce_loss, metrics=['accuracy'])

### Model Summary

In [None]:
unet_model.summary()

In [None]:
print(X_train.shape, y_train.shape)
print(X_val.shape, y_val.shape)  

### Fit Model

In [None]:
start_time = time.time()

unet_model.fit(
    X_train, y_train,
    epochs=100,
    validation_data=(X_val, y_val),
    callbacks=[checkpoint_callback, early_stopping_callback],
    verbose=1
)

stop_time = time.time() - start_time
print(f"UNET training time: {stop_time / 3600:.2f} hrs")

In [None]:
''' 
    Note to self: reduce lr , increase drop out, make drop out an argument to that we can update its value 
    without editing the models script 
    After the first 3 epochs, the models training became faster 
    Val_acccuracy never changed, but val_loss changes 
    accuracy has remained the same
'''

### Training split 

In [None]:
type(normalized_tiles)

In [None]:
type(masks_float64)

In [None]:
train_data, test_data, train_labels, test_labels = train_test_split(normalized_tiles, masks_float64, test_size=0.2)

- ### Plot Test split before model evaluation

In [None]:
import matplotlib.pyplot as plt

# Choose an index for the sample to plot
index = 4  # You can change this index to visualize different samples

# Plot the training sample
#temporarily convert train_data to unit8 to plot
uint8_image_train = (train_data[index] * 255).astype(np.uint8)
plt.subplot(1, 2, 1)
plt.imshow(uint8_image_train )
plt.title('Testing Sample')
plt.axis('off')

# Plot the corresponding ground truth mask
plt.subplot(1, 2, 2)
plt.imshow(train_labels[index], cmap='gray')  # Assuming the mask is grayscale
plt.title('Ground Truth Mask')
plt.axis('off')

plt.show()

# Plot the validation sample
plt.subplot(1, 2, 1)
#temporarily convert test_data to unit8 to plot
uint8_image_val = (test_data[index] * 255).astype(np.uint8)
plt.imshow(uint8_image_val)
plt.title('Validation Sample (Test)')
plt.axis('off')

# Plot the corresponding ground truth mask
plt.subplot(1, 2, 2)
plt.imshow(test_labels[index], cmap='gray')  # Assuming the mask is grayscale
plt.title('Ground Truth Mask')
plt.axis('off')

plt.show()


In [None]:

# Method 1: Using boolean indexing and sum
count_of_ones = np.sum(test_labels == 1)
proportion_of_ones = count_of_ones / len(test_labels)  

# Method 2: Using np.where
count_of_ones = np.where(test_labels == 1)[0].size
proportion_of_ones = count_of_ones / len(test_labels)

print("Proportion of '1' values:", proportion_of_ones)


In [None]:
print(test_data.shape)

In [None]:
print(test_labels.shape)

In [None]:
print(train_labels.shape)

In [None]:
print(test_labels.shape)

### Run Model Evaluation

In [None]:
len(normalized_tiles) == len(masks_float64)

In [None]:
score = unet_model.evaluate(test_data, test_labels, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

In [None]:
print("Test_data min:", test_data.min())
print("test_Data max:", test_data.max())
print("test_labels min:", test_labels.min())
print("test_labels max:", test_labels.max())

### Run  Model predictions

In [None]:
predictions = unet_model.predict(test_data)

In [None]:
# Plot the 1 prediction
plt.imshow((predictions[1]>0.25), cmap='gray')  # Assuming the mask is grayscale
plt.title('Ground Truth Mask')
plt.axis('off')
plt.show()

In [None]:
'''
    View datatypes and information of Predictions  
'''
#test
print(predictions.shape)  # Check the shape
print(predictions.dtype)  # Check the data type

In [None]:
predictions.shape

In [None]:
print("Predictions min:", predictions.min())
print("Predictions max:", predictions.max())

In [None]:

# Debugging: (You can remove if no longer needed)
print("Predictions data type:", predictions.dtype)
print("Predictions shape:", predictions.shape)
print("Predictions min:", predictions.min())
print("Predictions max:", predictions.max())

# Define the number of images to plot
num_images = 5

# Plot the input images and their predictions
plt.figure(figsize=(12, 6))
for i in range(num_images):
    plt.subplot(2, num_images, i + 1)
    plt.imshow(test_data[i])
    plt.title('Input')
    plt.axis('off')

    plt.subplot(2, num_images, num_images + i + 1)
    plt.imshow(predictions[i], cmap='viridis')
    plt.title('Prediction')
    plt.axis('off')

plt.tight_layout()
plt.show()


### Peformance Evaluation

In [None]:

from sklearn.metrics import confusion_matrix

# Step 1: Apply thresholding to predictions
threshold = 0.005
predictions_binary = (predictions >= threshold).astype(np.int32)

# Step 2: Flatten true class labels and predictions for comparison
y_true = test_labels.flatten()
y_pred = predictions_binary.flatten()

# Step 3: Calculate the confusion matrix
cm = confusion_matrix(y_true, y_pred)

# Step 4: Calculate TPR, FPR, TNR, FNR directly from the confusion matrix
TN = cm[0][0]
FP = cm[0][1]
FN = cm[1][0]
TP = cm[1][1]

TPR = TP / (TP + FN)  # True Positive Rate (Sensitivity, Recall)
FPR = FP / (FP + TN)  # False Positive Rate 
TNR = TN / (TN + FP)  # True Negative Rate (Specificity)
FNR = FN / (TP + FN)  # False Negative Rate

# Step 5: Print the results
print("Confusion Matrix:\n", cm)
print("TPR:", TPR)
print("FPR:", FPR)
print("TNR:", TNR)
print("FNR:", FNR)
