Import modules

In [1]:
import os
import pickle
import numpy as np
import tensorflow as tf
import pandas as pd
from tensorflow.keras import layers, Model
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import cv2

In [None]:
data_dir = 'train_cluster_cleaned'

cluster_numbers = list(range(1, 29))
cluster_folders = [os.path.join(data_dir, f'cluster_{i}') for i in cluster_numbers]

# Data preprocessing loop
train_images, val_images = [], []
train_labels, val_labels = [], []

for folder in cluster_folders:
    cluster_images = []
    cluster_labels = []
    
    file_names = os.listdir(folder)
    for file_name in file_names:
        cell_id = int(file_name.split('_')[1].split('.')[0])
        img = cv2.imread(os.path.join(folder, file_name))
        
        cluster_images.append(img)
        cluster_labels.append(f'cluster_{folder.split("_")[-1]}')
    
    # Perform train-validation split for each cluster
    cluster_train_images, cluster_val_images, cluster_train_labels, cluster_val_labels = train_test_split(
        cluster_images, cluster_labels, test_size=0.2, random_state=42
    )  # 80% train, 20% validation

    # Merge train-validation sets of each cluster
    train_images.extend(cluster_train_images)
    val_images.extend(cluster_val_images)
    train_labels.extend(cluster_train_labels)
    val_labels.extend(cluster_val_labels)

# Convert to numpy arrays
train_images = np.array(train_images)
val_images = np.array(val_images)
train_labels = np.array(train_labels)
val_labels = np.array(val_labels)

lb = sklearn.preprocessing.LabelEncoder()
train_labels = lb.fit_transform(train_labels)
val_labels = lb.transform(val_labels)

val_labels = tf.keras.utils.to_categorical(val_labels, num_classes=len(cluster_numbers))
val_images = val_images.astype('float32') / 255.0

train_labels = tf.keras.utils.to_categorical(train_labels, num_classes=len(cluster_numbers))
train_images = train_images.astype('float32') / 255.0

Model Setting

In [None]:
EPOCHS = 10
checkpoint_filepath = '/tmp/checkpoint'
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=True,
    monitor='val_accuracy',
    mode='max',
    save_best_only=True)

In [None]:
image_size = 224
num_classes = len(cluster_numbers)

# Define the custom object ClassToken
class ClassToken(layers.Layer):
    def __init__(self, **kwargs):
        super(ClassToken, self).__init__(**kwargs)

    def build(self, input_shape):
        self.class_token = self.add_weight(
            shape=(1, 1, input_shape[3]),
            initializer=tf.keras.initializers.TruncatedNormal(stddev=0.02),
            trainable=True,
            name="class_token",
        )
        super(ClassToken, self).build(input_shape)

    def call(self, x):
        batch_size = tf.shape(x)[0]
        # Add a class token
        class_token = tf.broadcast_to(self.class_token, [batch_size, 1, 1, tf.shape(self.class_token)[2]])
        x = tf.concat([class_token, x], axis=1)
        return x

# Create ImageDataGenerators for training and validation
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=30,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    vertical_flip=True,
    fill_mode='nearest'
)

val_datagen = ImageDataGenerator(rescale=1./255)

# Generate train and validation data from the ImageDataGenerators
train_generator = train_datagen.flow(
    x=train_images,
    y=train_labels,
    batch_size=128,
    shuffle=True
)

validation_generator = val_datagen.flow(
    x=val_images,
    y=val_labels,
    batch_size=128,
    shuffle=True
)

In [None]:
# Load the pre-trained ResNet50 model without the top (classification) layer
base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# Fine-tune only the top layers, freeze the rest
for layer in base_model.layers:
    layer.trainable = False

# Add a new classification head to the base model
x = layers.GlobalAveragePooling2D()(base_model.output)
x = layers.Dense(1024, activation='relu')(x)
x = layers.Dropout(0.2)(x)
predictions = layers.Dense(train_generator.num_classes, activation='softmax')(x)
# Create the final model
model = Model(inputs=base_model.input, outputs=predictions)
# Compile the model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
# Train the model
model_history = model.fit(
    train_generator,
    steps_per_epoch=train_generator.samples // train_generator.batch_size,
    validation_data=validation_generator,
    validation_steps=validation_generator.samples // validation_generator.batch_size,
    epochs=EPOCHS,
    callbacks=[model_checkpoint_callback]
)
# Save the fine-tuned model
model.save("resnet_50_cells_100_epoch.h5")

