## Expert Knowledge Worker

### A question answering agent that is an expert knowledge worker
### To be used by employees of Insurellm, an Insurance Tech company
### The agent needs to be accurate and the solution should be low cost.

This project will use RAG (Retrieval Augmented Generation) to ensure our question/answering assistant has high accuracy.

In [2]:
# imports

import os
import glob
from dotenv import load_dotenv
import gradio as gr

import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin

In [3]:
# imports for langchain and Chroma and plotly

from langchain.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.schema import Document
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_chroma import Chroma
import numpy as np
from sklearn.manifold import TSNE
import plotly.graph_objects as go

In [4]:
# price is a factor for our company, so we're going to use a low cost model

MODEL = "gpt-4o-mini"
db_name = "vector_db"

In [5]:
# Load environment variables in a file called .env

load_dotenv(override=True)
os.environ['OPENAI_API_KEY'] = os.getenv('OPENAI_API_KEY', 'your-key-if-not-using-env')

In [None]:
# # A class to represent a Webpage

# # Some websites need you to use proper headers when fetching them:
# headers = {
#  "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
# }

# class Website:
#     """
#     A utility class to represent a Website that we have scraped, now with links
#     """

#     def __init__(self, url):
#         self.url = url
#         response = requests.get(url, headers=headers)
#         self.body = response.content
#         soup = BeautifulSoup(self.body, 'html.parser')
#         self.title = soup.title.string if soup.title else "No title found"
#         if soup.body:
#             for irrelevant in soup.body(["script", "style", "img", "input"]):
#                 irrelevant.decompose()
#             self.text = soup.body.get_text(separator="\n", strip=True)
#         else:
#             self.text = ""
#         links = [link.get('href') for link in soup.find_all('a')]
#         self.links = [link for link in links if link]

#     def get_contents(self):
#         return f"Webpage Title:\n{self.title}\nWebpage Contents:\n{self.text}\n\n"

# ashes = Website("https://ashesofcreation.wiki/")
# ashes.links


In [18]:

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}

class Website:
    """
    A utility class to represent a Website that we have scraped, 
    with the ability to crawl internal links.
    """

    def __init__(self, url):
        self.url = url
        response = requests.get(url, headers=headers)
        self.body = response.content
        soup = BeautifulSoup(self.body, 'html.parser')

        # Title
        self.title = soup.title.string if soup.title else "No title found"

        # print(self.title)
        # Clean up the body text: remove scripts, styles, images, inputs
        if soup.body:
            for irrelevant in soup.body(["script", "style", "img", "input"]):
                irrelevant.decompose()
            self.text = soup.body.get_text(separator="\n", strip=True)
        else:
            self.text = ""

        # Extract all raw links
        raw_links = [link.get('href') for link in soup.find_all('a')]
        # Filter out None or empty
        self.links = [link for link in raw_links if link]

    def get_contents(self):
        return f"Webpage Title:\n{self.title}\nWebpage Contents:\n{self.text}\n\n"

    def crawl_internal_links(self, visited=None):
        """
        Crawls only internal links (same domain) from this page
        and returns a list of sub-pages' data.
        
        visited: a set to keep track of URLs we've already visited,
                 so we avoid infinite loops.
        """
        if visited is None:
            visited = set()

        results = []
        base_domain = urlparse(self.url).netloc

        for link in self.links:
            # Build an absolute URL (to handle relative links)
            abs_link = urljoin(self.url, link)
            link_domain = urlparse(abs_link).netloc

            # Check if this link is "internal" (same domain) 
            if link_domain == base_domain and abs_link not in visited:
                visited.add(abs_link)
                try:
                    # Instantiate a new Website object for this internal link
                    sub_site = Website(abs_link)

                    # You could store or process sub_site's data here:
                    results.append({
                        "url": abs_link,
                        "title": sub_site.title,
                        "text": sub_site.text,
                        "links": sub_site.links
                    })

                    # (Optional) Recursively crawl further if you want deeper coverage:
                    deeper_links = sub_site.crawl_internal_links(visited=visited)
                    results.extend(deeper_links)

                except Exception as e:
                    print(f"Error scraping {abs_link}: {e}")

        return results

# site = Website("https://ashesofcreation.wiki/")
site = Website("http://Gratco.com")
# site.links
front_dict = {
    "url": site.url,
    "title": site.title,
    "text": site.text,
    "links": site.links
}
site_dict = site.crawl_internal_links()

site_dict.insert(0, front_dict)

# for i, page_info in enumerate(site_dict, start=0):
#     print(f"\nPage #{i} => URL: {page_info['url']}")
#     print("Links found:")
#     for link in page_info["links"]:
#         print(link)

# now we make a list of the text of each page
documents = []
for page_info in site_dict:
    doc = Document(page_content=page_info["text"])
    doc.metadata = {"doc_type": page_info["title"]}
    documents.append(doc)
    


In [None]:
# # Read in documents using LangChain's loaders
# # Take everything in all the sub-folders of our knowledgebase

# folders = glob.glob("knowledge-base/*")

# # With thanks to CG and Jon R, students on the course, for this fix needed for some users 
# text_loader_kwargs = {'encoding': 'utf-8'}
# # If that doesn't work, some Windows users might need to uncomment the next line instead
# # text_loader_kwargs={'autodetect_encoding': True}

# documents = []
# for folder in folders:
#     doc_type = os.path.basename(folder)
#     loader = DirectoryLoader(folder, glob="**/*.md", loader_cls=TextLoader, loader_kwargs=text_loader_kwargs)
#     folder_docs = loader.load()
#     for doc in folder_docs:
#         doc.metadata["doc_type"] = doc_type
#         documents.append(doc)

