In [None]:
# Imports 
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_text as text
from keras import backend as K
import matplotlib.pyplot as plt 
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.callbacks import LearningRateScheduler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Activation, Dense 
from tensorflow.keras.models import clone_model

In [None]:
# Import data
data = pd.read_csv("../../../data/side_information.csv",encoding='unicode_escape')
checkpoint_path = "C:/Users/jack-/Documents/University/Project/src/deep_learning/bert_tests/checkpoints"
feature_names = ['Sentence',
                 'Length in Words', 'Length in Characters', 'F-score', 'I-score',
                 'Lexical Density','FK Reading Ease', 'FOG Scale', 'SMOG Index', 'ARI',
                 'CL Index', 'LW Formula', 'DC Score', 'Readability Consensus',
                 'Spache Formula']

samples = data[feature_names]
labels = data["Formality"]
train_samples, test_samples, train_labels,test_labels = train_test_split(samples, labels, test_size=0.2,random_state=5)

bert_train_samples = np.array(train_samples["Sentence"])
bert_test_samples = np.array(test_samples["Sentence"])
side_train_samples = np.array(train_samples[feature_names[1:]])
side_test_samples = np.array(test_samples[feature_names[1:]])

train_samples = np.array(train_samples)
test_samples = np.array(test_samples)
train_labels = np.array(train_labels)
test_labels = np.array(test_labels)

In [None]:
# Model setup

# Attention layer
class peel_the_layer(tf.keras.layers.Layer): 

    def __init__(self,units=1):    
        ##Nothing special to be done here
        super(peel_the_layer, self).__init__()
        
    def build(self, input_shape):
        ##Define the shape of the weights and bias in this layer
        ##This is a 1 unit layer. 
        units=1
        ##last index of the input_shape is the number of dimensions of the prev
        ##RNN layer. last but 1 index is the num of timesteps
        self.w=self.add_weight(name="att_weights", shape=(input_shape[-1], units), initializer="normal") #name property is useful for avoiding RuntimeError: Unable to create link.
        self.b=self.add_weight(name="att_bias", shape=(input_shape[-2], units), initializer="zeros")
        super(peel_the_layer,self).build(input_shape)

    def call(self, x):
        ##x is the input tensor..each word that needs to be attended to
        ##Below is the main processing done during training
        ##K is the Keras Backend import
        e = K.tanh(K.dot(x,self.w)+self.b)
        a = K.softmax(e, axis=1)
        output = x*a

        ##return the ouputs. 'a' is the set of attention weights
        ##the second variable is the 'attention adjusted o/p state' or context
        return a, K.sum(output, axis=1)


bert_model_name = 'small_bert/bert_en_uncased_L-4_H-512_A-8' 



tfhub_handle_encoder = 'https://tfhub.dev/tensorflow/bert_en_cased_L-12_H-768_A-12/4'
tfhub_handle_preprocess = 'https://tfhub.dev/tensorflow/bert_en_cased_preprocess/3'
bert_preprocess_model = hub.KerasLayer(tfhub_handle_preprocess)
bert_model = hub.KerasLayer(tfhub_handle_encoder)

In [None]:
normaliser = tf.keras.layers.Normalization()
normaliser.adapt(side_train_samples)

In [None]:
# -- NORMALISED SIDE INFORMATION MODEL --

# Bert model
text_input = tf.keras.layers.Input(shape=(), dtype=tf.string, name='text')
preprocessing_layer = hub.KerasLayer(tfhub_handle_preprocess, name='preprocessing')
encoder_inputs = preprocessing_layer(text_input)
encoder = hub.KerasLayer(tfhub_handle_encoder, trainable=True, name='BERT_encoder')
outputs = encoder(encoder_inputs)
net = outputs['pooled_output']
reshaped = tf.reshape(net,[-1, 768, 1])
lstm = tf.keras.layers.LSTM(512,return_sequences=True)(reshaped)
normalised_bert = tf.keras.Model(text_input, lstm)

# Side information model
side_input = tf.keras.layers.Input(shape=(14))
normalised = normaliser(side_input)
reshaped = tf.reshape(normalised,[-1, 1, 14])
lstm_1 = tf.keras.layers.LSTM(512,return_sequences=True)(reshaped)
lstm_2 = tf.keras.layers.LSTM(512,return_sequences=True)(lstm_1)
normalised_side = tf.keras.Model(side_input, lstm_2)

