In [None]:
#Importing Libraries
!apt-get update
!apt install chromium-chromedriver 
!pip install gradio --quiet
!pip install selenium webdriver-manager
!pip install beautifulsoup4 requests
!pip install -q transformers sentence-transformers
!pip install --upgrade torch transformers sentence-transformers
from functools import partial
import os
import re
import time
import requests
import gradio as gr
import pandas as pd
from bs4 import BeautifulSoup
from selenium import webdriver
from IPython import get_ipython
from transformers import pipeline
from IPython.display import display, Markdown
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from sentence_transformers import SentenceTransformer, util 

Web Scraping

In [None]:
def scrape_article_text(url, tag='p'):

    #Setting custom headers to mimic a real browser and avoid basic bot blocks
    headers = {'User-Agent': 'Mozilla/5.0'}
    try:

        #GET request to the specified URL
        #If the request was successful
        response = requests.get(url, headers=headers, timeout=10)
        if response.status_code == 200:
            #Parse the HTML content using BeautifulSoup
            soup = BeautifulSoup(response.text, 'html.parser')
            #Extracting all elements with the specified tag
            elements = soup.find_all(tag)

            #Fallback: if very few <p> tags are found, we try grabbing <div> content
            if len(elements) < 3 and tag == 'p':
                elements = soup.find_all('div')

            #returning all extracted text, separated by newlines
            return "\n\n".join([e.get_text(strip=True) for e in elements])
        else:

            #Printing error if page request fails
            print(f"Error {response.status_code} on {url}")
            return ""
    except Exception as e:

        #Handling exceptions such as connection errors or timeouts
        print(f"Exception at {url}: {e}")
        return ""

In [None]:
#Creates the 'ww2_sources' folder if it doesn't already exist
os.makedirs("ww2_sources", exist_ok=True)


#Loops through each source and its URL
for name, url in sources.items():
    print(f"Scraping: {name}")

    #Scrapes article text from the URL
    content = scrape_article_text(url, tag='p')

    if content:
        new_words = len(content.split())

        #Creates a clean filename from the source name
        filename = name.lower().replace(" ", "_") + ".txt"
        file_path = f"ww2_sources/{filename}"


        #If the file already exists, check the word count in the old content
        if os.path.exists(file_path):
            with open(file_path, "r", encoding="utf-8") as f:
                old_content = f.read()
            old_words = len(old_content.split())
        else:
            old_words = 0

        #Save new content only if it has more or equal words than the old one
        if new_words >= old_words:
            with open(file_path, "w", encoding="utf-8") as f:
                f.write(content)
            print(f"Saved: {filename} ({new_words} words)")
        else:
            print(f"Skipped: {filename} (new={new_words} < old={old_words})")
    else:
        print(f"No content scraped for {name}")

In [None]:
#Initializing total word counter
total_words = 0
print("\nFile Summary:")

#Loop through each file in the 'ww2_sources' directory
for file in os.listdir("ww2_sources"):
    path = os.path.join("ww2_sources", file)

    #Open file in read mode
    with open(path, "r", encoding="utf-8") as f:
        content = f.read()
        wc = len(content.split())
        total_words += wc
        print(f"{file}: {wc} words")

print(f"\nTOTAL DATASET SIZE: {total_words} words")

In [None]:
#Backing up original folder
!cp -r ww2_sources ww2_sources_backup

In [None]:
#checking content of source folder
!ls ww2_sources

In [None]:
def safe_scrape_and_save(name, url, tag='p', folder='ww2_timelines'):

    #Create folder if it doesn't exist
    os.makedirs(folder, exist_ok=True)
    print(f"Scraping: {name}")

    #Scapes textual content from the given url
    content = scrape_article_text(url, tag)

    if content:
        #Count number of words in the newly scraped content
        new_words = len(content.split())

        #Formats, cleans and lowercases filename
        filename = name.lower().replace(" ", "_").replace("/", "_") + ".txt"
        file_path = os.path.join(folder, filename)


        #Checks if same file exists if yes, then counts word in existing
        if os.path.exists(file_path):
            with open(file_path, "r", encoding="utf-8") as f:

              old_words = len(f.read().split())

        #If no file exists, treat old word count as zero
        else:
            old_words = 0

        #Only overwrites the file if the new content has more or equal words
        if new_words >= old_words:
            with open(file_path, "w", encoding="utf-8") as f:
                f.write(content)
            print(f"Saved: {filename} ({new_words} words)")
        else:
            print(f"Skipped: {filename} (new={new_words} < old={old_words})")
    else:
        print(f"No content scraped for {name}")

