# I. Explore Dataset

In [None]:
%%capture
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from sklearn.model_selection import train_test_split
from PIL import Image

In [None]:
def show_images(images, titles=None):
    if not titles:
        titles = [img.shape for img in images]
    fig, axes = plt.subplots(nrows=1, ncols=len(images), figsize=(10, 30))
    for i, ax in enumerate(axes):
        ax.imshow(images[i], cmap="summer")
        ax.set_title(titles[i])
        ax.axis("off")
    plt.show()

The dataset is divided into several directories based on the location the image is taken, following code will show the number of all images in each directory.

In [None]:
root_path = "/kaggle/input/solar-panel-detection-and-identification/PV03" 
categories_paths = os.listdir(root_path)
categories_paths = [os.path.join(root_path, cat_path) for cat_path in categories_paths]

In [None]:
for cat_path in categories_paths:
    for _, _, files in os.walk(cat_path):
        print("{}: {}".format(cat_path, len(files)))

Example of image and its mask.

In [None]:
# Edit path to image and mask to see
image_path = '/kaggle/input/solar-panel-detection-and-identification/PV03/PV03_Rooftop/PV03_314602_1199205.bmp'
mask_path = '/kaggle/input/solar-panel-detection-and-identification/PV03/PV03_Rooftop/PV03_314602_1199205_label.bmp'
image = plt.imread(image_path)
mask = np.expand_dims(plt.imread(mask_path), axis=(-1))
image_shape = image.shape
mask_shape = mask.shape

fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(10, 20))

axes[0].imshow(image)
axes[0].set_title('Shape: ' + str(image_shape))

axes[1].imshow(mask, cmap="summer")
axes[1].set_title('Shape: ' + str(mask_shape))

[ax.axis("off") for ax in axes]
plt.show()

All image are 1024x1024 pixels which will be huge for our model. The masks are in binary format, 1 for solar panels and 0 for other things.
So we need to take this considerations:
* Down-sampling images for ease of training.
* We need just one neuron in the output layer of our model which tells probabilty of being solar panel.
* Due to high resolution of the images, we cannot fit all of the dataset in RAM so creating the dataset from a generator is necessary.

# II. Dataset Preparation

In this part, we should take care of several things:
* Creating a generator to read images from memory on the fly (during training).
* Spliting the dataset into train and test for further evaluation.
* Creating two datasets from the generators

In [None]:
images_paths = []
for cat_path in categories_paths:
    for root, _, files in os.walk(cat_path):
        cd_images = [os.path.join(root, file) for file in files]
        [images_paths.append(img) for img in cd_images]
images_paths = sorted(images_paths)
images_paths[:6]

In [None]:
len(images_paths)

We can see that the images are in the odd indices and their mask is in the next index, so knowing this structure, we will divide the indices to `train_idx` and `test_idx` and then create these generators:

In [None]:
n_images = len(images_paths)
#new_size = (256, 256)
new_size = (512, 512)
images_idx = range(0, n_images, 2)
train_idx, test_idx = train_test_split(images_idx, test_size=0.15)

In [None]:
total_test=0
total=0
for i in test_idx:
    if 'Cropland' in images_paths[i]:
        total+=1
print('Ground Cropland:',total)
total_test=total_test+total

total=0
for i in test_idx:
    if 'Grassland' in images_paths[i]:
        total+=1
print('Ground Grassland:',total)
total_test=total_test+total

total=0
for i in test_idx:
    if 'SalineAlkali' in images_paths[i]:
        total+=1
print('Ground SalineAlkali:',total)
total_test=total_test+total

total=0
for i in test_idx:
    if 'Shrubwood' in images_paths[i]:
        total+=1
print('Ground Shrubwood:',total)
total_test=total_test+total

total=0
for i in test_idx:
    if 'WaterSurface' in images_paths[i]:
        total+=1
print('Ground WaterSurface:',total)
total_test=total_test+total

