In [2]:
import keras
import tensorflow as tf
import numpy as np
import pandas as pd
from tensorflow.keras import layers as L
from tensorflow.keras import models as M
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
class BahdanauAttention(L.Layer):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = L.Dense(units)
        self.W2 = L.Dense(units)
        self.V = L.Dense(1)

    def call(self, query, values):
        # query - shape == (batch_size, hidden_size) -> decoder hidden state at the current timestep
        # values - shape == (batch_size, max_len/timesteps, hidden_size) -> encoder outputs (all timesteps)
        # here, hidden_size = units, max_len = timesteps
        query = tf.expand_dims(query, axis = 1)                # (batch_size, 1, hidden_size)
        score = self.V(tf.nn.tanh(self.W1(query) + self.W2(values)))  # (batch_size, timesteps, 1)
        attention_weight = tf.nn.softmax(score, axis = 1)      # (batch_size, timesteps, 1)
        context = attention_weight*values                      # (batch_size, timesteps, hidden_size)
        context_vector = tf.reduce_sum(context, axis = 1)      # (batch_size, hidden_size)
        return context_vector, attention_weight
class Encoder(L.Layer):
    def __init__(self, in_vocab, embedding_dim, hidden_units):
        super(Encoder, self).__init__()
        self.embed = L.Embedding(in_vocab, embedding_dim)       # (batch_size, seq_length) -> (batch_size, seq_length, embedding_dim)
        self.lstm = L.LSTM(hidden_units, return_sequences=True,return_state = True)   # (batch_size, seq_length, embedding_dim) -> (batch_size, hidden_units)

    def call(self, inputs):
        # input : (batch_size, seq_length)
        x = self.embed(inputs)                               # (batch_size, seq_length, embeddign_dim)
        enc_out, hidden_state, cell_state = self.lstm(x)     # O/P (batch_size, seq_len, hidden_size)
        return enc_out, hidden_state, cell_state
class Decoder(L.Layer):
    def __init__(self, out_vocab, embedding_dim, hidden_units):
        super(Decoder, self).__init__()
        self.embed = L.Embedding(out_vocab, embedding_dim)     # (batch_size, seq_length) -> (batch_size, seq_length, embedding_dim)
        self.lstm = L.LSTM(hidden_units, return_sequences = True, return_state = True)  # (batch_size, seq_length, embedding_dim) -> (batch_size, hidden_units)
        self.dense = L.Dense(out_vocab, activation='softmax')  # (batch_size, seq_length, hidden_units) -> (batch_size, seq_length, out_vocab)
        self.attention = BahdanauAttention(64)
    
    def call(self, inputs, hidden_state, cell_state, enc_output):
        # input : (batch_size, 1)
        x = self.embed(inputs)                                 # (batch_size, 1, embedding_dim)
        states = [hidden_state, cell_state] 
        context, attention_weights = self.attention(query = hidden_state, values = enc_output)
        dec_out, hidden_state, cell_state = self.lstm(x, initial_state=states)  # O/P : (batch_size, 1, hidden_units)
        dec_out = tf.squeeze(dec_out, axis=1)                  # (batch_size, hidden_units)
        # context = tf.expand_dims(context, axis=1)              # (batch_size, 1, embedding_dim)
        inputs = tf.concat([context, dec_out], axis=-1)        # (batch_size, 1, embedding_dim + enc_units)
        out = self.dense(inputs)                               # (batch_size, 1, out_vocab)
        return out, hidden_state, cell_state 

@keras.saving.register_keras_serializable(package="Custom", name="Seq2Seq")
class Seq2Seq(M.Model):

    def __init__(self, in_vocab, out_vocab, embedding_dim, hidden_units, end_token):
        super(Seq2Seq, self).__init__()

        self.in_vocab = in_vocab
        self.out_vocab = out_vocab
        self.embedding_dim = embedding_dim
        self.hidden_units = hidden_units
        
        self.encoder = Encoder(in_vocab, embedding_dim, hidden_units)
        self.decoder = Decoder(out_vocab, embedding_dim, hidden_units)
        self.end_token = end_token
    
    @tf.function
    def train_step(self, inputs):
        (enc_inputs, dec_inputs), targets = inputs         # (batch_size, seq_length)
        
        with tf.GradientTape() as tape:
            enc_out, hidden_state, cell_state = self.encoder(enc_inputs)           # (batch_size, hidden_units)
            seq_len = dec_inputs.shape[1]
            dec_out = tf.TensorArray(tf.float32, seq_len)  # (batch_size, seq_len, target_vocab_size)
            mask = tf.TensorArray(tf.bool, size=seq_len)
            for timestep in tf.range(seq_len):
                timestep_input = dec_inputs[:, timestep:timestep+1]       # (batch_size, 1)
                timestep_output, hidden_state, cell_state = self.decoder(timestep_input, hidden_state, cell_state, enc_out)   # timestep_output -> # (batch_size, 1, hidden_units)
                dec_out = dec_out.write(timestep, timestep_output)
                is_end = tf.equal(targets[:, timestep], self.end_token)  # Creating mask based on whether end token is encountered
                mask = mask.write(timestep, tf.logical_not(is_end))
            dec_out = tf.transpose(dec_out.stack(), [1, 0, 2])
            sequence_mask = tf.transpose(mask.stack(), [1, 0])
            loss = self.compiled_loss(targets, dec_out, sample_weight=tf.cast(sequence_mask, tf.float32))   
        variables = self.trainable_variables
        gradients = tape.gradient(loss, variables)
        self.optimizer.apply_gradients(zip(gradients, variables))
        self.compiled_metrics.update_state(targets, dec_out) # Update metrics
        return {m.name : m.result() for m in self.metrics}

    @tf.function
    def call(self, inputs, training=False):
        enc_inputs, dec_inputs = inputs                       
        enc_out, hidden_state, cell_state = self.encoder(enc_inputs)   # (batch_size, hidden_units)
        seq_len = tf.shape(dec_inputs)[1]
        dec_out = tf.TensorArray(tf.float32, seq_len)  # (batch_size, seq_len, target_vocab_size)
        for timestep in tf.range(seq_len):
            timestep_input = dec_inputs[:, timestep:timestep+1]       # (batch_size, 1)
            timestep_output, hidden_state, cell_state = self.decoder(timestep_input, hidden_state, cell_state, enc_out)   # timestep_output -> # (batch_size, 1, hidden_units)
            dec_out = dec_out.write(timestep, timestep_output)
        return tf.transpose(dec_out.stack(), [1, 0, 2])
    

    def generate(self, enc_inputs, max_len, start, end):
        enc_out, hidden_state, cell_state = self.encoder(enc_inputs)
        dec_in = tf.expand_dims([start], 0)              # To get from int -> (1,1) tensor
        result = []
        for _ in range(max_len): 
            prediction_logits, hidden_state, cell_state = self.decoder(dec_in, hidden_state, cell_state, enc_out) # (1, 1, hidden_units)
            prediction = tf.argmax(prediction_logits, axis=-1)        # return token ID (int)
            if prediction == end:
                break
            result.append(prediction.numpy())
            dec_in = tf.expand_dims(prediction, 0) 
        return result
    def get_config(self):
        config = super(Seq2Seq, self).get_config()
        config.update({
            'in_vocab': self.in_vocab,
            'out_vocab': self.out_vocab,
            'embedding_dim': self.embedding_dim,
            'hidden_units': self.hidden_units,
            'end_token': self.end_token  # 🛠️ include this!
        })
        return config

    @classmethod
    def from_config(cls, config):
        end_token = config.get('end_token', 0)  # 🛠️ set a default or handle gracefully
        return cls(
            in_vocab=config['in_vocab'],
            out_vocab=config['out_vocab'],
            embedding_dim=config['embedding_dim'],
            hidden_units=config['hidden_units'],
            end_token=end_token
        )