In [None]:
#Loop through each timeline source and safely scrape + save its content to the 'ww2_timelines' folder
for name, url in timeline_sources.items():
    safe_scrape_and_save(name, url, tag='p', folder='ww2_timelines')

In [None]:
#Checking content for timeline folder
!ls ww2_timelines

In [None]:
#previewing timeline folder content
file_path = os.path.join("ww2_timelines", "thoughtco_timeline.txt")

with open(file_path, "r", encoding="utf-8") as f:
    content = f.read()

print(content)

Building Model

Note: For this project we are showcasing the process for making World War chatbot, learning along the process. 

Test Q/A:
Questions:

Who was the leader of Germany Army during World War II?

When did the Battle of Midway take place?

What country did Germany Invade in 1940?

When did World War 2 Began?

Which two countries signed the Molotov-Ribbentrop Pact?

Who were the Axis Powers during World War II?

When did World War 2 end?

What event marked as start of World War II?

What was D-day?

When was D-day?

Which two cities were hit with atomic bomb in 1945?

What was the name of the German military strategy used in the early part of the war?

What hate was hiroshima bombed?

When did the Allies liberate Paris?

When did Adolf Hitler die?

When did Germany surrender in World War II?

When did Japan surrender in World War II?

Model 1: DistilBert

In [None]:
#Comibing .txt files together and preprocessing the text
def preprocess_text(text):
    text = re.sub(r'\s+', ' ', text)
    text = re.sub(r'[^\x00-\x7F]+', '', text)
    return text.strip()

sources = {}
for folder in ["ww2_sources", "ww2_timelines"]:
    for file in os.listdir(folder):
        with open(os.path.join(folder, file), "r", encoding="utf-8") as f:
            content = preprocess_text(f.read())
            if len(content) > 100:
                sources[file.replace(".txt", "")] = content

In [None]:
#Setting q/a pipeline
qa_pipeline = pipeline("question-answering", model="distilbert-base-uncased-distilled-squad")

In [None]:
#Breaking text into smaller chucks
def chunk_text(text, max_words=300):
    words = text.split()
    return [' '.join(words[i:i+max_words]) for i in range(0, len(words), max_words)]

In [None]:
#This function searches for the best answer to the provided question across multiple text sources.
def ask_question(question):
    best = {"score": 0, "answer": "Not found", "source": None}
    for file, content in sources.items():
        for chunk in chunk_text(content):
            try:
                result = qa_pipeline(question=question, context=chunk)
                if result["score"] > best["score"]:
                    best.update({"score": result["score"], "answer": result["answer"], "source": file})
            except:
                continue

    #Prining Confidence Score, Answer as well as source
    print(f"\nQuestion: {question}")
    print(f"Answer: {best['answer']}")
    print(f"Source: {best['source']}.txt")
    print(f"Confidence: {best['score']:.2f}\n")

In [None]:
#Continuously prompts the user to ask WW2-related questions until pressed 'exit'.
while True:
    q = input("Ask a WW2 question (or type 'exit'): ")
    if q.lower() == "exit":
        break
    ask_question(q)

Model 2: Roberta & Sentence Transformer 

In [None]:
#Preprocessing text
def preprocess_text(text):
    text = re.sub(r'\s+', ' ', text)
    text = re.sub(r'[^\x00-\x7F]+', '', text)
    return text.strip()

In [None]:
#loading all content together
sources = {}
for folder in ["ww2_sources", "ww2_timelines"]:
    for file in os.listdir(folder):
        with open(os.path.join(folder, file), "r", encoding="utf-8") as f:
            content = preprocess_text(f.read())
            if len(content) > 100:
                sources[file.replace(".txt", "")] = content

In [None]:
#Breaking text into chunk
def chunk_text(text, max_words=300):
    sentences = re.split(r'(?<=[.!?]) +', text)
    chunks, current_chunk = [], []
    word_count = 0
    for sentence in sentences:
        wc = len(sentence.split())
        if word_count + wc > max_words:
            chunks.append(' '.join(current_chunk))
            current_chunk, word_count = [], 0
        current_chunk.append(sentence)
        word_count += wc
    if current_chunk:
        chunks.append(' '.join(current_chunk))
    return chunks

In [None]:
#Building chunk index and vectorize with Sentence-BERT
model_embed = SentenceTransformer('all-MiniLM-L6-v2')
chunk_index = []  # Each item: {embedding, text, source}

