In [3]:
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from keras.preprocessing import image 
import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import os

## NN for Normal vs NonNormal

In [4]:
def normal_nonnormal(x): 
    if x == 'Normal': 
        return x 
    else: 
        return 'Non-Normal'

df = pd.read_csv('../CombinedImages/CombinedUpdated.csv')
na_fill = {'VirusCategory1': 'Normal'}
df = df.fillna(value = na_fill)

df.VirusCategory1 = df.VirusCategory1.map(normal_nonnormal)
df = df.join(pd.get_dummies(df.VirusCategory1.values, prefix = 'type'))
columns_include = ['Normal', 'Non-Normal']
df = df[['ImagePath', 'VirusCategory1'] + [f'type_{i}' for i in columns_include]]


In [5]:
X = df[['ImagePath', 'VirusCategory1']]
y = df[[f'type_{i}' for i in columns_include]]

x_train, x_test, y_train, y_test = train_test_split(X,y, random_state = 10, stratify = X.VirusCategory1.values,
                                                   train_size = .9)

x_train = x_train.drop('VirusCategory1', axis = 1)
x_test = x_test.drop('VirusCategory1', axis = 1)

In [6]:
def get_image_value(path): 
    img = image.load_img(path, target_size = (28,28,1))
    img = image.img_to_array(img)/255
    return img 


def get_data(df): 
    from tqdm import tqdm
    img_list = [] 
    for path in tqdm(df.ImagePath.values):
        path = f'../CombinedImages/all/{path}'
        img_list.append(get_image_value(path)) 
    return np.array(img_list).squeeze()

x_test = get_data(x_test)
x_train = get_data(x_train)


100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████| 648/648 [00:07<00:00, 86.98it/s]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 5829/5829 [01:11<00:00, 81.69it/s]


In [7]:
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten 
from keras.layers import Conv2D, MaxPooling2D 

In [15]:
def get_conv_model_normal(x, y): 
    drop = .4 
    
    model = Sequential() 
    
    model.add(Conv2D(32, kernel_size=(3, 3),activation='relu', input_shape = (28,28,3)))
    model.add(Conv2D(64, (3, 3), activation='relu', dilation_rate =1))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Conv2D(128, (3, 3), activation='relu', dilation_rate =1))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(Dropout(drop))
    model.add(Flatten())
    

#     model.add(Dropout(drop))
#     model.add(Flatten())
    
    model.add(Dense(128, activation='sigmoid'))
    model.add(Dropout(drop))

    model.add(Dense(2, activation='softmax'))
    
    model.compile(loss = 'categorical_crossentropy', optimizer = 'adam', metrics = ['accuracy'])
    return model 

In [16]:
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
# y_train = y_train.values
# y_test = y_test.values
early_stopping = EarlyStopping(monitor='val_loss', verbose = 1, patience=5)
model_checkpoint = ModelCheckpoint('models/NormalModelCheckpointWeights.h5', verbose = 1, save_best_only=True)

epochs = 50
batch_size = 1
normal_model = get_conv_model_normal(x_train, y_train)
normal_history = normal_model.fit(x_train, y_train, epochs = epochs, batch_size = batch_size, 
         callbacks = [early_stopping, model_checkpoint], validation_data = (x_test, y_test), verbose= 2)

Epoch 1/50

Epoch 00001: val_loss improved from inf to 0.10505, saving model to models/NormalModelCheckpointWeights.h5
5829/5829 - 19s - loss: 0.1478 - accuracy: 0.9581 - val_loss: 0.1051 - val_accuracy: 0.9630
Epoch 2/50

Epoch 00002: val_loss improved from 0.10505 to 0.09255, saving model to models/NormalModelCheckpointWeights.h5
5829/5829 - 20s - loss: 0.1149 - accuracy: 0.9640 - val_loss: 0.0926 - val_accuracy: 0.9722
Epoch 3/50

Epoch 00003: val_loss did not improve from 0.09255
5829/5829 - 19s - loss: 0.1076 - accuracy: 0.9667 - val_loss: 0.1672 - val_accuracy: 0.9352
Epoch 4/50

Epoch 00004: val_loss improved from 0.09255 to 0.07620, saving model to models/NormalModelCheckpointWeights.h5
5829/5829 - 19s - loss: 0.1215 - accuracy: 0.9605 - val_loss: 0.0762 - val_accuracy: 0.9738
Epoch 5/50

Epoch 00005: val_loss did not improve from 0.07620
5829/5829 - 18s - loss: 0.1218 - accuracy: 0.9633 - val_loss: 0.1112 - val_accuracy: 0.9707
Epoch 6/50

Epoch 00006: val_loss did not improve

