In [2]:
import os
import pandas as pd
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import layers, models
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, CSVLogger, LearningRateScheduler, ReduceLROnPlateau
from tensorflow.keras.mixed_precision import set_global_policy
import math
import multiprocessing

try:
    import psutil
    monitor_memory = True
except ImportError:
    monitor_memory = False
    print("psutil not installed; memory monitoring disabled. Install with: pip install psutil")


set_global_policy('mixed_float16')

gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

data_dir = 'D:\Major Project\Dataset\Resized, Augmented\Colored_Augmented'
batch_size = 64
target_size = (224, 224)
image_paths = []
labels = []
classes = sorted(os.listdir(data_dir))
num_classes = len(classes)

for class_name in classes:
    class_dir = os.path.join(data_dir, class_name)
    for img_name in os.listdir(class_dir):
        if img_name.lower().endswith(('.jpg', '.jpeg', '.png')):
            img_path = os.path.join(class_dir, img_name)
            image_paths.append(img_path)
            labels.append(class_name)


df = pd.DataFrame({'image_path': image_paths, 'label': labels})
train_df, temp_df = train_test_split(df, test_size=0.2, stratify=df['label'], random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['label'], random_state=42)


class_weights = compute_class_weight('balanced', classes=np.unique(labels), y=labels)
class_weight_dict = dict(enumerate(class_weights))

train_datagen = ImageDataGenerator(
    preprocessing_function=preprocess_input,
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    brightness_range=[0.8, 1.2],
    channel_shift_range=20,
    fill_mode='nearest'
)
val_datagen = ImageDataGenerator(preprocessing_function=preprocess_input)
test_datagen = ImageDataGenerator(preprocessing_function=preprocess_input)

workers = min(multiprocessing.cpu_count(), 4)

train_generator = train_datagen.flow_from_dataframe(
    dataframe=train_df,
    x_col='image_path',
    y_col='label',
    target_size=target_size,
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=True,
    workers=workers,
    use_multiprocessing=True,
    max_queue_size=10
)

val_generator = val_datagen.flow_from_dataframe(
    dataframe=val_df,
    x_col='image_path',
    y_col='label',
    target_size=target_size,
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=False,
    workers=workers,
    use_multiprocessing=True,
    max_queue_size=10
)

test_generator = test_datagen.flow_from_dataframe(
    dataframe=test_df,
    x_col='image_path',
    y_col='label',
    target_size=target_size,
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=False,
    workers=workers,
    use_multiprocessing=True,
    max_queue_size=10
)

def generator_to_tfdata(generator, cache=False):
    dataset = tf.data.Dataset.from_generator(
        lambda: generator,
        output_types=(tf.float32, tf.float32),
        output_shapes=([None, 224, 224, 3], [None, num_classes])
    )
    if cache:
        dataset = dataset.cache()
    dataset = dataset.prefetch(tf.data.AUTOTUNE)
    return dataset

train_dataset = generator_to_tfdata(train_generator, cache=False)
val_dataset = generator_to_tfdata(val_generator, cache=False)
test_dataset = generator_to_tfdata(test_generator, cache=False)

steps_per_epoch = len(train_df) // batch_size
validation_steps = len(val_df) // batch_size
print(f"Steps per epoch: {steps_per_epoch}")
print(f"Validation steps: {validation_steps}")

INFO:tensorflow:Mixed precision compatibility check (mixed_float16): OK
Your GPU will likely run quickly with dtype policy mixed_float16 as it has compute capability of at least 7.0. Your GPU: NVIDIA GeForce RTX 3050 Laptop GPU, compute capability 8.6
Found 72800 validated image filenames belonging to 91 classes.
Found 9100 validated image filenames belonging to 91 classes.
Found 9100 validated image filenames belonging to 91 classes.
Steps per epoch: 1137
Validation steps: 142


In [3]:
def build_model(num_classes):
    base_model = ResNet50(input_shape=(224, 224, 3), include_top=False, weights='imagenet')
    for layer in base_model.layers:
        layer.trainable = False
    x = base_model.output
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.5)(x)
    predictions = layers.Dense(num_classes, activation='softmax', dtype='float32')(x)
    model = models.Model(inputs=base_model.input, outputs=predictions)
    return model, base_model