for filename, content in sources.items():
    for chunk in chunk_text(content):
        if len(chunk.split()) >= 30:
            embedding = model_embed.encode(chunk, convert_to_tensor=True)
            chunk_index.append({"embedding": embedding, "text": chunk, "source": filename})

In [None]:
#Loading QA model- Roberta trained on SQuAD2
qa_pipeline = pipeline("question-answering", model="deepset/roberta-base-squad2")

In [None]:
#q/a smart search
def ask_question(question, top_k=3):
    print(f"\nQuestion: {question}")
    q_vec = model_embed.encode(question, convert_to_tensor=True)

    #Find top-k most similar chunks
    sims = [util.pytorch_cos_sim(q_vec, entry['embedding'])[0][0].item() for entry in chunk_index]
    top_indices = sorted(range(len(sims)), key=lambda i: sims[i], reverse=True)[:top_k]

    best = {"score": 0, "answer": "Not found", "source": None}
    for i in top_indices:
        entry = chunk_index[i]
        try:
            result = qa_pipeline(question=question, context=entry["text"])
            if result['score'] > best['score'] and result['answer'].lower() != '':
                best.update({"score": result['score'], "answer": result["answer"], "source": entry['source']})
        except:
            continue

    print(f"Answer: {best['answer']}")
    print(f"Source: {best['source']}.txt")
    print(f"Confidence: {best['score']:.2f}\n")

In [None]:
#Continuously prompts the user to ask WW2-related questions until pressed 'exit'.
while True:
    user_input = input("Ask a WW2 question (or type 'exit'): ")
    if user_input.lower() == "exit":
        break
    ask_question(user_input)

Model 3: Tuning chunks and confidence to check difference With Roberta and Sentence Transformers

In [None]:
#Preprocessing text
def preprocess_text(text):
    text = re.sub(r'\s+', ' ', text)
    text = re.sub(r'[^\x00-\x7F]+', '', text)
    return text.strip()

In [None]:
#Loading all content from sources
sources = {}
for folder in ["ww2_sources", "ww2_timelines"]:
    for file in os.listdir(folder):
        with open(os.path.join(folder, file), "r", encoding="utf-8") as f:
            content = preprocess_text(f.read())
            if len(content) > 100:
                sources[file.replace(".txt", "")] = content

In [None]:
#Smart chunking
def chunk_text(text, max_words=300):
    sentences = re.split(r'(?<=[.!?]) +', text)
    chunks, current_chunk = [], []
    word_count = 0
    for sentence in sentences:
        if any(x in sentence.lower() for x in ["sign up", "click", "cookies", "learn more"]):
            continue


        wc = len(sentence.split())
        if word_count + wc > max_words:
            if word_count >= 30:
                chunks.append(' '.join(current_chunk))
            current_chunk, word_count = [], 0
        current_chunk.append(sentence)
        word_count += wc
    if len(current_chunk) >= 1 and word_count >= 30:
        chunks.append(' '.join(current_chunk))
    return chunks

In [None]:
#Building vector index
model_embed = SentenceTransformer('all-MiniLM-L6-v2')
chunk_index = []
for filename, content in sources.items():
    for chunk in chunk_text(content):
        embedding = model_embed.encode(chunk, convert_to_tensor=True)
        chunk_index.append({"embedding": embedding, "text": chunk, "source": filename})

In [None]:
qa_pipeline = pipeline("question-answering", model="deepset/roberta-base-squad2")

In [None]:
#Finds and ranks top answers to a question using similarity scores and a confidence threshold.
def ask_question(question, top_k=5, confidence_threshold=0.4):
    print(f"\nQuestion: {question}")
    q_vec = model_embed.encode(question, convert_to_tensor=True)

    #Computing similarity scores
    sims = [util.pytorch_cos_sim(q_vec, entry['embedding'])[0][0].item() for entry in chunk_index]
    top_indices = sorted(range(len(sims)), key=lambda i: sims[i], reverse=True)[:top_k]

    results = []
    for i in top_indices:
        entry = chunk_index[i]
        try:
            result = qa_pipeline(question=question, context=entry["text"])
            if result['answer'].strip():
                results.append({
                    "answer": result["answer"],
                    "score": result["score"],
                    "source": entry["source"],
                    "context": entry["text"]
                })
        except Exception as e:
            print(f"Error on chunk from {entry['source']}: {e}")
            continue

    if not results:
        print("No valid answers found.")
        return

    #Sorting results by confidence * answer length
    for r in results:
        r["rank_score"] = r["score"] * len(r["answer"].split())

    best = max(results, key=lambda x: x["rank_score"])

    if best['score'] < confidence_threshold:
        print("I'm not confident enough to answer that.")
    else:
        print(f"\nTop Answer: {best['answer']}")
        print(f"Source: {best['source']}.txt")
        print(f"Confidence: {best['score']:.2f}")
        print(f"Retrieved From:\n{best['context'][:500]}...\n")

