In [None]:
import tensorflow as tf
AUTOTUNE = tf.data.experimental.AUTOTUNE
import IPython.display as display
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import os
import cv2
import seaborn as sns
from tensorflow.keras import datasets, layers, models
os.environ["CUDA_VISIBLE_DEVICES"]="2"

### Retrieve the images

In [None]:
import pathlib
train_data_dir = 'data/data_itay_profile/patch_train_itay_binary_10000_350x350_fb'
train_data_dir = pathlib.Path(train_data_dir)
test_data_dir = 'data/data_itay_profile/patch_val_itay_binary_2000_350x350_fb'
test_data_dir = pathlib.Path(test_data_dir)
blind_test_data_dir = 'data/data_itay_profile/patch_test_itay_binary_2000_350x350_fb'
blind_test_data_dir = pathlib.Path(blind_test_data_dir)

In [None]:
train_image_count = len(list(train_data_dir.glob('*/*.png')))
train_image_count

In [None]:
test_image_count = len(list(test_data_dir.glob('*/*.png')))
test_image_count

In [None]:
blind_test_image_count = len(list(blind_test_data_dir.glob('*/*.png')))
blind_test_image_count

In [None]:
CLASS_NAMES = np.array([item.name for item in train_data_dir.glob('*')])
CLASS_NAMES

In [None]:
TEST_CLASS_NAMES = np.array([item.name for item in test_data_dir.glob('*')])
TEST_CLASS_NAMES

In [None]:
BLIND_CLASS_NAMES = np.array([item.name for item in blind_test_data_dir.glob('*')])
BLIND_CLASS_NAMES

In [None]:
NUMBER_OF_CLASSES = len(CLASS_NAMES)
BATCH_SIZE = 16
NUMBER_OF_EPOCHS = 15
TRAIN_STEPS_PER_EPOCH = np.ceil(train_image_count/BATCH_SIZE)
TEST_STEPS_PER_EPOCH = np.ceil(test_image_count/BATCH_SIZE)
IMG_HEIGHT = 350
IMG_WIDTH = 350

In [None]:
NUMBER_OF_CLASSES

### Visualisation functions

In [None]:
def show_logical_batch(image_batch, label_batch):
  plt.figure(figsize=(7,7))
  for n in range(9):
      ax = plt.subplot(3,3,n+1)
      plt.imshow(image_batch[n])
      plt.title(CLASS_NAMES[label_batch[n]==1][0].title())
      plt.axis('off')

In [None]:
def show_numerical_batch(image_batch, label_batch):
  plt.figure(figsize=(7,7))
  for n in range(9):
      ax = plt.subplot(3,3,n+1)
      plt.imshow(image_batch[n])
      plt.title(CLASS_NAMES[label_batch[n]])
      plt.axis('off')

### Load data

In [None]:
train_list_ds = tf.data.Dataset.list_files(str(train_data_dir/'*/*'))
test_list_ds = tf.data.Dataset.list_files(str(test_data_dir/'*/*'))
blind_test_list_ds = tf.data.Dataset.list_files(str(blind_test_data_dir/'*/*'))

In [None]:
def get_logical_label(file_path):
  # convert the path to a list of path components
  parts = tf.strings.split(file_path, os.path.sep)
  # The second to last is the class-directory
  return parts[-2] == CLASS_NAMES

In [None]:
def get_numerical_label(file_path):
  # convert the path to a list of path components
  parts = tf.strings.split(file_path, os.path.sep)
  # The second to last is the class-directory
  numeric_label=tf.argmax(tf.cast((parts[-2] == CLASS_NAMES),dtype=tf.uint8))
  return numeric_label

In [None]:
def get_onehot_label(file_path):
  # convert the path to a list of path components
  parts = tf.strings.split(file_path, os.path.sep)
  # The second to last is the class-directory
  onehot_label=tf.cast((parts[-2] == CLASS_NAMES),dtype=tf.uint8)
  return onehot_label

