In [1]:
import numpy as np 
import os
import tensorflow as tf
from tensorflow.keras.models import *
from tqdm import tqdm
from random import Random
from model import *

In [None]:
model = get_model(32)
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss=static_weighted_binary_crossentropy,
    metrics=["accuracy", tf.keras.metrics.MeanIoU(num_classes=2)],
)
model.summary()

In [None]:
model.load_weights('weights')

In [4]:
# paths to preprocessed training data created from trainingData.ipynb
X_PATH = 'E:/SUBDIVS/img'
Y_PATH = 'E:/SUBDIVS/mask'

In [4]:
# used to calculate weight_zero for the staticWeightedBinaryCrossEntropy function
# value = fraction of one pixels / total pixels in training dataset

all_y = os.listdir(Y_PATH)

# shuffling data before train/validation split
Random(333).shuffle(all_y)

SPLIT_train_val = int(len(all_y) * 0.7)
rolling_sum = 0
for num, img in tqdm(enumerate(all_y[:SPLIT_train_val])):
    tempY = np.load(os.path.join(Y_PATH, img))
    rolling_sum += tempY.sum()

print(rolling_sum / (SPLIT_train_val * 484 * 484))

In [6]:
# trains the model

BATCH_SIZE = 6

all_x, all_y = os.listdir(X_PATH), os.listdir(Y_PATH)
all_x.sort()
all_y.sort()

# seeded shuffling data before train/validation split
Random(333).shuffle(all_x)
Random(333).shuffle(all_y)

# 70% training, 15% validation, 15% testing
SPLIT_train_val = int(len(all_x) * 0.7)
SPLIT_val_test = int(len(all_x) * 0.85)

# iterable function that returns (img, mask) tuples from the training data. Performs augmentation on the data (normal, horizontal flip, vertical flip, horizontal & vertical flip)
def iter_train():
    for num in range(0, SPLIT_train_val * 4):
        index = int(num / 4)
        rem = num % 4
        img = np.load(os.path.join(X_PATH, all_x[index])).astype(np.uint8)
        mask = np.load(os.path.join(Y_PATH, all_y[index])).astype(bool)
        if rem == 0:
            pass
        elif rem == 1:
            img = np.flip(img, axis=0)
            mask = np.flip(mask, axis=0)
        elif rem == 2:
            img = np.flip(img, axis=1)
            mask = np.flip(mask, axis=1)
        else:
            img = np.flip(np.flip(img, axis=0), axis=1)
            mask = np.flip(np.flip(mask, axis=0), axis=1)
        yield img, mask


# iterable function that returns (img, mask) tuples from the validation data
def iter_validate():
    for num in range(SPLIT_train_val, SPLIT_val_test):
        img = np.load(os.path.join(X_PATH, all_x[num])).astype(np.uint8)
        mask = np.load(os.path.join(Y_PATH, all_y[num])).astype(bool)
        yield img, mask


gen_train = iter_train()
gen_validate = iter_validate()

data_train = (
    tf.data.Dataset.from_generator(
        iter_train,
        output_signature=(
            tf.TensorSpec(shape=(668, 668, 3), dtype=tf.uint8),
            tf.TensorSpec(shape=(484, 484, 1), dtype=tf.bool),
        ),
    )
    .batch(BATCH_SIZE)
    .prefetch(tf.data.AUTOTUNE)
)

data_validate = (
    tf.data.Dataset.from_generator(
        iter_validate,
        output_signature=(
            tf.TensorSpec(shape=(668, 668, 3), dtype=tf.uint8),
            tf.TensorSpec(shape=(484, 484, 1), dtype=tf.bool),
        ),
    )
    .batch(BATCH_SIZE)
    .prefetch(tf.data.AUTOTUNE)
)

# optional
callbacks = [
    tf.keras.callbacks.EarlyStopping(patience=5, monitor="val_loss"),
    tf.keras.callbacks.TensorBoard(
        log_dir="cloud_model", histogram_freq=1, write_images=True, update_freq=200
    ),
    tf.keras.callbacks.ModelCheckpoint(
        "checkpoints/cloud_model_weights.ckpt", verbose=1, save_best_only=True
    ),
]

results = model.fit(
    data_train, validation_data=data_validate, verbose=1, epochs=250, callbacks=callbacks
)

# for tensorboard run command in activated python environment: python -m tensorboard.main --logdir="path\to\CloudMask Final\cloud_model" --host=127.0.0.1

In [None]:
# saves the model
model.load_weights("checkpoints/cloud_model_weights.ckpt/")
model.save("save_cloud_model")

In [8]:
# tests the model

all_x = os.listdir(X_PATH)
all_y = os.listdir(Y_PATH)

model = tf.keras.models.load_model(
    "save_cloud_model",
    custom_objects={"static_weighted_binary_crossentropy": static_weighted_binary_crossentropy},
)

# shuffling data before train/validation split
Random(333).shuffle(all_x)
Random(333).shuffle(all_y)

SPLIT_train_val = int(len(all_x) * 0.7)
SPLIT_val_test = int(len(all_x) * 0.85)


def iterTest():
    for num in range(SPLIT_val_test, int(len(all_x))):
        img = np.load(os.path.join(X_PATH, all_x[num])).astype(np.uint8)
        mask = np.load(os.path.join(Y_PATH, all_y[num])).astype(bool)
        yield img, mask


gen_test = iterTest()

BATCH_SIZE = 1
data_test = (
    tf.data.Dataset.from_generator(
        iterTest,
        output_signature=(
            tf.TensorSpec(shape=(668, 668, 3), dtype=tf.uint8),
            tf.TensorSpec(shape=(484, 484, 1), dtype=tf.bool),
        ),
    )
    .batch(BATCH_SIZE)
    .prefetch(tf.data.AUTOTUNE)
)

results = model.evaluate(data_test, verbose=1)
print(results)