model, base_model = build_model(num_classes)
model.compile(optimizer=Adam(learning_rate=0.001, clipnorm=1.0),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

print("Initial Model (Base Layers Frozen):")
print(f"Total parameters: {model.count_params()}")
print(f"Trainable parameters: {sum([w.shape.num_elements() for w in model.trainable_weights])}")
print(f"Non-trainable parameters: {model.count_params() - sum([w.shape.num_elements() for w in model.trainable_weights])}")

class_weights = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(train_df['label']),
    y=train_df['label']
)
class_weights_dict = dict(enumerate(class_weights))

csv_logger_initial = CSVLogger('D:/Major Project/resnet/training_history_initial_resnet50.csv', separator=',', append=False)
early_stopping_initial = EarlyStopping(monitor='val_accuracy', patience=5, mode='max', restore_best_weights=True)
model_checkpoint_initial = ModelCheckpoint('D:/Major Project/resnet/best_initial_resnet50.keras', monitor='val_accuracy', mode='max', save_best_only=True)

history_initial = model.fit(
    train_dataset,
    steps_per_epoch=steps_per_epoch,
    epochs=50,
    validation_data=val_dataset,
    validation_steps=validation_steps,
    class_weight=class_weights_dict,
    callbacks=[early_stopping_initial, model_checkpoint_initial, csv_logger_initial],
    verbose=1
)
model.save_weights('D:/Major Project/resnet/initial_weights_resnet50.keras')

Initial Model (Base Layers Frozen):
Total parameters: 23862235
Trainable parameters: 274267
Non-trainable parameters: 23587968
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50


In [None]:
# Fine-Tuning: Step 1 (Last 10 Layers)
for layer in base_model.layers[-10:]:
    layer.trainable = True
model.compile(optimizer=Adam(learning_rate=1e-5, clipnorm=1.0),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

csv_logger_fine_1 = CSVLogger('D:/Major Project/resnet/training_history_fine_resnet50_1.csv', separator=',', append=False)
early_stopping_fine = EarlyStopping(monitor='val_accuracy', patience=5, mode='max', restore_best_weights=True)
model_checkpoint_fine_1 = ModelCheckpoint('D:/Major Project/resnet/best_fine_tuned_resnet50_1.keras', monitor='val_accuracy', mode='max', save_best_only=True)

def cosine_decay(epoch, initial_lr=1e-5):
    epochs = 15
    lr = initial_lr * (1 + math.cos(epoch * math.pi / epochs)) / 2
    return lr

lr_scheduler = LearningRateScheduler(cosine_decay)

history_fine_1 = model.fit(
    train_dataset,
    steps_per_epoch=steps_per_epoch,
    epochs=15,
    validation_data=val_dataset,
    validation_steps=validation_steps,
    class_weight=class_weights_dict,
    callbacks=[early_stopping_fine, model_checkpoint_fine_1, csv_logger_fine_1, lr_scheduler],
    verbose=1
)

Epoch 1/15
 104/1137 [=>............................] - ETA: 12:18 - loss: 0.3340 - accuracy: 0.8858

In [None]:
# Fine-Tuning: Step 2 (Last 30 Layers)
for layer in base_model.layers[-30:]:
    layer.trainable = True
model.compile(optimizer=Adam(learning_rate=1e-6, clipnorm=1.0),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

csv_logger_fine_2 = CSVLogger('D:/Major Project/resnet/training_history_fine_resnet50_2.csv', separator=',', append=False)
model_checkpoint_fine_2 = ModelCheckpoint('D:/Major Project/resnet/best_fine_tuned_resnet50_2.keras', monitor='val_accuracy', mode='max', save_best_only=True)

history_fine_2 = model.fit(
    train_dataset,
    steps_per_epoch=steps_per_epoch,
    epochs=15,
    validation_data=val_dataset,
    validation_steps=validation_steps,
    class_weight=class_weights_dict,
    callbacks=[early_stopping_fine, model_checkpoint_fine_2, csv_logger_fine_2, lr_scheduler],
    verbose=1
)
model.save_weights('D:/Major Project/resnet/all_weights_resnet50.keras')

In [None]:
test_loss, test_accuracy = model.evaluate(test_dataset, steps=len(test_df) // batch_size)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")