In [None]:
#Transformer

import tensorflow as tf
from tensorflow.keras.preprocessing import sequence,pad_sequences
from tensorflow.keras.datasets import imdb
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, f1_score
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping
import time
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, GlobalAveragePooling1D, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy

# Load IMDb dataset
max_features = 20000
maxlen = 80
(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_features)
x_train = sequence.pad_sequences(x_train, maxlen=maxlen)
x_test = sequence.pad_sequences(x_test, maxlen=maxlen)

class MultiHeadSelfAttention(layers.Layer):
    def __init__(self, embed_dim, num_heads):
        super(MultiHeadSelfAttention, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        if embed_dim % num_heads != 0:
            raise ValueError(
                f"embedding dimension = {embed_dim} should be divisible by number of heads = {num_heads}"
            )
        self.projection_dim = embed_dim // num_heads
        self.query_dense = layers.Dense(embed_dim)
        self.key_dense = layers.Dense(embed_dim)
        self.value_dense = layers.Dense(embed_dim)
        self.combine_heads = layers.Dense(embed_dim)

    def attention(self, query, key, value):
        score = tf.matmul(query, key, transpose_b=True)
        dim_key = tf.cast(tf.shape(key)[-1], tf.float32)
        scaled_score = score / tf.math.sqrt(dim_key)
        weights = tf.nn.softmax(scaled_score, axis=-1)
        output = tf.matmul(weights, value)
        return output, weights

    def separate_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, inputs):
        # x.shape = [batch_size, seq_len, embedding_dim]
        batch_size = tf.shape(inputs)[0]
        query = self.query_dense(inputs)  # (batch_size, seq_len, embed_dim)
        key = self.key_dense(inputs)  # (batch_size, seq_len, embed_dim)
        value = self.value_dense(inputs)  # (batch_size, seq_len, embed_dim)
        query = self.separate_heads(
            query, batch_size
        )  # (batch_size, num_heads, seq_len, projection_dim)
        key = self.separate_heads(
            key, batch_size
        )  # (batch_size, num_heads, seq_len, projection_dim)
        value = self.separate_heads(
            value, batch_size
        )  # (batch_size, num_heads, seq_len, projection_dim)
        attention, weights = self.attention(query, key, value)
        attention = tf.transpose(
            attention, perm=[0, 2, 1, 3]
        )  # (batch_size, seq_len, num_heads, projection_dim)
        concat_attention = tf.reshape(
            attention, (batch_size, -1, self.embed_dim)
        )  # (batch_size, seq_len, embed_dim)
        output = self.combine_heads(
            concat_attention
        )  # (batch_size, seq_len, embed_dim)
        return output

class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.att = MultiHeadSelfAttention(embed_dim, num_heads)
        self.ffn = tf.keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions
    
# Define the Transformer model
class TransformerModel(tf.keras.Model):
    def __init__(self, num_classes, embed_dim=32, num_heads=2, ff_dim=32, dropout=0.1):
        super(TransformerModel, self).__init__()
        self.embedding_layer = tf.keras.layers.Embedding(input_dim=max_features, output_dim=embed_dim)
        self.transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim, rate=dropout)
        self.global_average_pooling = GlobalAveragePooling1D()
        self.dropout = Dropout(dropout)
        self.dense = Dense(num_classes, activation='softmax')

    def call(self, inputs, training):
        x = self.embedding_layer(inputs)
        x = self.transformer_block(x, training=training)
        x = self.global_average_pooling(x)
        x = self.dropout(x, training=training)
        return self.dense(x)

# Instantiate the Transformer model
num_classes = 2  # Positive or negative sentiment
transformer_model = TransformerModel(num_classes)

# Compile the model
transformer_model.compile(optimizer=Adam(), loss=SparseCategoricalCrossentropy(), metrics=['accuracy'])



def train_model(model, X_train, y_train, X_test, y_test, model_name='Transformer', dataset_name="IMDB", total_epoch=50, ea=False, eamonitor="val_loss"):
    tensorboard_callback = TensorBoard(log_dir='./../../Observation/'+model_name+'/'+model_name+dataset_name+'logs')
    # Define EarlyStopping callback
    early_stopping = EarlyStopping(monitor=eamonitor, patience=3, min_delta=0.001, restore_best_weights=True, verbose=1)

    start_time = time.time()
    if ea:
        history = model.fit(X_train, y_train, epochs=total_epoch, batch_size=64, validation_data=(X_test, y_test), callbacks=[tensorboard_callback,early_stopping])
    else:
        history = model.fit(X_train, y_train, epochs=total_epoch, batch_size=64, validation_data=(X_test, y_test), callbacks=[tensorboard_callback])
    training_time = time.time() - start_time

    predictions = model.predict(X_test)

    return model, history, predictions, training_time

