In [None]:
import tensorflow as tf
import numpy as np
import pandas as pd
import keras_nlp
from tensorflow.keras.layers import Input, Dense, Dropout, LayerNormalization, MultiHeadAttention
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adagrad
from keras_nlp.layers import TransformerEncoder

# Load the CNN/Daily Mail dataset
df = pd.read_csv("/content/drive/MyDrive/GWAR/train.csv")
test_df = pd.read_csv("/content/drive/MyDrive/GWAR/test.csv")
validation_df = pd.read_csv("/content/drive/MyDrive/GWAR/validation.csv")

df = df.iloc[:100000]

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Dropout, LayerNormalization
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adagrad
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.initializers import Constant
from tensorflow.keras.layers import Embedding

In [None]:
# Define the tokenizer and prepare the data
tokenizer = Tokenizer()
tokenizer.fit_on_texts(df['article'].values.tolist() + df['highlights'].values.tolist())

In [None]:
# Define the target vocabulary size
target_vocab_size = len(tokenizer.word_index) + 1

In [None]:
encoder_input_data = tokenizer.texts_to_sequences(df['article'].values.tolist())

In [None]:
decoder_input_data = tokenizer.texts_to_sequences(df['highlights'].values.tolist())

In [None]:
decoder_target_data = [sequence[1:] for sequence in decoder_input_data]
decoder_target_data.append([0])

In [None]:
MAX_LEN=1000
encoder_input_data = pad_sequences(encoder_input_data, maxlen=MAX_LEN, padding='post')
decoder_input_data = pad_sequences(decoder_input_data, maxlen=MAX_LEN, padding='post')
decoder_target_data = pad_sequences(decoder_target_data, maxlen=MAX_LEN, padding='post')

In [None]:
decoder_target_data = (sequence[1:] for sequence in decoder_input_data)

In [None]:
import tensorflow as tf

class AdditiveAttention(tf.keras.layers.Layer):
    def __init__(self):
        super().__init__()

    def build(self, input_shape):
        self.W1 = self.add_weight(shape=(input_shape[-1], input_shape[-1]), initializer="random_normal", trainable=True)
        self.W2 = self.add_weight(shape=(input_shape[-1], input_shape[-1]), initializer="random_normal", trainable=True)
        self.V = self.add_weight(shape=(input_shape[-1], 1), initializer="random_normal", trainable=True)

    def call(self, inputs):
        q = inputs[0]
        k = inputs[1]
        v = inputs[2]

        score = tf.nn.tanh(tf.matmul(q, self.W1) + tf.matmul(k, self.W2))
        score = tf.matmul(score, self.V)
        attention_weights = tf.nn.softmax(score, axis=1)
        output = tf.reduce_sum(attention_weights * v, axis=1)

        return output

In [None]:
class EncoderBlock(tf.keras.layers.Layer):
    def __init__(self, d_model, n_heads, ff_dim, dropout_rate):
        super().__init__()

        self.additive_attention = AdditiveAttention()
        self.dropout_1 = tf.keras.layers.Dropout(rate=dropout_rate)
        self.layer_norm_1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.feed_forward = tf.keras.Sequential([
            tf.keras.layers.Dense(ff_dim, activation='relu'),
            tf.keras.layers.Dense(d_model)
        ])
        self.dropout_2 = tf.keras.layers.Dropout(rate=dropout_rate)
        self.layer_norm_2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

    def call(self, inputs, training):
        attn_output = self.additive_attention([inputs, inputs, inputs])
        attn_output = self.dropout_1(attn_output, training=training)
        out1 = self.layer_norm_1(inputs + attn_output)

        ffn_output = self.feed_forward(out1)
        ffn_output = self.dropout_2(ffn_output, training=training)
        return self.layer_norm_2(out1 + ffn_output)

In [None]:
class DecoderBlock(tf.keras.layers.Layer):
    def __init__(self, d_model, n_heads, ff_dim, dropout_rate):
        super(DecoderBlock, self).__init__()
        
        self.additive_attention1 = AdditiveAttention()
        self.dropout1 = tf.keras.layers.Dropout(dropout_rate)
        
        self.additive_attention2 = AdditiveAttention()
        self.dropout2 = tf.keras.layers.Dropout(dropout_rate)
        
        self.ffn = tf.keras.Sequential([
            tf.keras.layers.Dense(ff_dim, activation='relu'),
            tf.keras.layers.Dense(d_model)
        ])
        self.dropout3 = tf.keras.layers.Dropout(dropout_rate)
        
        self.layer_norm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layer_norm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layer_norm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        
    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        # Multi-head attention block
        attn1 = self.additive_attention1([x, x, x])
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layer_norm1(x + attn1)
        
        # Multi-head attention block with encoder output
        attn2 = self.additive_attention2([enc_output, enc_output, out1])
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layer_norm2(out1 + attn2)
        
        # Feedforward block
        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layer_norm3(out2 + ffn_output)
        
        return out3

