# Week 5 Exercise
## Self-knowledge expert



In [None]:
# imports

import os
import glob
import requests
from bs4 import BeautifulSoup
from dotenv import load_dotenv
import gradio as gr

# imports for langchain, plotly and Chroma

from langchain.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.schema import Document
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_chroma import Chroma
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
import numpy as np
import plotly.graph_objects as go
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain
from langchain.embeddings import HuggingFaceEmbeddings

In [None]:
# low cost model

MODEL = "gpt-4o-mini"
db_name = "vector_db"

In [None]:
# Load environment variables in a file called .env

load_dotenv(override=True)
os.environ['OPENAI_API_KEY'] = os.getenv('OPENAI_API_KEY', 'your-key-if-not-using-env')

## Website Scraping

In [None]:
# A class to represent a Webpage

# Some websites need you to use proper headers when fetching them:
headers = {
 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}

class Website:
    """
    A utility class to represent a Website that we have scraped, now with links
    """

    def __init__(self, url):
        self.url = url
        response = requests.get(url, headers=headers)
        self.body = response.content
        soup = BeautifulSoup(self.body, 'html.parser')
        self.title = soup.title.string if soup.title else "No title found"
        if soup.body:
            for irrelevant in soup.body(["script", "style", "img", "input"]):
                irrelevant.decompose()
            self.text = soup.body.get_text(separator="\n", strip=True)
        else:
            self.text = ""
        links = [link.get('href') for link in soup.find_all('a')]
        self.links = [link for link in links if link]

    def get_contents(self):
        return f"Webpage Title:\n{self.title}\nWebpage Contents:\n{self.text}\n\n"

