In [None]:
import os
import glob
import matplotlib.pyplot as plt
from PIL import Image
#
import tensorflow as tf

from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import layers
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, GlobalAveragePooling2D,Conv2D, MaxPooling2D,BatchNormalization, Dropout, Input, LeakyReLU, UpSampling2D, Conv2DTranspose
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.applications import VGG16
from tensorflow.keras.models import Model


In [None]:

# Kaggle API Token
from google.colab import files
files.upload()

# Setup Kaggle API
!pip install kaggle --quiet
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

# Download the Dataset
!kaggle datasets download -d paultimothymooney/chest-xray-pneumonia
 # Extract the Dataset
!unzip -q chest-xray-pneumonia.zip

In [None]:
# Set up the directory path for training and testing images
base_dir = './chest_xray'
train_dir = os.path.join(base_dir, 'train')
val_dir = os.path.join(base_dir, 'val')
test_dir = os.path.join(base_dir, 'test')

# Preprocessing the images - setting up image data generators

batch_size = 16
image_size = 178

# Limit data augmentation
train_datagen = ImageDataGenerator(rescale=1./255, rotation_range=20,
                           width_shift_range=0.2, height_shift_range=0.2)
val_datagen = ImageDataGenerator(rescale=1./255)
test_datagen = ImageDataGenerator(rescale=1./255)

train_generator = train_datagen.flow_from_directory(train_dir, target_size=(image_size, image_size), 
                                       batch_size=batch_size, class_mode='binary')
val_generator = val_datagen.flow_from_directory(val_dir, target_size=(image_size, image_size), 
                                      batch_size=batch_size, class_mode='binary')
test_generator = test_datagen.flow_from_directory(test_dir, target_size=(image_size, image_size), 
                                      batch_size=batch_size, class_mode='binary')

# Use tf.data.Dataset for efficient data loading
def generator_wrapper(generator):
   for x_batch, y_batch in generator:
      yield (x_batch, x_batch)

train_dataset = tf.data.Dataset.from_generator(
   lambda: generator_wrapper(train_generator),
   output_signature=(
      tf.TensorSpec(shape=(None, image_size, image_size, 3), dtype=tf.float32),
      tf.TensorSpec(shape=(None, image_size, image_size, 3), dtype=tf.float32))
).prefetch(tf.data.AUTOTUNE)

val_dataset = tf.data.Dataset.from_generator(
   lambda: generator_wrapper(val_generator),
   output_signature=(
      tf.TensorSpec(shape=(None, image_size, image_size, 3), dtype=tf.float32),
      tf.TensorSpec(shape=(None, image_size, image_size, 3), dtype=tf.float32))
).prefetch(tf.data.AUTOTUNE)

test_dataset = tf.data.Dataset.from_generator(
   lambda: generator_wrapper(test_generator),
   output_signature=(
      tf.TensorSpec(shape=(None, image_size, image_size, 3), dtype=tf.float32),
      tf.TensorSpec(shape=(None, image_size, image_size, 3), dtype=tf.float32))
).prefetch(tf.data.AUTOTUNE)

In [None]:
from tensorflow.keras.layers import Dense, Flatten, GlobalAveragePooling2D,Conv2D, MaxPooling2D,BatchNormalization, Dropout, Input, LeakyReLU, UpSampling2D, Conv2DTranspose


# Encoder
conv_encoder = tf.keras.Sequential([
    tf.keras.layers.Reshape([image_size, image_size, 3], name="input"),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Conv2D(16, 3, padding="same", activation="relu"), 
    tf.keras.layers.MaxPool2D(pool_size=2), # output: 112x112x16
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Conv2D(32, 3, padding="same", activation="relu"), 
    tf.keras.layers.MaxPool2D(pool_size=2), # output: 56x56x32
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Conv2D(64, 3, padding="same", activation="relu"), 
    tf.keras.layers.MaxPool2D(pool_size=2), # output: 28x28x64
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Conv2D(128, 3, padding="same", activation="relu"), 
    tf.keras.layers.MaxPool2D(pool_size=2), # output: 14x14x128
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Conv2D(256, 3, padding="same", activation="relu"),
    tf.keras.layers.MaxPool2D(pool_size=2), # output: 7x7x256
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Conv2D(60, 3, padding="same", activation="relu"),
    tf.keras.layers.GlobalAvgPool2D() # output: 60
])

