In [1]:
import subprocess 
from subprocess import PIPE
import rasterio
import json
import glob 
import pandas as pd
import os
import numpy as np
import cv2

In [57]:
# The local folder path where the labels are stored
path_labels = '/workspaces/Flood-Risk-Analysis-Spatial-Computing/Flood-Detection---Satellite-Images-main/data/SEN12-FLOOD/Sentinel_2/sen12floods_s2_labels'

# The local folder where the training images are stored
path_training_images = '/workspaces/Flood-Risk-Analysis-Spatial-Computing/Flood-Detection---Satellite-Images-main/data/SEN12-FLOOD/Sentinel_2/Images/sen12floods_s2_source>3'

# The local folder where the testing images are stored
path_testing_images = '/workspaces/Flood-Risk-Analysis-Spatial-Computing/Flood-Detection---Satellite-Images-main/data/SEN12-FLOOD/Sentinel_2/Images/sen12floods_s2_source'

In [65]:
import json
import os

def image_label(product_id, s1_json_path, s2_json_path):
    """
    Determines if the given image depicts a flooded area based on labels in S1list and S2list JSON files.

    Args:
        product_id (str): The name of the image file.
        s1_json_path (str): Path to the S1list JSON file.
        s2_json_path (str): Path to the S2list JSON file.

    Returns:
        int: 1 if the image depicts a flooded area, 0 otherwise.
    """
    def search_label_in_json(json_path, product_id):
        """
        Searches for the product_id in the given JSON file and returns the FLOODING label if found.

        Args:
            json_path (str): Path to the JSON file.
            product_id (str): The image file name to search for.

        Returns:
            int or None: 1 for flooding, 0 for no flooding, None if not found.
        """
        if not os.path.exists(json_path):
            raise FileNotFoundError(f"JSON file not found: {json_path}")
        
        with open(json_path, "r") as json_file:
            data = json.load(json_file)
        
        # Iterate through entries in the JSON file to match the product_id
        for entry in data.values():
            if isinstance(entry, dict) and entry.get("filename") == product_id:
                flooding = entry.get("FLOODING")
                return 1 if flooding else 0
        
        return None  # Return None if the product_id is not found
    
    # Search in S1list JSON
    label = search_label_in_json(s1_json_path, product_id)
    if label is not None:
        return label
    
    # Search in S2list JSON
    label = search_label_in_json(s2_json_path, product_id)
    if label is not None:
        return label
    
    # If not found in either file, raise an exception or handle as needed
    raise ValueError(f"Product ID '{product_id}' not found in either S1list or S2list.")


In [44]:
# This functions accepts as argument the name of the image and it searches for the coresponding label. 
# If the image depicts a flooded area then it returns the number one (1) otherwise it returns the number zero (0)

def image_label(product_id):
    
    rootdir = "/workspaces/Flood-Risk-Analysis-Spatial-Computing/Flood-Detection---Satellite-Images-main/data/SEN12-FLOOD/Sentinel_2/Images/sen12floods_s2_source"
    pd = product_id.split("_")
    pd = pd[3] + "_" + pd[4] + "_" + pd[5] + "_" + pd[6]
    
    json_data=open(rootdir + pd +"/stac.json", "rb")
    jdata = json.load(json_data)
    flood = jdata["properties"]["FLOODING"]
    

    if (flood == "False"):
        image_label = 0
    else:
        image_label = 1
    
    return image_label


In [45]:
# This function it accepts as argument the path of the folder where the image is stored.
# Inside this folder there is a json file containing the product id as a property
# It return the product id

def product_name(path):
    
    json_data=open(path+"/stac.json", "rb")
    jdata = json.load(json_data)
    
    return  jdata['id']

In [46]:
# This function is responsible for constructing the dataset in a way compatible with keras.
# It iterates through folders and searches for images along with their coresponding label.

def load_data():
    
    data = [] 
    images = []
    labels = []
    
    for folder in imagelist:
        try:
            product_id = product_name(folder)
            print(f"Processing {product_id} image product")
            label = image_label(product_id)
        
            # Open the img
            image = cv2.imread(folder + "/stack.tif")
            # Append the image and its corresponding label to the output
            images.append(image)
            labels.append(label)
        except:
            pass
        
        
    images = np.array(images, dtype = 'float32')
    labels = np.array(labels, dtype = 'int32')
        
        
    data.append([images, labels])     

    return images, labels

## Load the Training Dataset

In [63]:
# Create a list with all the folders containing spectral bands


imagelist = []
rootdir = path_training_images
for file in os.listdir(rootdir):
    d = os.path.join(rootdir, file)
    if os.path.isdir(d):
        imagelist.append(d)
        
        
print(f"The number of training samples is currently = {len(imagelist)}")

The number of training samples is currently = 3


In [48]:
train_images, train_labels = load_data()

## Load the Test Dataset

In [62]:
# Create a list with all the folders containing spectral bands


imagelist = []
rootdir = path_testing_images # define the path for the folder
for file in os.listdir(rootdir):
    d = os.path.join(rootdir, file)
    if os.path.isdir(d):
        imagelist.append(d)
        
        
print(f"The number of testing samples is = {len(imagelist)}")

The number of testing samples is = 3


In [59]:
test_images, test_labels = load_data()

## Explore the Datasets

In [60]:
# Calculate the number of images in the test set 
# containing Flooded areas and the number of images that does not contain flooded areas

(unique, counts) = np.unique(test_labels, return_counts=True)

