In [30]:
import os
import os.path as op
import json
from pathlib import Path
import shutil
import logging
import numpy as np
from tqdm import tqdm
from skimage import io
import matplotlib.pyplot as plt

In [3]:
# Logging configuration
logging.basicConfig(level=logging.INFO,
                    datefmt='%H:%M:%S',
                    format='%(asctime)s | %(levelname)-5s | %(module)-15s | %(message)s')

IMAGE_SIZE = (299, 299)  # All images contained in this dataset are 299x299 (originally, to match Inception v3 input size)
SEED = 17

# Head directory containing all image subframes. Update with the relative path of your data directory
data_head_dir = Path('../data')

# Find all subframe directories
subdirs = [Path(subdir.stem) for subdir in data_head_dir.iterdir() if subdir.is_dir()]
src_image_ids = ['_'.join(a_path.name.split('_')[:3]) for a_path in subdirs]

In [6]:
# Load train/val/test subframe IDs
def load_text_ids(file_path):
    """Simple helper to load all lines from a text file"""
    with open(file_path, 'r') as f:
        lines = [line.strip() for line in f.readlines()]
    return lines

# Load the subframe names for the three data subsets
train_ids = load_text_ids('../train_source_images.txt')
validate_ids = load_text_ids('../val_source_images.txt')
test_ids = load_text_ids('../test_source_images.txt')

# Generate a list containing the dataset split for the matching subdirectory names
subdir_splits = []
for src_id in src_image_ids:
    if src_id in train_ids:
        subdir_splits.append('train')
    elif src_id in validate_ids:
        subdir_splits.append('validate')
    elif(src_id in test_ids):
        subdir_splits.append('test')
    else:
        logging.warning(f'{src_id}: Did not find designated split in train/validate/test list.')
        subdir_splits.append(None)

# Loading and pre processing the data
### Note that there are multiple ways to preprocess and load your data in order to train your model in tensorflow. We have provided one way to do it in the following cell. Feel free to use your own method and get better results.

In [10]:
import random
!pip install tensorflow
import tensorflow as tf
from PIL import Image 

def load_and_preprocess(img_loc, label):
    
    def _inner_function(img_loc, label):
        
        # Convert tensor to native type
        img_loc_str = img_loc.numpy().decode('utf-8')
        # label_str = label.numpy().decode('utf-8')
        
        img = Image.open(img_loc_str).convert('RGB')
        img = np.array(img)
        img = tf.image.resize(img, [299, 299])

        img = img / 255.0
        label = 1 if label.numpy().decode('utf-8') == 'frost' else 0

        return img, label

    # Wrap the Python function
    X, y = tf.py_function(_inner_function, [img_loc, label], [tf.float32, tf.int64])
    X.set_shape([299, 299, 3])
    y.set_shape([])
    
    return X, y

def load_subdir_data(dir_path, image_size, seed=None):
    
    """Helper to create a TF dataset from each image subdirectory"""
    
    # Grab only the classes that (1) we want to keep and (2) exist in this directory
    tile_dir = dir_path / Path('tiles')
    label_dir = dir_path /Path('labels')
    
    loc_list = []
    
    for folder in os.listdir(tile_dir):
        if os.path.isdir(os.path.join(tile_dir, folder)):
            for file in os.listdir(os.path.join(tile_dir, folder)):
                if file.endswith(".png"):
                    loc_list.append((os.path.join(os.path.join(tile_dir, folder), file), folder))

    return loc_list

# Loop over all subframes, loading each into a list
tf_data_train, tf_data_test, tf_data_val = [], [], []
tf_dataset_train, tf_dataset_test, tf_dataset_val = [], [], []

# Update the batch and buffer size as per your model requirements
buffer_size = 64
batch_size = 32

for subdir, split in zip(subdirs, subdir_splits):
    full_path = data_head_dir / subdir
    if split=='validate':
        tf_data_val.extend(load_subdir_data(full_path, IMAGE_SIZE, SEED))
    elif split=='train':
        tf_data_train.extend(load_subdir_data(full_path, IMAGE_SIZE, SEED))
    elif split=='test':
        tf_data_test.extend(load_subdir_data(full_path, IMAGE_SIZE, SEED))
        
random.shuffle(tf_data_train)
img_list, label_list = zip(*tf_data_train)
img_list_t = tf.convert_to_tensor(img_list)
lb_list_t = tf.convert_to_tensor(label_list)

tf_dataset_train = tf.data.Dataset.from_tensor_slices((img_list_t, lb_list_t))
tf_dataset_train = tf_dataset_train.map(load_and_preprocess, num_parallel_calls=tf.data.experimental.AUTOTUNE)
tf_dataset_train = tf_dataset_train.shuffle(buffer_size=buffer_size).batch(batch_size) 

random.shuffle(tf_data_val)
img_list, label_list = zip(*tf_data_val)
img_list_t = tf.convert_to_tensor(img_list)
lb_list_t = tf.convert_to_tensor(label_list)

tf_dataset_val = tf.data.Dataset.from_tensor_slices((img_list_t, lb_list_t))
tf_dataset_val = tf_dataset_val.map(load_and_preprocess, num_parallel_calls=tf.data.experimental.AUTOTUNE)
tf_dataset_val = tf_dataset_val.shuffle(buffer_size=buffer_size).batch(batch_size) 

