# **Notebook for Google Collab**

# This notebook is for use in Google Collab only, compiled in a single workflow.

## Objectives
Answer Business requirement 2: Binary Classification using Convolutional Neural Networks

* predict if a given leaf is infected or not judging by the presence of powdery mildew.
* use the CNN to map relationships between features and labels.
* build a binary classifier and generate reports.

## Inputs

* inputs/cherry-leaves-dataset/cherry-leaves/train
* inputs/cherry-leaves-dataset/cherry-leaves/test
* inputs/cherry-leaves-dataset/cherry-leaves/validation
* image shape embeddings pickle file

## Outputs
TODO





---

### ANNOTATE MODEL VERSION

In [1]:
version = 'v3'  # change as needed

### Import regular packages

In [2]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.image import imread

### Change working directory

We need to change the working directory from its current folder to its parent folder
* We access the current directory with os.getcwd()

In [None]:
import os
current_dir = os.getcwd()
current_dir

We want to make the parent of the current directory the new current directory
* os.path.dirname() gets the parent directory
* os.chir() defines the new current directory

In [None]:
os.chdir(os.path.dirname(current_dir))
print("You set a new current directory")

Confirm the new current directory

In [None]:
current_dir = os.getcwd()
current_dir

In [6]:
!pip install kaggle==1.5.12

In [7]:
# manually upload kaggle.json
from google.colab import files
files.upload()

In [8]:
# allow kaggle.json access
os.environ['KAGGLE_CONFIG_DIR'] = os.getcwd()
! chmod 600 kaggle.json

In [9]:
KaggleDatasetPath = "codeinstitute/cherry-leaves"
DestinationFolder = "inputs/cherry-leaves-dataset"  # creates new dir/dir
! kaggle datasets download -d {KaggleDatasetPath} -p {DestinationFolder}

In [10]:
import zipfile

try:
    with zipfile.ZipFile(DestinationFolder + '/cherry-leaves.zip', 'r') as zip_ref:
        zip_ref.extractall(DestinationFolder)
except Exception as e:
    print(e)


### Set input directory

In [12]:
my_data_dir = 'inputs/cherry-leaves-dataset/cherry-leaves'
train_path = my_data_dir + '/train'
val_path = my_data_dir + '/validation'
test_path = my_data_dir + '/test'

### Set output directory

In [None]:

file_path = f'outputs/{version}'

if 'outputs' in os.listdir(current_dir) and version in os.listdir(current_dir + '/outputs'):
    print('Old version is already available, create a new version.')
    pass
else:
    os.makedirs(name=file_path)

### Gather labels

In [None]:
try:
    labels = os.listdir(train_path)
except:
    labels = ['healthy', 'powdery_mildew']

print(f"Project Labels: {labels}")

### Load image shape embeddings

In [None]:
import joblib
version_im = 'v1'  # original (should remain unchanged)

try:
    # Import saved image shape embedding pickle file
    image_shape = joblib.load(filename=f"outputs/{version_im}/image_shape.pkl")

except:
    # for google collab
    image_shape = (256, 256, 3)

finally:
    print(image_shape)



## Validate image files:

In [16]:
Uncomment if using google collab

def remove_non_image_files(my_data_dir):
    print('Removing non image files...\n')
    image_extension = ('.png', '.jpg', 'jpeg')
    folders = os.listdir(my_data_dir)
    for folder in folders:
        files = os.listdir(f'{my_data_dir}/{folder}')
        # print files
        non_image = []
        image_count = []
        for given_file in files:
            try:
                if not given_file.lower().endswith(image_extension):
                    file_location = f'{my_data_dir}/{folder}/{given_file}'
                    os.remove(file_location) # remove non image file
                    non_image.append(1)
                else:
                    image_count.append(1)
                    pass
            except Exception as e:
                print(e)

        print(f'Folder: {folder} has - {len(image_count)} image files')
        print(f'Folder: {folder} has - {len(non_image)} non image files, which have been removed')

In [17]:
remove_non_image_files('inputs/cherry-leaves-dataset/cherry-leaves')

# Split train, val, test sets with dirs

In [18]:
import os
import shutil
import random
import joblib

