# Change working directory

In [1]:
%cd ..

/home/minhhieu/Desktop/Hieu/COVIDNet-Implementation


# Import libraries

In [21]:
import os
import tqdm
import pickle
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

from tensorflow.keras.models import load_model
from tensorflow.keras.models import Model

from sklearn.decomposition import PCA
from sklearn.svm import SVC, LinearSVC
from sklearn.metrics import accuracy_score, confusion_matrix

from dataloader import DataLoader
from contrastive import ArcFace, CosFace, SphereFace
from models import get_small_covid_net

visual_dir = 'media'

checkpoint_dir = 'checkpoints'
model_name = 'covid_net_test_contrastive_arcface_2'

model_path = 'best_model.h5'
weights_path = 'best_model.weights.h5'

train_dir = 'data/covidx/train'
test_dir = 'data/covidx/test'
img_size = 480
batch_size = 16

# Load the model

In [7]:
# Load raw model from weights and model path
model = load_model(os.path.join(checkpoint_dir, model_name, 'models', model_path),
                  custom_objects={'ArcFace' : ArcFace})
model.load_weights(os.path.join(checkpoint_dir, model_name, 'weights', weights_path))

# Get the embedding output only
model = Model(inputs=model.inputs[0], outputs=model.layers[-3].output, 
              name=f'{model.name}_Embedding')

print(model.summary())

Model: "Contrastive_COVID_Net_Small_Embedding"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 480, 480, 3) 0                                            
__________________________________________________________________________________________________
conv2d (Conv2D)                 (None, 240, 240, 56) 8288        input_1[0][0]                    
__________________________________________________________________________________________________
re_lu (ReLU)                    (None, 240, 240, 56) 0           conv2d[0][0]                     
__________________________________________________________________________________________________
max_pooling2d (MaxPooling2D)    (None, 120, 120, 56) 0           re_lu[0][0]                      
______________________________________________________________

# Train a classifier on top of embeddings

In [27]:
# Load testing data
train_loader = DataLoader(train_dir, labels_as_subdir=True, one_hot=True,
            img_size=img_size,test=False,
            batch_size=batch_size, random_noise=True)

# Get number of batches
train_steps_per_epoch = train_loader.get_train_size()

# Loop through the batches
all_embeddings = np.array([])
all_labels = np.array([])

with tqdm.tqdm(total = train_steps_per_epoch) as pbar:
    for batchidx in range(train_steps_per_epoch):
        images, labels = train_loader.get_train_batch()
        images = images.numpy()
        labels = labels.numpy()
        
        embeddings = model(images, training=False)
        
        if(len(all_embeddings) != 0):
            all_embeddings = np.concatenate([all_embeddings, embeddings])
            all_labels = np.concatenate([all_labels, labels])
        else:
            all_embeddings = embeddings
            all_labels = labels
        
        pbar.update(1)

  return py_builtins.overload_of(f)(*args)
100%|██████████| 1270/1270 [07:40<00:00,  2.76it/s]


In [None]:
# Normalize the train embeddings
all_embeddings = all_embeddings / np.linalg.norm(all_embeddings, axis=1).reshape(-1, 1)

# Train an SVM model
categories = np.argmax(all_labels, axis=1)
clf = SVC(kernel='linear', probability=True)
clf.fit(all_embeddings, categories)

In [None]:
# Save the classification model
if(not os.path.exists(os.path.join(checkpoint_dir, model_name, 'classifier'))):
    os.mkdir(os.path.join(checkpoint_dir, model_name, 'classifier'))
    print('[INFO] Classification model directiory created.')
    
with open(os.path.join(checkpoint_dir, model_name, 'classifier', 'clf.pickle'), 'wb') as f:
    pickle.dump(clf, f)
    print('[INFO] Classification model saved.')

# Test the classification model

In [None]:
# Load testing data
test_loader = DataLoader(test_dir, labels_as_subdir=True, one_hot=True,
            img_size=img_size,test=True,
            batch_size=batch_size, random_noise=False)

# Get number of batches
test_steps_per_epoch = test_loader.get_train_size()

# Loop through the batches
all_embeddings = np.array([])
all_labels = np.array([])

with tqdm.tqdm(total = test_steps_per_epoch) as pbar:
    for batchidx in range(test_steps_per_epoch):
        images, labels = test_loader.get_train_batch()
        images = images.numpy()
        labels = labels.numpy()
        
        embeddings = model(images, training=False)
        
        if(len(all_embeddings) != 0):
            all_embeddings = np.concatenate([all_embeddings, embeddings])
            all_labels = np.concatenate([all_labels, labels])
        else:
            all_embeddings = embeddings
            all_labels = labels
        
        pbar.update(1)

In [None]:
# Normalize all the embeddings
all_embeddings = all_embeddings / np.linalg.norm(all_embeddings, axis=1).reshape(-1, 1)

# Make predictions on the embeddings
predictions = clf.predict(all_embeddings)