def write_output_to_file(filename: str, content: str, mode: str = 'w') -> None:
    """
    Writes the given content to a specified file.

    Args:
        filename (str): The name of the file to write to.
        content (str): The string content to write into the file.
        mode (str): The file opening mode.
                    'w' for write (overwrites existing file or creates new).
                    'a' for append (adds to the end of the file or creates new).
                    'x' for exclusive creation (fails if file already exists).
                    (default is 'w')
    """
    try:
        with open(filename, mode) as f:
            f.write(content)
        print(f"Content successfully written to '{filename}' in mode '{mode}'.")
    except FileExistsError:
        print(f"Error: File '{filename}' already exists. Use mode 'w' to overwrite or 'a' to append.")
    except IOError as e:
        print(f"An I/O error occurred while writing to '{filename}': {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

In [None]:
WEBSITE_OUTPUT = "alan-knowledge-base/website-scraping/alanjponte.com-website.md"

write_output_to_file(
    filename=WEBSITE_OUTPUT,
    content=alan_website.get_contents(),
)

## Read in folders


In [None]:
# Read in documents using LangChain's loaders
# Take everything in all the sub-folders of our knowledgebase

KNOWLEDGE_BASE = "alan-knowledge-base/*"

folders = glob.glob(KNOWLEDGE_BASE)

def add_metadata(doc, doc_type):
    doc.metadata["doc_type"] = doc_type
    return doc

# With thanks to CG and Jon R, students on the course, for this fix needed for some users
text_loader_kwargs = {'encoding': 'utf-8'}
# If that doesn't work, some Windows users might need to uncomment the next line instead
# text_loader_kwargs={'autodetect_encoding': True}

documents = []
for folder in folders:
    doc_type = os.path.basename(folder)
    loader = DirectoryLoader(folder, glob="**/*.md", loader_cls=TextLoader, loader_kwargs=text_loader_kwargs)
    folder_docs = loader.load()
    documents.extend([add_metadata(doc, doc_type) for doc in folder_docs])

text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
chunks = text_splitter.split_documents(documents)

print(f"Total number of chunks: {len(chunks)}")
print(f"Document types found: {set(doc.metadata['doc_type'] for doc in documents)}")

## Vector Embeddings

In [None]:
# Put the chunks of data into a Vector Store that associates a Vector Embedding with each chunk
# Chroma is a popular open source Vector Database based on SQLLite

embeddings = OpenAIEmbeddings()

# If you would rather use the free Vector Embeddings from HuggingFace sentence-transformers
# Then replace embeddings = OpenAIEmbeddings()
# with:
# from langchain.embeddings import HuggingFaceEmbeddings
# embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")

# Delete if already exists

if os.path.exists(db_name):
    Chroma(persist_directory=db_name, embedding_function=embeddings).delete_collection()

# Create vectorstore

vectorstore = Chroma.from_documents(documents=chunks, embedding=embeddings, persist_directory=db_name)
print(f"Vectorstore created with {vectorstore._collection.count()} documents")

In [None]:
# Let's investigate the vectors

collection = vectorstore._collection
count = collection.count()

sample_embedding = collection.get(limit=1, include=["embeddings"])["embeddings"][0]
dimensions = len(sample_embedding)
print(f"There are {count:,} vectors with {dimensions:,} dimensions in the vector store")

## Visualizing the Vector Store


In [None]:
# Prework (with thanks to Jon R for identifying and fixing a bug in this!)

result = collection.get(include=['embeddings', 'documents', 'metadatas'])
vectors = np.array(result['embeddings'])
documents = result['documents']
metadatas = result['metadatas']
doc_types = [metadata['doc_type'] for metadata in metadatas]
colors = [['blue', 'green', 'red', 'orange', 'yellow'][['about', 'career-history', 'relationships', 'hobbies', 'website-scraping'].index(t)] for t in doc_types]

In [None]:
from sklearn.manifold import TSNE
import plotly.graph_objects as go

# Assuming 'vectors', 'colors', 'doc_types', and 'documents' are already defined
# For demonstration purposes, let's create some dummy data if they aren't
import numpy as np
if 'vectors' not in locals():
    vectors = np.random.rand(15, 50) # 15 samples, 50 dimensions
if 'colors' not in locals():
    colors = np.random.randint(0, 5, 15)
if 'doc_types' not in locals():
    doc_types = [f"Type_{i%3}" for i in range(15)]
if 'documents' not in locals():
    documents = [f"This is document number {i} with some content." for i in range(15)]


# Reduce the dimensionality of the vectors to 2D using t-SNE
tsne = TSNE(n_components=2, random_state=42, perplexity=min(5, len(vectors) - 1)) # Choose a perplexity less than n_samples
# A safer choice is often `min(some_small_number, len(vectors) - 1)`
# For 15 samples, perplexity=5 or perplexity=10 would work.
# Let's try 5, or if 5 is too much, try len(vectors) - 1 if your n_samples is very small.
# A robust way is to ensure it's at least 1 and less than n_samples.
# Given your n_samples is 15, 5 is a good starting point.
# You could even do perplexity=min(30, len(vectors) - 1) if you want to be dynamic,
# but given your current error, len(vectors) - 1 will ensure it's always valid.
# For n_samples=15, min(30, 14) = 14. So perplexity=14 would work too.

reduced_vectors = tsne.fit_transform(vectors)

# Create the 2D scatter plot
fig = go.Figure(data=[go.Scatter(
    x=reduced_vectors[:, 0],
    y=reduced_vectors[:, 1],
    mode='markers',
    marker=dict(size=5, color=colors, opacity=0.8),
    text=[f"Type: {t}<br>Text: {d[:100]}..." for t, d in zip(doc_types, documents)],
    hoverinfo='text'
)])

fig.update_layout(
    title='2D Chroma Vector Store Visualization',
    scene=dict(xaxis_title='x',yaxis_title='y'),
    width=800,
    height=600,
    margin=dict(r=20, b=10, l=10, t=40)
)

fig.show()

In [None]:
from sklearn.manifold import TSNE
import plotly.graph_objects as go

# Assuming 'vectors', 'colors', 'doc_types', and 'documents' are already defined
# For demonstration purposes, let's create some dummy data if they aren't
import numpy as np
if 'vectors' not in locals():
    vectors = np.random.rand(15, 50) # 15 samples, 50 dimensions
if 'colors' not in locals():
    colors = np.random.randint(0, 5, 15)
if 'doc_types' not in locals():
    doc_types = [f"Type_{i%3}" for i in range(15)]
if 'documents' not in locals():
    documents = [f"This is document number {i} with some content." for i in range(15)]


# Reduce the dimensionality of the vectors to 2D using t-SNE
tsne = TSNE(n_components=3, random_state=42, perplexity=min(5, len(vectors) - 1)) # Choose a perplexity less than n_samples
# A safer choice is often `min(some_small_number, len(vectors) - 1)`
# For 15 samples, perplexity=5 or perplexity=10 would work.
# Let's try 5, or if 5 is too much, try len(vectors) - 1 if your n_samples is very small.
# A robust way is to ensure it's at least 1 and less than n_samples.
# Given your n_samples is 15, 5 is a good starting point.
# You could even do perplexity=min(30, len(vectors) - 1) if you want to be dynamic,
# but given your current error, len(vectors) - 1 will ensure it's always valid.
# For n_samples=15, min(30, 14) = 14. So perplexity=14 would work too.

reduced_vectors = tsne.fit_transform(vectors)

# Create the 3D scatter plot
fig = go.Figure(data=[go.Scatter3d(
    x=reduced_vectors[:, 0],
    y=reduced_vectors[:, 1],
    z=reduced_vectors[:, 2],
    mode='markers',
    marker=dict(size=5, color=colors, opacity=0.8),
    text=[f"Type: {t}<br>Text: {d[:100]}..." for t, d in zip(doc_types, documents)],
    hoverinfo='text'
)])

fig.update_layout(
    title='3D Chroma Vector Store Visualization',
    scene=dict(xaxis_title='x', yaxis_title='y', zaxis_title='z'),
    width=900,
    height=700,
    margin=dict(r=20, b=10, l=10, t=40)
)

fig.show()

## LangChain

In [None]:
# create a new Chat with OpenAI
llm = ChatOpenAI(temperature=0.7, model_name=MODEL)

# Alternative - if you'd like to use Ollama locally, uncomment this line instead
# llm = ChatOpenAI(temperature=0.7, model_name='llama3.2', base_url='http://localhost:11434/v1', api_key='ollama')

# set up the conversation memory for the chat
memory = ConversationBufferMemory(memory_key='chat_history', return_messages=True)

# the retriever is an abstraction over the VectorStore that will be used during RAG
retriever = vectorstore.as_retriever()

# putting it together: set up the conversation chain with the GPT 3.5 LLM, the vector store and memory
conversation_chain = ConversationalRetrievalChain.from_llm(llm=llm, retriever=retriever, memory=memory)

In [None]:
# Let's try a simple question

query = "Please explain who Alan is in a couple of sentences"
result = conversation_chain.invoke({"question": query})
print(result["answer"])

In [None]:
# set up a new conversation memory for the chat
memory = ConversationBufferMemory(memory_key='chat_history', return_messages=True)

# putting it together: set up the conversation chain with the GPT 4o-mini LLM, the vector store and memory
conversation_chain = ConversationalRetrievalChain.from_llm(llm=llm, retriever=retriever, memory=memory)

In [None]:
# Wrapping that in a function

def chat(question, history):
    result = conversation_chain.invoke({"question": question})
    return result["answer"]

In [None]:
# And in Gradio:

view = gr.ChatInterface(chat, type="messages").launch(inbrowser=True)

In [None]:
# Let's investigate what gets sent behind the scenes

from langchain_core.callbacks import StdOutCallbackHandler

llm = ChatOpenAI(temperature=0.7, model_name=MODEL)

memory = ConversationBufferMemory(memory_key='chat_history', return_messages=True)

retriever = vectorstore.as_retriever()

conversation_chain = ConversationalRetrievalChain.from_llm(llm=llm, retriever=retriever, memory=memory, callbacks=[StdOutCallbackHandler()])

query = "Who was Alan's dad"
result = conversation_chain.invoke({"question": query})
answer = result["answer"]
print("\nAnswer:", answer)

In [None]:
# create a new Chat with OpenAI
llm = ChatOpenAI(temperature=0.7, model_name=MODEL)

# set up the conversation memory for the chat
memory = ConversationBufferMemory(memory_key='chat_history', return_messages=True)

# the retriever is an abstraction over the VectorStore that will be used during RAG; k is how many chunks to use
retriever = vectorstore.as_retriever(search_kwargs={"k": 25})

# putting it together: set up the conversation chain with the GPT 3.5 LLM, the vector store and memory
conversation_chain = ConversationalRetrievalChain.from_llm(llm=llm, retriever=retriever, memory=memory)

In [None]:
def chat(question, history):
    result = conversation_chain.invoke({"question": question})
    return result["answer"]

In [None]:
view = gr.ChatInterface(chat, type="messages").launch(inbrowser=True)