**<span style="color:#0638b2; font-size:32px">
Phase II - Model Fine Tuning and Saving
</span>** <br>

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import PIL
import tensorflow as tf
import os
from collections import Counter
from sklearn.utils.class_weight import compute_class_weight

from tensorflow import keras
from tensorflow.keras import layers, regularizers   ### Adding 'regularizers'
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Rescaling
import pathlib
# import zipfile

In [None]:
os.getcwd()

**<span style="color:#180842; font-size:26px">
Train/Validation Data Pull
</span>** <br>

In [None]:
# Specify the path to the zip file
# dataset_url = "https://dl.dropboxusercontent.com/scl/fi/hgz1prxm1kx14w5riy9du/ls_model_data_three.zip?rlkey=7cv22qxmmaeofr7jn6z7iayjy&dl=0?raw=1"
# dataset_url = "https://dl.dropboxusercontent.com/scl/fi/vmox93cwsx2vzrish3l63/land_ls_model_data_four.zip?rlkey=6uykz7bb0aoqbl8hgfkpt1t4f&dl=0?raw=1"
dataset_url = "https://dl.dropboxusercontent.com/scl/fi/mrsbiu74dh3h3i0j2xi8b/ls_model_data_five.zip?rlkey=in4z2twssi71adfi1exknm67w&dl=0?raw=1"

In [None]:
data_dir = tf.keras.utils.get_file('ls_model_data_five.zip', origin=dataset_url, extract=True)
data_dir = pathlib.Path(data_dir).with_suffix('')

In [None]:
image_count = len(list(data_dir.glob('*/*.jpg')))
print(image_count)

In [None]:
# Drill into each subdirectory and list counts per folder
folders = [folder for folder in data_dir.glob('*') if folder.is_dir()]
for folder in folders:
    print(f"\nContents of folder '{folder.name}':")
    # for subitem in folder.iterdir():
    count = len(list(folder.glob('*.jpg')))
    print(f"{folder.name}: {count} .jpg images")

**<span style="color:#180842; font-size:26px">
Test Data Pull
</span>** <br>

In [None]:
#https://www.dropbox.com/scl/fi/nyv6xg7u0lj5uv5jtglto/test_google_images_landslide_three.zip?rlkey=b7xnjid2s7zvfjbp0oqtji084&dl=0
# test_dataset_url = "https://dl.dropboxusercontent.com/scl/fi/nyv6xg7u0lj5uv5jtglto/test_google_images_landslide_three.zip?rlkey=b7xnjid2s7zvfjbp0oqtji084&dl=0?raw=1"
# test_dataset_url = "https://dl.dropboxusercontent.com/s/0en4k86c1r1uzs0/test_google_images_landslides_four.zip?st=pdg6yhzj&dl=0?raw=1"
# test_dataset_url = "https://dl.dropboxusercontent.com/scl/fi/kl3e9wxoibqu0ldogwxg0/test_google_images_landslides_five.zip?rlkey=zuhh6xvjy3oi9wuroy4ogtdi3&dl=0?raw=1"
test_dataset_url = "https://dl.dropboxusercontent.com/scl/fi/a8evtoop10e9cadnxorqw/google_earth_unique_test.zip?rlkey=g463zbw32hn5ez80wb1qogw4g&dl=0?raw=1"

In [None]:
data_dir_test = tf.keras.utils.get_file('google_earth_unique_test.zip', origin=test_dataset_url, extract=True)
data_dir_test = pathlib.Path(data_dir_test).with_suffix('')

In [None]:
## Review folders in incoming data / classes
folders_test = [folder for folder in data_dir_test.glob('*') if folder.is_dir()]
print("Folders in the test directory:")
for folder in folders_test:
    print(folder.name)

In [None]:
# Drill into each subdirectory and list counts per folder
folders = [folder for folder in data_dir_test.glob('*') if folder.is_dir()]
for folder in folders:
    print(f"\nContents of folder '{folder.name}':")
    # for subitem in folder.iterdir():
    count = len(list(folder.glob('*.jpg')))
    print(f"{folder.name}: {count} .jpg images")