In [None]:
def decode_and_normalize_img(img):
  # convert the compressed string to a 3D uint8 tensor
  img = tf.image.decode_jpeg(img, channels=3)
  # Use `convert_image_dtype` to convert to floats in the [0,1] range.
  img = tf.image.convert_image_dtype(img, tf.float32)
  # resize the image to the desired size.
  return img

In [None]:
def augment(img):
  img = tf.image.resize_with_crop_or_pad(img, IMG_HEIGHT + 6, IMG_WIDTH + 6)
  img = tf.image.random_crop(img, size=[IMG_HEIGHT, IMG_WIDTH, 3])
  img = tf.image.random_brightness(img, 0.2)
  img = tf.image.random_contrast(img, 0.2,0.5) 
  img = tf.image.random_saturation(img, 5,10) 
  img = tf.image.random_flip_left_right(img)
  img = tf.image.random_flip_up_down(img)
  img = tf.clip_by_value(img, 0, 1)
  return img

In [None]:
def process_augment_path(file_path):
  label = get_numerical_label(file_path)
  # load the raw data from the file as a string
  img = tf.io.read_file(file_path)
  img = decode_and_normalize_img(img)
  img = augment(img)
  return img, label

In [None]:
def process_path(file_path):
  label = get_numerical_label(file_path)
  # load the raw data from the file as a string
  img = tf.io.read_file(file_path)
  img = decode_and_normalize_img(img)
  return img, label

In [None]:
# Set `num_parallel_calls` so multiple images are loaded/processed in parallel.
train_labeled_ds = train_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)
test_labeled_ds = test_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)
blind_test_labeled_ds = blind_test_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)

In [None]:
def prepare_for_training(ds):
  ds = ds.repeat(NUMBER_OF_EPOCHS)
  ds = ds.shuffle(buffer_size=96)
  ds = ds.batch(BATCH_SIZE)
  ds = ds.prefetch(buffer_size=AUTOTUNE)
  return ds

In [None]:
train_ds = prepare_for_training(train_labeled_ds)
test_ds = prepare_for_training(test_labeled_ds)
blind_test_ds = prepare_for_training(blind_test_labeled_ds)

In [None]:
tf_image_batch, tf_label_batch= next(iter(train_ds))
show_numerical_batch(tf_image_batch.numpy(), tf_label_batch.numpy())

In [None]:
label_names=[ [CLASS_NAMES[item],item] for item in tf_label_batch]
label_names

In [None]:
tf_image_batch, tf_label_batch= next(iter(test_ds))
show_numerical_batch(tf_image_batch.numpy(), tf_label_batch.numpy())

In [None]:
label_names=[ [CLASS_NAMES[item],item] for item in tf_label_batch]
label_names

In [None]:
tf_image_batch, tf_label_batch= next(iter(blind_test_ds))
show_numerical_batch(tf_image_batch.numpy(), tf_label_batch.numpy())

In [None]:
label_names=[ [CLASS_NAMES[item],item] for item in tf_label_batch]
label_names

#### VGG19

In [None]:
#base_model = tf.keras.applications.VGG16(input_shape=(IMG_HEIGHT, IMG_WIDTH, 3),
#                                               include_top=False,
#                                               weights='imagenet')

In [None]:
#global_average_layer = tf.keras.layers.GlobalAveragePooling2D()
#prediction_layer = tf.keras.layers.Dense(NUMBER_OF_CLASSES)
#model = tf.keras.Sequential([
#  base_model,
#  global_average_layer,
#  prediction_layer
#])

In [None]:
def base_model():
  inp = tf.keras.layers.Input(shape=(IMG_HEIGHT, IMG_WIDTH, 3))
  base_model = tf.keras.applications.VGG16(include_top=False, weights='imagenet', input_tensor=inp,
                                            input_shape=(IMG_HEIGHT, IMG_WIDTH,3))
  #vgg.trainable = False
  #x = inception.get_layer('mixed10').output
  x = base_model.output
  x = tf.keras.layers.GlobalAveragePooling2D()(x)
  #x = tf.keras.layers.Dense(128, activation='relu')(x)
  output = tf.keras.layers.Dense(NUMBER_OF_CLASSES)(x)
  model = tf.keras.models.Model(inputs = inp, outputs=output)
  return model