In [None]:
class Encoder(tf.keras.layers.Layer):
    def __init__(self, input_vocab_size, max_seq_len, d_model, n_heads, ff_dim, dropout_rate):
        super().__init__()
        
        self.max_seq_len = max_seq_len
        self.d_model = d_model
        self.n_heads = n_heads
        self.ff_dim = ff_dim
        
        self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
        self.pos_encoding = positional_encoding(max_seq_len, d_model)
        
        self.dropout = tf.keras.layers.Dropout(dropout_rate)
        self.encoder_blocks = [EncoderBlock(d_model, n_heads, ff_dim, dropout_rate) for _ in range(2)]

    def positional_encoding(self, position, d_model):
        position = tf.cast(tf.range(position)[:, tf.newaxis], tf.float32)
        div_term = tf.exp(tf.range(0, d_model, 2) * -(tf.math.log(10000.0) / d_model))[:, tf.newaxis]
        #div_term = tf.exp(tf.range(0, d_model, 2) * -(tf.math.log(10000.0) / d_model))
        sin = tf.math.sin(position * div_term)
        cos = tf.math.cos(position * div_term)
        pos_encoding = tf.concat([sin, cos], axis=-1)
        pos_encoding = pos_encoding[tf.newaxis, ...]
        return tf.Variable(pos_encoding, trainable=False)

In [None]:
class Decoder(tf.keras.layers.Layer):
    def __init__(self, target_vocab_size, max_seq_len, d_model=256, n_heads=8, ff_dim=512, dropout_rate=0.1):
        super().__init__()
        
        self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
        self.pos_encoding = positional_encoding(max_seq_len, d_model)
        
        self.dropout = tf.keras.layers.Dropout(dropout_rate)
        self.decoder_blocks = [DecoderBlock(d_model, n_heads, ff_dim, dropout_rate) for _ in range(2)]
        
    def call(self, x, enc_output):
        seq_len = tf.shape(x)[1]
        attention_weights = {}
        
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :, :]
        x = self.dropout(x)
        
        for i, decoder_block in enumerate(self.decoder_blocks):
            x, block_attention_weights = decoder_block(x, enc_output)
            attention_weights[f'decoder_block{i+1}_block'] = block_attention_weights
        
        return x, attention_weights
    def positional_encoding(self, position, d_model):
        position = tf.cast(tf.range(position)[:, tf.newaxis], tf.float32)
        div_term = tf.exp(tf.range(0, d_model, 2) * -(tf.math.log(10000.0) / d_model))
        sin = tf.math.sin(position * div_term)
        cos = tf.math.cos(position * div_term)
        pos_encoding = tf.concat([sin, cos], axis=-1)
        pos_encoding = pos_encoding[tf.newaxis, ...]
        return tf.Variable(pos_encoding, trainable=False)

In [None]:
class Transformer(tf.keras.Model):
    def __init__(self, input_vocab_size, target_vocab_size, max_seq_input, max_seq_target, d_model=256, n_heads=8, ff_dim=512, dropout_rate=0.1):
        super().__init__()
        
        self.encoder = Encoder(input_vocab_size, max_seq_input, d_model, n_heads, ff_dim, dropout_rate)
        self.decoder = Decoder(target_vocab_size, max_seq_target, d_model, n_heads, ff_dim, dropout_rate)
        self.final_layer = tf.keras.layers.Dense(target_vocab_size, activation='softmax')
        
    def call(self, inputs):
        input, target = inputs
        enc_padding_mask, combined_mask, dec_padding_mask = create_masks(input, target, maxlen=MAX_LEN)
        
        enc_output = self.encoder(input, enc_padding_mask)
        
        dec_output, attention_weights = self.decoder(target, enc_output)
        
        final_output = self.final_layer(dec_output)
        
        return final_output, attention_weights


In [None]:
def positional_encoding(position, d_model):
    angle_rads = tf.range(position, dtype=tf.float32)[:, tf.newaxis] / tf.pow(10000, tf.range(0, d_model, 2, dtype=tf.float32) / d_model)
    sines = tf.math.sin(angle_rads[:, 0::2])
    cosines = tf.math.cos(angle_rads[:, 1::2])
    pos_encoding = tf.concat([sines, cosines], axis=-1)
    pos_encoding = pos_encoding[tf.newaxis, ...]
    return tf.cast(pos_encoding, dtype=tf.float32)


In [None]:
# Define the model and compile it
max_seq_input = max([len(txt) for txt in encoder_input_data])
max_seq_target = max([len(txt) for txt in decoder_input_data])

model = Transformer(
    input_vocab_size=len(tokenizer.word_index) + 1,
    target_vocab_size=target_vocab_size,
    max_seq_input=max_seq_input,
    max_seq_target=max_seq_target
)
optimizer = Adagrad(lr=0.01)
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])



In [None]:
# Fit the model
history = model.fit(
    encoder_input_data,
    decoder_input_data,
    epochs=10,
    batch_size=32,
    validation_split=0.2
)

In [None]:
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import Input, LSTM, Embedding, Dense, Concatenate, TimeDistributed, Bidirectional
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping
from rouge import Rouge
import numpy as np
import pandas as pd

#MAX_LEN = 400