In [None]:
# # Drill into each subdirectory and list files
# total_files = 0
# for folder in folders_test:
#     print(f"\nContents of folder '{folder.name}':")
#     for subitem in folder.iterdir():
#         if subitem.is_file():
#             print(f"  File: {subitem.name}")
#         elif subitem.is_dir():
#             print(f"  Subfolder: {subitem.name}")
#             count = len(list(subitem.glob('*.jpg')))
#             print(f"{subitem.name}: {count} .jpg images")
#             total_files += count  # Add the count of .jpg images to the total
            
# # Print the total number of files
# print(f"\nTotal number of files: {total_files}")

**<span style="color:#180842; font-size:26px">
Set Training/Validation Data & Split
</span>** <br>

In [None]:
## Set Batch and Image size. 
batch_size = 32
img_height = 300
img_width = 300

#### Create Training / Validation Sets

In [None]:
## Set Training Set image set. 
train_dataset = tf.keras.utils.image_dataset_from_directory(
  data_dir,
  validation_split=0.20,
  subset="training",
  seed=634,
  image_size=(img_height, img_width),
  batch_size=batch_size)

In [None]:
## Setup Valiation set.
valid_dataset = tf.keras.utils.image_dataset_from_directory(
  data_dir,
  validation_split=0.20,
  subset="validation",
  seed=634,
  image_size=(img_height, img_width),
  batch_size=batch_size)

In [None]:
# Count the total samples in training and validation sets
train_count = sum(1 for _ in train_dataset.unbatch())
valid_count = sum(1 for _ in valid_dataset.unbatch())

# Initialize class counters
train_class_counts = {}
valid_class_counts = {}

# Function to count samples per class
def count_classes(dataset, class_counts):
    for _, labels in dataset.unbatch():
        label = int(labels.numpy())
        if label not in class_counts:
            class_counts[label] = 0
        class_counts[label] += 1

# Count samples in training and validation datasets
count_classes(train_dataset, train_class_counts)
count_classes(valid_dataset, valid_class_counts)

# Display counts
print(f"Training set size: {train_count}")
print(f"Validation set size: {valid_count}")
print("\nClass distribution in Training set:")
for class_label, count in sorted(train_class_counts.items()):
    print(f"  Class {class_label}: {count} samples")

print("\nClass distribution in Validation set:")
for class_label, count in sorted(valid_class_counts.items()):
    print(f"  Class {class_label}: {count} samples")

### Buffering/Shuffle

In [None]:
## Data Performance
AUTOTUNE = tf.data.AUTOTUNE

train_dataset = train_dataset.cache().shuffle(3200).prefetch(buffer_size=AUTOTUNE)
valid_dataset = valid_dataset.cache().prefetch(buffer_size=AUTOTUNE)

In [None]:
## Checking Images and Labels:
plt.figure(figsize=(10, 5))
for images, labels in train_dataset.take(1):  # Take one batch
    for i in range(8):  # Display first 6 samples
        ax = plt.subplot(2, 4, i + 1)
        plt.imshow(images[i].numpy().astype("uint8"))
        label = labels[i].numpy()
        label_name = class_names[label] if 'class_names' in locals() else label
        plt.title(f"Label: {label} ({label_name})", fontsize=10)
        plt.axis("off")
plt.show()

**<span style="color:#180842; font-size:26px">
Set Test Data
</span>** <br>

In [None]:
## Update path given the Google_Earth_Images/landslide vs. non-landslide directory structure. 
data_dir_test_lvltwo=pathlib.Path(data_dir_test) #/ 'Google_Earth_Images'
data_dir_test_lvltwo

In [None]:
test_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir_test_lvltwo,   # Base directory
    labels='inferred',           # Automatically assign labels based on folder names
    subset=None,                 # No split; we are directly specifying test data
    seed=614,                    # Random seed for reproducibility
    image_size=(img_height, img_width),
    batch_size=batch_size,
    shuffle=False,  ### Add because the labels and images were getting mixed up in processing. 
    validation_split=None,       # No validation split; test data only
)

In [None]:
## Review class names based on dataset. 
class_names = test_dataset.class_names
print(class_names)

