<a href="https://colab.research.google.com/github/hlin-0420/Llama-Chatbot-Notebook/blob/main/Llama_Chatbot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Import Libraries

In [1]:
import os
from bs4 import BeautifulSoup
from transformers import BertTokenizer, BertModel
import torch
import re
import pandas as pd
from tabulate import tabulate
from langchain.schema import Document as LangchainDocument
import logging
from bs4 import XMLParsedAsHTMLWarning
import warnings

warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning)

## Load File Directories

In [2]:
class HTMFileFinder:
    def __init__(self, base_directory):
        self.base_directory = base_directory

    def _list_htm_files(self):
        """
        Recursively finds all .htm files in the base directory and its subdirectories.
        """
        htm_files = []
        for root, _, files in os.walk(self.base_directory):
            for file in files:
                if file.endswith(".htm"):
                    relative_path = os.path.relpath(os.path.join(root, file), start=self.base_directory)
                    htm_files.append(os.path.join(self.base_directory, relative_path))
        return htm_files

In [3]:
htm_file_finder_model = HTMFileFinder("Data")

htm_file_directories = htm_file_finder_model._list_htm_files()

## Load Different Document Structures

In [4]:
def extract_text(soup):
    """
    Extracts clean and meaningful text from the parsed HTML soup, excluding navigation and short texts.
    """
    # Define navigation-related keyword patterns
    navigation_keywords = [
        r'contact\s+us', r'click\s+(here|for)', r'guidance', r'help', r'support', r'assistance',
        r'maximize\s+screen', r'view\s+details', r'read\s+more', r'convert.*file', r'FAQ', r'learn\s+more'
    ]
    navigation_pattern = re.compile(r"|".join(navigation_keywords), re.IGNORECASE)

    # Remove navigation-related text
    for tag in soup.find_all("p"):
        if navigation_pattern.search(tag.text):
            tag.decompose()

    # Extract meaningful paragraphs (length > 20)
    paragraphs = [p.get_text(strip=True) for p in soup.find_all("p") if len(p.get_text(strip=True)) > 20]
    return "\n\n".join(paragraphs)

In [5]:
def extract_table_as_text_block(soup, file_path):
    """
    Extract tables from HTML as a formatted text block, skipping navigation and NaN-only tables.
    """
    try:
        tables = pd.read_html(file_path)

        def is_navigation_table(table):
            """Check if the table contains only navigation-related keywords."""
            flattened = [str(cell).strip().lower() for cell in table.to_numpy().flatten()]
            return set(flattened).issubset({"back", "forward"})

        def is_nan_only_table(table):
            """Check if the entire table contains only NaN values."""
            return table.isna().all().all()

        table_texts = []
        for idx, table in enumerate(tables):
            if is_navigation_table(table) or is_nan_only_table(table):
                continue

            # Drop rows where both the second and third columns are NaN
            if table.shape[1] == 2:
                table = table.dropna(how='all')
                table[table.columns[-1]] = table[table.columns[-1]].fillna("")

            formatted_table = tabulate(table, headers="keys", tablefmt="grid")
            beautified_table = f"""
╔════════════════════════════════════════════════════╗
║            📊 Table {idx+1} from {file_path}              ║
╚════════════════════════════════════════════════════╝

{formatted_table}

╔════════════════════════════════════════════════════╗
║            🔚 End of Table {idx+1}                       ║
╚════════════════════════════════════════════════════╝
"""
            table_texts.append(beautified_table)

        return "\n".join(table_texts) if table_texts else ""
    except ValueError:
        return ""


In [6]:
def extract_list(soup):
    """
    Extracts lists from HTML and formats them as bullet points.
    """
    lists = []
    for ul in soup.find_all("ul"):
        items = [li.get_text(strip=True) for li in ul.find_all("li")]
        if items:
            formatted_list = "\n".join([f"• {item}" for item in items])
            lists.append(formatted_list)
    return "\n\n".join(lists)

## Load Content