def test_model(model, X_test, y_test, model_name='Transformer', dataset_name="IMDB", threshold=0.5):
    start_time = time.time()
    predictions = model.predict(X_test)
    inference_time = time.time() - start_time

    metrics = calculate_metrics(predictions, y_test, threshold)
    metrics['Training Time'] = training_time
    metrics['Inference Time'] = inference_time
    metrics['Model type'] = model_name
    metrics['dataset'] = dataset_name

    print("Metrics:")
    for metric, value in metrics.items():
        print(f"{metric}: {value}")

    metrics_list = list(metrics.values())
    pd.DataFrame([metrics_list], columns=metrics.keys()).to_csv('./../../Observation/'+model_name+'/'+model_name+dataset_name+'metrics.csv', index=False)

    return metrics

def calculate_metrics(predictions, true_labels, threshold=0.5):
    binary_predictions = (predictions > threshold).astype('int32')  # Convert probabilities to binary predictions
    true_labels = true_labels.astype('int32')  # Ensure true labels are in integer format
    
    # Calculate accuracy
    accuracy = np.mean(binary_predictions == true_labels)
    
    # Calculate other metrics
    mse = mean_squared_error(true_labels, predictions)
    mae = mean_absolute_error(true_labels, predictions)
    rmse = np.sqrt(mse)
    f1 = f1_score(true_labels, binary_predictions)

    return {
        'Accuracy': accuracy,
        'MSE': mse,
        'MAE': mae,
        'RMSE': rmse,
        'F1 Score': f1
    }


# Train the model
model, history, predictions, training_time = train_model(transformer_model, x_train, y_train, x_test, y_test, model_name='Transformer', dataset_name='IMDB', total_epoch=10, ea=True)

# Test the model
test_metrics = test_model(model, x_test, y_test, model_name='Transformer', dataset_name='IMDB', threshold=0.5)


In [1]:
#Transformer

import tensorflow as tf
from tensorflow.keras.preprocessing import sequence
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.datasets import imdb
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, f1_score
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping
import time
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, GlobalAveragePooling1D, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy

# Load IMDb dataset
max_features = 20000
maxlen = 80
(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_features)
x_train = sequence.pad_sequences(x_train, maxlen=maxlen)
x_test = sequence.pad_sequences(x_test, maxlen=maxlen)

class MultiHeadSelfAttention(layers.Layer):
    def __init__(self, embed_dim, num_heads):
        super(MultiHeadSelfAttention, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        if embed_dim % num_heads != 0:
            raise ValueError(
                f"embedding dimension = {embed_dim} should be divisible by number of heads = {num_heads}"
            )
        self.projection_dim = embed_dim // num_heads
        self.query_dense = layers.Dense(embed_dim)
        self.key_dense = layers.Dense(embed_dim)
        self.value_dense = layers.Dense(embed_dim)
        self.combine_heads = layers.Dense(embed_dim)

    def attention(self, query, key, value):
        score = tf.matmul(query, key, transpose_b=True)
        dim_key = tf.cast(tf.shape(key)[-1], tf.float32)
        scaled_score = score / tf.math.sqrt(dim_key)
        weights = tf.nn.softmax(scaled_score, axis=-1)
        output = tf.matmul(weights, value)
        return output, weights

    def separate_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, inputs):
        # x.shape = [batch_size, seq_len, embedding_dim]
        batch_size = tf.shape(inputs)[0]
        query = self.query_dense(inputs)  # (batch_size, seq_len, embed_dim)
        key = self.key_dense(inputs)  # (batch_size, seq_len, embed_dim)
        value = self.value_dense(inputs)  # (batch_size, seq_len, embed_dim)
        query = self.separate_heads(
            query, batch_size
        )  # (batch_size, num_heads, seq_len, projection_dim)
        key = self.separate_heads(
            key, batch_size
        )  # (batch_size, num_heads, seq_len, projection_dim)
        value = self.separate_heads(
            value, batch_size
        )  # (batch_size, num_heads, seq_len, projection_dim)
        attention, weights = self.attention(query, key, value)
        attention = tf.transpose(
            attention, perm=[0, 2, 1, 3]
        )  # (batch_size, seq_len, num_heads, projection_dim)
        concat_attention = tf.reshape(
            attention, (batch_size, -1, self.embed_dim)
        )  # (batch_size, seq_len, embed_dim)
        output = self.combine_heads(
            concat_attention
        )  # (batch_size, seq_len, embed_dim)
        return output

class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.att = MultiHeadSelfAttention(embed_dim, num_heads)
        self.ffn = tf.keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions
    