total=0
for i in test_idx:
    if 'Rooftop' in images_paths[i]:
        total+=1
print('Rooftop:',total)
total_test=total_test+total

print('Total used for testing:', total_test)

Downsampling the PV03 images so they are more similar to satellite images

In [None]:
import torchvision.transforms as transforms 

def downsample_image(input_path, scale_factor):
    # Open the image
    img = Image.open(input_path)

    # Get the original resolution
    original_resolution = img.size

    # Calculate the new size based on the scale factor
    new_resolution = tuple(int(dim * scale_factor) for dim in original_resolution)

    # Resize the image while maintaining resolution
    resized_img = img.resize(new_resolution, Image.LANCZOS)

    upscaled_img = resized_img.resize(original_resolution, Image.LANCZOS)

    
    
    img_numpy = np.array(upscaled_img)
    
    return img_numpy

Example of downsampled image

In [None]:
image_path = '/kaggle/input/solar-panel-detection-and-identification/PV03/PV03_Rooftop/PV03_314602_1199205.bmp'

og = plt.imread(image_path)

downsamp = downsample_image(image_path,0.375)


show_images((og,downsamp),['Original', 'Down sampled'])

Create generator for train and test

In [None]:
def train_dataset_generator():
    for i in train_idx:
        image = (
            tf.convert_to_tensor(downsample_image(images_paths[i],0.375), dtype=tf.float32)
        )
        mask = (
            tf.convert_to_tensor(
                np.expand_dims(plt.imread(images_paths[i + 1]), axis=(-1)),
                dtype=tf.float32,
            )
            / 255.0
        )

        image = tf.image.resize(image, new_size)
        mask = tf.image.resize(mask, new_size)

        yield image, mask
        

def test_dataset_generator():
    for i in test_idx:
        image = (
            tf.convert_to_tensor(downsample_image(images_paths[i],0.375), dtype=tf.float32)
        )
        mask = (
            tf.convert_to_tensor(
                np.expand_dims(plt.imread(images_paths[i + 1]), axis=(-1)),
                dtype=tf.float32,
            )
            / 255.0
        )

        image = tf.image.resize(image, new_size)
        mask = tf.image.resize(mask, new_size)

        yield image, mask

Create tensorflow datasets:

