<a href="https://colab.research.google.com/github/retico/cmepda_medphys/blob/master/L11_code/Lecture11_CNN_pythonized.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

In [0]:
import matplotlib.pyplot as plt

# Importing the dataset from Google Drive

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

In [0]:
!ls /content/gdrive/My\ Drive/cmepda_medphys_dataset/IMAGES/Mammography_micro/

In [0]:
!unzip -q /content/gdrive/My\ Drive/cmepda_medphys_dataset/IMAGES/Mammography_micro/Train.zip  -d /content/
!unzip -q /content/gdrive/My\ Drive/cmepda_medphys_dataset/IMAGES/Mammography_micro/Test.zip  -d /content/

In [0]:
!ls /content/{Test,Train}

# Reading the images

In [0]:
from skimage.io import imread
import os
import glob
import numpy as np

In [0]:
def read_imgs(dataset_path, classes):
  tmp = []
  labels = []
  for cls in classes:
    fnames = glob.glob(os.path.join(dataset_path, str(cls), '*.pgm'))
    tmp += [ imread(fname) for fname in fnames ]
    labels += len(fnames)*[cls]
  return np.array(tmp, dtype='float32')[..., np.newaxis]/255, np.array(labels)

In [0]:
train_dataset_path = '/content/Train'
x_train, y_train = read_imgs(train_dataset_path, [0, 1])

test_dataset_path = '/content/Test'
x_test, y_test = read_imgs(test_dataset_path, [0, 1])

In [0]:
print(x_train.shape, y_train.shape)
print(x_test.shape, y_test.shape)


# Defining a CNN model

In [0]:
from keras.layers import Conv2D, BatchNormalization, MaxPool2D, Dense, Flatten, InputLayer, Activation
from keras.models import Sequential

In [0]:
shape = (60, 60, 1)
def make_model():
  model = Sequential([
      #InputLayer(input_shape=(60,60,1)),
      
      Conv2D(8, (3,3), padding='same', input_shape=(60, 60, 1)),
      BatchNormalization(),
      Activation('relu'),
      
      MaxPool2D((2,2)),
      
      Conv2D(16, (3,3), padding='same'),
      BatchNormalization(),
      Activation('relu'),

      MaxPool2D((2,2)),
        
      Conv2D(32, (3,3), padding='same'),
      BatchNormalization(),
      Activation('relu'),

      Flatten(), 
      Dense(1, activation='sigmoid')
  ])
  
  return model

In [0]:
model = make_model()
model.summary()

In [0]:
from keras.optimizers import SGD
model.compile(optimizer=SGD(lr=0.001, momentum=0.9), loss='binary_crossentropy', metrics=['accuracy'])

In [0]:
from keras.callbacks import ModelCheckpoint
checkpoint = ModelCheckpoint(
    "no_augmented.{epoch:02d}-{val_acc:.2f}.h5", 
    monitor='val_acc', 
    verbose=1,
    save_best_only=True,
    save_weights_only=False,
    mode='auto', period=1)

In [0]:
history = model.fit(x_train, y_train, validation_split=0.3, epochs=50, shuffle=True, callbacks=[checkpoint])

In [0]:
model.save('/content/noaug.h5')

# Convert to PNG

In [0]:
import PIL
import os

In [0]:
def convert_to_png(fname, dest_folder):
  if not os.path.exists(dest_folder):
    os.makedirs(dest_folder)
  dest_fname = os.path.basename(fname).replace('.pgm', '.png')
  dest_fname = os.path.join(dest_folder, dest_fname)
  PIL.Image.open(fname).convert('L').save(dest_fname)


In [0]:
#pgm_list = []
#for path, folders, fnames in os.walk('/content/Train/'):
#  for fname in fnames:
#    pgm_list.append(os.path.join(path, fname))

In [0]:
for data_path in ['/content/Train', '/content/Test']:
  for path, folders, fnames in os.walk(data_path):
    for fname in fnames:
      abs_path = os.path.join(path, fname)
      dest_folder = path.replace('Train', 'Train_png').replace('Test', 'Test_png')
      convert_to_png(abs_path, dest_folder)


In [0]:
!ls /content/Test_png/

# Data aumentation

In [0]:
from keras.preprocessing.image import ImageDataGenerator

In [0]:
train_dataset_path = '/content/Train_png'
batch_size = 32
img_width, img_height = (60, 60)