In [None]:
## Checking the Labels. 
plt.figure(figsize=(12, 7))
for images, labels in test_dataset.take(1):  # Take one batch
    for i in range(18):  # Display first 6 samples
        ax = plt.subplot(3, 6, i + 1)
        plt.imshow(images[i].numpy().astype("uint8"))
        label = labels[i].numpy()
        label_name = class_names[label] if 'class_names' in locals() else label
        plt.title(f"Label: {label} ({label_name})", fontsize=10)
        plt.axis("off")
plt.show()

<hr style="border:8px solid #301b66">

### **Class Weights for use during Model Fit**
- Added code to create Class Weights (class_weight=class_weight,) that you may use during the Model Fit process. 
- https://keras.io/examples/structured_data/imbalanced_classification/

In [None]:
# Assuming `train_dataset` is your dataset and you have class labels as `y_train`
class_labels = np.concatenate([y.numpy() for _, y in train_dataset])

# Compute class weights
class_weights = compute_class_weight(
    class_weight='balanced',  # Option to use 'balanced' or specify manual weights
    classes=np.unique(class_labels),
    y=class_labels
)

# Convert to a dictionary (Keras expects this format)
class_weights_dict = dict(enumerate(class_weights))

print("Class weights:", class_weights_dict)

<hr style="border:8px solid #301b66">

**<span style="color:#0638b2; font-size:30px">
Model Design w/ Transfer Learning
</span>** <br>


Pre-Training with EfficientNet
- Strengths: EfficientNet models scale well in terms of parameters and efficiency, balancing accuracy with computational cost, which is especially useful for large datasets.
- Suitability: EfficientNet performs exceptionally well on high-resolution images, which is ideal for detecting fine details in satellite images.
- https://viso.ai/deep-learning/efficientnet/
- https://keras.io/examples/vision/image_classification_efficientnet_fine_tuning/

**Setting up EfficientNet**
- Note - using B3 as this is set up for 300x300 images. 

In [None]:
## Import EFB3 Model Weights
from tensorflow.keras.applications import EfficientNetB3

In [None]:
## Calc. the number of classes. 
land_class_num = len(folders)
land_class_num

In [None]:
# Setting up parameters for model. 
IMG_SIZE = 300
learn_rate_land = 1e-3  ## For optimizer.

print(land_class_num) ## Check class number AND use as argument below.

<hr style="border:3px solid #301b66">

### **Helper Functions**

In [None]:
### Plot Helper
def visualize_model_results(model_input):
    acc = model_input.history['accuracy']
    val_acc = model_input.history['val_accuracy']

    loss = model_input.history['loss']
    val_loss = model_input.history['val_loss']

    epochs_range = range(epochs)

    plt.figure(figsize=(8, 8))
    plt.subplot(2, 1, 1)
    plt.plot(epochs_range, acc, label='Training Accuracy')
    plt.plot(epochs_range, val_acc, label='Validation Accuracy')
    plt.legend(loc='lower right')
    plt.title('Training and Validation Accuracy')

    plt.subplot(2,1,2)
    plt.plot(epochs_range, loss, label='Training Loss')
    plt.plot(epochs_range, val_loss, label='Validation Loss')
    plt.legend(loc='upper right')
    plt.title('Training and Validation Loss')
    plt.show()

Saturation
- The saturation factor is sampled randomly from a uniform distribution in the range [lower, upper].
- A saturation factor of 1.0 means no change in saturation.
- A saturation factor below 1.0 decreases the saturation (the image will appear less vibrant or more grayscale).
- A saturation factor above 1.0 increases the saturation (the image will appear more vibrant).

#### **Augmentation**

In [None]:
### Data Augmentation Helper
# data_augmentation = keras.Sequential(
#   [
#     keras.layers.RandomRotation(factor=0.10),
#     keras.layers.RandomTranslation(height_factor=0.1, width_factor=0.1),
#     keras.layers.RandomFlip("horizontal"),
#     keras.layers.RandomContrast(factor=0.5),
#     keras.layers.RandomBrightness(factor=.6),
      
