# Import Libraries

In [1]:
import numpy as np 
import pandas as pd
import os, sys, json, cv2, time, glob, gc
import matplotlib.pyplot as plt
import tensorflow as tf

from tensorflow.keras.optimizers import Adam
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, GlobalAveragePooling2D
from sklearn.model_selection import train_test_split
from kerastuner import HyperModel
from kerastuner.tuners import RandomSearch

  from kerastuner import HyperModel


# Configurations

In [2]:
# Setup the paths to train and test images
TRAIN_DIR = './global-wheat-detection/train/'
TEST_DIR = './global-wheat-detection/test/'
TRAIN_CSV_PATH = './global-wheat-detection/train.csv'
AUG_SAVE_DIR = './global-wheat-detection/augmented_images/'

# Data Loading

In [3]:
# df = pd.read_csv('./df_augment.csv')
df = pd.read_csv('./df_full.csv')
df.head()

Unnamed: 0,image_id,x,y,w,h,source
0,75d62c43e,863.0,741.0,132.0,67.0,arvalis_3
1,75d62c43e,844.0,980.0,133.0,44.0,arvalis_3
2,75d62c43e,736.0,798.0,68.0,63.0,arvalis_3
3,75d62c43e,825.0,630.0,137.0,68.0,arvalis_3
4,75d62c43e,853.0,223.0,152.0,142.0,arvalis_3


In [4]:
def load_image_and_bbox(image_id, bbox):
    image = tf.io.read_file(image_id)
    image = tf.image.decode_jpeg(image, channels=3)
    # image = tf.image.resize(image, [224, 224])
    return image, bbox

def parse_dataframe(df, TRAIN_DIR=TRAIN_DIR, AUG_SAVE_DIR=AUG_SAVE_DIR):
    image_ids = []
    for i, row in df.iterrows():
        image_id = row['image_id']
        ori_image_path = f'{TRAIN_DIR}/{image_id}.jpg'
        augmented_image_path = f'{AUG_SAVE_DIR}/{image_id}.jpg'
        if os.path.exists(ori_image_path):
            image_path = ori_image_path
        else:
            image_path = augmented_image_path
        image_ids.append(image_path)
    bboxes = df[['x', 'y', 'w', 'h']].values
    return image_ids, bboxes

def create_dataset(image_paths, bboxes, batch_size=32):
    dataset = tf.data.Dataset.from_tensor_slices((image_paths, bboxes))
    dataset = dataset.map(lambda x, y: load_image_and_bbox(x, y), num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return dataset

image_ids, bboxes = parse_dataframe(df[:3200])


In [5]:
# Split the dataset
train_paths, test_paths, train_bboxes, test_bboxes = train_test_split(image_ids, bboxes, test_size=0.4, random_state=42)
test_paths, val_paths, test_bboxes, val_bboxes = train_test_split(test_paths, test_bboxes, test_size=0.5, random_state=42)

# Create TensorFlow Datasets
train_dataset = create_dataset(train_paths, train_bboxes)
val_dataset = create_dataset(val_paths, val_bboxes)
test_dataset = create_dataset(test_paths, test_bboxes)

train_dataset = train_dataset.map(lambda x, y: (tf.cast(x, tf.float32), tf.cast(y, tf.float32)))
val_dataset = val_dataset.map(lambda x, y: (tf.cast(x, tf.float32), tf.cast(y, tf.float32)))
test_dataset = test_dataset.map(lambda x, y: (tf.cast(x, tf.float32), tf.cast(y, tf.float32)))


In [6]:
for images, bboxes in train_dataset.take(1):
    print(images.shape, bboxes.shape)
for images, bboxes in test_dataset.take(1):
    print(images.shape, bboxes.shape)
for images, bboxes in val_dataset.take(1):
    print(images.shape, bboxes.shape)

(32, 1024, 1024, 3) (32, 4)


2024-08-17 18:00:25.505803: I tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


(32, 1024, 1024, 3) (32, 4)


2024-08-17 18:00:25.783944: I tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


(32, 1024, 1024, 3) (32, 4)


In [7]:
class CustomCNNModel:
    def __init__(self, input_shape=(1024, 1024, 3)):
        self.input_shape = input_shape
        self.model = self.build_model()

    def build_model(self):
        # Check for GPU availability
        if tf.config.list_physical_devices('GPU'):
            device = '/GPU:0'
        else:
            device = '/CPU:0'
        
        with tf.device(device):
            model = Sequential()
            
            # First Convolutional Block
            model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(1024, 1024, 3)))
            model.add(MaxPooling2D((2, 2)))

            # Second Convolutional Block
            model.add(Conv2D(64, (3, 3), activation='relu'))
            model.add(MaxPooling2D((2, 2)))

            # Third Convolutional Block
            model.add(Conv2D(128, (3, 3), activation='relu'))
            model.add(MaxPooling2D((2, 2)))

            # Fourth Convolutional Block
            model.add(Conv2D(256, (3, 3), activation='relu'))
            model.add(MaxPooling2D((2, 2)))

            # Fifth Convolutional Block
            model.add(Conv2D(512, (3, 3), activation='relu'))
            model.add(MaxPooling2D((2, 2)))

            # Global Average Pooling instead of Flattening
            model.add(GlobalAveragePooling2D())

            # Fully Connected Layer with Dropout
            model.add(Dense(512, activation='relu'))
            model.add(Dropout(0.5))

            # Output Layer
            model.add(Dense(4, activation='softmax'))  # Output layer with 4 units for bounding box coordinates (whxy)
        return model

    def compile_model(self):
        self.model.compile(optimizer='adam', loss='mse', metrics=['accuracy', 'mae', 'mse', tf.keras.metrics.RootMeanSquaredError(name='rmse')])

    def train(self, train_dataset, val_dataset, epochs=10):
        history = self.model.fit(train_dataset, validation_data=val_dataset, epochs=epochs)
        return history

    def evaluate(self, test_dataset):
        test_loss = self.model.evaluate(test_dataset)
        print(f'Test Loss: {test_loss}')
        return test_loss

