<a href="https://colab.research.google.com/github/esraa-abdelmaksoud/Shai-Training-Notebooks/blob/main/X_ray_COVID_classification_EffNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import os
import shutil
import random
import cv2
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from scipy import ndimage
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.preprocessing import image
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Flatten
from tensorflow.keras.applications import InceptionV3
tf.__version__
tf.random.set_seed(123)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

In [None]:
# Create folders
parent_dir = r'/content/drive/MyDrive/SHAI/covid dataset'
if not os.path.isdir('/content/drive/MyDrive/SHAI/covid dataset/data'):
  folders = ['data','data/train','data/test','data/val','data/train/covid',
            'data/train/normal', 'data/train/virus', 'data/test/covid',
            'data/test/normal', 'data/test/virus', 'data/val/covid',
            'data/val/normal', 'data/val/virus']
  for folder in folders:
    path = os.path.join(parent_dir, folder)
    os.mkdir(path)

In [None]:
# Check original images count
org_covid = r'/content/drive/MyDrive/SHAI/covid dataset/train/covid'
org_normal = r'/content/drive/MyDrive/SHAI/covid dataset/train/normal'
org_virus = r'/content/drive/MyDrive/SHAI/covid dataset/train/virus'
print('COVID: ',len(os.listdir(org_covid)))
print('NORMAL: ',len(os.listdir(org_normal)))
print('VIRUS: ',len(os.listdir(org_virus)))

COVID:  295
NORMAL:  468
VIRUS:  433


In [None]:
# Class directories
covid_dir = os.path.join(parent_dir, 'data/train/covid')
normal_dir = os.path.join(parent_dir, 'data/train/normal')
virus_dir = os.path.join(parent_dir, 'data/train/virus')

In [None]:
# Copy all data to train if not copied
if len(os.listdir(covid_dir)) == 0:
  shutil.copytree(org_covid, covid_dir, dirs_exist_ok=True)
  shutil.copytree(org_normal, normal_dir, dirs_exist_ok=True)
  shutil.copytree(org_virus, virus_dir, dirs_exist_ok=True)

In [None]:
# Get number of required files per class
target = 500
to_move = target //10
covid_diff = target - len(os.listdir(covid_dir))
normal_diff = target - len(os.listdir(normal_dir))
virus_diff = target - len(os.listdir(virus_dir))
print(f'COVID: {covid_diff}, NORMAL: {normal_diff}, VIRUS: {virus_diff}')

COVID: 100, NORMAL: 100, VIRUS: 100


In [None]:
# Use counter to stop when the difference between classes is zero
def augment_data(diff: int, dir: str) -> None:
  files = os.listdir(dir)
  for i in range(diff):
    # rotate by 5 degrees
    img_path = os.path.join(dir, files[i])
    img = cv2.imread(img_path)
    rotated = ndimage.rotate(img, 5)
    cv2.imwrite(f'{img_path[:-5]}-aug.jpg', rotated)
if covid_diff > 0 and covid_diff != (to_move*2):
  augment_data(covid_diff, covid_dir)
if normal_diff > 0 and normal_diff != (to_move*2):
  augment_data(normal_diff, normal_dir)
if virus_diff > 0 and virus_diff != (to_move*2):
  augment_data(virus_diff, virus_dir)

In [None]:
# Move 10% to test folder and validation folder
def split_files(parent_dir: str, class_name: str, to_move: int, target: int) -> None:
  train_class_dir = os.path.join(parent_dir,'data/train', class_name)
  test_class_dir = os.path.join(parent_dir,'data/test', class_name)
  val_class_dir = os.path.join(parent_dir,'data/val', class_name)
  files = os.listdir(train_class_dir)
  for i in range(to_move):
    # Move to text
    rand_file = files[random.randint(0,target-(to_move*2)-1)]
    file_path = os.path.join(train_class_dir, rand_file)
    new_path = os.path.join(test_class_dir, rand_file)
    shutil.move(file_path, new_path)
    files.remove(rand_file)

    # Move to validation
    rand_file = files[random.randint(0,target-(to_move*2)-1)]
    file_path = os.path.join(train_class_dir, rand_file)
    new_path = os.path.join(val_class_dir, rand_file)
    shutil.move(file_path, new_path)
    files.remove(rand_file)

if len(os.listdir(os.path.join(parent_dir,'data/train/covid'))) > int(target*0.8):
  split_files(parent_dir, 'covid', to_move, target)
  split_files(parent_dir, 'normal', to_move, target)
  split_files(parent_dir, 'virus', to_move, target)

In [None]:
# Directories
train_dir = os.path.join(parent_dir, 'data/train')
test_dir = os.path.join(parent_dir, 'data/test')
val_dir = os.path.join(parent_dir, 'data/val')

### The data is now balanced!

In [None]:
# Set the input image size
img_width, img_height = 300, 300

