In [1]:
import torch
from transformers import GPT2LMHeadModel, GPT2Tokenizer
import pandas as pd
from rank_bm25 import BM25Okapi
import pickle
from gtts import gTTS
from playsound import playsound
import time
import warnings
from sklearn.feature_extraction.text import CountVectorizer

# Suppress the specific FutureWarning from transformers
warnings.filterwarnings(
    "ignore",
    message=(
        "`clean_up_tokenization_spaces` was not set. It will be set to `True` by default."
    ),
    category=FutureWarning,
)

# Helper function for progress updates
def update_checkpoint(message):
    print(f"[{time.strftime('%H:%M:%S')}] {message}")

def build_or_load_bm25(dataset_path, cache_path="bm25_cache.pkl"):
    try:
        # Try loading cached BM25 model and tokenized titles
        with open(cache_path, "rb") as f:
            bm25, df = pickle.load(f)
        update_checkpoint("Loaded BM25 model from cache.")
    except FileNotFoundError:
        # If no cache is found, build BM25 from scratch and save it
        update_checkpoint("Building BM25 model from scratch...")
        df = pd.read_csv(dataset_path, low_memory=False)
        df['Title'] = df['Title'].astype(str).fillna("")
        tokenized_titles = [title.split(" ") for title in df['Title'] if isinstance(title, str)]
        bm25 = BM25Okapi(tokenized_titles)
        # Cache the BM25 model and tokenized titles for future use
        with open(cache_path, "wb") as f:
            pickle.dump((bm25, df), f)
        update_checkpoint("BM25 model built and saved to cache.")
    return bm25, df

def retrieve_answer(query, bm25, df):
    try:
        update_checkpoint("Searching for matching title...")
        query_tokens = query.split(" ")
        top_n_titles = bm25.get_top_n(query_tokens, df['Title'], n=1)  # Find the closest matching title

        if len(top_n_titles) > 0:
            # Retrieve the answer corresponding to the top title match
            matched_title = top_n_titles[0]
            context = df[df['Title'] == matched_title]['Answer'].values[0]
            update_checkpoint(f"Match found: {matched_title}. Returning context.")
            return context
        else:
            update_checkpoint("No matching title found.")
            return None
    except Exception as e:
        update_checkpoint(f"Error during retrieval: {str(e)}")
        return None

# Load the model and tokenizer once at the start
def load_model_and_tokenizer():
    try:
        update_checkpoint("Loading GPT-2 model and tokenizer...")
        tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
        model = GPT2LMHeadModel.from_pretrained("gpt2")

        # Assign eos_token as pad_token
        if tokenizer.pad_token is None:
            tokenizer.pad_token = tokenizer.eos_token

        # Resize model embeddings if pad_token is added
        model.resize_token_embeddings(len(tokenizer))

        update_checkpoint("GPT-2 model and tokenizer loaded successfully.")
        return model, tokenizer
    except Exception as e:
        update_checkpoint(f"Error loading model/tokenizer: {str(e)}")
        return None, None

# Load the model and tokenizer globally to avoid reloading every time
MODEL, TOKENIZER = load_model_and_tokenizer()

def generate_answer_gpt2(query, context, max_length=150):
    try:
        update_checkpoint("Generating new answer using GPT-2...")

        if MODEL is None or TOKENIZER is None:
            update_checkpoint("Model or tokenizer not loaded. Cannot generate answer.")
            return None

        # Combine query and context for input
        combined_input = f"Question: {query}\nContext: {context}\nAnswer:"

        # Encode the combined input with padding and truncation
        inputs = TOKENIZER(
            combined_input,
            return_tensors="pt",
            padding=True,
            truncation=True,
            max_length=512  # GPT-2's maximum context size
        )

        # Generate text based on the query with advanced parameters
        outputs = MODEL.generate(
            inputs["input_ids"],
            attention_mask=inputs["attention_mask"],
            max_length=max_length,
            do_sample=True,
            temperature=0.7,  # Adjusted temperature for more control
            top_k=50,  # Restrict model to the top 50 likely words
            top_p=0.9,  # Nucleus sampling for diverse output
            repetition_penalty=1.2,  # Discourage repetition
            pad_token_id=TOKENIZER.eos_token_id  # Ensure pad_token_id is set
        )

        # Decode the generated text with clean_up_tokenization_spaces explicitly set
        generated_answer = TOKENIZER.decode(
            outputs[0],
            skip_special_tokens=True,
            clean_up_tokenization_spaces=True  # Set to True or False based on preference
        )
        update_checkpoint("New answer generated successfully.")
        return generated_answer
    except Exception as e:
        update_checkpoint(f"Error during answer generation: {str(e)}")
        return None

def calculate_relevance(query, generated_answer):
    # Calculate token overlap as a simple relevance metric
    vectorizer = CountVectorizer().fit([query, generated_answer])
    vectors = vectorizer.transform([query, generated_answer]).toarray()

    # Calculate cosine similarity between the query and generated answer vectors
    cosine_similarity = (vectors[0] @ vectors[1]) / (torch.norm(torch.tensor(vectors[0])) * torch.norm(torch.tensor(vectors[1])))

    relevance_percentage = round(cosine_similarity.item() * 100, 2)
    print(f"Relevance of the answer: {relevance_percentage}%")
    return relevance_percentage

def text_to_speech(text, filename="output.mp3"):
    try:
        update_checkpoint("Converting answer to speech...")
        speech = gTTS(text=text, lang='en', slow=False)
        speech.save(filename)
        update_checkpoint(f"Speech saved as '{filename}'.")

        # Play the mp3 file
        playsound(filename)
    except Exception as e:
        update_checkpoint(f"TTS error: {str(e)}")

def main_pipeline():
    # Step 1: Capture Query Input
    query_text = input("Enter your query (title): ")

    if query_text:
        # Step 2: Retrieve Relevant Context from Dataset
        bm25, df = build_or_load_bm25("data\\new_csv_file.csv")
        context = retrieve_answer(query_text, bm25, df)

        # Step 3: If no context is found, skip generation
        if context:
            # Step 4: Generate a new answer based on context and query
            generated_answer = generate_answer_gpt2(query_text, context, max_length=300)
            update_checkpoint(f"Generated answer: {generated_answer}")

            # Step 5: Calculate relevance of the generated answer
            calculate_relevance(query_text, generated_answer)

            # Step 6: Optionally convert the answer to speech
            if generated_answer:
                text_to_speech(generated_answer)

# Run the full pipeline
if __name__ == "__main__":
    main_pipeline()


[01:33:46] Loading GPT-2 model and tokenizer...
[01:33:50] GPT-2 model and tokenizer loaded successfully.
[01:34:11] Loaded BM25 model from cache.
[01:34:11] Searching for matching title...
[01:34:15] Match found: Political implications of Manmohan Singh's ill-health. Returning context.
[01:34:15] Generating new answer using GPT-2...
[01:34:15] Error during answer generation: Input length of input_ids is 512, but `max_length` is set to 300. This can lead to unexpected behavior. You should consider increasing `max_length` or, better yet, setting `max_new_tokens`.
[01:34:15] Generated answer: None


AttributeError: 'NoneType' object has no attribute 'lower'