# Please note:

In the next cell, we split the text into chunks.

2 students let me know that the next cell crashed their computer.  
They were able to fix it by changing the chunk_size from 1,000 to 2,000 and the chunk_overlap from 200 to 400.  
This shouldn't be required; but if it happens to you, please make that change!  
(Note that LangChain may give a warning about a chunk being larger than 1,000 - this can be safely ignored).

_With much thanks to Steven W and Nir P for this valuable contribution._

In [19]:
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
chunks = text_splitter.split_documents(documents)

In [20]:
len(chunks)

18

In [21]:
doc_types = set(chunk.metadata['doc_type'] for chunk in chunks)
print(f"Document types found: {', '.join(doc_types)}")

Document types found: Kill the Messenger Support Site, Gratco Productions, Gratco Productions Reference, File Not Found, Gratco Games,  About Gratco Productions , Grat's Googly Eyes! Support Site, Grat's Slate/Film Clapboard Support Site


## A sidenote on Embeddings, and "Auto-Encoding LLMs"

We will be mapping each chunk of text into a Vector that represents the meaning of the text, known as an embedding.

OpenAI offers a model to do this, which we will use by calling their API with some LangChain code.

This model is an example of an "Auto-Encoding LLM" which generates an output given a complete input.
It's different to all the other LLMs we've discussed today, which are known as "Auto-Regressive LLMs", and generate future tokens based only on past context.

Another example of an Auto-Encoding LLMs is BERT from Google. In addition to embedding, Auto-encoding LLMs are often used for classification.

### Sidenote

In week 8 we will return to RAG and vector embeddings, and we will use an open-source vector encoder so that the data never leaves our computer - that's an important consideration when building enterprise systems and the data needs to remain internal.

In [22]:
# Put the chunks of data into a Vector Store that associates a Vector Embedding with each chunk

embeddings = OpenAIEmbeddings()

# If you would rather use the free Vector Embeddings from HuggingFace sentence-transformers
# Then replace embeddings = OpenAIEmbeddings()
# with:
# from langchain.embeddings import HuggingFaceEmbeddings
# embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")

In [23]:
# Check if a Chroma Datastore already exists - if so, delete the collection to start from scratch

if os.path.exists(db_name):
    Chroma(persist_directory=db_name, embedding_function=embeddings).delete_collection()

In [24]:
# Create our Chroma vectorstore!
vectorstore = Chroma.from_documents(documents=chunks, embedding=embeddings, persist_directory=db_name)
print(f"Vectorstore created with {vectorstore._collection.count()} documents")

Vectorstore created with 18 documents


In [25]:
# Get one vector and find how many dimensions it has

collection = vectorstore._collection
sample_embedding = collection.get(limit=1, include=["embeddings"])["embeddings"][0]
dimensions = len(sample_embedding)
print(f"The vectors have {dimensions:,} dimensions")

The vectors have 1,536 dimensions


## Visualizing the Vector Store

Let's take a minute to look at the documents and their embedding vectors to see what's going on.

In [27]:
# Prework

result = collection.get(include=['embeddings', 'documents', 'metadatas'])
vectors = np.array(result['embeddings'])
documents = result['documents']
doc_types = [metadata['doc_type'] for metadata in result['metadatas']]
# colors = [['blue', 'green', 'red', 'orange'][['products', 'employees', 'contracts', 'company'].index(t)] for t in doc_types]

In [29]:
# We humans find it easier to visalize things in 2D!
# Reduce the dimensionality of the vectors to 2D using t-SNE
# (t-distributed stochastic neighbor embedding)
# Ensure the perplexity is less than the number of samples
perplexity_value = min(30, len(vectors) - 1)

tsne = TSNE(n_components=2, random_state=42, perplexity=perplexity_value)
reduced_vectors = tsne.fit_transform(vectors)

# Create the 2D scatter plot
fig = go.Figure(data=[go.Scatter(
    x=reduced_vectors[:, 0],
    y=reduced_vectors[:, 1],
    mode='markers',
    marker=dict(size=5, color='blue', opacity=0.8),
    text=[f"Type: {t}<br>Text: {d[:100]}..." for t, d in zip(doc_types, documents)],
    hoverinfo='text'
)])

fig.update_layout(
    title='2D Chroma Vector Store Visualization',
    scene=dict(xaxis_title='x',yaxis_title='y'),
    width=800,
    height=600,
    margin=dict(r=20, b=10, l=10, t=40)
)

fig.show()

In [32]:
# Let's try 3D!

tsne = TSNE(n_components=3, random_state=42, perplexity=perplexity_value)
reduced_vectors = tsne.fit_transform(vectors)

# Create the 3D scatter plot
fig = go.Figure(data=[go.Scatter3d(
    x=reduced_vectors[:, 0],
    y=reduced_vectors[:, 1],
    z=reduced_vectors[:, 2],
    mode='markers',
    marker=dict(size=5, color="blue", opacity=0.8),
    text=[f"Type: {t}<br>Text: {d[:100]}..." for t, d in zip(doc_types, documents)],
    hoverinfo='text'
)])

fig.update_layout(
    title='3D Chroma Vector Store Visualization',
    scene=dict(xaxis_title='x', yaxis_title='y', zaxis_title='z'),
    width=900,
    height=700,
    margin=dict(r=20, b=10, l=10, t=40)
)

fig.show()