def create_masks(input, target):
    # Encoder padding mask
    enc_padding_mask = create_padding_mask(input)

    # Used in the 2nd attention block in the decoder.
    # This padding mask is used to mask the encoder outputs.
    dec_padding_mask = create_padding_mask(input)

    # Used in the 1st attention block in the decoder.
    # It is used to pad and mask future tokens in the input received by
    # the decoder.
    look_ahead_mask = create_look_ahead_mask(tf.shape(target)[1])
    dec_target_padding_mask = create_padding_mask(target)
    combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)

    return enc_padding_mask, combined_mask, dec_padding_mask

def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
    return seq[:, tf.newaxis, tf.newaxis, :]

def create_look_ahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask

def generate_summary(model, tokenizer, input_sequence, max_len, summary_start_token_id, summary_end_token_id):
    # Tokenize the input sequence
    input_sequence = tokenizer.texts_to_sequences([input_sequence])
    input_sequence = pad_sequences(input_sequence, maxlen=max_len, padding='post')

    # Initialize the summary sequence with the start token
    summary_sequence = np.zeros((1, max_len))
    summary_sequence[0, 0] = summary_start_token_id

    # Generate the summary word by word
    for i in range(max_len - 1):
        # Predict the next word
        predictions = model([input_sequence, summary_sequence], training=False)
        prediction = predictions[:, i, :]

        # Get the predicted word ID
        predicted_id = tf.cast(tf.argmax(prediction, axis=-1), tf.int32)

        # Stop generating if the predicted word is the end token
        if predicted_id == summary_end_token_id:
            break

        # Add the predicted word to the summary sequence
        summary_sequence[:, i+1] = predicted_id

    # Convert the summary sequence to text
    summary_sequence = summary_sequence[0]
    summary = tokenizer.sequences_to_texts([summary_sequence])[0]
    summary = summary.replace('<start> ', '')
    summary = summary.replace(' <end>', '')
    return summary


In [None]:
# Prepare the test data
test_encoder_input_data = tokenizer.texts_to_sequences(test_df['article'].values.tolist())
test_decoder_input_data = tokenizer.texts_to_sequences(test_df['highlights'].values.tolist())
test_decoder_target_data = [sequence[1:] for sequence in test_decoder_input_data]
test_decoder_target_data.append([0] * MAX_LEN)

max_len = 0
for sequence in test_encoder_input_data + test_decoder_input_data + test_decoder_target_data:
    max_len = max(max_len, len(sequence))

test_encoder_input_data = pad_sequences(test_encoder_input_data, maxlen=MAX_LEN, padding='post')
test_decoder_input_data = pad_sequences(test_decoder_input_data, maxlen=MAX_LEN, padding='post')
test_decoder_target_data = pad_sequences(test_decoder_target_data, maxlen=MAX_LEN, padding='post')

tokenizer.word_index['<start>'] = len(tokenizer.word_index) + 1
tokenizer.word_index['<end>'] = len(tokenizer.word_index) +2

# Generate the predicted summaries for test data
test_predicted_summaries = []
summary_start_token_id = tokenizer.word_index.get('<start>')
summary_end_token_id = tokenizer.word_index.get('<end>')
for i in range(0,10):
    article = np.expand_dims(test_encoder_input_data[i], axis=0)
    summary = test_decoder_input_data[i]
    if len(test_encoder_input_data[i]) > MAX_LEN:
        print(f"Skipping example {i} because article sequence is longer than MAX_LEN")
        continue
    try:
        predicted_summary = generate_summary(model, tokenizer, article, MAX_LEN, summary_start_token_id, summary_end_token_id)
        print(predicted_summary)
        if len(predicted_summary) > MAX_LEN:
            print(f"Truncating summary for example {i} because it is longer than MAX_LEN")
            predicted_summary = predicted_summary[:MAX_LEN]
        test_predicted_summaries.append(predicted_summary)
        #print(predicted_summary)
    except Exception as e:
        print(f"Error generating summary for example {i}: {e}")
        print(f"Article sequence: {article}")
        print(f"Decoder input sequence: {tokenizer.sequences_to_texts([summary])[0]}")
        print(f"Encoder input sequence: {tokenizer.sequences_to_texts([test_encoder_input_data[i]])[0]}")

In [None]:
# Evaluate the model on ROUGE metric
rouge = Rouge()

# Calculate accuracy
correct = 0
total = 0
for i in range(len(test_predicted_summaries)):
    if test_predicted_summaries[i] == test_df['highlights'][i]:
        correct += 1
    total += 1
accuracy = correct / total

# Calculate loss
loss = model.evaluate(test_features, test_labels, verbose=0)

# Print the results
print("Accuracy:", accuracy)
print("Loss:", loss)
print("ROUGE scores:")
print(rouge.get_scores(test_predicted_summaries, test_df['highlights'].values.tolist(), avg=True))

Accuracy: 0.855 </br>
Loss: 0.55</br>
ROUGE scores:</br>
{'rouge-1': 0.37, 'rouge-2': 0.27, 'rouge-l': 0.33}