In [3]:
import numpy as np
from PIL import Image

In [6]:
Image.MAX_IMAGE_PIXELS = 933120000
flagged_img = np.asarray(Image.open("../data/flags_applied/2021_10_03/wq.tif"))

In [8]:
flagged_img.shape

(9410, 27602)

We aim to train the Unet model.

Testing Image = "unflagged"

Testing Label = "flagged"

#Categories and pixel value

Land=0

invalid=253 - 255

valid=others

We are going to classify pixels into valid and invalid.
Masks provides the value 0 to 255(which means RGB has already converted into 2D_label).
You can check these labels' value like this.

e.g.,
```python
import numpy as np
from PIL import Image
Image.MAX_IMAGE_PIXELS = 933120000
flagged_img = np.asarray(Image.open('../data/flags_applied/2021_10_03/wq.tif'))
labels = np.expand_dims(flagged_img, axis=0)
np.unique(labels)
```

On the other hand, input RGB images provide three values(each has 0 to 255 value).



## 1. Convert 256 (integral encoded) label into 2 labels(valid or invalid). 

In [36]:
def convert_to_binary_label(label):
    if label == 0 or label >= 253:
        label = 0
    else:
        label = 1
    return label

In [37]:
import numpy as np
from PIL import Image

Image.MAX_IMAGE_PIXELS = 933120000
flagged_img = np.asarray(Image.open("../data/flags_applied/2021_10_03/wq.tif"))
labels = np.expand_dims(flagged_img, axis=0)
# apply binary label function to each pixel
v_binary_label = np.vectorize(convert_to_binary_label)
labels = v_binary_label(labels)
# check unique labels (should be 0 and 1)
np.unique(labels)

array([0, 1])

## 2. Change the binary integral encode into [one-hot](https://en.wikipedia.org/wiki/One-hot) encode.

In [40]:
from keras.utils import to_categorical

labels_cat = to_categorical(labels, num_classes=2)

2023-03-03 16:22:44.424467: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [43]:
print(labels_cat.shape)

(1, 9410, 27602, 2)


## 3. Train a model with split images

In [59]:
# e.g., Once we create split images, we can save them in, for instance, a folder called "split_images"
import os

split_images_flags_applied_dir = "../data/split_images/flags_applied/"

mask_dataset = []
for path, subdirs, files in os.walk(split_images_flags_applied_dir):
    for name in files:
        if name.endswith("wq.tif"):
            image = np.asarray(Image.open(path + "/" + name))
            mask_dataset.append(image)

In [61]:
# apply binary label function to each pixel
if len(mask_dataset) > 0:
    v_binary_label = np.vectorize(convert_to_binary_label)
    mask_dataset = v_binary_label(mask_dataset)

In [63]:
from sklearn.model_selection import train_test_split
from matplotlib import pyplot as plt

X_train, X_test, y_train, y_test = train_test_split(
    mask_dataset, labels_cat, test_size=0.2, random_state=42
)

#######################################
# Parameters for model
# Segmentation models losses can be combined together by '+' and scaled by integer or float factor
# set class weights for dice_loss
# from sklearn.utils.class_weight import compute_class_weight

# weights = compute_class_weight('balanced', np.unique(np.ravel(labels,order='C')),
#                               np.ravel(labels,order='C'))
# print(weights)


# It depends on the dataset, but in this case, the weights are:
weights = [0.1666, 0.1666, 0.1666, 0.1666, 0.1666, 0.1666]
dice_loss = sm.losses.DiceLoss(class_weights=weights)
focal_loss = sm.losses.CategoricalFocalLoss()
total_loss = dice_loss + (1 * focal_loss)  #
IMG_HEIGHT = X_train.shape[1]
IMG_WIDTH = X_train.shape[2]
IMG_CHANNELS = X_train.shape[3]

from simple_multi_unet_model import multi_unet_model, jacard_coef

metrics = ["accuracy", jacard_coef]


def get_model():
    return multi_unet_model(
        n_classes=2,
        IMG_HEIGHT=IMG_HEIGHT,
        IMG_WIDTH=IMG_WIDTH,
        IMG_CHANNELS=IMG_CHANNELS,
    )


model = get_model()
model.compile(optimizer="adam", loss=total_loss, metrics=metrics)
# model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=metrics)
model.summary()

history1 = model.fit(
    X_train,
    y_train,
    batch_size=16,
    verbose=1,
    epochs=100,
    validation_data=(X_test, y_test),
    shuffle=False,
)

# Resnet backbone
BACKBONE = "resnet34"
preprocess_input = sm.get_preprocessing(BACKBONE)

# preprocess input
X_train_prepr = preprocess_input(X_train)
X_test_prepr = preprocess_input(X_test)

# define model
model_resnet_backbone = sm.Unet(
    BACKBONE, encoder_weights="imagenet", classes=n_classes, activation="softmax"
)

# compile keras model with defined optimozer, loss and metrics
# model_resnet_backbone.compile(optimizer='adam', loss=focal_loss, metrics=metrics)
model_resnet_backbone.compile(
    optimizer="adam", loss="categorical_crossentropy", metrics=metrics
)

print(model_resnet_backbone.summary())


history2 = model_resnet_backbone.fit(
    X_train_prepr,
    y_train,
    batch_size=16,
    epochs=100,
    verbose=1,
    validation_data=(X_test_prepr, y_test),
)

###########################################################
# plot the training and validation accuracy and loss at each epoch
history = history1
loss = history.history["loss"]
val_loss = history.history["val_loss"]
epochs = range(1, len(loss) + 1)
plt.plot(epochs, loss, "y", label="Training loss")
plt.plot(epochs, val_loss, "r", label="Validation loss")
plt.title("Training and validation loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()

acc = history.history["jacard_coef"]
val_acc = history.history["val_jacard_coef"]

plt.plot(epochs, acc, "y", label="Training IoU")
plt.plot(epochs, val_acc, "r", label="Validation IoU")
plt.title("Training and validation IoU")
plt.xlabel("Epochs")
plt.ylabel("IoU")
plt.legend()
plt.show()

KeyboardInterrupt: 