# Decoder
conv_decoder = tf.keras.Sequential([
    tf.keras.layers.Dense(28 * 28 * 256),
    tf.keras.layers.Reshape((28, 28, 256)),
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.Conv2DTranspose(256, 3, strides=2, padding="same", activation="relu"), # output: 56x56x256
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.Conv2DTranspose(128, 3, strides=2, padding="same", activation="relu"), # output: 112x112x128
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.Conv2DTranspose(64, 3, strides=2, padding="same", activation="relu"), # output: 224x224x64
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.Conv2DTranspose(32, 3, strides=1, padding="same", activation="relu"), # output: 224x224x32
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.Conv2DTranspose(16, 3, strides=1, padding="same", activation="relu"), # output: 224x224x16
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.Conv2DTranspose(3, 3, strides=1, padding="same", activation="sigmoid"), # output: 224x224x3
    tf.keras.layers.Reshape([image_size, image_size, 3], name="output")
])

# Autoencoder
conv_ae = tf.keras.Sequential([conv_encoder, conv_decoder])


conv_ae.compile(
    loss="mean_squared_error", 
    # Use nadam with a 0.01 learning rate
    optimizer=tf.keras.optimizers.Nadam(learning_rate=0.01),
     metrics=[tf.keras.metrics.MeanSquaredError(),'accuracy'])

conv_ae.build(input_shape=(None, image_size, image_size, 3))
conv_ae.summary()


In [None]:
# Define the callbacks

early_stopping = EarlyStopping(monitor='val_loss', patience=5, verbose=1, mode='auto', restore_best_weights=True)
model_checkpoint = ModelCheckpoint('conv_ae.keras', monitor='val_loss', verbose=1, save_best_only=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=2, verbose=1, mode='auto')


In [None]:

history = conv_ae.fit(
   train_dataset, 
   epochs=10, 
   validation_data=test_dataset,
   steps_per_epoch=len(train_generator),
   validation_steps=len(val_generator),
   callbacks=[
      early_stopping,
      model_checkpoint,
      reduce_lr
   ])

In [None]:
# Function to visualize the encoded images
def visualize(img, encoder, decoder):
   encoded = encoder.predict(img[None])[0]
   decoded = decoder.predict(encoded[None])[0]
   plt.figure(figsize=(10, 5))
   plt.subplot(1, 3, 1)
   plt.title("Original")
   plt.imshow(img)
   plt.axis('off')
   plt.subplot(1, 3, 2)
   plt.title("Encoded")
   plt.imshow(encoded)
   plt.axis('off')
   plt.subplot(1, 3, 3)
   plt.title("Decoded")
   plt.imshow(decoded)
   plt.axis('off')
   plt.show()

# Visualize the images
for img, _ in test_dataset.take(5):
   visualize(img[0], conv_encoder, conv_decoder)


In [None]:
import plotly.graph_objects as go

# Create traces
trace0 = go.Scatter(
   x = list(range(len(history.history['accuracy']))),
   y = history.history['accuracy'],
   mode = 'lines',
   name = 'Train'
)

trace1 = go.Scatter(
   x = list(range(len(history.history['val_accuracy']))),
   y = history.history['val_accuracy'],
   mode = 'lines',
   name = 'Test'
)

data = [trace0, trace1]

# Edit the layout
layout = dict(title = 'Model Accuracy',
           xaxis = dict(title = 'Epoch'),
           yaxis = dict(title = 'Accuracy'),
           )

fig = dict(data=data, layout=layout)
go.Figure(fig).show()