In [9]:
import os
import zipfile
import tempfile
import shutil
import pickle
import numpy as np
from keras.models import load_model
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Resolve absolute path of the .keras model archive
BASE_DIR = os.getcwd()
keras_archive = os.path.join(BASE_DIR, "Attention_Model_(teacher_forcing).keras")

# Manually extract the .keras archive to a temporary directory to avoid internal cleanup issues
tmp_dir = tempfile.mkdtemp()
with zipfile.ZipFile(keras_archive, 'r') as zip_ref:
    zip_ref.extractall(tmp_dir)

try:
    # Load the extracted SavedModel directory
    model = load_model(tmp_dir, custom_objects={"Seq2Seq": Seq2Seq})
finally:
    # Ensure the temporary directory is removed after loading
    shutil.rmtree(tmp_dir)

# Load tokenizers
e_tk_path = os.path.join(BASE_DIR, "e_tk.pkl")
d_tk_path = os.path.join(BASE_DIR, "d_tk.pkl")
with open(e_tk_path, "rb") as f:
    e_tk = pickle.load(f)
with open(d_tk_path, "rb") as f:
    d_tk = pickle.load(f)

# Load metadata
metadata_path = os.path.join(BASE_DIR, "metadata.pkl")
with open(metadata_path, "rb") as f:
    metadata = pickle.load(f)

word_dict = metadata["word_dict"]
start_id = metadata["start_id"]
end_id = metadata["end_id"]
input_seq_len = metadata["input_seq_len"]
output_seq_len = metadata["output_seq_len"]

def generate_summary(text):
    # Preprocess input text to sequence
    seq = e_tk.texts_to_sequences([text])
    seq = pad_sequences(seq, maxlen=input_seq_len, padding='post')

    # Generate summary tokens
    model_output = model.generate(seq, output_seq_len, start_id, end_id)

    # Convert token IDs back to words
    output_words = []
    for token_id in model_output:
        # Handle potential array-like tokens
        token_id = int(token_id[0]) if hasattr(token_id, '__len__') else int(token_id)
        if token_id == end_id:
            break
        word = word_dict.get(token_id, '')
        if word:
            output_words.append(word)
    return ' '.join(output_words)

import pandas as pd
import random

# Load your dataset
df = pd.read_csv(r"C:\Users\mitta\OneDrive - iiit-b\Documents\NLP_Project\Backend\summariser-api\filtered_train.csv")

# Pick a random article
random_row = df.sample(1).iloc[0]
article = random_row['article']  # Replace with the correct column name if it's different

# Display the original article (optional)
print("Original Article:\n", article, "\n")

# Generate summary
summary = generate_summary(article)

# Display the summary
print("Generated Summary:\n", summary)


Original Article:
 (CNN) -- A typhoon slinging fierce winds moved north Saturday toward the Japanese island of Okinawa, on a track to hit the Korean Peninsula, where dozens of people were killed by a big storm last month. Tropical cyclone Sanba had winds of 232 kilometers per hour (144 mph), said CNNI Weather Anchor Jenny Harrison. "One expects and assumes that people are beginning to already take serious precautions as to the arrival of this very strong typhoon," she said. She predicted that storm surge could prove to be a problem for islanders. "It's a large storm and it's going to have a fairly wide-reaching effect," she said. "Okinawa is pretty much in the path of this storm." The storm had been, "for a very short time," classified as a "super typhoon," with winds of more than 241 mph (150 mph), she said. Typhoon tourism: One week in North Korea . Sanba is expected to approach Okinawa late Saturday or early Sunday local time before trudging on toward South Korea, according to proje