# Initialize the CustomCNNModel class
custom_cnn_model = CustomCNNModel(input_shape=(1024, 1024, 3))

# Compile the model
custom_cnn_model.compile_model()

custom_cnn_model.model.summary()

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [8]:
# Train the model
history = custom_cnn_model.train(train_dataset, val_dataset, epochs=2)

# Evaluate the model on the test set
test_loss = custom_cnn_model.evaluate(test_dataset)

Epoch 1/2
[1m47/60[0m [32m━━━━━━━━━━━━━━━[0m[37m━━━━━[0m [1m7:16[0m 34s/step - loss: 162751.2812

In [None]:
# Extract metrics from the history object
accuracy = history.history.get('accuracy', [])
val_accuracy = history.history.get('val_accuracy', [])
loss = history.history.get('loss', [])
val_loss = history.history.get('val_loss', [])
mae = history.history.get('mae', [])
val_mae = history.history.get('val_mae', [])
rmse = history.history.get('rmse', [])
val_rmse = history.history.get('val_rmse', [])
learning_rate = history.history.get('lr', [])

# Plot training & validation accuracy values
plt.figure(figsize=(12, 8))
plt.subplot(2, 2, 1)
plt.plot(accuracy)
plt.plot(val_accuracy)
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

# Plot training & validation loss values
plt.subplot(2, 2, 2)
plt.plot(loss)
plt.plot(val_loss)
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

# Plot training & validation MAE values
plt.subplot(2, 2, 3)
plt.plot(mae)
plt.plot(val_mae)
plt.title('Model MAE')
plt.ylabel('MAE')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

# Plot training & validation RMSE values
plt.subplot(2, 2, 4)
plt.plot(rmse)
plt.plot(val_rmse)
plt.title('Model RMSE')
plt.ylabel('RMSE')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

# Plot learning rate
if learning_rate:
    plt.figure(figsize=(6, 4))
    plt.plot(learning_rate)
    plt.title('Learning Rate')
    plt.ylabel('Learning Rate')
    plt.xlabel('Epoch')

plt.tight_layout()
plt.show()

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, GlobalAveragePooling2D, Dense, Dropout
from kerastuner import HyperModel, RandomSearch

class CustomCNNHyperModel(HyperModel):
    def __init__(self, input_shape=(1024, 1024, 3)):
        self.input_shape = input_shape

    def build(self, hp):
        model = Sequential()
        
        # First Convolutional Block
        model.add(Conv2D(hp.Int('conv_1_filters', min_value=32, max_value=128, step=32), 
                         (3, 3), activation='relu', input_shape=self.input_shape))
        model.add(MaxPooling2D((2, 2)))

        # Second Convolutional Block
        model.add(Conv2D(hp.Int('conv_2_filters', min_value=64, max_value=256, step=64), 
                         (3, 3), activation='relu'))
        model.add(MaxPooling2D((2, 2)))

        # Third Convolutional Block
        model.add(Conv2D(hp.Int('conv_3_filters', min_value=128, max_value=512, step=128), 
                         (3, 3), activation='relu'))
        model.add(MaxPooling2D((2, 2)))

        # Fourth Convolutional Block
        model.add(Conv2D(hp.Int('conv_4_filters', min_value=256, max_value=1024, step=256), 
                         (3, 3), activation='relu'))
        model.add(MaxPooling2D((2, 2)))

        # Fifth Convolutional Block
        model.add(Conv2D(hp.Int('conv_5_filters', min_value=512, max_value=2048, step=512), 
                         (3, 3), activation='relu'))
        model.add(MaxPooling2D((2, 2)))

        # Global Average Pooling instead of Flattening
        model.add(GlobalAveragePooling2D())

        # Fully Connected Layer with Dropout
        model.add(Dense(hp.Int('dense_units', min_value=256, max_value=1024, step=256), activation='relu'))
        model.add(Dropout(hp.Float('dropout_rate', min_value=0.2, max_value=0.5, step=0.1)))

        # Output Layer
        model.add(Dense(4, activation='softmax'))  # Output layer with 4 units for bounding box coordinates (whxy)

        # Compile the model
        model.compile(optimizer=tf.keras.optimizers.Adam(hp.Float('learning_rate', min_value=1e-4, max_value=1e-2, sampling='LOG')),
                      loss='mse')
        return model

# Initialize the hypermodel
hypermodel = CustomCNNHyperModel(input_shape=(1024, 1024, 3))

# Initialize the tuner
tuner = RandomSearch(
    hypermodel,
    objective='val_loss',
    max_trials=10,
    executions_per_trial=1,
    directory='hyperparameter_tuning',
    project_name='custom_cnn_model'
)

# Display search space summary
tuner.search_space_summary()

# Run the hyperparameter search
tuner.search(train_dataset, validation_data=val_dataset, epochs=10)

# Get the best model
best_model = tuner.get_best_models(num_models=1)[0]

# Evaluate the best model
best_model.evaluate(test_dataset)

# Display the best hyperparameters
best_hyperparameters = tuner.get_best_hyperparameters(num_trials=1)[0]
print(best_hyperparameters.values)

In [None]:
best_model.compile(optimizer='adam', loss='mse', metrics=['accuracy', 'mae', 'mse', tf.keras.metrics.RootMeanSquaredError(name='rmse')])

# Train the best model
history = best_model.fit(train_dataset, validation_data=val_dataset, epochs=10)

# Plot training & validation accuracy values
plt.figure(figsize=(15, 10))

# Plot training & validation accuracy values
plt.subplot(3, 2, 1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

# Plot training & validation loss values
plt.subplot(3, 2, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

# Plot training & validation MAE values
plt.subplot(3, 2, 3)
plt.plot(history.history['mae'])
plt.plot(history.history['val_mae'])
plt.title('Model MAE')
plt.ylabel('MAE')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

# Plot training & validation MSE values
plt.subplot(3, 2, 4)
plt.plot(history.history['mse'])
plt.plot(history.history['val_mse'])
plt.title('Model MSE')
plt.ylabel('MSE')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

# Plot training & validation RMSE values
plt.subplot(3, 2, 5)
plt.plot(history.history['rmse'])
plt.plot(history.history['val_rmse'])
plt.title('Model RMSE')
plt.ylabel('RMSE')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

# Plot learning rate
if 'lr' in history.history:
    plt.subplot(3, 2, 6)
    plt.plot(history.history['lr'])
    plt.title('Learning Rate')
    plt.ylabel('Learning Rate')
    plt.xlabel('Epoch')

plt.tight_layout()
plt.show()