# Combine models and predict
combined = tf.keras.layers.concatenate([normalised_bert.output, normalised_side.output],axis=1)
a, context = peel_the_layer()(combined)
dense = tf.keras.layers.Dense(1)(context)
normalised_model = tf.keras.Model(inputs=[normalised_bert.input, normalised_side.input], outputs=dense)

In [None]:
# -- PRETRAINED SIDE MODEL -- 

# Side information model
side_input = tf.keras.layers.Input(shape=(14),name="Side Information")
reshaped = tf.reshape(side_input,[-1, 1, 14])
lstm_1 = tf.keras.layers.LSTM(512,return_sequences=True)(reshaped)
lstm_2 = tf.keras.layers.LSTM(512,return_sequences=True)(lstm_1)
pretrained_side = tf.keras.Model(side_input, lstm_2)

# Pre-train side model
callback = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=3,min_delta=0.01)
pretrained_side.compile(optimizer=Adam(learning_rate=0.0001),loss='mean_squared_error',metrics=[tf.keras.losses.MeanAbsoluteError(),tf.keras.losses.MeanAbsolutePercentageError()])
pretrained_side.fit(x=side_train_samples,y=train_labels,batch_size=5,epochs=100,verbose=0,callbacks=[callback])


# Bert using pretrained side model
text_input = tf.keras.layers.Input(shape=(), dtype=tf.string, name='text')
side_input = tf.keras.layers.Input(shape=(14),)
preprocessing_layer = hub.KerasLayer(tfhub_handle_preprocess, name='preprocessing')
encoder_inputs = preprocessing_layer(text_input)
encoder = hub.KerasLayer(tfhub_handle_encoder, trainable=True, name='BERT_encoder')
outputs = encoder(encoder_inputs)
net = outputs['pooled_output']
reshaped = tf.reshape(net,[-1, 768, 1])
lstm = tf.keras.layers.LSTM(512,return_sequences=True)(reshaped)
a, context = peel_the_layer()(lstm)

trained_side_input = pretrained_side(side_input)
trained_side_input = tf.reshape(trained_side_input,[-1, 512])

concat = tf.keras.layers.concatenate([context, trained_side_input]) 


dense = tf.keras.layers.Dense(1)(concat)

pretrained_model = tf.keras.Model([text_input,side_input],dense)

In [None]:
# -- AVERAGE LAYER INSTEAD OF CONCATENATION --

# Bert model
text_input = tf.keras.layers.Input(shape=(), dtype=tf.string)
preprocessing_layer = hub.KerasLayer(tfhub_handle_preprocess)
encoder_inputs = preprocessing_layer(text_input)
encoder = hub.KerasLayer(tfhub_handle_encoder, trainable=True)
outputs = encoder(encoder_inputs)
net = outputs['pooled_output']
reshaped = tf.reshape(net,[-1, 768, 1])
lstm = tf.keras.layers.LSTM(512,return_sequences=True)(reshaped)
average_bert = tf.keras.Model(text_input, lstm)

# Side information model
side_input = tf.keras.layers.Input(shape=(14))
reshaped = tf.reshape(side_input,[-1, 1, 14])
lstm_1 = tf.keras.layers.LSTM(512,return_sequences=True)(reshaped)
lstm_2 = tf.keras.layers.LSTM(512,return_sequences=True)(lstm_1)
average_side = tf.keras.Model(side_input, lstm_2)

# Combine models and predict
combined = tf.keras.layers.average([average_bert.output, average_side.output])
a, context = peel_the_layer()(combined)
dense = tf.keras.layers.Dense(1)(context)
average_model = tf.keras.Model(inputs=[average_bert.input, average_side.input], outputs=dense)

In [None]:
# -- NORMALISED TRAINING AND TESTING --
normalised_results = pd.DataFrame()
callback = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=1,min_delta=0.01)

normalised_model.save_weights('normalised.h5')
normalised_model.compile(optimizer=Adam(learning_rate=0.0001),loss='mean_squared_error',metrics=[tf.keras.losses.MeanAbsoluteError(),tf.keras.losses.MeanAbsolutePercentageError()])
normalised_model.fit(x=[bert_train_samples,side_train_samples],y=train_labels,batch_size=32,epochs=8,verbose=2,callbacks=[callback])
scores = normalised_model.evaluate(x=[bert_test_samples,side_test_samples],y=test_labels)
normalised_results["E8 B32"] = scores
print("Completed 8 Epochs")
print(scores)