def split_train_validation_test_images(my_data_dir, train_set_ratio, validation_set_ratio, test_set_ratio):

    # confirm ratios total 1.0
    if train_set_ratio + validation_set_ratio + test_set_ratio != 1.0:
        print('Ratios should total 1.0.')
        print('You entered:\n')
        print(f'Train radio: {train_set_ratio}')
        print(f'Validation radio: {validation_set_ratio}')
        print(f'Test radio: {test_set_ratio}')
        return 

    # get classes labels
    labels = os.listdir(my_data_dir)  # expect only folder name
    if 'test' in labels:
        pass
    else:
        try:
            # create train, test folders with classes labels sub-folder
            for folder in ['train', 'validation', 'test']:
                for label in labels:
                    os.makedirs(name=f'{my_data_dir}/{folder}/{label}')
            
            for label in labels:

                files = os.listdir(f'{my_data_dir}/{label}')
                random.seed(42)
                random.shuffle(files)

                train_set_files_qty = int(len(files) * train_set_ratio)
                validation_set_files_qty = int(len(files) * validation_set_ratio)

                count = 1
                for file_name in files:
                    if count <= train_set_files_qty:
                        # move given file to train set
                        shutil.move(f'{my_data_dir}/{label}/{file_name}',
                                    f'{my_data_dir}/train/{label}/{file_name}')
                    elif count <= (train_set_files_qty + validation_set_files_qty):
                        # move a given file to the validation set
                        shutil.move(f'{my_data_dir}/{label}/{file_name}',
                                    f'{my_data_dir}/validation/{label}/{file_name}')
                    else:
                        # move given file to test set
                        shutil.move(f'{my_data_dir}/{label}/{file_name}',
                                    f'{my_data_dir}/test/{label}/{file_name}')
                    
                    count += 1

                os.rmdir(f'{my_data_dir}/{label}')
            
        except Exception as e:
            print(e)
    
    print('Done!')


In [19]:
split_train_validation_test_images(
    my_data_dir='inputs/cherry-leaves-dataset/cherry-leaves',
    train_set_ratio=0.7,
    validation_set_ratio=0.1,
    test_set_ratio=0.2
)

***

# Review class distribution

* across whole dataset
* per train, test, and validation

In [None]:
df_freq = pd.DataFrame([])
total_images_count = 0

# gather info
for folder in ['train', 'validation', 'test']:
    for label in labels:

        path = my_data_dir + '/' + folder + '/' + label
        
        image_count = int(len(os.listdir(path)))
        total_images_count += image_count

        try:
            df_freq = df_freq.append(pd.Series({'Set': folder,'Label': label,'Frequency': image_count}), ignore_index=True )
            print(f"* {folder}- {label}: {image_count} images\n")
        except:
            # for google collab functionality 
            df_freq = df_freq.concat(pd.Series({'Set': folder,'Label': label,'Frequency': image_count}), ignore_index=True )
            print(f"* {folder}- {label}: {image_count} images\n")


print(f'{total_images_count} images total')
print('--------')

### plot class distribution
plt.figure(figsize=(10, 6))
sns.barplot(x='Set', y='Frequency', hue='Label', data=df_freq)
plt.title('Class Distribution')
plt.savefig(f'{file_path}/class_distribution.png', bbox_inches='tight', dpi=600)
plt.show()
print('\n')

print('--------')

# confirm percentages of dataset
df_freq.set_index('Label', inplace=True)
df_freq['Percent of DataSet'] = round(df_freq['Frequency'] / total_images_count * 100)

print(df_freq)

We can confirm that train, validation and test set percentages of dataset are split as expected, and that there are equal amounts of both classes (healthy and powdery_mildew) in each set.

***

In [None]:
import tensorflow as tf


# Image Augmentation

### Define image data generator, initialize


In [22]:
# This function generates batches of image data with real-time data augmentation.
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Initialize
augmented_image_data = ImageDataGenerator(rotation_range=30,
                                          width_shift_range=0.15,
                                          height_shift_range=0.15,
                                          brightness_range=[0.8, 1.2],
                                          shear_range=0.1,
                                          zoom_range=0.2,
                                          horizontal_flip=True,
                                          vertical_flip=True,
                                          fill_mode='nearest',
                                          rescale=1./255
                                          )

In [23]:
# define batch size
batch_size = 16

### Augment TRAINING image dataset


In [None]:
train_set = augmented_image_data.flow_from_directory(train_path,
                                                     target_size=image_shape[:2],
                                                     color_mode='rgb',
                                                     batch_size=batch_size,
                                                     class_mode='binary',
                                                     shuffle=True,
                                                     seed=42
                                                     follow_links=False
                                                     )


train_set.class_indices

### Rescale validation image dataset


In [None]:
validation_set = ImageDataGenerator(rescale=1./255).flow_from_directory(val_path,
                                                                        target_size=image_shape[:2],
                                                                        color_mode='rgb',
                                                                        batch_size=batch_size,
                                                                        class_mode='binary',
                                                                        shuffle=False
                                                                        )

validation_set.class_indices

### Rescale test image dataset

