**Creating 2000 fake X-ray images (1000 for normal x-ray and 1000 for covid x-ray) using Deep Convolutional Generative Adversarial Network**

To see the results of the sequential model performance with DCGAN generated augmented images, go to:
https://github.com/etcho00700/ML_fin_project.git

The repo contains the datasets we used to train and the end results. 

In [None]:
from skimage import img_as_float
from skimage import exposure
from skimage import io # To preprocess the images

In [None]:

import matplotlib.pyplot as plt 
import matplotlib.animation as animation
from IPython.display import HTML


import sys, os, glob, time, imageio 
import numpy as np, pandas as pd  
from PIL import Image 


import torch 
import torchvision.utils as vutils 
import torchvision.transforms as transforms 
import tensorflow as tf 
from tensorflow.keras import models, layers, optimizers 
from tensorflow.keras.models import Sequential 
from tensorflow.keras.preprocessing.image import array_to_img, img_to_array, load_img 

In [None]:
# Load the Drive helper and mount
from google.colab import drive

# This will prompt for authorization.
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!unzip drive/MyDrive/Covid-dataset.zip

In [None]:
cd COVID-19_Radiography_Dataset/

/content/COVID-19_Radiography_Dataset


In [None]:
# Create new folders for image training
!mkdir ./COVID1500/
!mkdir ./NORMAL1500/

In [None]:
# Set dataset path
DATASET_PATH = '/content/COVID-19_Radiography_Dataset'

# There are two classes of images that we will deal with
cls = ['COVID', 'Normal']

covid_path = os.path.join(DATASET_PATH, cls[0], '*')
normal_path = os.path.join(DATASET_PATH, cls[1], '*')

# Lists for access paths
listCovidPaths = []
listNormalPaths = []

# Get covid images files paths
for root, directories, files in os.walk(covid_path[:-2]):
    for name in files:
        listCovidPaths.append(os.path.join(root, name))
        
# Get normal images files paths
for root, directories, files in os.walk(normal_path[:-2]):
    for name in files:
        listNormalPaths.append(os.path.join(root, name))



In [None]:
from distutils.file_util import copy_file

# Paths to covid1500 and nomral1500 image folders
pathCovid1500 = './COVID1500/'
pathNormal1500 = './NORMAL1500/'

# Move covid images files to new folders
for i in range(1500):
      copy_file(listCovidPaths[i], pathCovid1500)
      copy_file(listNormalPaths[i], pathNormal1500)


In [None]:

# Root paths for X-ray Images
XRay_covid1500 = glob.glob(pathCovid1500 + '/*.png', recursive = True)
XRay_normal1500 = glob.glob(pathNormal1500 + '/*.png', recursive = True)

In [None]:
# Time Computing 
def _time(start, end): 
    # if in seconds 
    if (end-start)<60: 
        wall_time = f'{round((end-start),2)}sec'
    # if in minute(s)  
    elif (end-start)>=3600: 
        wall_time = f'{int((end-start)/3600)}h {int(((end-start)%3600)/60)}min {round((end-start)%60,2)}sec'
    # if in houre(s)  
    else: 
        wall_time = f'{int((end-start)/60)}min {round((end-start)%60,2)}sec'
    return wall_time 