random.shuffle(tf_data_test)
img_list, label_list = zip(*tf_data_test)
img_list_t = tf.convert_to_tensor(img_list)
lb_list_t = tf.convert_to_tensor(label_list)

tf_dataset_test = tf.data.Dataset.from_tensor_slices((img_list_t, lb_list_t))
tf_dataset_test = tf_dataset_test.map(load_and_preprocess, num_parallel_calls=tf.data.experimental.AUTOTUNE)
tf_dataset_test = tf_dataset_test.shuffle(buffer_size=buffer_size).batch(batch_size) 



In [12]:
!pip install tensorflow opencv-python

Collecting opencv-python
  Obtaining dependency information for opencv-python from https://files.pythonhosted.org/packages/05/58/7ee92b21cb98689cbe28c69e3cf8ee51f261bfb6bc904ae578736d22d2e7/opencv_python-4.8.1.78-cp37-abi3-macosx_10_16_x86_64.whl.metadata
  Downloading opencv_python-4.8.1.78-cp37-abi3-macosx_10_16_x86_64.whl.metadata (19 kB)
Downloading opencv_python-4.8.1.78-cp37-abi3-macosx_10_16_x86_64.whl (54.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.7/54.7 MB[0m [31m8.0 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25hInstalling collected packages: opencv-python
Successfully installed opencv-python-4.8.1.78


<h3>Empirical Regularization</h3>
    
Randomly zoom, rotate, flip, contrast, and translate images in your training set for image augmentation. 

Based on this dataset, I believe *random contrast adjustment, rotation, and cropping/zooming* would be the best. The model will learn based on different orientations of features, which could be helpful for satellite images. It will learn from different parts of the image at different scales, which could identify terrain features better. The model will learn to be more robust to changes in lighting, as well. 

In [27]:
# Helper methods for image augmentation

def random_contrast(image, lower=0.5, upper=1.5):
    factor = np.random.uniform(lower, upper)
    return tf.image.adjust_contrast(image, factor)

def random_flip(image, label): 
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_flip_up_down(image)
    return image, label
    
def random_zoom(image, label):
    # Extract the height and width from the shape of the image
    shape = tf.shape(image)
    height, width = tf.cast(shape[-3], tf.float32), tf.cast(shape[-2], tf.float32)

    zoom_factor = tf.random.uniform([], 0.8, 1.2)
    new_height, new_width = tf.cast(height * zoom_factor, tf.int32), tf.cast(width * zoom_factor, tf.int32)

    # Resize and then crop the image
    image = tf.image.resize(image, [new_height, new_width])
    image = tf.image.random_crop(image, size=[tf.cast(height, tf.int32), tf.cast(width, tf.int32), 3])

    return image, label


def random_rotation(image, label):
    image = tf.image.rot90(image, k=np.random.randint(4))
    return image, label

def random_translate(image):
    image = tf.image.random_translation(image, 0.2, 0.2)
    return image
    
    
def image_augmentation(image, label):
    image = tf.convert_to_tensor(image, dtype=tf.float32)
    # Selectively augment data to prevent all augmentations on every image
    choice = tf.random.uniform(shape=[], minval=0, maxval=2, dtype=tf.int32)
    if choice == 0:
        # Random Contrast for different image sharpness/different lighting
        image = random_contrast(image)
    elif choice == 1:
        # Random Rotation of terrain features
        image, label = random_rotation(image, label)

    # Image normalization
    image /= 255.0

    return image, label

In [28]:
augmented_train_dataset = tf_dataset_train.map(image_augmentation, num_parallel_calls=tf.data.experimental.AUTOTUNE)

<h3> Train a three-layer CNN followed by a dense layer on the data </h3>

In [35]:
from tensorflow.keras import models, layers, regularizers
from tensorflow.keras import metrics
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.optimizers import Adam
model = models.Sequential([
    layers.Conv2D(32, (3, 3), activation='relu', input_shape=(299, 299, 3)),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 2)),

    layers.Conv2D(64, (3, 3), activation='relu', kernel_regularizer=regularizers.l2(0.01)),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 2)),

    layers.Conv2D(128, (3, 3), activation='relu', kernel_regularizer=regularizers.l2(0.01)),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 2)),

    layers.Flatten(),
    layers.Dropout(0.3),
    layers.Dense(256, activation='relu', kernel_regularizer=regularizers.l2(0.01)),
    layers.BatchNormalization(),
    layers.Dense(1, activation='sigmoid')  # Use sigmoid for binary classification
])

# Use cross entropy loss
model.compile(optimizer='adam',loss='binary_crossentropy', metrics=['accuracy', metrics.Precision(name='precision'),
                       metrics.Recall(name='recall')])

# Perform early stopping using the validation set
early_stopping = EarlyStopping(monitor='val_loss', patience=3)
model_checkpoint = ModelCheckpoint('best_CNN_MLP_model.tf', monitor='val_loss', save_best_only=True, save_format ='tf')

In [37]:
# Train the CNN & MLP model for at least 20 epochs with the train data set
history = model.fit(augmented_train_dataset, epochs = 20, validation_data = tf_dataset_val,
                    callbacks=[early_stopping, model_checkpoint])

Epoch 1/20


KeyboardInterrupt: 

In [None]:
# Plot training and validation errors vs. epochs
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()