#### Compile and train the model

In [None]:
opt=tf.keras.optimizers.Adam(0.0001)

In [None]:
early_stop_callback=tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=1)

In [None]:
checkpoint_callback=tf.keras.callbacks.ModelCheckpoint('vgg.h5', monitor='val_accuracy',save_best_only=True,verbose=1)

In [None]:
model = base_model()
model.compile(optimizer=opt,
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

history = model.fit(train_ds, epochs=NUMBER_OF_EPOCHS, 
                    validation_data=test_ds,
                    steps_per_epoch=TRAIN_STEPS_PER_EPOCH,
                    validation_steps=TEST_STEPS_PER_EPOCH,
                    callbacks=[early_stop_callback,checkpoint_callback])

In [None]:
model=tf.keras.models.load_model('vgg.h5')

#### Evaluate on test set

In [None]:
results= model.evaluate(test_ds,steps=500)

#### Evaluate on blind test set

In [None]:
results= model.evaluate(blind_test_ds,steps=500)

#### Confusion matrix

In [None]:
conf_ds=blind_test_ds.take(20)

In [None]:
conf_images=[]
conf_labels=[]
for batch in list(conf_ds.as_numpy_iterator()):
    for image in batch[0]:
        conf_images.append(image)
    for label in batch[1]:
        conf_labels.append(label)
conf_images=np.asarray(conf_images)
conf_labels=np.asarray(conf_labels)

In [None]:
predictions=model.predict(conf_images)

In [None]:
predictions=tf.argmax(predictions,-1)
predictions

In [None]:
conf_labels

In [None]:
cm = tf.math.confusion_matrix(conf_labels, predictions)

In [None]:
plt.figure(figsize=(20,20))
sns.set(font_scale=2)
sns.heatmap(
    cm, annot=True,
    xticklabels=CLASS_NAMES,
    yticklabels=CLASS_NAMES)
plt.xlabel("Predicted")
plt.ylabel("True")

In [None]:
indices_of_wrong_predictions=np.nonzero(predictions!=conf_labels)

In [None]:
wrong_predicted_images=conf_images[indices_of_wrong_predictions]

In [None]:
wrong_predicted_labels=conf_labels[indices_of_wrong_predictions]
wrong_predicted_labels

In [None]:
sns.set(font_scale=1)
show_numerical_batch(wrong_predicted_images, wrong_predicted_labels)

In [None]:
plt.figure(figsize=(20,80))

columns=5
for i, image in enumerate(wrong_predicted_images):
    plt.subplot(len(wrong_predicted_images)/columns+1,columns,i+1)
    plt.imshow(image)
    plt.grid(None)
    if i>50:
        break

### Example predictions

In [None]:
def plot_image(i, predictions_array, true_label, img):
  predictions_array, true_label, img = predictions_array, true_label[i], img[i]
  plt.grid(False)
  plt.xticks([])
  plt.yticks([])

  plt.imshow(img, cmap=plt.cm.binary)

  predicted_label = np.argmax(predictions_array)
  if predicted_label == true_label:
    color = 'blue'
  else:
    color = 'red'

  plt.xlabel("{} \n ({})".format(CLASS_NAMES[predicted_label],
                                CLASS_NAMES[true_label]),
                                color=color)

def plot_value_array(i, predictions_array, true_label):
  predictions_array, true_label = predictions_array, true_label[i]
  plt.grid(False)
  plt.xticks(range(NUMBER_OF_CLASSES))
  plt.yticks([])
  thisplot = plt.bar(range(NUMBER_OF_CLASSES), predictions_array, color="#777777")
  plt.ylim([0, 1])
  predicted_label = np.argmax(predictions_array)

  thisplot[predicted_label].set_color('red')
  thisplot[true_label].set_color('blue')

In [None]:
probability_model = tf.keras.Sequential([model, tf.keras.layers.Softmax()])

#### Test predictions

In [None]:
test_images,test_labels=next(iter(test_ds))
predictions=probability_model.predict(test_images)

In [None]:
# Plot the first X test images, their predicted labels, and the true labels.
# Color correct predictions in blue and incorrect predictions in red.
num_rows = 5
num_cols = 1
num_images = num_rows*num_cols
plt.figure(figsize=(2*2*num_cols, 2*num_rows))
for i in range(num_images):
  plt.subplot(num_rows, 2*num_cols, 2*i+1)
  plot_image(i, predictions[i], test_labels, test_images)
  plt.subplot(num_rows, 2*num_cols, 2*i+2)
  plot_value_array(i, predictions[i], test_labels)
plt.tight_layout(pad=4.0)
plt.show()

#### Blind test predictions

In [None]:
test_images,test_labels=next(iter(blind_test_ds))
predictions=probability_model.predict(test_images)

In [None]:
# Plot the first X test images, their predicted labels, and the true labels.
# Color correct predictions in blue and incorrect predictions in red.
num_rows = 5
num_cols = 1
num_images = num_rows*num_cols
plt.figure(figsize=(2*2*num_cols, 2*num_rows))
for i in range(num_images):
  plt.subplot(num_rows, 2*num_cols, 2*i+1)
  plot_image(i, predictions[i], test_labels, test_images)
  plt.subplot(num_rows, 2*num_cols, 2*i+2)
  plot_value_array(i, predictions[i], test_labels)
plt.tight_layout(pad=4.0)
plt.show()

## Activation map visualization

In [None]:
model.summary()

In [None]:
IMAGE_PATH = 'data/rescaled_filtered_new_split/patch_test_2000_350x350/italiansquare/10.png'
LAYER_NAME = 'block5_conv3'
CLASS_INDEX=tf.argmax(tf.cast(('italiansquare' == CLASS_NAMES),dtype=tf.uint8))
img = tf.keras.preprocessing.image.load_img(IMAGE_PATH, target_size=(350, 350))
img = tf.keras.preprocessing.image.img_to_array(img)
img=img/255
grad_model = tf.keras.models.Model(inputs=[model.inputs], 
                                        outputs=[model.get_layer(LAYER_NAME).output, model.output])

In [None]:
with tf.GradientTape() as tape:
    conv_outputs, predictions = grad_model(np.array([img]))
    loss = predictions[:, CLASS_INDEX]

output = conv_outputs[0]
grads = tape.gradient(loss, conv_outputs)[0]

gate_f = tf.cast(output > 0, 'float32')
gate_r = tf.cast(grads > 0, 'float32')
guided_grads = tf.cast(output > 0, 'float32') * tf.cast(grads > 0, 'float32') * grads

weights = tf.reduce_mean(guided_grads, axis=(0, 1))

cam = np.ones(output.shape[0: 2], dtype = np.float32)

for i, w in enumerate(weights):
    cam += w * output[:, :, i]

cam = cv2.resize(cam.numpy(), (350, 350))
cam = np.maximum(cam, 0)
heatmap = (cam - cam.min()) / (cam.max() - cam.min())

cam = cv2.applyColorMap(np.uint8(255*heatmap), cv2.COLORMAP_JET)
img=(img*255)
output_image = cv2.addWeighted(img.astype('uint8'), 0.5,cv2.cvtColor(cam,cv2.COLOR_BGR2RGB), 0.5, 0)
plt.imshow(output_image)
plt.grid(None)

In [None]:
plt.imshow(img.astype('uint8'))

In [None]:
def gradient_cam(cam_model,class_index, img):
    with tf.GradientTape() as tape:
        conv_outputs, predictions = grad_model(np.array([img]))
        loss = predictions[:, class_index]
    output = conv_outputs[0]
    grads = tape.gradient(loss, conv_outputs)[0]
    gate_f = tf.cast(output > 0, 'float32')
    gate_r = tf.cast(grads > 0, 'float32')
    guided_grads = tf.cast(output > 0, 'float32') * tf.cast(grads > 0, 'float32') * grads
    weights = tf.reduce_mean(guided_grads, axis=(0, 1))
    cam = np.ones(output.shape[0: 2], dtype = np.float32)
    for i, w in enumerate(weights):
        cam += w * output[:, :, i]
    cam = cv2.resize(cam.numpy(), (350, 350))
    cam = np.maximum(cam, 0)
    heatmap = (cam - cam.min()) / (cam.max() - cam.min())
    cam = cv2.applyColorMap(np.uint8(255*heatmap), cv2.COLORMAP_JET)
    img=(img*255)
    output_image = cv2.addWeighted(img.astype('uint8'), 0.5,cv2.cvtColor(cam,cv2.COLOR_BGR2RGB), 0.5, 0)
    return output_image

In [None]:
test_images, test_labels=next(iter(blind_test_ds))
raw_predictions=model.predict(test_images)

In [None]:
num_rows = 6
num_cols = 1
num_images = num_rows*num_cols
plt.figure(figsize=(2*2*num_cols, 2*num_rows))
for i in range(num_images):
  plt.subplot(num_rows, 2*num_cols, 2*i+1)
  predictions, true_label, img = raw_predictions[i], test_labels[i].numpy(), test_images[i]
  plt.grid(False)
  plt.xticks([])
  plt.yticks([])
  plt.imshow(img, cmap=plt.cm.binary)
  predicted_label = np.argmax(predictions)
  if predicted_label == true_label:
    color = 'blue'
  else:
    color = 'red'

  plt.xlabel("{} \n ({})".format(CLASS_NAMES[predicted_label],
                                CLASS_NAMES[true_label]),
                                color=color)  
  plt.subplot(num_rows, 2*num_cols, 2*i+2)
  plt.grid(False)
  plt.xticks([])
  plt.yticks([])
  img = tf.keras.preprocessing.image.img_to_array(img)
  cam=gradient_cam(grad_model,true_label, img)
  plt.imshow(cam)

plt.tight_layout(pad=1.0)


### Page prediction accuracy

In [None]:
def read_pages(test_page_dir):
    test_pages = glob.glob(test_page_dir+'/*/*.png')
    test_labels = []
    for page_path in test_pages:
        label=np.argmax(os.path.split(page_path)[0].split(os.sep)[-1]==CLASS_NAMES)
        test_labels.append(label)
    return test_pages, test_labels

In [None]:
import glob
def sample_page_patches(page_image, crop_height, crop_width, number_of_samples):
    samples=[]
    max_x = page_image.shape[0] - crop_height
    max_y = page_image.shape[1] - crop_width
    for i in range(number_of_samples):
        x = np.random.randint(0, max_x)
        y = np.random.randint(0, max_y)
        crop = page_image[x: x + crop_height, y: y + crop_width]
        samples.append(crop)
    return samples

In [None]:
from utils import *
from collections import Counter
def predict_pages_raw(test_page_images,binary_test_page_images,number_of_samples):
    test_predicts=[]
    for i in range(len(test_page_images)):
        print(test_page_images[i])
        page_image = cv2.imread(test_page_images[i],0)
        binary_page_image = cv2.imread(binary_test_page_images[i],0)
        #page_patches=sample_page_patches(page_image, IMG_HEIGHT, IMG_WIDTH, number_of_samples)
        page_patches = sample_patches_from_page_w_binary(page_image,binary_page_image, number_of_patches=number_of_samples)
        patch_predicts=[]
        for patch in page_patches:
            patch=cv2.merge((patch,patch,patch))
            patch = tf.image.convert_image_dtype(patch, tf.float32)
            patch_label=np.argmax(model.predict(np.expand_dims(patch,axis=0)))
            patch_predicts.append(patch_label)
            print(CLASS_NAMES[patch_label])
        c=Counter(patch_predicts)
        print(patch_predicts)
        print(c)
        major_label=c.most_common()[0][0]
        test_predicts.append(major_label)
        print(CLASS_NAMES[major_label])
    return test_predicts
        

In [None]:
from utils import *
from collections import Counter
def predict_pages(test_page_images,binary_test_page_images,number_of_samples):
    test_predicts=[]

    for i in range(len(test_page_images)):
        true_text_label=test_page_images[i].split('/')[4]
        true_label=tf.argmax(tf.cast((true_text_label == CLASS_NAMES),dtype=tf.uint8))
        page_image = cv2.imread(test_page_images[i],0)
        binary_page_image = cv2.imread(binary_test_page_images[i],0)
        fig,ax=plt.subplots(1,4, figsize=(30,15),dpi=150)
        ax[0].imshow(cv2.cvtColor(page_image,cv2.COLOR_BGR2RGB))
        ax[0].set_title(true_text_label,size=25)
        #page_patches=sample_page_patches(page_image, IMG_HEIGHT, IMG_WIDTH, number_of_samples)
        page_patches = sample_patches_from_page_w_binary(page_image,binary_page_image, number_of_patches=number_of_samples)
        #img = tf.io.read_file(file_path)
        patch_predicts=[]
        n=1
        for patch in page_patches:            
            patch=cv2.merge((patch,patch,patch))
            patch = tf.image.convert_image_dtype(patch, tf.float32)
            patch_label=np.argmax(model.predict(np.expand_dims(patch,axis=0)))
            patch = tf.keras.preprocessing.image.img_to_array(patch)
            cam=gradient_cam(grad_model,true_label, patch)
            patch_predicts.append(patch_label)
            predicted_text_label=CLASS_NAMES[patch_label]
            ax[n].imshow(cam)
            ax[n].set_title(predicted_text_label, size=25)
            n=n+1       
        
        plt.show()
        c=Counter(patch_predicts)
        major_label=c.most_common()[0][0]
        test_predicts.append(major_label)
        print(CLASS_NAMES[major_label])
    return test_predicts
        

In [None]:
test_page_dir = 'data/data_itay_profile/dataset_pages/test'
test_page_images, test_page_labels=read_pages(test_page_dir)

In [None]:
binary_test_page_dir = 'data/data_itay_profile/dataset_binary_pages/test'
binary_test_page_images, binary_test_page_labels=read_pages(binary_test_page_dir)

In [None]:
%%time
import time
time.sleep(1)
test_page_predicts=predict_pages(test_page_images,binary_test_page_images,3)

In [None]:
array_test_page_labels=np.array(test_page_labels)
array_test_page_predicts=np.array(test_page_predicts)
correct=(array_test_page_labels==array_test_page_predicts)
page_accuracy=correct.sum()/correct.size
page_accuracy

### Page confusion matrix

In [None]:
cm = tf.math.confusion_matrix( test_page_labels,  test_page_predicts)
plt.figure(figsize=(20,20))
sns.set(font_scale=2)
sns.heatmap(
    cm, annot=True,
    xticklabels=CLASS_NAMES,
    yticklabels=CLASS_NAMES)
plt.xlabel("Predicted")
plt.ylabel("True")

### Wrong predicted page images

In [None]:
test_page_images=np.asarray(test_page_images)
test_page_labels=np.asarray(test_page_labels)
indices_of_wrong_predicted_pages=np.nonzero(test_page_labels!= test_page_predicts)
wrong_predicted_pages=test_page_images[indices_of_wrong_predicted_pages]
wrong_predicted_labels=test_page_labels[indices_of_wrong_predicted_pages]

In [None]:
wrong_predicted_page_images=[]
for path in wrong_predicted_pages:
    page_image=cv2.imread(path,1)
    wrong_predicted_page_images.append(page_image)

In [None]:
sns.set(font_scale=1)
show_numerical_batch(wrong_predicted_page_images, wrong_predicted_labels)

In [None]:
for i, image in enumerate(wrong_predicted_page_images):  
    plt.figure(figsize=(5,7))
    plt.grid(None)
    plt.imshow(cv2.cvtColor(image,cv2.COLOR_BGR2RGB))
    plt.show()
    
    if i>5:
        break