In [7]:
def _load_content(htm_files, selectedOptions=None):
    """
    Load and process all .htm files from the base directory, extracting text, tables, and lists.
    """

    # Set default options if none are specified
    if selectedOptions is None:
        selectedOptions = ["text", "table", "list"]

    web_documents = []

    for file_path in htm_files:
        try:
            with open(file_path, encoding="utf-8") as file:
                content = file.read()
                content = content[content.find("<body>")+6:content.find("</body>")]
                soup = BeautifulSoup(content, "html.parser")

                # Extract content based on selected options
                clean_text = extract_text(soup) if "text" in selectedOptions else ""
                formatted_table = extract_table_as_text_block(soup, file_path) if "table" in selectedOptions else ""
                formatted_list = extract_list(soup) if "list" in selectedOptions else ""

                # Combine extracted content into a single text block
                page_text = "\n\n".join(filter(None, [clean_text, formatted_table, formatted_list]))

                if page_text:
                    document = LangchainDocument(page_content=page_text)
                    web_documents.append(document)
                    logging.info(f"✅ Loaded document from {file_path}")

        except UnicodeDecodeError:
            logging.error(f"❌ Could not read the file {file_path}. Check the file encoding.")

    logging.info(f"✅ Total documents loaded: {len(web_documents)}")
    
    return web_documents

In [8]:
training_web_documents = _load_content(htm_file_directories)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  table[table.columns[-1]] = table[table.columns[-1]].fillna("")


## Data Preparation

In [9]:
def prepare_context_response_pairs(web_documents):
    """
    Prepare context-response pairs from loaded web documents.
    """
    context_response_pairs = []
    for doc in web_documents:
        context = doc.page_content  # Treat the entire document as context
        # Placeholder for response (you might replace it with manually labeled responses)
        response = "Generated response based on context"
        context_response_pairs.append((context, response))
    return context_response_pairs

In [10]:
# Example usage
context_response_pairs = prepare_context_response_pairs(training_web_documents)
print(f"Total context-response pairs: {len(context_response_pairs)}")

Total context-response pairs: 264


## Load Models

In [11]:
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

In [12]:
# Load the model and tokenizer from Hugging Face
model_name = "t5-base"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name)

config.json:   0%|          | 0.00/1.21k [00:00<?, ?B/s]

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.39M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/892M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

In [13]:
# Directory to save the model and tokenizer
save_directory = "./local_models/t5-base"

In [14]:
# Save the tokenizer and model locally
tokenizer.save_pretrained(save_directory)
model.save_pretrained(save_directory)

print(f"✅ Model and tokenizer saved locally at {save_directory}")

✅ Model and tokenizer saved locally at ./local_models/t5-base


In [15]:
# Load the locally saved model and tokenizer
local_tokenizer = AutoTokenizer.from_pretrained(save_directory)
local_model = AutoModelForSeq2SeqLM.from_pretrained(save_directory)

print("✅ Model and tokenizer loaded successfully from local directory.")

✅ Model and tokenizer loaded successfully from local directory.


## Apply Encoding

In [16]:
def encode_context(context):
    """
    Encode the context using a transformer model.
    """
    inputs = tokenizer.encode(context, return_tensors="pt", max_length=512, truncation=True)
    return inputs

## Train Model

In [17]:
from torch.optim import AdamW
from torch.nn import CrossEntropyLoss
import torch

def train_cag_model(context_response_pairs, epochs=3, learning_rate=5e-5):
    optimizer = AdamW(model.parameters(), lr=learning_rate)
    model.train()

    for epoch in range(epochs):
        total_loss = 0
        for context, response in context_response_pairs:
            # Encode context and response
            context_inputs = encode_context(context)
            response_inputs = encode_context(response)

            # Generate outputs
            outputs = model(input_ids=context_inputs, labels=response_inputs)

            # Calculate loss and backpropagate
            loss = outputs.loss
            total_loss += loss.item()
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

        print(f"Epoch {epoch+1}/{epochs} - Loss: {total_loss/len(context_response_pairs)}")

# Train the model
train_cag_model(context_response_pairs)

Passing a tuple of `past_key_values` is deprecated and will be removed in Transformers v4.48.0. You should pass an instance of `EncoderDecoderCache` instead, e.g. `past_key_values=EncoderDecoderCache.from_legacy_cache(past_key_values)`.


Epoch 1/3 - Loss: 0.8295803365475546
Epoch 2/3 - Loss: 0.05924200758553608
Epoch 3/3 - Loss: 0.004273090744887172


In [20]:
def generate_local_response(context):
    """
    Generate a response based on the given context using the locally saved model.
    """
    local_model.eval()
    inputs = local_tokenizer.encode(context, return_tensors="pt")
    output = local_model.generate(inputs, max_length=50, num_beams=5, early_stopping=True)
    response = local_tokenizer.decode(output[0], skip_special_tokens=True)
    return response

# Test the locally loaded model
context = "What is the importance of context-aware generation in AI?"
print("Generated Response:")
print(generate_local_response(context))

Generated Response:
What is the importance of context-aware generation in AI?