# print(unique, counts)
print(f"The number of images in the test dataset containing flooded areas is {counts[1]}\n")
print(f"While the number of images clean from floods is {counts[0]}")


IndexError: index 1 is out of bounds for axis 0 with size 0

In [56]:
# Calculate the number of images in the training set 
# containing Flooded areas and the number of images that does not contain flooded areas

(unique, counts) = np.unique(train_labels, return_counts=True)

# print(unique, counts)
print(f"The number of images in the train dataset containing flooded areas is {counts[1]}\n")
print(f"While the number of images clean from floods is {counts[0]}")


IndexError: index 1 is out of bounds for axis 0 with size 0

## Model Creation

In [20]:
# Import the Deep Learing modules
import matplotlib.pyplot as plt
import seaborn as sns

import keras
from keras.models import Sequential
from keras.layers import Dense, Conv2D , MaxPool2D , Flatten , Dropout 
from keras.preprocessing.image import ImageDataGenerator
from keras.optimizers import Adam
from keras.utils.vis_utils import plot_model
import pydot

# from sklearn.metrics import classification_report,confusion_matrix

import tensorflow as tf

import cv2
import os

import numpy as np

In [36]:
model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, (3, 3), activation = 'relu', input_shape = (512, 512, 3)), 
    tf.keras.layers.MaxPooling2D(2,2),
    tf.keras.layers.Conv2D(32, (3, 3), activation = 'relu'),
    tf.keras.layers.MaxPooling2D(2,2),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation=tf.nn.relu),
    tf.keras.layers.Dense(2, activation=tf.nn.softmax)

])

In [37]:
model.summary()

Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_4 (Conv2D)           (None, 510, 510, 32)      896       
                                                                 
 max_pooling2d_4 (MaxPooling  (None, 255, 255, 32)     0         
 2D)                                                             
                                                                 
 conv2d_5 (Conv2D)           (None, 253, 253, 32)      9248      
                                                                 
 max_pooling2d_5 (MaxPooling  (None, 126, 126, 32)     0         
 2D)                                                             
                                                                 
 flatten_2 (Flatten)         (None, 508032)            0         
                                                                 
 dense_4 (Dense)             (None, 128)              

## Compile the Model

In [38]:
model.compile(optimizer = 'adam', loss = 'sparse_categorical_crossentropy', metrics=['accuracy'])

## Fit the model on the available training data

In [61]:
print(train_images.shape)  # Should match the input shape defined in the first layer of the model
print(train_labels.shape)  # Should match the number of samples


(0,)
(0,)


In [33]:
history = model.fit(train_images, train_labels, batch_size=1, epochs=20)
#history = model.fit(train_images, train_labels, batch_size=1, epochs=20, validation_split = 0.2)

Epoch 1/20


ValueError: Unexpected result of `train_function` (Empty logs). Please use `Model.compile(..., run_eagerly=True)`, or `tf.config.run_functions_eagerly(True)` for more information of where went wrong, or file a issue/bug to `tf.keras`.

In [51]:
# Save the trained model
model.save("my_model")
# model = keras.models.load_model('path/to/location')

## Evaluating the result

In [46]:
def plot_accuracy_loss(history):
    """
        Plot the accuracy and the loss during the training of the nn.
    """
    fig = plt.figure(figsize=(10,5))

    # Plot accuracy
    plt.subplot(221)
    plt.plot(history.history['acc'],'bo--', label = "acc")
    plt.plot(history.history['val_acc'], 'ro--', label = "val_acc")
    plt.title("train_acc vs val_acc")
    plt.ylabel("accuracy")
    plt.xlabel("epochs")
    plt.legend()

    # Plot loss function
    plt.subplot(222)
    plt.plot(history.history['loss'],'bo--', label = "loss")
    plt.plot(history.history['val_loss'], 'ro--', label = "val_loss")
    plt.title("train_loss vs val_loss")
    plt.ylabel("loss")
    plt.xlabel("epochs")

    plt.legend()
    plt.show()

In [None]:
plot_accuracy_loss(history)

In [None]:
model.summary()

In [None]:
test_loss = model.evaluate(test_images, test_labels)

## Feature extraction with VGG ImageNet

In [None]:
from keras.applications.vgg16 import VGG16
from keras.preprocessing import image
from keras.applications.vgg16 import preprocess_input

model = VGG16(weights='imagenet', include_top=False)

### Get the features directly from VGG16

In [None]:
train_features = model.predict(train_images)
test_features = model.predict(test_images)

In [36]:
# Save the features calculated earlier
np.save("train_features.npy", train_features)
np.save("test_features", test_features)

# Load the trained weights
# loaded_array = np.load("train_features.npy")
# loaded_array = np.load("test_features.npy")

In [None]:
type(train_features)

In [22]:
n_train, x, y, z = train_features.shape
n_test, x, y, z = test_features.shape
numFeatures = x * y * z

## Training on top of VGG

Let's train a simple one-layer Neural Network on the features extracted from VGG.

In [None]:
model2 = tf.keras.Sequential([
    tf.keras.layers.Flatten(input_shape = (x, y, z)),
    tf.keras.layers.Dense(128, activation=tf.nn.relu),
    tf.keras.layers.Dense(2, activation=tf.nn.softmax)
])

model2.compile(optimizer = 'adam', loss = 'sparse_categorical_crossentropy', metrics=['accuracy'])

history2 = model2.fit(train_features, train_labels, batch_size=28, epochs=30, validation_split = 0.2)

In [None]:
plot_accuracy_loss(history)

In [None]:
test_loss = model2.evaluate(test_features, test_labels)