In [1]:
import tensorflow as tf 

import tensorflow_datasets as tfds 
DATA_DIR = "./tensorflow-datasets"


In [5]:
test_ds = tfds.load('imdb_reviews', split="test", with_info=False,
                          as_supervised=True,data_dir=DATA_DIR)
for_vocab = tfds.load('imdb_reviews', split="train", with_info=False,
                          as_supervised=True,data_dir=DATA_DIR)

In [4]:
import re
import string
def custom_standardization(input_data):
    lowercase = tf.strings.lower(input_data)
    stripped_html = tf.strings.regex_replace(lowercase, '<br />', ' ')
    return tf.strings.regex_replace(stripped_html,
                                  '[%s]' % re.escape(string.punctuation),
                                  '')

def get_text_vectorizer(dataset, vocab_size=1000, sequence_length=500):
    vectorizer = tf.keras.layers.TextVectorization(standardize=custom_standardization, max_tokens=vocab_size,output_mode='int', output_sequence_length=sequence_length)
    vectorizer.adapt(dataset.map(lambda text,label: text))
    return vectorizer 

def get_bidirectional_lstm(vectorizer,kernel_regularizer=None, use_dropout=False):
    input = tf.keras.layers.Input(shape=[None],dtype=tf.string)
    vocab_len = len(vectorizer.get_vocabulary())
    x = vectorizer(input)
    x = tf.keras.layers.Embedding(input_dim=vocab_len, output_dim=64, mask_zero=True,name="embedding")(x)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64,name="lstm_cell"),name="bidirectional")(x)
    x = tf.keras.layers.Dense(64, activation='relu',kernel_regularizer=kernel_regularizer )(x)
    x = tf.keras.layers.Dense(32, activation='relu', kernel_regularizer=kernel_regularizer)(x)
    if use_dropout:
        x = tf.keras.layers.Dropout(rate=0.2)(x)
    x = tf.keras.layers.Dense(1)(x)
    return tf.keras.Model(inputs=input,outputs=x)


def train_model(model, train_ds, val_ds, loss_fn, batch_size=64, optimizer=tf.keras.optimizers.Adam(), callbacks=None, epochs=50):
    train_batched = train_ds.shuffle(1000).batch(batch_size)
    val_batched = val_ds.shuffle(1000).batch(batch_size)
    model.compile(loss=loss_fn, optimizer=optimizer,
              metrics=[tf.metrics.BinaryAccuracy(threshold=0.5)])
    history = model.fit(train_batched, epochs=epochs, validation_data=val_batched,callbacks=callbacks)
    return history

from sklearn.metrics import accuracy_score, ConfusionMatrixDisplay
import matplotlib.pyplot as plt 
def evaluate_model(model, val_ds, result_path=None):
    val_batched = val_ds.batch(64)
    predictions = (tf.nn.sigmoid(model.predict(val_batched)) > 0.5).numpy()
    truth = np.concatenate([label.numpy() for _,label in val_batched],axis=0)
    accuracy = accuracy_score(truth, predictions)
    ConfusionMatrixDisplay.from_predictions(y_pred=predictions, y_true=truth)
    if result_path:
        plt.savefig(result_path.joinpath('confusion_matrix.jpg'))
        plt.close()
    else:
        plt.show()
    return accuracy 
    
def plot_training_graphs(history, result_path=None):
    plt.plot(history.history['loss'],label="training")
    plt.plot(history.history['val_loss'],label="validation")
    plt.xlabel("epochs")
    if result_path:
        plt.savefig(result_path.joinpath('training_history.jpg'))
        plt.close()
    else:
        plt.show()
        
        
# Add attention layer to the deep learning network
import keras.backend as K
class Attention(tf.keras.layers.Layer):
    def __init__(self,**kwargs):
        super(Attention,self).__init__(**kwargs)
 
    def build(self,input_shape):
        self.W=self.add_weight(name='attention_weight', shape=(input_shape[-1],1), 
                               initializer='random_normal', trainable=True)
        self.b=self.add_weight(name='attention_bias', shape=(input_shape[1],1), 
                               initializer='zeros', trainable=True)        
        super(Attention, self).build(input_shape)
 
    def call(self,x):
        # Alignment scores. Pass them through tanh function
        e = K.tanh(K.dot(x,self.W)+self.b)
        # Remove dimension of size 1
        e = K.squeeze(e, axis=-1)   
        # Compute the weights
        alpha = K.softmax(e)
        # Reshape to tensorFlow format
        alpha = K.expand_dims(alpha, axis=-1)
        # Compute the context vector
        context = x * alpha
        context =K.sum(context, axis=1)
        return context
    
    
def get_bidirectional_lstm_attention(vectorizer,kernel_regularizer=None, use_dropout=False):
    input = tf.keras.layers.Input(shape=[None],dtype=tf.string)
    vocab_len = len(vectorizer.get_vocabulary())
    x = vectorizer(input)
    x = tf.keras.layers.Embedding(input_dim=vocab_len, output_dim=64, mask_zero=True,name="embedding")(x)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(32, return_sequences=True), name="bi_lstm_0")(x)
    # x, forward_h, forward_c, backward_h, backward_c = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(32,return_sequences=True, return_state=True), name="bi_lstm_0")(x)
    
    x = Attention()(x)
    
    # x = tf.keras.layers.Dense(64, activation='relu',kernel_regularizer=kernel_regularizer )(x)
    x = tf.keras.layers.Dense(32, activation='relu', kernel_regularizer=kernel_regularizer)(x)
    if use_dropout:
        x = tf.keras.layers.Dropout(rate=0.2)(x)
    x = tf.keras.layers.Dense(1)(x)
    return tf.keras.Model(inputs=input,outputs=x)


def get_bidirectional_gru_attention(vectorizer,kernel_regularizer=None, use_dropout=False):
    input = tf.keras.layers.Input(shape=[None],dtype=tf.string)
    vocab_len = len(vectorizer.get_vocabulary())
    x = vectorizer(input)
    x = tf.keras.layers.Embedding(input_dim=vocab_len, output_dim=64, mask_zero=True,name="embedding")(x)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.GRU(32, return_sequences=True), name="bi_lstm_0")(x)
    # x, forward_h, forward_c, backward_h, backward_c = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(32,return_sequences=True, return_state=True), name="bi_lstm_0")(x)
    
    x = Attention()(x)
    
    # x = tf.keras.layers.Dense(64, activation='relu',kernel_regularizer=kernel_regularizer )(x)
    x = tf.keras.layers.Dense(32, activation='relu', kernel_regularizer=kernel_regularizer)(x)
    if use_dropout:
        x = tf.keras.layers.Dropout(rate=0.2)(x)
    x = tf.keras.layers.Dense(1)(x)
    return tf.keras.Model(inputs=input,outputs=x)

    
    

In [6]:
best_model = "/results_gru/64_0.01_True/best_weights.index"
vectorizer = get_text_vectorizer(for_vocab)
e_model = get_bidirectional_gru_attention(vectorizer)


2024-11-09 16:49:46.118242: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)


In [11]:
best_model = "/results_gru/64_0.01_True/best_weights"
e_model.load_weights(best_model)

NotFoundError: Unsuccessful TensorSliceReader constructor: Failed to find any matching files for /results_gru/64_0.01_True/best_weights