In [33]:
normal_model.load_weights('models/NormalModelCheckpointWeights.h5')
tester_img = get_image_value('TestImages/Normal6.jpg')
tester_img = np.reshape(tester_img, (1, 28,28,3))
tester_img.shape
 
normal_order = ['Normal', 'Non-Normal']
normal_predict = normal_model.predict(tester_img).squeeze()
print(normal_predict,normal_order[np.argmax(normal_predict)])



[0.01162955 0.9883704 ] Non-Normal


In [None]:
 assert False

### Classes of NonNormal

In [None]:
df = pd.read_csv('../CombinedImages/CombinedUpdated.csv')
na_fill = {'VirusCategory1': 'Normal'}
df = df.fillna(value = na_fill)
df = df.replace('E.Coli', 'Bacterial')
df = df.replace('COVID-19, ARDS', 'Viral')
df = df.replace('COVID-19', 'Viral')
df = df.replace('Mycoplasma Bacterial Pneumonia', 'Bacterial')
df = df.replace('Klebsiella', 'Bacterial')
df = df.replace('Legionella', 'Bacterial')
df = df.replace('Chlamydophila', 'Bacterial')
df = df.replace('Pneumocystis', 'Fungal')
df = df.replace('Streptococcus', 'Bacterial')
df = df.replace('Influenza', 'Viral')
df = df.replace('ARDS', 'Viral')
df = df.replace('SARS', 'Viral')
df = df.replace('Influenza', 'Viral')
df = df.replace('Varicella', 'Viral')


df.VirusCategory1.unique()
# df.VirusCategory1 = df.VirusCategory1.str.strip()
pneu_types = ['Viral', 'Bacterial', 'Fungal']
df = df.join(pd.get_dummies(df.VirusCategory1.values, prefix = 'type'))

df = df[['ImagePath', 'VirusCategory1']+[f'type_{i}' for i in pneu_types]]
df = df[df.VirusCategory1.isin(pneu_types)]
df.head(3)

In [None]:
df.VirusCategory1.value_counts()

In [None]:
random_state = 10
X = df[['ImagePath', 'VirusCategory1']]
y = df[[f'type_{i}' for i in pneu_types]]
x_train, x_test, y_train, y_test = train_test_split(X,y, train_size = .84, stratify = X.VirusCategory1.values)
x_train = x_train.drop('VirusCategory1', axis = 1)
x_test = x_test.drop('VirusCategory1', axis = 1)
x_test.shape


In [None]:
x_test = get_data(x_test)
x_train = get_data(x_train)

In [None]:
def get_conv_model_nonnormal(x, y): 
    drop = .2 
    
    model = Sequential() 
    
    model.add(Conv2D(32, kernel_size=(3, 3),activation='relu', input_shape = (28,28,3)))
    
    model.add(Conv2D(64, (3, 3), activation='relu'))
    model.add(Dropout(drop))

    model.add(Conv2D(128, (3, 3), activation='relu'))
    model.add(Dropout(drop))

    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Flatten())
    
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(drop))
    
    model.add(Dense(3, activation='softmax'))
    
    model.compile(loss = 'categorical_crossentropy', optimizer = 'adam', metrics = ['accuracy'])
    return model 

In [None]:

# y_train = y_train.values
# y_test = y_test.values
early_stopping = EarlyStopping(monitor='val_loss', patience=3, verbose=1)
model_checkpoint = ModelCheckpoint('models/Non-NormalModelCheckpointWeights.h5', verbose = 1, save_best_only=True)

epochs = 50
batch_size = 2
nonnormal_model = get_conv_model_nonnormal(x_train, y_train)
nonnormal_history = nonnormal_model.fit(x_train, y_train, epochs = epochs, batch_size = batch_size, verbose = 2, 
         callbacks = [early_stopping, model_checkpoint], validation_data = (x_test, y_test))

In [None]:
assert False

### Test on Random Images

['Normal', 'COVID-19', 'Bacterial', 'Fungal', 'SARS']
['Normal', 'COVID-19', 'Bacterial']

In [None]:
tester_img = get_image_value('TestImages/Normal2.jpg')
tester_img = np.reshape(tester_img, (1, 28,28,3))
tester_img.shape
def test_stacked(input_image): 
    normal_order = ['Normal', 'Non-Normal']
    nonnormal_order = ['Viral', 'Bacterial', 'Fungal']
    normal_predict = normal_model.predict(input_image).squeeze()
    if normal_order[np.argmax(normal_predict)] == 'Non-Normal': 
        nonnormal_predict = nonnormal_model.predict(input_image)
        return nonnormal_order[np.argmax(nonnormal_predict)]
    else: 
        return 'Normal'

tester_img = get_image_value('TestImages/Normal.jpg')
tester_img = np.reshape(tester_img, (1, 28,28,3))
test_stacked(tester_img)