In [None]:
test_set = ImageDataGenerator(rescale=1./255).flow_from_directory(test_path,
                                                                  target_size=image_shape[:2],
                                                                  color_mode='rgb',
                                                                  batch_size=batch_size,
                                                                  class_mode='binary',
                                                                  shuffle=False
                                                                  )

test_set.class_indices

### Plot augmented training images

In [None]:
for _ in range(5):
    try:
        img, label = train_set.next()
    except:
        # for google collab functionality
        img, label = next(train_set)

    print(f'{img.shape}\n')  # expect: (20, 256, 256, 3)
    plt.imshow(img[0])
    print('--------------')
    plt.show()
    

### Plot augmented validation and test images

In [None]:
for _ in range(5):  
    try:
        img, label = validation_set.next()
    except:
        # for google collab functionality
        img, label = next(validation_set)
    print(f'{img.shape}\n')
    plt.imshow(img[0])
    print('--------------')
    plt.show()


###  Observations
Augmented validation and test images have been standardized between 0 to 255 pixels. As you can see, the images are ugmented and are ready to be used for developing and training a CNN model.

### Save class indices

In [None]:
joblib.dump(value=train_set.class_indices,
            filename=f"{file_path}/class_indices.pkl")

---

# Model Creation

---

### ML Model

* Import model packages

In [30]:
# TODO remove? for google collab functionality
# !pip install tensorflow
# !pip install keras_tuner 

# TODO v3:
# TODO import keras_tuner as kt 
# TODO import Adam?
# TODO add check for tf version for continuity between collab and other environments
# TODO uncomment to find collab dependencies
# TODO remove? !pip freeze > collab_requirements.txt

In [None]:
import tensorflow as tf
print(f'tf version: {tf.__version__}')

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Activation, Dropout, Flatten, Dense, Conv2D, MaxPooling2D, BatchNormalization


* ### Model

In [32]:
# TODO v3 add hp as param if using hypertuner
def create_tf_model():
    """
    Creates a CNN model for binary classification of leaf images
    TODO add to readme instead:
    Documentation of process: 

    v1:
    - 4 convolution layers, c.7m trainable params. Early stopping included. No batch normalisation. Batch size 20.
    - bizarre results with 100% accuracy: concerns of data leakage.

    v2:
    - no early stopping to observe model development over longer period of epochs (25)
    - removed one convolution layer: 4 may have been too complex for the small dataset
        - instead, v2 convolution layer 3 has largest number of filters
    - v2 includes batch normalisation before final dense layer
    - results were unreadable: input ran out of data and interrupted training.
        - steps per epoch needs revision

    V3 :
    - Findings showed that augmented training data was only used on first batch and was not recalled, hense the input ran out of data on subsequent epochs.
    - no early stopping yet - allow full 25 epochs for evaluation first
    - when fitting model, steps per epoch were handled directly by keras (which should be the same as train_set.samples // batch_size), but should call augmentation each time.
    - TODO Findings:
        - this showed no input data running out 
        - keep early stop

    V4 plans: 
    - TODO deicde add hyperparam optimisation
    - TODO CONV layers descending order instead (128 filters, then 64, then 32)

    """
    model = Sequential()

    # Input layer: CONV1
    model.add(Conv2D(filters=32, kernel_size=(3, 3),
        input_shape=image_shape,  # average image shape
        activation='relu', ))
    model.add(MaxPooling2D(pool_size=(2,2)))

    # CONV2
    model.add(Conv2D(filters=64, kernel_size=(3, 3),
        activation='relu', ))
    model.add(MaxPooling2D(pool_size=(2,2)))

    # CONV3
    model.add(Conv2D(
        filters=128,  # increase
        kernel_size=(3,3),
        activation='relu', ))
    # TODO v3: consider adding normalisation here too
    # model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(2,2)))

    # Flatten
    model.add(Flatten())

    # TODO v3: Tune the number of units in the first Dense layer
    # Choose an optimal value between 32-512
    # hp_units = hp.Int('units', min_value=32, max_value=512, step=32)
    # model.add(Dense(units=hp_units, activation='relu'))
    model.add(Dense(128, activation='relu'))
    model.add(BatchNormalization())  

    model.add(Dropout(0.4))

    model.add(Dense(1, activation='sigmoid'))

    # TODO v3 Tune the learning rate for the optimizer
    # Choose an optimal value from 0.01, 0.001, or 0.0001
    # hp_learning_rate = hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])

    # Compile
    model.compile(
        loss='binary_crossentropy',
        optimizer='adam',
        # TODO edit optimizer=Adam(learning_rate=hp_learning_rate), - import beforehand
        metrics=[
            'accuracy',
        ])

    return model