train_datagen = ImageDataGenerator(
        rotation_range=40,
        width_shift_range=0.2,
        height_shift_range=0.2,
        rescale=1./255,
        shear_range=0.2,
        zoom_range=0.2,
        horizontal_flip=True,
        fill_mode='reflect',
        validation_split=0.3)
        
train_gen = train_datagen.flow_from_directory(
    train_dataset_path,
    target_size=(img_width, img_height),
    color_mode='grayscale', 
    class_mode='binary',
    subset='training')

val_gen = train_datagen.flow_from_directory(
    train_dataset_path,
    target_size=(img_width, img_height),
    batch_size=batch_size,
    color_mode='grayscale',
    class_mode='binary',
    subset='validation')


In [0]:
train_gen.next()[0].shape

In [0]:
plt.imshow(train_gen.next()[0][1].squeeze())

In [0]:
from keras.callbacks import ModelCheckpoint
checkpoint = ModelCheckpoint(
    "augmented.{epoch:02d}-{val_acc:.2f}.h5", 
    monitor='val_acc', 
    verbose=1,
    save_best_only=True,
    save_weights_only=False,
    mode='auto', period=1)

In [0]:
from keras.optimizers import SGD

In [0]:
model = make_model()
model.compile(optimizer=SGD(lr=0.001, momentum=0.9), loss='binary_crossentropy', metrics=['accuracy'])

In [0]:
batch_size=32
history = model.fit_generator(
        train_gen,
        steps_per_epoch=278 // batch_size,
        epochs=50,
        validation_data=val_gen,
        validation_steps= 118 // batch_size,
        callbacks=[checkpoint])

In [0]:
import matplotlib.pyplot as plt
plt.plot(history.history['loss'])
plt.figure()
plt.plot(history.history['val_acc'])
plt.figure()
plt.plot(history.history['acc'])

# Saving the model

In [0]:
!ls /content

In [0]:
model.save('/content/augmented.h5')

In [0]:
from keras.models import load_model

In [0]:
imported_model = load_model('/content/augmented.h5')

In [0]:
imported_model.summary()

# Evaluate the performances of the two models

In [0]:
noaug_model = load_model('/content/noaug.h5')
aug_model = load_model('/content/augmented.h5')

In [0]:
x_test, y_test = read_imgs('/content/Test', [0,1])

In [0]:
noaug_model.evaluate(x_test, y_test)

In [0]:
aug_model.evaluate(x_test, y_test)

# Visualizing

In [0]:
model = load_model('/content/augmented.h5')

In [0]:
model.layers


In [0]:
from keras import models
layer_outputs = [layer.output for layer in model.layers]
activation_model = models.Model(inputs=model.input, outputs=layer_outputs)

In [0]:
micro_calc = val_gen.next()[0][2][np.newaxis,...]


In [0]:
plt.imshow(micro_calc.squeeze())

In [0]:
activations = activation_model.predict(micro_calc)

In [0]:
len(activations) == len(model.layers)

In [0]:
activations[0].shape

In [0]:
for k in range(8):
  plt.subplot(2,4, k+1)
  plt.imshow(activations[0][..., k].squeeze())
  plt.axis(False)

In [0]:
for k in range(16):
  plt.subplot(2,8, k+1)
  plt.imshow(activations[6][..., k].squeeze())
  plt.axis(False)

In [0]:
for k in range(32):
  plt.subplot(4,8, k+1)
  plt.imshow(activations[8][..., k].squeeze())
  plt.axis(False)

## Visualizing filters

In [0]:
# retrieve weights from the second hidden layer
filters, biases = model.layers[0].get_weights()
filters = (filters - filters.min())/(filters.max() - filters.min())
filters = filters.squeeze()

In [0]:
for k in range(8):
  plt.subplot(2, 4, k+1)
  plt.imshow(filters[:,:,k], cmap='gray')
  plt.axis(False)

In [0]:
# retrieve weights from the second hidden layer
filters, biases = model.layers[4].get_weights()
filters = (filters - filters.min())/(filters.max() - filters.min())
filters = filters.squeeze()
for k in range(16):
  plt.subplot(2, 8, k+1)
  plt.imshow(filters[:,:,1 ,k], cmap='gray')
  plt.axis(False)

In [0]:
filters.shape