#     # layers.RandomSaturation(.7, seed=516) ## Error
#     keras.layers.Lambda(lambda x: tf.image.random_saturation(x, lower=0.3, upper=1.2))
#   ]
# )

data_augmentation = keras.Sequential(
  [
    # keras.layers.Input((350, 350, 3)),
    keras.layers.RandomRotation(factor=0.40),
    keras.layers.RandomTranslation(height_factor=0.4, width_factor=0.4, fill_mode="reflect"),
    keras.layers.RandomFlip("horizontal_and_vertical"),
    # keras.layers.RandomCrop(300, 300),
    keras.layers.RandomContrast(factor=0.7), ## Values closer to 0 = minimal adjustment, higher to 1.0 allow larger contrast variations.
    keras.layers.RandomBrightness(factor=.7),
      
    # layers.RandomSaturation(.7, seed=516) ## Error
    keras.layers.Lambda(lambda x: tf.image.random_saturation(x, lower=0.7, upper=1.3))
  ]
)

<hr style="border:3px solid #301b66">

## **Model**

### **Define Model Design**

In [None]:
# Create the base model from the pre-trained model EfficientNetB3.
def build_model(num_classes, learning_r):
    inputs = layers.Input(shape=(IMG_SIZE, IMG_SIZE, 3))
       
    ### Adding Augmentation
    augmented_inputs = data_augmentation(inputs)  ## Added Line
    
    ### Add a rescaling layer to normalize pixel values to range [0, 1] !! Note-this caused a large decrease in performance...
    # normalized_inputs = Rescaling(scale=1.0 / 255)(augmented_inputs)
    
    model = EfficientNetB3(include_top=False, input_tensor=augmented_inputs, weights="imagenet")

    # Freeze the pretrained weights
    model.trainable = False

    # Rebuild top
    x = layers.GlobalAveragePooling2D(name="avg_pool")(model.output)
    x = layers.BatchNormalization()(x)

    top_dropout_rate = 0.20
    x = layers.Dropout(top_dropout_rate, name="top_dropout")(x)
    outputs = layers.Dense(num_classes, activation="softmax", name="pred")(x)
    # outputs = layers.Dense(1, activation="sigmoid", name="pred")(x) ## Use for Binary Cross Entropy

    # Compile
    model = keras.Model(inputs, outputs, name="EfficientNet")
    optimizer = keras.optimizers.Adam(learning_rate=learning_r)  ## originally 1e-2
    model.compile(
        # optimizer=optimizer, loss="categorical_crossentropy", metrics=["accuracy"]
        optimizer=optimizer, loss="sparse_categorical_crossentropy", metrics=["accuracy"]
        # optimizer=optimizer, loss="binary_crossentropy", metrics=["accuracy"]
    )
    return model

### **Build Model**

In [None]:
# Setting up parameters for model. 
IMG_SIZE = 300
learn_rate_land = 1e-4  ## For optimizer.

print(land_class_num) ## Check class number AND use as argument below.

In [None]:
### Create Model
landslide_model_ENB3 = build_model(num_classes=land_class_num, learning_r=learn_rate_land)
# landslide_model_ENB3 = build_model(num_classes=1) ## for when we use Sigmoid/Binary Cross Entropy

In [None]:
### Review of the model structure:   Confirm that no layers are open after set.
# landslide_model_ENB3.summary(show_trainable=True)

### **Fit**

In [None]:
epochs = 15  # @param {type: "slider", min:8, max:80}
landslide_ENB3_hist = landslide_model_ENB3.fit(train_dataset, epochs=epochs, validation_data=valid_dataset, class_weight=class_weights_dict)

In [None]:
## Plot results
visualize_model_results(landslide_ENB3_hist)

### **Testing**

In [None]:
print("Evaluate on test landslide images")
results = landslide_model_ENB3.evaluate(test_dataset)
print("test loss, test acc:", results)

In [None]:
# Get predictions and true labels
y_pred_probs = landslide_model_ENB3.predict(test_dataset)  # Predict probabilities
y_pred = np.argmax(y_pred_probs, axis=1)  # Get the class with the highest probability
y_true = np.concatenate([y for x, y in test_dataset], axis=0)  # Extract true labels