In [None]:
## Continuously prompts the user to ask WW2-related questions until typed 'exit'.
while True:
    user_input = input("Ask a WW2 question (or type 'exit'): ")
    if user_input.lower() == "exit":
        break
    ask_question(user_input)

Final Model: With Best output using Roberta and Sentence Transformer and vigorous cleaning and preprocessing for optimal answers.

In [None]:
def preprocess_text(text):
    """
    Cleans and preprocesses the input text by applying the following transformations:
    - Removes references in square brackets
    - Collapses multiple spaces into a single space.
    - Inserts spaces between lowercase and uppercase letters
    - Ensures proper spacing after punctuation marks
    - Removes non-ASCII characters to ensure compatibility.
    - Trims leading and trailing whitespace.

    Args:
        text: The input text to preprocess.

    Returns:
        str: The cleaned and preprocessed text.
    """

    text = re.sub(r'\[\d+\]', '', text)
    text = re.sub(r'\s+', ' ', text)
    text = re.sub(r'\[\s*\d+\s*\]', '', text)
    text = re.sub(r'([a-z])([A-Z])', r'\1 \2', text)

    text = re.sub(r'([,.;:!?])([^\s])', r'\1 \2', text)
    text = re.sub(r'[^\x00-\x7F]+', '', text)
    return text.strip()

In [None]:
#Loads and preprocesses text files from specified folders, storing them
sources = {}
for folder in ["ww2_sources", "ww2_timelines"]:
    for file in os.listdir(folder):
        with open(os.path.join(folder, file), "r", encoding="utf-8") as f:
            content = preprocess_text(f.read())
            if len(content) > 100:
                sources[file.replace(".txt", "")] = content

In [None]:
def chunk_text(text, max_words=300):
    """
    Splits the input text into smaller chunks, ensuring each chunk has a manageable number of words.

    The function operates as follows:
    - Splits the text into sentences based on punctuation marks
    - Filters out sentences containing specific irrelevant phrases
    - Groups sentences into chunks with a maximum word count of `max_words`.
    - Ensures that chunks have at least 30 words to avoid overly short sections.

    Args:
        text (str): The input text to be chunked.
        max_words (int): Maximum number of words allowed per chunk. Default is 300.

    Returns:
        list: A list of text chunks, where each chunk is a string.
    """

    sentences = re.split(r'(?<=[.!?]) +', text)
    chunks, current_chunk = [], []
    word_count = 0
    for sentence in sentences:
        if any(x in sentence.lower() for x in ["sign up", "cookies", "learn more"]):
            continue
        wc = len(sentence.split())
        if word_count + wc > max_words:
            if word_count >= 30:
                chunks.append(' '.join(current_chunk))
            current_chunk, word_count = [], 0
        current_chunk.append(sentence)
        word_count += wc
    if len(current_chunk) >= 1 and word_count >= 30:
        chunks.append(' '.join(current_chunk))
    return chunks

In [None]:
#Creates embeddings for text chunks from sources using a sentence transformer model and stores them in 'chunk_index'.
model_embed = SentenceTransformer('all-MiniLM-L6-v2')
chunk_index = []
for filename, content in sources.items():
    for chunk in chunk_text(content):
        embedding = model_embed.encode(chunk, batch_size= 16,  show_progress_bar= True, convert_to_tensor=True)
        chunk_index.append({"embedding": embedding, "text": chunk, "source": filename})