normalised_model.load_weights("normalised.h5")
normalised_model = tf.keras.models.clone_model(normalised_model)
normalised_model.compile(optimizer=Adam(learning_rate=0.0001),loss='mean_squared_error',metrics=[tf.keras.losses.MeanAbsoluteError(),tf.keras.losses.MeanAbsolutePercentageError()])
normalised_model.fit(x=[bert_train_samples,side_train_samples],y=train_labels,batch_size=32,epochs=20,verbose=2,callbacks=[callback])
scores = normalised_model.evaluate(x=[bert_test_samples,side_test_samples],y=test_labels)
normalised_results["E8 B32"] = scores
print("Completed 20 Epochs")
print(scores)

normalised_results.T

In [None]:
# -- SAVE NORMALISED RESULTS --
normalised_results = normalised_results.T
normalised_results.to_csv("/lstm_side_information_tests/normalised.csv")

In [None]:
# --PRETRAINED TRAINING AND TESTING --
pretrained_results = pd.DataFrame()
callback = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=1,min_delta=0.01)

pretrained_model.save_weights('pretrained.h5')
pretrained_model.compile(optimizer=Adam(learning_rate=0.0001),loss='mean_squared_error',metrics=[tf.keras.losses.MeanAbsoluteError(),tf.keras.losses.MeanAbsolutePercentageError()])
pretrained_model.fit(x=[bert_train_samples,side_train_samples],y=train_labels,batch_size=32,epochs=8,verbose=2,callbacks=[callback])
scores = pretrained_model.evaluate(x=[bert_test_samples,side_test_samples],y=test_labels)
pretrained_results["E8 B32"] = scores
print("Completed 8 Epochs")
print(scores)

pretrained_model.load_weights('pretrained.h5')
pretrained_model.compile(optimizer=Adam(learning_rate=0.0001),loss='mean_squared_error',metrics=[tf.keras.losses.MeanAbsoluteError(),tf.keras.losses.MeanAbsolutePercentageError()])
pretrained_model.fit(x=[bert_train_samples,side_train_samples],y=train_labels,batch_size=32,epochs=20,verbose=2,callbacks=[callback])
scores = pretrained_model.evaluate(x=[bert_test_samples,side_test_samples],y=test_labels)
pretrained_results["E8 B32"] = scores
print("Completed 20 Epochs")
print(scores)

pretrained_results.T

In [None]:
# -- SAVE PRETRAINED RESULTS --
pretrained_results = pretrained_results.T
pretrained_results.to_csv("/lstm_side_information_tests/pretrained.csv")

In [None]:
# --AVERAGE LAYER TRAINING AND TESTING --
average_results = pd.DataFrame()
callback = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=1,min_delta=0.01)


average_model.save_weights('average.h5')
average_model.compile(optimizer=Adam(learning_rate=0.0001),loss='mean_squared_error',metrics=[tf.keras.losses.MeanAbsoluteError(),tf.keras.losses.MeanAbsolutePercentageError()])
average_model.fit(x=[bert_train_samples,side_train_samples],y=train_labels,batch_size=32,epochs=8,verbose=2,callbacks=[callback])
scores = average_model.evaluate(x=[bert_test_samples,side_test_samples],y=test_labels)
average_results["E8 B32"] = scores
print("Completed 8 Epochs")
print(scores)

average_model.load_weights('average.h5')
average_model.compile(optimizer=Adam(learning_rate=0.0001),loss='mean_squared_error',metrics=[tf.keras.losses.MeanAbsoluteError(),tf.keras.losses.MeanAbsolutePercentageError()])
average_model.fit(x=[bert_train_samples,side_train_samples],y=train_labels,batch_size=32,epochs=20,verbose=2,callbacks=[callback])
scores = average_model.evaluate(x=[bert_test_samples,side_test_samples],y=test_labels)
average_results["E8 B32"] = scores
print("Completed 20 Epochs")
print(scores)

average_results.T

In [None]:
# -- SAVE AVERAGE RESULTS --
average_results = average_results.T
average_results.to_csv("/lstm_side_information_tests/average.csv")