# Define the batch size for training and validation sets
batch_size = 32

# Create data generators for the train, validation, and test sets with data augmentation
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True
)

val_datagen = ImageDataGenerator(rescale=1./255)
test_datagen = ImageDataGenerator(rescale=1./255)

In [None]:

train_generator = train_datagen.flow_from_directory(
    train_dir,
    target_size=(img_width, img_height),
    batch_size=batch_size,
    class_mode='categorical'
)

val_generator = val_datagen.flow_from_directory(
    val_dir,
    target_size=(img_width, img_height),
    batch_size=batch_size,
    class_mode='categorical'
)

test_generator = test_datagen.flow_from_directory(
    test_dir,
    target_size=(img_width, img_height),
    batch_size=batch_size,
    class_mode='categorical'
)

Found 1200 images belonging to 3 classes.
Found 150 images belonging to 3 classes.
Found 150 images belonging to 3 classes.


In [None]:

# Load the pre-trained InceptionV3 model
from tensorflow.keras.applications.efficientnet import EfficientNetB7

base_model = EfficientNetB7(weights='imagenet', include_top=False, input_shape=(img_width, img_height, 3))

In [None]:

# Load the pre-trained InceptionV3 model
base_model = InceptionV3(weights='imagenet', include_top=False, input_shape=(img_width, img_height, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/inception_v3/inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
# Freeze the base model layers
for layer in base_model.layers:
    layer.trainable = False

In [None]:
# Add new fully connected layers on top of the base model
model = Sequential([
    base_model,
    Flatten(),
    Dense(256, activation='relu'),
    Dropout(0.5),
    Dense(3, activation='softmax')
])

In [None]:
# Compile the model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['Precision'])

# Define early stopping and model checkpoint callbacks
early_stop = EarlyStopping(monitor='val_precision', patience=5, verbose=1, mode='max', 
                           min_delta=.01)
model_checkpoint = ModelCheckpoint('/content/drive/MyDrive/SHAI/covid dataset/covid_imgnet_effnet_pres.h5', monitor='val_precision', save_best_only=True, verbose=1, mode='max')

# Train the model
epochs = 20
history = model.fit(
    train_generator,
    steps_per_epoch=train_generator.samples // batch_size,
    epochs=epochs,
    validation_data=val_generator,
    validation_steps=val_generator.samples // batch_size,
    callbacks=[early_stop, model_checkpoint]
)

Epoch 1/20
Epoch 1: val_precision improved from -inf to 0.85039, saving model to /content/drive/MyDrive/SHAI/covid dataset/covid_imgnet_effnet_pres.h5
Epoch 2/20
Epoch 2: val_precision did not improve from 0.85039
Epoch 3/20
Epoch 3: val_precision did not improve from 0.85039
Epoch 4/20
Epoch 4: val_precision did not improve from 0.85039
Epoch 5/20
Epoch 5: val_precision improved from 0.85039 to 0.85714, saving model to /content/drive/MyDrive/SHAI/covid dataset/covid_imgnet_effnet_pres.h5
Epoch 6/20
Epoch 6: val_precision did not improve from 0.85714
Epoch 6: early stopping


In [None]:
# Load the model
# model = keras.models.load_model("/content/drive/MyDrive/SHAI/covid dataset/covid_imgnet_effnet_pres.h5")

In [None]:
# Evaluate the model on the test set
test_loss, test_prec = model.evaluate(test_generator,
                                     steps=test_generator.samples // batch_size)
print(f'Test accuracy: {test_prec:.2f}')

Test accuracy: 0.83


In [None]:
# Load competition test data
pred_path = '/content/drive/MyDrive/SHAI/covid dataset/test'
files = os.listdir(pred_path)

In [None]:
# Create data frame to write data
df = pd.DataFrame(columns=['Image','Label'])

In [None]:
# Get classes
class_names = train_generator.class_indices
class_names = {v: k for k, v in class_names.items()}

In [None]:
# Load an image to predict
for i, file_name in enumerate(files):
  try:
    img_path = os.path.join(pred_path,file_name)
    img = image.load_img(img_path, target_size=(img_width, img_height))
    x = image.img_to_array(img)
    x = np.expand_dims(x, axis=0)
    x = x / 255.0
    predictions = model.predict(x)
    pred_idx = np.argmax(predictions[0])
    pred_class = class_names[pred_idx]
    df.loc[i,'Image'] = file_name
    df.loc[i,'Label'] = pred_class
  except:
    pass



In [None]:
# Write data frame
df_path = os.path.join('/content/drive/MyDrive/SHAI/covid dataset/', 'incept_imgnet_effnet_prec.csv')
df.to_csv(df_path, index=False)

In [None]:
# Save model
# model.save(f"{parent_dir}/covid_{epochs}e_imgnet_inception.h5")