In [None]:
def ask_question(question, top_k=5, confidence_threshold=0.3):

    """
    Processes a user-provided question to retrieve the most relevant answers from a set of precomputed text embeddings.

    The function works as follows:
    1. Encodes the question using a sentence transformer model to generate its vector representation.
    2. Computes similarity scores between the question vector and embeddings from the indexed text chunks.
    3. Selects the top `top_k` chunks with the highest similarity scores for further processing.
    4. Uses a QA pipeline to extract answers from the selected chunks and ranks them based on a confidence-weighted score.
    5. Returns the best answer if its confidence score meets the specified threshold; otherwise, it notifies the user of insufficient confidence.

    Args:
        question: The user query to process.
        top_k: Number of most similar chunks to retrieve.
        confidence_threshold: Minimum confidence score required to present an answer.

    Returns:
        None: Prints the top answer, its source, confidence score, and the retrieved context directly.
        If no valid answers are found or the confidence score is too low, an appropriate message is displayed.
    """

    print(f"\nQuestion: {question}")
    q_vec = model_embed.encode(question, convert_to_tensor=True)

    sims = [util.pytorch_cos_sim(q_vec, entry['embedding'])[0][0].item() for entry in chunk_index]
    top_indices = sorted(range(len(sims)), key=lambda i: sims[i], reverse=True)[:top_k]

    results = []
    for i in top_indices:
        entry = chunk_index[i]
        try:
            result = qa_pipeline(question=question, context=entry["text"])
            if result['answer'].strip():
                results.append({
                    "answer": result["answer"],
                    "score": result["score"],
                    "source": entry["source"],
                    "context": entry["text"]
                })
        except Exception as e:
            print(f"Error on chunk from {entry['source']}: {e}")
            continue

    if not results:
        print("No valid answers found.")
        return

    for r in results:
        r["rank_score"] = r["score"] * len(r["answer"].split())

    best = max(results, key=lambda x: x["rank_score"])

    if best['score'] < confidence_threshold:
        print("I'm not confident enough to answer that :() ")
    else:
        print(f"\nTop Answer: {best['answer']}")
        print(f"Source: {best['source']}.txt")
        print(f"Confidence: {best['score']:.2f}")
        print(f"\nRetrieved Chunk:\n{best['context']}\n")

In [None]:
#Continuously prompts the user to ask WW2-related questions until typed 'exit'.
while True:
    user_input = input("Ask a WW2 question (or type 'exit'): ")
    if user_input.lower() == "exit":
        break
    ask_question(user_input)

UI with  Gradio 

In [None]:
# Suggested questions
suggested_questions = [
    "When did the Battle of Midway take place?",
    "When did World War 2 begin?",
    "What country did Germany invade in 1940?"
]

#QA function
def qa_interface(question):
    q_vec = model_embed.encode(question, convert_to_tensor=True)
    sims = [util.pytorch_cos_sim(q_vec, entry['embedding'])[0][0].item() for entry in chunk_index]
    top_indices = sorted(range(len(sims)), key=lambda i: sims[i], reverse=True)[:5]

    candidates = []
    for i in top_indices:
        entry = chunk_index[i]
        try:
            result = qa_pipeline(question=question, context=entry["text"])
            if result['answer'].strip():
                candidates.append({
                    "answer": result["answer"],
                    "score": result["score"],
                    "source": entry["source"],
                    "chunk": entry["text"]
                })
        except:
            continue

    for c in candidates:
        c["rank_score"] = c["score"] * len(c["answer"].split())

    best = max(candidates, key=lambda x: x["rank_score"], default=None)

    if not best or best["score"] < 0.4:
        return "I'm not confident enough to answer that.", "", "", ""

    return best["answer"], f"{best['score']:.2f}", best["source"], best["chunk"]

#Defining theme
custom_theme = gr.themes.Base(
    primary_hue="blue",
    secondary_hue="slate",
    neutral_hue="gray"
)

#Interface
with gr.Blocks(theme=custom_theme, title="World War 2 QA System") as interface:
    gr.Markdown("## Ask me about World War II")
    gr.Markdown("Use the input box or click a suggested question below:")

    with gr.Row():
        question_input = gr.Textbox(lines=2, placeholder="Ask me a question...", label="Your Question")
        ask_button = gr.Button("Answer me")

    # Add suggested question buttons
    with gr.Row():
        for q in suggested_questions:
            gr.Button(q).click(fn=lambda q=q: q, outputs=question_input)

    with gr.Row():
        with gr.Column():
            answer_output = gr.Textbox(label="Answer")
            confidence_output = gr.Textbox(label="Confidence Score")
        with gr.Column():
            source_output = gr.Textbox(label="Source File")
            chunk_output = gr.Textbox(label="Retrieved Chunk", lines=8)

    # Bind main button
    ask_button.click(
        fn=qa_interface,
        inputs=question_input,
        outputs=[answer_output, confidence_output, source_output, chunk_output]
    )

interface.launch()