# Define the Transformer model
class TransformerModel(tf.keras.Model):
    def __init__(self, num_classes, embed_dim=32, num_heads=2, ff_dim=32, dropout=0.1):
        super(TransformerModel, self).__init__()
        self.embedding_layer = tf.keras.layers.Embedding(input_dim=max_features, output_dim=embed_dim)
        self.transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim, rate=dropout)
        self.global_average_pooling = GlobalAveragePooling1D()
        self.dropout = Dropout(dropout)
        self.dense = Dense(num_classes, activation='softmax')

    def call(self, inputs, training):
        x = self.embedding_layer(inputs)
        x = self.transformer_block(x, training=training)
        x = self.global_average_pooling(x)
        x = self.dropout(x, training=training)
        return self.dense(x)

# Instantiate the Transformer model
num_classes = 2  # Positive or negative sentiment
transformer_model = TransformerModel(num_classes)

# Compile the model
transformer_model.compile(optimizer=Adam(), loss=SparseCategoricalCrossentropy(), metrics=['accuracy'])



def train_model(model, X_train, y_train, X_test, y_test, model_name='Transformer', dataset_name="IMDB", total_epoch=50, ea=False, eamonitor="val_loss"):
    tensorboard_callback = TensorBoard(log_dir='./../../Observation/'+model_name+'/'+model_name+dataset_name+'logs')
    # Define EarlyStopping callback
    early_stopping = EarlyStopping(monitor=eamonitor, patience=3, min_delta=0.001, restore_best_weights=True, verbose=1)

    start_time = time.time()
    if ea:
        history = model.fit(X_train, y_train, epochs=total_epoch, batch_size=64, validation_data=(X_test, y_test), callbacks=[tensorboard_callback,early_stopping])
    else:
        history = model.fit(X_train, y_train, epochs=total_epoch, batch_size=64, validation_data=(X_test, y_test), callbacks=[tensorboard_callback])
    training_time = time.time() - start_time

    predictions = model.predict(X_test)

    return model, history, predictions, training_time

def test_model(model, X_test, y_test, model_name='Transformer', dataset_name="IMDB"):
    start_time = time.time()
    predictions = model.predict(X_test)
    inference_time = time.time() - start_time

    metrics = calculate_metrics(predictions, y_test)
    metrics['Training Time'] = training_time
    metrics['Inference Time'] = inference_time
    metrics['Model type'] = model_name
    metrics['dataset'] = dataset_name

    print("Metrics:")
    for metric, value in metrics.items():
        print(f"{metric}: {value}")

    metrics_list = list(metrics.values())
    pd.DataFrame([metrics_list], columns=metrics.keys()).to_csv('./../../Observation/'+model_name+'/'+model_name+dataset_name+'metrics.csv', index=False)

    return metrics

def calculate_metrics(predictions, true_labels):
    # Convert predictions to class labels
    predicted_labels = np.argmax(predictions, axis=1)

    # Calculate accuracy
    accuracy = np.mean(predicted_labels == true_labels)

    # Calculate other metrics
    mse = mean_squared_error(true_labels, predicted_labels)
    mae = mean_absolute_error(true_labels, predicted_labels)
    rmse = np.sqrt(mse)
    f1 = f1_score(true_labels, predicted_labels, average='weighted')

    return {
        'Accuracy': accuracy,
        'MSE': mse,
        'MAE': mae,
        'RMSE': rmse,
        'F1 Score': f1
    }

# Train the model
model, history, predictions, training_time = train_model(transformer_model, x_train, y_train, x_test, y_test, model_name='Transformer', dataset_name='IMDB', total_epoch=10, ea=True)

# Test the model
test_metrics = test_model(model, x_test, y_test, model_name='Transformer', dataset_name='IMDB')


Epoch 1/10
[1m391/391[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 12ms/step - accuracy: 0.7367 - loss: 0.5112 - val_accuracy: 0.8354 - val_loss: 0.3679
Epoch 2/10
[1m391/391[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 11ms/step - accuracy: 0.9001 - loss: 0.2451 - val_accuracy: 0.8289 - val_loss: 0.3924
Epoch 3/10
[1m391/391[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 11ms/step - accuracy: 0.9299 - loss: 0.1813 - val_accuracy: 0.8198 - val_loss: 0.4648
Epoch 4/10
[1m391/391[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 11ms/step - accuracy: 0.9548 - loss: 0.1271 - val_accuracy: 0.8130 - val_loss: 0.5447
Epoch 4: early stopping
Restoring model weights from the end of the best epoch: 1.
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2ms/step
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2ms/step
Metrics:
Accuracy: 0.8354
MSE: 0.1646
MAE: 0.1646
RMSE: 0.40570925550201586
F1 Score: 0.83521084604456
Training Tim