In [None]:
def get_data(data_path, dim=(128, 128), rand_shuffle=True): 
    start = time.time() 
    imgs_data = []         
    sample_size = len(data_path)
    for idx, im_path in enumerate(data_path): 
        if idx%(sample_size//10)==0:
            print('Processing index {:05d} of {:05d} ==> {:03d}%'\
                  .format(idx, sample_size, round(100*idx/sample_size))) 
        img = img_to_array(load_img(im_path, target_size = dim)) 
        imgs_data.append(img) 
        
    # to float 
    imgs_data = np.array(imgs_data).astype('float32') 
    # scale to [0,1] (note the . after 255 - float)
    imgs_data = imgs_data/255. #for formalizing to [-1,1] ==> (imgs_data - 127.5)/127.5 
    
    # shuffle the data 
    if rand_shuffle: 
        idx = np.arange(imgs_data.shape[0])
        np.random.shuffle(idx) 
        imgs_data = imgs_data[idx,:,:,:] 
    
    print(f"Hey! the calculations are done in {_time(start, time.time())}")
    return imgs_data  

In [None]:
## Obtain Data from 1500 training images for covid and normal xray
X_covid_1500 = get_data(XRay_covid1500)
X_normal_1500 = get_data(XRay_normal1500)

In [None]:
##Parameters for DCGAN taken from Keras DCGAN: https://keras.io/examples/generative/dcgan_overriding_train_step/

# Number of training epochs
n_epoch = 180
batch_size = 128 
latent_dim = 100  
cols, rows = 299, 299 
channels = 3 
dim = cols, rows  
in_shape = (cols, rows, channels) 
lr = 0.0002
beta1 = 0.5
ngpu = 1 
nrows, ncols = 3, 4

In [None]:
#discriminator model taken from Keras DCGAN

def define_discriminator(in_shape=(128,128,3)): 
    model = models.Sequential() 
    # normal 
    model.add(layers.Conv2D(128, (5,5), padding='same', input_shape=in_shape)) 
    model.add(layers.LeakyReLU(alpha=0.2)) 
    # downsample to 64x64 
    model.add(layers.Conv2D(128, (5,5), strides=(2,2), padding='same')) 
    model.add(layers.LeakyReLU(alpha=0.2)) 
    # downsample to 32x32 
    model.add(layers.Conv2D(128, (5,5), strides=(2,2), padding='same')) 
    model.add(layers.LeakyReLU(alpha=0.2)) 
    # downsample to 16x16 
    model.add(layers.Conv2D(128, (5,5), strides=(2,2), padding='same')) 
    model.add(layers.LeakyReLU(alpha=0.2)) 
    # downsample to 8x8 
    model.add(layers.Conv2D(128, (5,5), strides=(2,2), padding='same')) 
    model.add(layers.LeakyReLU(alpha=0.2)) 
    # classifier 
    model.add(layers.Flatten()) 
    model.add(layers.Dropout(0.4)) 
    model.add(layers.Dense(1, activation='sigmoid')) 
    # compile model 
    opt = optimizers.Adam(lr=0.0002, beta_1=0.5) 
    model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy']) 
    return model

In [None]:
#generator model taken from Keras DCGAN


def define_generator(latent_dim):
    model = models.Sequential()
    # foundation for 8x8 feature maps
    n_nodes = 128*8*8
    model.add(layers.Dense(n_nodes, input_dim=latent_dim))
    model.add(layers.LeakyReLU(alpha=0.2))
    model.add(layers.Reshape((8, 8, 128)))
    # upsample to 16x16
    model.add(layers.Conv2DTranspose(128, (4,4), strides=(2,2), padding='same'))
    model.add(layers.LeakyReLU(alpha=0.2))
    # upsample to 32x32
    model.add(layers.Conv2DTranspose(128, (4,4), strides=(2,2), padding='same'))
    model.add(layers.LeakyReLU(alpha=0.2))
    # upsample to 64x64
    model.add(layers.Conv2DTranspose(128, (4,4), strides=(2,2), padding='same'))
    model.add(layers.LeakyReLU(alpha=0.2))
    # upsample to 128x128
    model.add(layers.Conv2DTranspose(128, (4,4), strides=(2,2), padding='same'))
    model.add(layers.LeakyReLU(alpha=0.2))
    # output layer 128x128x3
    model.add(layers.Conv2D(3, (5,5), activation='tanh', padding='same'))
    return model 

# input for Generator (random noise)
def generate_latent_points(latent_dim, n_samples):
    # generate points in the latent space
    x_input = np.random.randn(latent_dim*n_samples)
    # reshape into a batch of inputs for the network
    x_input = x_input.reshape(n_samples, latent_dim)
    return x_input 

# Generating n_samples number of noise images which will become fake images
def generate_fake_samples(g_model, latent_dim, n_samples):
    # generate points in latent space
    x_input = generate_latent_points(latent_dim, n_samples)
    # predict outputs
    X = g_model.predict(x_input)
    # create 'fake' class labels (0)
    y = np.zeros((n_samples, 1))
    return X, y

In [None]:
#Gan Model taken from Keras DCGAN
def define_gan(g_model, d_model): 
    # make weights in the discriminator not trainable
    d_model.trainable = False 
    # connect them
    model = models.Sequential()
    # add generator
    model.add(g_model)
    # add the discriminator
    model.add(d_model)
    # compile model
    opt = optimizers.Adam(lr=0.0002, beta_1=0.5)
    model.compile(loss='binary_crossentropy', optimizer=opt)
    return model

# parse and get real samples (training data)
def get_real_samples(dataset, n_samples):
    # choose random instances
    ix = np.random.randint(0, dataset.shape[0], n_samples)
    # retrieve selected images
    X = dataset[ix]
    # set 'real' class labels (1)
    y = np.ones((n_samples, 1))
    return X, y

# create and save a plot of generated images 
def show_generated(generated, epoch, covid, nrows=4, ncols=5, num_gan = 1000):
    #[-1,1] -> [0,1] 
    #generated = (generated+1)/2 
    #generated = (generated[:ncols*nrows]*127.5)+127.5 
    #generated = generated*255 

    plt.figure(figsize=(10,10)) 
    for idx in range(nrows*ncols): 
        plt.subplot(nrows, ncols, idx+1)
        plt.imshow(generated[idx])
        plt.axis('off')
    if (covid): 
      plt.savefig('./DCGAN_Covid_gif/image_at_epoch_{:04d}.png'.format(epoch+1), bbox_inches = 'tight', pad_inches = 0) 
    else:
      plt.savefig('./DCGAN_Normal_gif/image_at_epoch_{:04d}.png'.format(epoch+1), bbox_inches = 'tight', pad_inches = 0 )
    plt.show() 

    if (epoch == n_epoch - 1):
      for i in range(num_gan):
        plt.figure(figsize = (6,6))
        plt.imshow(generated[i])
        plt.axis('off')
        if (covid):
          plt.savefig('./DCGAN_Covid/DCGAN_image{:04d}.png'.format(i), bbox_inches = 'tight', pad_inches = 0)
        else:
          plt.savefig('./DCGAN_Normal/DCGAN_image{:04d}.png'.format(i), bbox_inches = 'tight', pad_inches = 0)

# evaluate the discriminator and plot generated images 
def summarize_performance(epoch, g_model, d_model, dataset, latent_dim, covid = True, n_samples=1000):
    # prepare real samples
    X_real, y_real = get_real_samples(dataset, n_samples)
    # evaluate discriminator on real examples 
    _, acc_real = d_model.evaluate(X_real, y_real, verbose=0)
    # prepare fake examples 
    x_fake, y_fake = generate_fake_samples(g_model, latent_dim, n_samples)
    # evaluate discriminator on fake examples 
    _, acc_fake = d_model.evaluate(x_fake, y_fake, verbose=0)
    # summarize discriminator performance 
    print('> Accuracy at epoch %d [real: %.0f%%, fake: %.0f%%]'%(epoch+1, acc_real*100, acc_fake*100))
    # show plot 
    show_generated(x_fake, epoch, covid) 

    
def plot_loss(loss):
    plt.figure(figsize=(10,5))
    plt.title("Generator and Discriminator Loss During Training", fontsize=20) 
    plt.plot(loss[0], label="D_real") 
    plt.plot(loss[1], label="D_fake") 
    plt.plot(loss[2], label="G") 
    plt.xlabel("Iteration", fontsize=20); plt.ylabel("Loss", fontsize=20) 
    plt.legend(); plt.show()

In [None]:
!mkdir ./DCGAN_Covid/
!mkdir ./DCGAN_Normal/
!mkdir ./DCGAN_Covid_gif/
!mkdir ./DCGAN_Normal_gif/

In [None]:

def train(g_model, d_model, gan_model, dataset, covid, latent_dim=100, n_epochs=100, n_batch=128):
    
    start = time.time() 
    bat_per_epo = int(dataset.shape[0]/n_batch) 
    half_batch = int(n_batch/2) 
    loss1, loss2, loss3 = [], [], [] 
    fake_liste = [] 
    
    # manually enumerate epochs
    print('Training Start...')
    for i in range(n_epochs):
        start1 = time.time()
        # enumerate batches over the training set
        for j in range(bat_per_epo):
            # get randomly selected 'real' samples
            X_real, y_real = get_real_samples(dataset, half_batch)
            # update discriminator model weights
            d_loss1, _ = d_model.train_on_batch(X_real, y_real)
            # generate 'fake' examples
            X_fake, y_fake = generate_fake_samples(g_model, latent_dim, half_batch)
            # update discriminator model weights
            d_loss2, _ = d_model.train_on_batch(X_fake, y_fake)
            # prepare points in latent space as input for the generator
            X_gan = generate_latent_points(latent_dim, n_batch)
            # create inverted labels for the fake samples
            y_gan = np.ones((n_batch, 1))
            # update the generator via the discriminator's error
            g_loss = gan_model.train_on_batch(X_gan, y_gan)
            # summarize loss on this batch
            loss1.append(d_loss1); loss2.append(d_loss2); loss3.append(g_loss) 
        
        print('Epoch: {:03d}/{:03d}, time: {:s}'\
              .format(i+1,n_epochs, _time(start1,time.time())))
        # evaluate the model performance 
        if ((i+1) % 10 == 0): 
            # Save and show generated images 
            summarize_performance(i, g_model, d_model, dataset, latent_dim, covid) 
        
    print('Total time for training {} epochs is {} sec'.format(n_epochs, _time(start, time.time())))
    
    # Show loss curves 
    loss = (loss1, loss2, loss3) 
    plot_loss(loss) 


In [None]:

## create 1000 dcgan images for covid xray
discriminator = define_discriminator() 
generator = define_generator(latent_dim) 
gan = define_gan(generator, discriminator)
train(generator, discriminator, gan, X_covid_1500, True, latent_dim, n_epoch, n_batch=batch_size)

In [None]:

## create 1000 dcgan images for normal xray
discriminator = define_discriminator() 
generator = define_generator(latent_dim) 
gan = define_gan(generator, discriminator)
train(generator, discriminator, gan, X_normal_1500, False, latent_dim, n_epoch, n_batch=batch_size)

In [None]:
!zip -r /content/DCGAN_Normal.zip /content/COVID-19_Radiography_Dataset/DCGAN_Normal/

In [None]:
!zip -r /content/DCGAN_Covid.zip /content/COVID-19_Radiography_Dataset/DCGAN_Covid/

In [None]:
!zip -r /content/DCGAN_Covid_gif.zip /content/COVID-19_Radiography_Dataset/DCGAN_Covid_gif/
!zip -r /content/DCGAN_Normal_gif.zip /content/COVID-19_Radiography_Dataset/DCGAN_Normal_gif/