# Import necessary libraries

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, recall_score, accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import LinearSVC

import tensorflow as tf
import tensorflow_datasets as tfds

from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential
from keras.preprocessing.image import img_to_array
from keras.preprocessing.image import load_img
from tensorflow.keras.preprocessing import image_dataset_from_directory
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Load training, validation, and test data

In [None]:
training_data = image_dataset_from_directory(
    '../data/chest_xray/train', 
    labels='inferred',
    validation_split = 0.2,
    subset="training",
    seed=14)

In [None]:
validation_data = image_dataset_from_directory(
    '../data/chest_xray/train', 
    labels='inferred',
    validation_split = 0.2,
    subset="validation",
    seed=14)

In [None]:
test_data = image_dataset_from_directory(
    '../data/chest_xray/test', 
    labels='inferred',
    batch_size = 624,
    shuffle = False)

# Visualize example images

In [None]:
plt.figure(figsize=(10, 10))
for images, labels in training_data.take(1):
    for i in range(9):
        ax = plt.subplot(3, 3, i + 1)
        plt.imshow(images[i].numpy().astype("uint8"))
        plt.title(training_data.class_names[labels[i]])
        plt.axis("off")

# Augmentations to increase available training data

In [None]:
data_augmentation = tf.keras.Sequential([
  tf.keras.layers.experimental.preprocessing.RandomFlip("horizontal"),
  tf.keras.layers.experimental.preprocessing.RandomRotation(0.2),
])

In [None]:
aug_ds = training_data.map( lambda x, y: (data_augmentation(x, training=True), y))
augmented_training_data = training_data.concatenate(aug_ds)

# Preprocess images as needed for pretrained Xception model

In [None]:
def preprocess(image, label):
    resized_image = tf.image.resize(image, [224,224])
    final_image = keras.applications.xception.preprocess_input(resized_image)
    return final_image, label

In [None]:
augmented_training_data = augmented_training_data.shuffle(1000)
augmented_training_data = augmented_training_data.map(preprocess).prefetch(1)
validation_data = validation_data.map(preprocess).prefetch(1)
test_data = test_data.map(preprocess).prefetch(1)

# Create and fit model

### This is a transfer learning Xception model with weights pretrained on the ImageNet dataset. For the first few epochs, we freeze the bottom layers and set a larger learning rate.

In [None]:
base_model = keras.applications.xception.Xception(weights = 'imagenet', include_top = False)

for layer in base_model.layers:
    layer.trainable = False

avg = keras.layers.GlobalAveragePooling2D()(base_model.output)
output = keras.layers.Dense(1, activation = 'sigmoid')(avg)
model = keras.Model(inputs = base_model.input, outputs = output)

In [None]:
optimizer = keras.optimizers.SGD(lr = 0.2, momentum = 0.9, decay = 0.01)
model.compile(loss = 'binary_crossentropy', optimizer = optimizer,  metrics = ['accuracy', tf.keras.metrics.Recall()])

In [None]:
results = model.fit(augmented_training_data, epochs = 5, validation_data = augmented_validation_data, class_weight = {0:2.88, 1:1})


### After a few epochs our top layers are trained and we can begin training on the bottom layers. We set a smaller learning rate to avoid damaging the pretrained weights.

In [None]:
for layer in base_model.layers:
    layer.trainable = True

In [None]:
optimizer = keras.optimizers.SGD(lr = 0.01, momentum = 0.9, decay = 0.001)
model.compile(loss = 'binary_crossentropy', optimizer = optimizer,  metrics = ['accuracy', tf.keras.metrics.Recall()])

In [None]:
results = model.fit(augmented_training_data, epochs = 5, validation_data = augmented_validation_data, class_weight = {0:2.88, 1:1})

# Evaluate model and plot results

In [None]:
model.evaluate(test_data)

In [None]:
acc = results.history['accuracy']
val_acc = results.history['val_accuracy']

loss = results.history['loss']
val_loss = results.history['val_loss']

epochs_range = range(1)

plt.figure(figsize=(15, 8))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.show()

In [None]:
predictions = (reconstructed_model.predict(test_data) > 0.5).astype('int32')
image_batch, labels_batch = next(iter(test_data))

In [None]:
fig, ax = plt.subplots(figsize = (15,7))
ax.set_title('Test Data Confusion Matrix')

labels = training_data.class_names

cm = confusion_matrix(labels_batch, predictions)

disp = ConfusionMatrixDisplay(confusion_matrix=cm)

disp.plot(cmap=plt.cm.Blues, ax = ax);

# Testing other models
### These models cannot use TensorFlow Datasets as input so the data will be read in as NumPy arrays

In [None]:
train_imgs = ImageDataGenerator(rescale=1./255).flow_from_directory('../data/chest_xray/train', batch_size=5216)

In [None]:
test_imgs = ImageDataGenerator(rescale=1./255).flow_from_directory('../data/chest_xray/test', batch_size=624)

In [None]:
X_initial, y_initial = next(train_imgs)
X_test, y_test = next(test_imgs)

In [None]:
X_train, X_val, y_train, y_val = train_test_split(X_initial, y_initial, train_size = 0.8, random_state = 14)

In [None]:
print(X_train.shape)
print(X_val.shape)
print(X_test.shape)

In [None]:
X_train = X_train.reshape(4172, -1)
X_val = X_val.reshape(1044, -1)
X_test = X_test.reshape(624, -1)

print(X_train.shape)
print(X_val.shape)
print(X_test.shape)

In [None]:
y_train = y_train[:,1]
y_val = y_val[:,1]
y_test = y_test[:,1]

print(y_train.shape)
print(y_val.shape)
print(y_test.shape)

# Logistic Regression

In [None]:
lr = LogisticRegression()
lr.fit(X_train,y_train)

In [None]:
recall_score(y_test, lr.predict(X_test))

In [None]:
accuracy_score(y_test, lr.predict(X_test))

# Random Forest

In [None]:
rf = RandomForestClassifier()
rf.fit(X_train, y_train)

In [None]:
recall_score(y_test, rf.predict(X_test))

In [None]:
accuracy_score(y_test, rf.predict(X_test))

# Support Vector Machine

In [None]:
svm = LinearSVC()
svm.fit(X_train,y_train)

In [None]:
recall_score(y_test, svm.predict(X_test))

In [None]:
accuracy_score(y_test, svm.predict(X_test))