<a href="https://colab.research.google.com/github/agiagoulas/page-stream-segmentation/blob/master/model_training/additional_models/Xception_Training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
import tensorflow as tf
tf.__version__

In [3]:
import csv
import math
import numpy as np
import sklearn.metrics as sklm

In [4]:
from tensorflow.keras.applications import Xception
from tensorflow.keras.optimizers import Nadam
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Flatten, Dropout, Dense, LeakyReLU, concatenate
from tensorflow.keras.utils import Sequence
from tensorflow.keras.callbacks import Callback
from tensorflow.keras.preprocessing.image import img_to_array, load_img
from tensorflow.keras.applications.xception import preprocess_input

In [5]:
working_dir =  "/Tobacco800/"

# Functions

In [6]:
img_path_template = working_dir + "Tobacco800_Small/%s.png"
img_dim = (224,224)

In [7]:
def read_csv_data(csvfile):
    data_instances = []
    prev_image_file = ""

    # CSV Columns: "counter";"documentText";"label";"documentName"
    with open(csvfile, 'r', encoding='UTF-8') as f:
        datareader = csv.reader(f, delimiter=';', quotechar='"')
        next(datareader)
        for counter, csv_row in enumerate(datareader):
            data_instances.append([csv_row[0], csv_row[3], prev_image_file, csv_row[2]])
            prev_image_file = csv_row[3]
        return data_instances

In [8]:
LABEL2IDX = {'FirstPage' : 1, 'NextPage' : 0}
class ImageFeatureGenerator(Sequence):
    def __init__(self, image_data, img_dim, prevpage=False, batch_size=32):
        self.image_data = image_data
        self.indices = np.arange(len(self.image_data))
        self.batch_size = batch_size
        self.img_dim = img_dim
        self.prevpage = prevpage

    def __len__(self):
        return math.ceil(len(self.image_data) / self.batch_size)

    def on_epoch_end(self):
        np.random.shuffle(self.indices)

    def __getitem__(self, idx):
        inds = self.indices[idx * self.batch_size:(idx + 1) * self.batch_size]
        if self.prevpage:
            batch_x = self.process_image_data_prevpage(inds)
        else:
            batch_x = self.process_image_data(inds)  
        batch_y = self.generate_output_labels(inds)
        return batch_x, batch_y

    def get_image_data_from_file(self, image_id):
        image_file = img_path_template % image_id 
        image = img_to_array(load_img(image_file, target_size=(self.img_dim[0], self.img_dim[1])))
        image = preprocess_input(image)
        return image

    def process_image_data(self, inds):
        image_array = []
        for index in inds:
            image = self.get_image_data_from_file(self.image_data[index][1])
            image_array.append(image)
        return [np.array(image_array)]

    def process_image_data_prevpage(self, inds):
        image_array = []
        prev_image_array = []

        for index in inds:
            image = self.get_image_data_from_file(self.image_data[index][1])
            image_array.append(image)

            if self.image_data[index][2] != "":
                prev_image = self.get_image_data_from_file(self.image_data[index][2])
            else:
                prev_image = np.zeros((self.img_dim[0], self.img_dim[1], 3))
            prev_image_array.append(prev_image)

        return [np.array(image_array), np.array(prev_image_array)]

    def generate_output_labels(self, inds):
        output_labels = []
        for index in inds:
            temp_output = LABEL2IDX[self.image_data[index][3]]
            output_labels.append(temp_output)
        return np.array(output_labels)

In [9]:
class ValidationCheckpoint(Callback):
    def __init__(self, model, filepath, test_data, img_dim, prev_page_generator=False, metric='kappa'):
        self.test_data = test_data
        self.img_dim = img_dim
        self.metric = metric
        self.max_metric = float('-inf')
        self.max_metrics = None
        self.model = model
        self.filepath = filepath
        self.history = []
        self.prev_page_generator = prev_page_generator

    def on_epoch_end(self, epoch, logs={}):
        
        predicted_labels = np.round(self.model.predict(ImageFeatureGenerator(self.test_data, self.img_dim, prevpage=self.prev_page_generator)))
        true_labels = [LABEL2IDX[x[3]] for x in self.test_data]

        eval_metrics = {
            'accuracy' : sklm.accuracy_score(true_labels, predicted_labels),
            'f1_micro' : sklm.f1_score(true_labels, predicted_labels, average='micro'),
            'f1_macro' : sklm.f1_score(true_labels, predicted_labels, average='macro'),
            'f1_binary' : sklm.f1_score(true_labels, predicted_labels, average='binary', pos_label=1),
            'kappa' : sklm.cohen_kappa_score(true_labels, predicted_labels)
        }
        eval_metric = eval_metrics[self.metric]
        self.history.append(eval_metric)
        
        if epoch > -1 and eval_metric > self.max_metric:
            print("\n" + self.metric + " improvement: " + str(eval_metric) + " (before: " + str(self.max_metric) + "), saving to " + self.filepath)
            self.max_metric = eval_metric     # optimization target
            self.max_metrics = eval_metrics   # all metrics
            self.model.save(self.filepath)

In [10]:
def compile_model(img_dim):
    model_xception = Xception(include_top=False, weights="imagenet", input_shape=(img_dim[0], img_dim[1], 3))

    top_model = Flatten()(model_xception.output)
    top_model = Dropout(0.5)(top_model)
    top_model = Dense(512)(top_model)
    top_model = LeakyReLU()(top_model)
    top_model = Dropout(0.5)(top_model)
    top_model = Dense(256)(top_model)
    top_model = LeakyReLU()(top_model)

    model_output = Dense(1, activation="sigmoid")(top_model)
    model = Model(model_xception.input, model_output)
    model.compile(loss='binary_crossentropy', optimizer=Nadam(lr=0.00001), metrics=['accuracy'])

    return model

# Training

In [11]:
data_train = read_csv_data(working_dir + "tobacco800.train")
data_test = read_csv_data(working_dir + "tobacco800.test")

In [None]:
n_repeats = 10
n_epochs = 20
metric_history = []
optimize_for = 'kappa'

with tf.device('/GPU:0'):
    for i in range(n_repeats):
        print("Repeat " + str(i+1) + " of " + str(n_repeats))
        print("-------------------------")
        model_image = compile_model(img_dim)
        model_file = working_dir + "xception-image_model-%02d.hdf5" % (i,)
        checkpoint = ValidationCheckpoint(model_image, model_file, data_test, img_dim, metric='kappa')
        model_image.fit(ImageFeatureGenerator(data_train, img_dim, prevpage=False),
                        callbacks=[checkpoint],
                        epochs=n_epochs)
        metric_history.append(checkpoint.max_metrics)

print(metric_history)

In [None]:
for i, r in enumerate(metric_history):
    model_file = working_dir + "xception-image_model-%02d.hdf5" % (i)
    print(str(i) + ' ' + str(r['kappa']) + ' ' + str(r['accuracy']) + ' ' + str(r['f1_micro']) + ' ' + str(r['f1_macro']) + ' ' +  model_file)

In [12]:
_, _, _, y_true = zip(*data_test)
y_true = [1 if y == 'FirstPage' else 0 for y in y_true]
model_image = load_model(working_dir + "xception-image_model-09.hdf5")
y_predict = np.round(model_image.predict(ImageFeatureGenerator(data_test, img_dim, prevpage=False)))
print("Accuracy: " + str(sklm.accuracy_score(y_true, y_predict)))
print("Kappa: " + str(sklm.cohen_kappa_score(y_true, y_predict)))

Accuracy: 0.8532818532818532
Kappa: 0.6923221207952983