In [None]:
# Create the output_csv folder if it doesn't exist
output_folder = "output_Image4_csv"
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

# Save the training data to a CSV file
training_data = pd.DataFrame({
    'epoch': np.arange(1, len(history.history['accuracy']) + 1),
    'train_accuracy': history.history['accuracy'],
    'validation_accuracy': history.history['val_accuracy'],
    'loss': history.history['loss'],
    'validation_loss': history.history['val_loss']
})
training_data.to_csv(os.path.join(output_folder, 'history_cleaned_cluster_cnn.csv'), index=False)

<h1>RUN TO HERE<h1>

Initial setup

In [None]:
# Define your dataset directory
dataset_dir = 'data/data_processed/cell_images_cleaned'

Preprocessing image to matrix array

In [None]:
def preprocess_image(image_path, target_size):
    img = load_img(image_path, target_size=target_size)
    img_array = img_to_array(img)
    img_array = np.expand_dims(img_array, axis=0)
    img_array /= 255.0
    return img_array

Data stratification

In [None]:
class_names = os.listdir(dataset_dir)
class_indices = {name: i for i, name in enumerate(class_names)}

images = []
labels = []

for label, class_name in enumerate(class_names):
    class_directory = os.path.join(dataset_dir, class_name)
    for image_name in os.listdir(class_directory):
        images.append(os.path.join(class_directory, image_name))
        labels.append(label)

# Train validation split
train_images, val_images, train_labels, val_labels = train_test_split(
    images, labels, test_size=0.2, stratify=labels, random_state=42
)

Data generator

In [None]:
train_generator = train_datagen.flow_from_dataframe(
    train_df,
    directory=None,
    x_col='filename',
    y_col='class',
    target_size=(32, 32),
    batch_size=64,
    class_mode='categorical'
)

validation_generator = val_datagen.flow_from_dataframe(
    val_df,
    directory=None,
    x_col='filename',
    y_col='class',
    target_size=(32, 32),
    batch_size=64,
    class_mode='categorical'
)

Load the pre-trained ResNet50 model, without the top layer

In [None]:
# Load the pre-trained ResNet50 model without the top (classification) layer
base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# Fine-tune only the top layers, freeze the rest
for layer in base_model.layers:
    layer.trainable = False

# Add a new classification head to the base model
x = layers.GlobalAveragePooling2D()(base_model.output)
x = layers.Dense(1024, activation='relu')(x)
x = layers.Dropout(0.2)(x)
predictions = layers.Dense(train_generator.num_classes, activation='softmax')(x)
# Create the final model
model = Model(inputs=base_model.input, outputs=predictions)
# Compile the model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

Train the model

In [None]:
# Train the model
model_history = model.fit(
    train_generator,
    steps_per_epoch=train_generator.samples // train_generator.batch_size,
    validation_data=validation_generator,
    validation_steps=validation_generator.samples // validation_generator.batch_size,
    epochs=100
)
# Save the fine-tuned model
model.save("resnet_50_cells_100_epoch.h5")

In [None]:
results = []

# for index, row in val_df.iterrows():
#     image_path = row['filename']
#     true_label = row['class']
#     image_id = os.path.basename(image_path).split('.')[0]  # Assuming the ID is the filename without the extension

#     img_array = preprocess_image(image_path, target_size=(32, 32))
#     predictions = model.predict(img_array)
#     predicted_label = np.argmax(predictions, axis=1)[0]

#     is_correct = true_label == predicted_label
#     results.append([image_id, is_correct])


results_df = pd.DataFrame(results, columns=['Image_ID', 'Prediction_Correct'])
results_df.to_csv('validation_results.csv', index=False)


In [None]:
with open('/trainHistoryDict', 'wb') as file_pi:
    pickle.dump(model_history.history, file_pi)