## Instantiate tuner and perform hypertuning

In [None]:
summary = create_tf_model().summary()

* Early Stopping

    * Avoid overfitting

In [34]:
from tensorflow.keras.callbacks import EarlyStopping
early_stop = EarlyStopping(monitor='val_loss', patience=10, verbose=3)  # increase patience from 3 to 10

### Run hypertune search

In [35]:
# TODO v3
# SOURCE https://www.tensorflow.org/tutorials/keras/keras_tuner

# tuner.search(img_train, label_train, epochs=50, validation_split=0.2, callbacks=[stop_early])

# # Get the optimal hyperparameters
# best_hps=tuner.get_best_hyperparameters(num_trials=1)[0]

# print(f"""
# The hyperparameter search is complete. The optimal number of units in the first densely-connected
# layer is {best_hps.get('units')} and the optimal learning rate for the optimizer
# is {best_hps.get('learning_rate')}.
# """)
# _________
# # then reinstantiate hyermodel and train it with optimal number of epochs from above
# hypermodel = tuner.hypermodel.build(best_hps)

# # Retrain the model
# hypermodel.fit(img_train, label_train, epochs=best_epoch, validation_split=0.2)
# _________

# eval_result = hypermodel.evaluate(img_test, label_test)
# print("[test loss, test accuracy]:", eval_result)


### Visaulise Model

In [None]:
from tensorflow.keras.utils import plot_model

try:
    model = create_tf_model()
    plot_model(model, show_shapes=True, to_file=f'model_{version}.png')
except Exception as e:
    print(e)
    pass


***

In [None]:
# TODO remove before submit
print(f'Train set object: {train_set}')
print(f"Number of samples in training set: {train_set.samples}")
print(f"Number of classes: {len(train_set.classes)}")
print(f"Batch size: {batch_size}")
print(f"Current steps calculation (classes/batch_size): {len(train_set.classes) // batch_size}")
print(f"Correct steps calculation (samples/batch_size): {train_set.samples // batch_size}")
print(f"Number of validation samples: {validation_set.samples}")
print(f"Validation steps per epoch: {validation_set.samples // batch_size}")

# Create Model

In [None]:
# # TODO remove? Build the model with the optimal hyperparameters and train it on the data for 50 epochs
# model = tuner.hypermodel.build(best_hps)
# history = model.fit(img_train, label_train, epochs=50, validation_split=0.2)

# val_acc_per_epoch = history.history['val_accuracy']
# best_epoch = val_acc_per_epoch.index(max(val_acc_per_epoch)) + 1
# print('Best epoch: %d' % (best_epoch,))

model = create_tf_model()

model.fit(train_set,
          epochs=25,
          steps_per_epoch=None, # None is equal to the number of samples in your dataset divided by the batch size
          validation_data=validation_set,
          validation_steps=None,  #  validation will run until the validation_data dataset is exhausted
          #   TODO v4: add back: 
          callbacks=[early_stop],
          verbose=1
          )

### Save model

In [None]:
model.save(f'outputs/{version}/cherry-tree-model.h5')

*** 

# Evaluate Model Performance 

* Load model

In [None]:
# # TODO remove if required
# import joblib
# file = f'/workspace/cherry-ML/outputs/{version}/cherry-tree-model.h5'
# model = joblib.load(file)

In [72]:
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.metrics import confusion_matrix, classification_report

def extract_performance_from_history(model, history):
    """
    Extract and analyze performance metrics from model training history
    
    Parameters:
    - model: Trained Keras model
    - history: Model training history object
    
    Returns:
    - Dictionary of performance metrics
    """
    # Extract metrics from history
    performance_metrics = {
        'training': {
            'loss': history.history.get('loss', []),
            'accuracy': history.history.get('accuracy', []),
            'val_loss': history.history.get('val_loss', []),
            'val_accuracy': history.history.get('val_accuracy', [])
        }
    }
    
    # Calculate best epoch and corresponding metrics
    best_train_accuracy = max(performance_metrics['training']['accuracy'])
    best_val_accuracy = max(performance_metrics['training']['val_accuracy'])
    best_train_loss = min(performance_metrics['training']['loss'])
    best_val_loss = min(performance_metrics['training']['val_loss'])
    
    performance_metrics['best_metrics'] = {
        'best_train_accuracy': best_train_accuracy,
        'best_val_accuracy': best_val_accuracy,
        'best_train_loss': best_train_loss,
        'best_val_loss': best_val_loss
    }
    
    return performance_metrics

