## --------- Importing dependencies ---------

In [None]:
import os
import numpy as np
import pandas as pd
import tables
import keras
from google.colab import drive
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import DepthwiseConv2D, BatchNormalization, Activation, GlobalAveragePooling2D, Dropout, Dense
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import matplotlib.pyplot as plt

## --------- Importing dataset ---------

In [None]:
# Mount the drive
drive.mount('/content/gdrive', force_remount=True)

# Connect to the drive containing the data
INPUT_FOLDER = '/content/gdrive/Shareddrives/OurTeam/Source/data'
os.listdir(INPUT_FOLDER)

# Load the CSVs
annotations = pd.read_csv(INPUT_FOLDER + '/metadata/annotations.csv')
candidates = pd.read_csv(INPUT_FOLDER + '/metadata/candidates.csv')

Mounted at /content/gdrive


## --------- Preprocessing data ---------

In [None]:
# Get positive and negative indexes
positives = candidates[candidates['class'] == 1].index
negatives = candidates[candidates['class'] == 0].index

# Randomly select negative indexes to achieve 10:1 negative to positive ratio
negIndexes = np.random.choice(negatives, len(positives) * 10, replace=False)

# Combine positives and negative candidates
candidatesDf = candidates.iloc[list(positives) + list(negIndexes)]

# Split data into features (X) and target variable (y)
X = candidatesDf.iloc[:, :-1]
y = candidatesDf.iloc[:, -1]

# Split data into training, testing, and validation sets
rand_state = 1
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, random_state=rand_state)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.20, random_state=rand_state)

# Save data as CSV files
X_train.to_csv(INPUT_FOLDER + '/preprocessed_data/traindata.csv', index=False)
X_test.to_csv(INPUT_FOLDER + '/preprocessed_data/testdata.csv', index=False)
X_val.to_csv(INPUT_FOLDER + '/preprocessed_data/valdata.csv', index=False)

# Augment data by duplicating positive instances
positive_indexes = X_train[y_train == 1].index
X_train_new = X_train.append(X_train.loc[positive_indexes])
y_train_new = y_train.append(y_train.reindex(positive_indexes))

# Generate train filenames for the dataset file
train_filenames = X_train_new.index.map(lambda x: f"{INPUT_FOLDER}/data/train/image_{x}.jpg").astype(str)

# Set the filename for the dataset file
dataset_file = 'traindatalabels.txt'

# Create a structured array to store filenames and labels
traindata = np.zeros(train_filenames.size, dtype=[('filename', 'S36'), ('label', int)])
traindata['filename'] = train_filenames.values.astype(str)
traindata['label'] = y_train_new.values.astype(int)

# Save the structured array as a text file
np.savetxt(dataset_file, traindata, fmt="%10s %d")

# Read image and label data from HDF5 files
with tables.open_file(INPUT_FOLDER + '/hdf5_data/train_dataset.h5', mode='r') as h5f:
    X_train_images = h5f.root.X.read()
    Y_train_labels = h5f.root.Y.read()

with tables.open_file(INPUT_FOLDER + '/hdf5_data/val_dataset.h5', mode='r') as h5f2:
    X_val_images = h5f2.root.X.read()
    Y_val_labels = h5f2.root.Y.read()

with tables.open_file(INPUT_FOLDER + '/hdf5_data/test_dataset.h5', mode='r') as h5f3:
    X_test_images = h5f3.root.X.read()
    Y_test_labels = h5f3.root.Y.read()

# Convert image data to NumPy arrays
X_train_images_np = np.expand_dims(np.array(X_train_images), axis=3)
X_val_images_np = np.expand_dims(np.array(X_val_images), axis=3)
X_test_images_np = np.expand_dims(np.array(X_test_images), axis=3)

# Convert label data to NumPy arrays
y_train_labels_np = np.array(Y_train_labels)
y_val_labels_np = np.array(Y_val_labels)
y_test_labels_np = np.array(Y_test_labels)

  X_train_new = X_train.append(X_train.loc[positive_indexes])
  y_train_new = y_train.append(y_train.reindex(positive_indexes))


## --------- Defining the model architecture ---------

In [None]:
def my_model():
    model = Sequential()
    
    model.add(DepthwiseConv2D(64, (3, 3), padding='same', input_shape=(50, 50, 1)))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    
    model.add(DepthwiseConv2D(64, (3, 3), padding='same'))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    
    model.add(DepthwiseConv2D(64, (3, 3), padding='same'))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    
    model.add(DepthwiseConv2D(128, (3, 3), padding='same'))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    
    model.add(DepthwiseConv2D(128, (3, 3), padding='same'))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    
    model.add(DepthwiseConv2D(128, (3, 3), padding='same'))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    
    model.add(DepthwiseConv2D(256, (3, 3), padding='same'))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    
    model.add(DepthwiseConv2D(256, (3, 3), padding='same'))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    
    model.add(DepthwiseConv2D(256, (3, 3), padding='same'))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    
    model.add(GlobalAveragePooling2D())
    model.add(Dropout(0.5))
    
    model.add(Dense(512))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    
    model.add(Dense(256))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    
    model.add(Dense(2, activation='sigmoid')) 
    
    return model

model = my_model()

## --------- Training the model ---------

In [None]:
# Define optimizer
opt = SGD(lr=0.001, momentum=0.9)

# Compile the model
model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])

# Define callbacks
callbacks = [
    keras.callbacks.EarlyStopping(patience=10),
    keras.callbacks.ReduceLROnPlateau(factor=0.1, patience=5),
    keras.callbacks.ModelCheckpoint(filepath='classification_model_best_weights.h5', save_best_only=True),
]

# Create data generator
datagen = ImageDataGenerator(
    width_shift_range=0.1,
    height_shift_range=0.1,
    horizontal_flip=True,
    zoom_range=0.1
)

# Prepare iterator for training data
batch_size = 64
it_train = datagen.flow(X_train_images_np, y_train_labels_np, batch_size=batch_size)

# Calculate steps per epoch
steps = X_train_images_np.shape[0] // batch_size

# Fit the model to the training data
hist = model.fit_generator(
    it_train,
    steps_per_epoch=steps,
    epochs=5,
    validation_data=(X_val_images_np, y_val_labels_np),
    verbose=1,
    callbacks=callbacks
)




## --------- Testing the model ---------

In [None]:
# Evaluate our model
_, acc = model.evaluate(X_val_images_np, y_val_labels_np, verbose=1)



In [None]:
print('*** Model Accuracy %.3f ***' % (acc * 100.0))

*** Model Accuracy 82.729 ***