In [None]:
train_dataset = tf.data.Dataset.from_generator(
    train_dataset_generator,
    output_signature=(
        tf.TensorSpec(shape=(*new_size, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(*new_size, 1), dtype=tf.float32),
    ),
)

test_dataset = tf.data.Dataset.from_generator(
    test_dataset_generator,
    output_signature=(
        tf.TensorSpec(shape=(*new_size, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(*new_size, 1), dtype=tf.float32),
    ),
)


Next, we wil check if the dataset is working properly or not.

In [None]:
for item in train_dataset.shuffle(20).take(1):
    show_images((item[0]/255,item[1]))

# III. UNet Model

We will be utilizing the segmentation-models library to obtain a Unet model with a inceptionresnetv2 backbone.
The Unet model is pretrained on the ImageNet dataset: https://www.image-net.org/

In [None]:
%%capture
!pip install -U -q segmentation-models
os.environ["SM_FRAMEWORK"] = "tf.keras"

import segmentation_models as sm

In [None]:
load_model = False
backbone = 'inceptionresnetv2'
batch_size = 16

Before going into training we need to make sure:
* The data is preprocessed regard to the backbone model.
* The dataset is splitted into train and validation dataset.
* The encoder part of the UNet is freezed so the model can be trained within a reasonable time frame.

In [None]:
backbone_preprocess = sm.get_preprocessing(backbone) 
preprocess_fn = lambda x, y: (backbone_preprocess(x), y) # Converts RGB values into the range of -1 to 1 for images only (not mask)

train_dataset = train_dataset.map(preprocess_fn)

In [None]:
def is_test(x, _):
    return x % 4 == 0

def is_train(x, y):
    return not is_test(x, y)


recover = lambda x, y: y

valid_dataset = train_dataset.enumerate().filter(is_test).map(recover).batch(batch_size)

train_dataset = train_dataset.enumerate().filter(is_train).map(recover).batch(batch_size)

In [None]:
model = sm.Unet(
    backbone,
    classes=1,
    encoder_weights="imagenet",
    encoder_freeze=True,
)

**Loss function used**: Diceloss <br> **Metrics used**: IOU Score, Binary Accuracy, Precision, and Recall

In [None]:
loss = sm.losses.DiceLoss()
metrics = [sm.metrics.IOUScore(),'binary_accuracy',keras.metrics.Precision(),keras.metrics.Recall()]
model.compile("Adam", loss=loss, metrics=metrics)

In [None]:
#keras.utils.plot_model(model, show_shapes=True) # Uncomment for seeing the model graph

# IV. Training Model

In this part we will train the model.
First we define a callback for visualize the learning process with one instance of the training dataset. 

In [None]:
class DisplayCallback(tf.keras.callbacks.Callback):
    def on_epoch_begin(self, epoch, logs=None):
        if (epoch + 1) % 5 == 0:
            for item in train_dataset.unbatch().shuffle(1).take(1):
                image = item[0]
                mask_4d = self.model.predict(np.expand_dims(image, axis=(0)))
                mask = np.squeeze(mask_4d, axis=0)
                image_converted = (image + 1) / 2
                show_images((image_converted, mask))

display_cb = DisplayCallback()
early_stopping_cb = keras.callbacks.EarlyStopping(patience=10, restore_best_weights='True')
checkpoint_cb = keras.callbacks.ModelCheckpoint('model_unet-inceptionresnetv2_cp.keras', verbose=1, save_best_only=True)

In [None]:
for item in train_dataset.unbatch().shuffle(10).take(2):
    image = item[0]
    image_converted = (image + 1) / 2 # Converts RGB values back to the range of 0 to 1
    mask = item[1]
    show_images((image_converted, mask))

The following code will train the model. In the case of training on the PV03 dataset it will take around 2 hours running on a P100 GPU with kaggle.

In [None]:
with tf.device("/device:GPU:0"):
    history = model.fit(
        train_dataset,
        batch_size=batch_size,
        epochs=100,
        validation_data=valid_dataset,
        callbacks=[display_cb, early_stopping_cb, checkpoint_cb],
    )
history = pd.DataFrame.from_dict(history.history)

history.to_csv("history_unet-inceptionresnetv2.csv", index=False)
model.save("model_unet-inceptionresnetv2.keras")


Let's take a look on train and validation loss and IoU score:

In [None]:
plt.figure(figsize=(15, 5))
plt.subplot(1, 2, 1)
plt.plot(history['loss'])
plt.plot(history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')

plt.subplot(1, 2, 2)
plt.plot(history['iou_score'])
plt.plot(history['val_iou_score'])
plt.title('model IoU score')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()

# V. Evaluate Model

We will evaluate the model. Doing this, we will take 10 instances from test set and see the result, note that the model has never seen these images.

Apply backbone preprocessing to test dataset

In [None]:
test_dataset = test_dataset.map(preprocess_fn)

Evaluate Model

In [None]:
threshold = 0.9

for item in test_dataset.shuffle(100).take(10):
        image = item[0]
        true_mask = item[1]
        mask_4d = model.predict(np.expand_dims(image, axis=(0)))
        pred_mask_proba = np.squeeze(mask_4d, axis=0)
        pred_mask = np.where(pred_mask_proba > threshold, 1, 0)
        
        image_converted = (image + 1) / 2
        
        show_images(
            (image_converted, true_mask, pred_mask_proba, pred_mask), 
            ["Image", "True Mask", "Model Probability", "Model Prediciton @"+str(threshold)]
        )

In [None]:
model.evaluate(test_dataset.batch(batch_size))