def plot_training_history(history):
    """
    Create plots for training and validation metrics
    
    Parameters:
    - history: Model training history object
    
    Returns:
    - Matplotlib figure with subplots
    """
    import matplotlib.pyplot as plt
    
    plt.figure(figsize=(12, 4))
    
    # Plot training & validation accuracy values
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('Model Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='lower right')
    
    # Plot training & validation loss values
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('Model Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='upper right')
    
    plt.tight_layout()
    return plt

def print_model_summary(model):
    """
    Print a detailed summary of the model architecture
    
    Parameters:
    - model: Keras model
    """
    # Capture model summary as a string
    from io import StringIO
    import sys
    
    # Redirect stdout to capture model summary
    old_stdout = sys.stdout
    model_summary = StringIO()
    sys.stdout = model_summary
    
    model.summary()
    
    # Restore stdout and get the summary
    sys.stdout = old_stdout
    summary_text = model_summary.getvalue()
    
    print("Model Architecture Summary:")
    print(summary_text)
    
    # Calculate total parameters
    total_params = sum([np.prod(K.get_value(w).shape) for w in model.trainable_weights])
    trainable_params = sum([np.prod(K.get_value(w).shape) for w in model.trainable_weights])
    non_trainable_params = sum([np.prod(K.get_value(w).shape) for w in model.non_trainable_weights])
    
    print(f"\nTotal Parameters: {total_params:,}")
    print(f"Trainable Parameters: {trainable_params:,}")
    print(f"Non-Trainable Parameters: {non_trainable_params:,}")

# Example usage:
def comprehensive_model_analysis(model, history, train_set, validation_set, test_set, labels):
    """
    Provide comprehensive analysis of model performance
    
    Parameters:
    - model: Trained Keras model
    - history: Model training history
    - train_set, validation_set, test_set: Data generators
    - labels: List of class labels
    """
    # 1. Print model summary
    print_model_summary(model)
    
    # 2. Extract performance metrics
    performance_metrics = extract_performance_from_history(model, history)
    
    # 3. Print performance metrics
    print("\n--- Performance Metrics ---")
    for metric, value in performance_metrics['best_metrics'].items():
        print(f"{metric.replace('_', ' ').title()}: {value:.4f}")
    
    # 4. Plot training history
    plt = plot_training_history(history)
    plt.show()
    
    # 5. Generate confusion matrices
    print("\n--- Confusion Matrices ---")
    clf_performance(model, train_set, validation_set, test_set, labels)




### Plot model training loss and accuracy 

In [None]:
# Run analysis
extract_performance_from_history(model, model.history.history)
plot_training_history(model.history.history)
print_model_summary(model)
comprehensive_model_analysis(model, history, train_set, validation_set, test_set, labels=labels)

In [None]:
# TODO do I need this as well as the functions above?
losses = pd.DataFrame(model.history.history)
losses[['loss', 'val_loss']].plot(style='.-')
plt.title('Loss')
plt.savefig(f'{file_path}/training_loss.png', bbox_inches='tight', dpi=600)
print('\n')
losses[['accuracy', 'val_accuracy']].plot(style='.-')
plt.title('Accuracy')
plt.savefig(f'{file_path}/training_accuracy.png', bbox_inches='tight', dpi=600)

### Evaluate and save

In [None]:
evaluation = model.evaluate(test_set)
try:
    evaluation_train = model.evaluate(train_set)
    evaluation_val = model.evaluate(validation_set)
except Exception as e:
    print(e)
    pass

In [None]:
joblib.dump(value=evaluation,
            filename=f"outputs/v1/evaluation.pkl")

# Run Live Prediction 

In [None]:
from tensorflow.keras.preprocessing import image

pointer = 66  # TODO change to random within length of dir
label = labels[1]  # select Uninfected or Parasitised # TODO change to random (0 or 1) and print image class too

pil_image = image.load_img(test_path + '/' + label + '/' + os.listdir(test_path+'/' + label)[pointer],
                           target_size=image_shape, color_mode='rgb')
print(f'Image shape: {pil_image.size}, Image mode: {pil_image.mode}')
pil_image

In [None]:
pred_img = image.img_to_array(pil_image)
pred_img = np.expand_dims(pred_img, axis=0)/255
print(pred_img.shape)
pred_img

In [None]:
# predict class probability on test image
pred_proba = model.predict(pred_img)[0, 0] # TODO why 0 0?

target_map = {v: k for k, v in train_set.class_indices.items()}
pred_class = target_map[pred_proba > 0.5]

if pred_class == target_map[0]:
  pred_proba = 1 - pred_proba

print(f'Prediction: {pred_class}\nConfidence: {pred_proba*100:.1f}%') # TODO do I want more decimal places?

*** 