In [None]:
### Predict probabilities when using a Binary Optimizer (sigmoid output)

# y_pred_probs = landslide_model_ENB3.predict(test_dataset)  # Predict probabilities

# # Convert probabilities to binary predictions (threshold at 0.5)
# y_pred = (y_pred_probs > 0.5).astype(int).flatten()  # Convert to binary class (0 or 1)

# # Extract true labels from the dataset
# y_true = np.concatenate([y for x, y in test_dataset], axis=0)  # True labels

# # Print results
# print(f"Predicted probabilities: {y_pred_probs[:5]}")
# print(f"Binary predictions: {y_pred[:5]}")
# print(f"True labels: {y_true[:5]}")


In [None]:
## Confusion Matrix
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# Generate confusion matrix
cm = confusion_matrix(y_true, y_pred)

# Display the confusion matrix
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=test_dataset.class_names)
disp.plot(cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.show()


In [None]:
### Extract TP, TN, FP, FN - HH - Redesigned to make Landslides the TPs
# TN, FP, FN, TP = cm.ravel()
TP, FN, FP, TN = cm.ravel()

print(f"True Positives (TP): {TP}  False Negatives (FN): {FN}")
print(f"False Positives (FP): {FP} True Negatives (TN): {TN}")
print('\n')
print(f'Accuracy: {round((TP+TN)/(TP+TN+FP+FN),3)}')
precision_val = round(TP/(TP+FP),3); #print(precision_val)
print(f'Precision: TP / (TP + FP) | Positive predictions are correct. {precision_val}')
recall_val = round(TP/(TP+FN),3); #print(recall_val)
print(f'Recall: TP / (TP + FN) | Actual positive cases that are identified. {recall_val}')
specificity_val = round(TN/(TN+FP),3);
print(f'Specificity: TN / (TN + FP) | Actual negative cases that are identified. {specificity_val}')

F1_val = round(2*(precision_val*recall_val)/(precision_val+recall_val),3);  #print(F1_val)
print(f'F1 Score: 2 * (Precision * Recall) / (Precision + Recall) | Balance Measurement. {F1_val}')

### Identify Image File to Prediction|True Labels

In [None]:
file_paths = test_dataset.file_paths  # This retrieves file paths corresponding to each sample

# Get predictions
y_pred_probs = landslide_model_ENB3.predict(test_dataset)  # Predict probabilities
y_pred = np.argmax(y_pred_probs, axis=1)  # Predicted class

# Extract true labels
y_true = np.concatenate([y for _, y in test_dataset], axis=0)

# Map file paths to predictions
file_class_map = zip(file_paths, y_pred, y_true)  # Zip paths, predictions, and true labels

print("\nImages with predicted and true labels:")
for file_path, pred, true in file_class_map:
    if pred != true & true == 0:    ## Added to only see where prediction is INCORRECT...
        print(f"File: {file_path}, Predicted: {pred}, True Label: {true}")


<hr style="border:15px solid #301b66">

**<span style="color:#0638b2; font-size:30px">
Fine Tuning - Opening Up Feature Layers
</span>** <br>

### **Number of Layers**

In [None]:
# Freeze all layers
for layer in landslide_model_ENB3.layers:
    layer.trainable = False

In [None]:
## Number of Layers to Open!!!
layer_open = 12
layer_open

In [None]:
## Update Model
def unfreeze_model(model, layer_num):   
    # We unfreeze the top XXX layers while leaving BatchNorm layers frozen
    for layer in model.layers[-layer_num:]:
        if not isinstance(layer, layers.BatchNormalization):
            layer.trainable = True

    optimizer = keras.optimizers.Adam(learning_rate=1e-4)
    
    model.compile(
        optimizer=optimizer, loss="sparse_categorical_crossentropy", metrics=["accuracy"] )  

<hr style="border:3px solid #301b66">

### **Rebuild**

In [None]:
## Rebuild model with XX Layers open for Fine Tuning.
unfreeze_model(landslide_model_ENB3, layer_open)

In [None]:
### Review of the model structure:   Confirm that no layers are open after set.
# landslide_model_ENB3.summary(show_trainable=True)

## **Fit Model** 

In [None]:
unfreeze_model(landslide_model_ENB3, layer_open)

# epochs = 10  # @param {type: "slider", min:4, max:10}
# EFB3_hist = model.fit(ds_train, epochs=epochs, validation_data=ds_test)
# plot_hist(hist)

epochs = 10  # @param {type: "slider", min:8, max:80}
ENB3_hist_fine = landslide_model_ENB3.fit(train_dataset, epochs=epochs, validation_data=valid_dataset, 
                                          class_weight=class_weights_dict)


In [None]:
## Plot results
visualize_model_results(ENB3_hist_fine)

## **Testing**

In [None]:
print("Evaluate on test landslide images")
results = landslide_model_ENB3.evaluate(test_dataset)
print("test loss, test acc:", results)

In [None]:
# Get predictions and true labels
y_pred_probs = landslide_model_ENB3.predict(test_dataset)  # Predict probabilities
y_pred = np.argmax(y_pred_probs, axis=1)  # Get the class with the highest probability
y_true = np.concatenate([y for x, y in test_dataset], axis=0)  # Extract true labels

In [None]:
## Confusion Matrix
# from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# Generate confusion matrix
cm = confusion_matrix(y_true, y_pred)

# Display the confusion matrix
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=test_dataset.class_names)
disp.plot(cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.show()

In [None]:
### Extract TP, TN, FP, FN - HH - Redesigned to make Landslides the TPs
# TN, FP, FN, TP = cm.ravel()
TP, FN, FP, TN = cm.ravel()

print(f"True Positives (TP): {TP}  False Negatives (FN): {FN}")
print(f"False Positives (FP): {FP} True Negatives (TN): {TN}")
print('\n')
print(f'Accuracy: {round((TP+TN)/(TP+TN+FP+FN),3)}')
precision_val = round(TP/(TP+FP),3); #print(precision_val)
print(f'Precision: TP / (TP + FP) | Positive predictions are correct. {precision_val}')
recall_val = round(TP/(TP+FN),3); #print(recall_val)
print(f'Recall: TP / (TP + FN) | Actual positive cases that are identified. {recall_val}')
specificity_val = round(TN/(TN+FP),3);
print(f'Specificity: TN / (TN + FP) | Actual negative cases that are identified. {specificity_val}')

F1_val = round(2*(precision_val*recall_val)/(precision_val+recall_val),3);  #print(F1_val)
print(f'F1 Score: 2 * (Precision * Recall) / (Precision + Recall) | Balance Measurement. {F1_val}')

### Identify Image File to Prediction|True Labels

In [None]:
file_paths = test_dataset.file_paths  # This retrieves file paths corresponding to each sample

# Get predictions
y_pred_probs = landslide_model_ENB3.predict(test_dataset)  # Predict probabilities
y_pred = np.argmax(y_pred_probs, axis=1)  # Predicted class

# Extract true labels
y_true = np.concatenate([y for _, y in test_dataset], axis=0)

# Map file paths to predictions
file_class_map = zip(file_paths, y_pred, y_true)  # Zip paths, predictions, and true labels

print("\nImages with predicted and true labels:")
for file_path, pred, true in file_class_map:
    if pred != true & true == 0:    ## Added to only see where prediction is INCORRECT...
        print(f"File: {file_path}, Predicted: {pred}, True Label: {true}")


<hr style="border:8px solid #301b66">

**<span style="color:#0638b2; font-size:30px">
Train Model on Full Population
</span>** <br>

<hr style="border:8px solid #301b66">

**<span style="color:#0638b2; font-size:30px">
Saving Model for Future Use
</span>** <br>

In [None]:
# Calling `save('my_model.keras')` creates a zip archive `my_model.keras`.
# model.save("my_model.keras")
landslide_model_ENB3.save("/sfs/gpfs/tardis/home/waa4bq/Documents/MSDS/6050 Deep Learning/DS6050_Project/ENB3_One_40Lyr_hh.keras")