In [None]:
import os
import glob
import gradio as gr
from dotenv import load_dotenv

# RAG imports
import numpy as np
from sklearn.manifold import TSNE
import plotly.graph_objects as go
from langchain_chroma import Chroma

from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain.text_splitter import CharacterTextSplitter
from langchain.document_loaders import (
    TextLoader,
    DirectoryLoader,
)

In [None]:
# Constants

MODEL = "gpt-4o-mini"
VECTOR_DB = "../vector_db"
KNOWLEDGE_BASE = "../knowledge-base/linkedin"

In [None]:
# Load environment variables from .env

load_dotenv(override=True)
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "your-key-if-not-using-env")

In [None]:
def add_metadata(doc, doc_type):
    """
    Add a 'doc_type' field to the document's metadata.

    Args:
        doc (Document): The document to annotate.
        doc_type (str): The type or source folder of the document.

    Returns:
        Document: The updated document with added metadata.
    """
    doc.metadata["doc_type"] = doc_type
    return doc


def load_folder_documents(folder):
    """
    Load all markdown documents from a given folder and tag them with metadata.

    Args:
        folder (str): Path to the folder containing documents.

    Returns:
        list: List of documents with metadata added.
    """
    doc_type = os.path.basename(folder)
    loader = DirectoryLoader(
        folder,
        glob="**/*",
        loader_cls=TextLoader,
        loader_kwargs={"encoding": "utf-8"},
    )
    folder_docs = loader.load()
    return [add_metadata(doc, doc_type) for doc in folder_docs]


def load_documents(folders):
    """
    Load documents from a list of folders.

    Args:
        folders (list): List of folder paths.

    Returns:
        list: Combined list of documents from all folders.
    """
    documents = []
    for folder in folders:
        documents.extend(load_folder_documents(folder))
    return documents


def load_documents_from_knowledge_base(knowledge_base: str):
    """
    Load all documents from the specified knowledge base directory.

    Uses the provided knowledge_base path (can include wildcards) to find all folders,
    then loads and tags markdown documents within those folders.

    Args:
        knowledge_base (str): Path to the knowledge base directory or pattern (e.g., "knowledge-base/*").

    Returns:
        list: A list of LangChain Document objects with added metadata.
    """
    folders = glob.glob(f"{knowledge_base}/*")
    return load_documents(folders)


def split_into_chunks(documents, chunk_size=1000, chunk_overlap=200):
    """
    Split documents into smaller text chunks for processing.

    Args:
        documents (list): List of LangChain Document objects.
        chunk_size (int): Maximum size of each chunk.
        chunk_overlap (int): Number of overlapping characters between chunks.

    Returns:
        list: List of chunked Document objects.
    """
    splitter = CharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
    return splitter.split_documents(documents)

In [None]:
documents = load_documents_from_knowledge_base(KNOWLEDGE_BASE)
chunks = split_into_chunks(documents)

print(f"Total documents: {len(documents)}")
print(f"Total chunks: {len(chunks)}")
print(f"Document types: {set(doc.metadata['doc_type'] for doc in documents)}")

In [None]:
def create_vectorstore(chunks, persist_directory, overwrite=True):
    """
    Create a Chroma vector store from document chunks.

    Args:
        chunks (list): List of document chunks to embed and store.
        persist_directory (str): Directory path where the vector store will be saved.
        overwrite (bool): If True, deletes existing collection before creating a new one.

    Returns:
        Chroma: The created Chroma vector store instance.
    """
    embeddings = OpenAIEmbeddings()

    if overwrite and os.path.exists(persist_directory):
        Chroma(
            persist_directory=persist_directory, embedding_function=embeddings
        ).delete_collection()

    vectorstore = Chroma.from_documents(
        documents=chunks, embedding=embeddings, persist_directory=persist_directory
    )

    return vectorstore


vectorstore = create_vectorstore(chunks, VECTOR_DB)
print(f"Vectorstore created with {vectorstore._collection.count()} documents")

In [None]:
def get_collection(vectorstore):
    """
    Retrieve the underlying collection from a Chroma vector store.

    Args:
        vectorstore (Chroma): An instance of the Chroma vector store.

    Returns:
        Collection: The internal collection object.
    """
    return vectorstore._collection


def inspect_vectorstore(collection):
    """
    Inspect the contents of a Chroma vector store collection.

    Prints the number of vectors and their embedding dimensionality.

    Args:
        collection: The underlying Chroma collection object.
    """
    count = collection.count()

    sample = collection.get(limit=1, include=["embeddings"])
    embeddings = sample.get("embeddings", [])

    if len(embeddings) > 0:
        dimensions = len(embeddings[0])
        print(
            f"There are {count:,} vectors with {dimensions:,} dimensions in the vector store"
        )
    else:
        print("No embeddings found in the vector store.")


collection = get_collection(vectorstore)
inspect_vectorstore(collection)

In [None]:
# Get all vectors, documents, and metadata from the collection
result = collection.get(include=["embeddings", "documents", "metadatas"])

vectors = np.array(result["embeddings"])
documents = result["documents"]
metadatas = result["metadatas"]

# Extract 'doc_type' from each metadata dictionary
doc_types = [metadata.get("doc_type", "unknown") for metadata in metadatas]

# Map doc_types to dark colors
color_map = {
    "profile": "#1e3a8a",          # Deep blue for profile information
    "experience": "#065f46",        # Dark green for work experience and career
    "education": "#7c2d12",         # Brown for education and academic achievements
    "skills": "#ca8a04",           # Gold for skills and competencies
    "certifications": "#7c3aed",   # Purple for certifications and credentials
    "projects": "#ea580c",         # Orange for projects and portfolio work
    "publications": "#dc2626",     # Red for publications and written work
    "networking": "#0d9488",       # Teal for connections and networking
    "communications": "#059669",   # Green for messages and communications
    "preferences": "#6b7280",      # Gray for settings and preferences
}

# Assign colors to each vector based on its doc_type
colors = [color_map.get(doc_type, "gray") for doc_type in doc_types]

In [None]:
def visualize_vectorstore_2d(
    vectors, documents, doc_types, colors, title="2D Chroma Vector Store Visualization"
):
    """
    Reduce vector embeddings to 2D using t-SNE and return a Plotly figure.

    Args:
        vectors (ndarray): The high-dimensional vector embeddings.
        documents (list): The original text documents (same order as vectors).
        doc_types (list): The type of each document, used for hover info and coloring.
        colors (list): Color values corresponding to each doc_type.
        title (str): Title for the Plotly figure.

    Returns:
        go.Figure: A Plotly figure object ready to be displayed.
    """
    tsne = TSNE(n_components=2, random_state=42)
    reduced_vectors = tsne.fit_transform(vectors)

    hover_texts = [
        f"Type: {t}<br>Text: {d[:100]}..." for t, d in zip(doc_types, documents)
    ]

    fig = go.Figure(
        data=[
            go.Scatter(
                x=reduced_vectors[:, 0],
                y=reduced_vectors[:, 1],
                mode="markers",
                marker=dict(size=5, color=colors, opacity=0.8),
                text=hover_texts,
                hoverinfo="text",
            )
        ]
    )

    fig.update_layout(
        title=title,
        xaxis_title="x",
        yaxis_title="y",
        width=800,
        height=600,
        margin=dict(r=20, b=10, l=10, t=40),
    )

    return fig


fig = visualize_vectorstore_2d(vectors, documents, doc_types, colors)
fig.show()

In [None]:
def visualize_vectorstore_3d(
    vectors, documents, doc_types, colors, title="3D Chroma Vector Store Visualization"
):
    """
    Reduce vector embeddings to 3D using t-SNE and return a Plotly 3D scatter plot.

    Args:
        vectors (ndarray): High-dimensional vector embeddings.
        documents (list): Corresponding document texts.
        doc_types (list): Document types for labeling and coloring.
        colors (list): Color values for each point based on doc_type.
        title (str): Title for the Plotly figure.

    Returns:
        go.Figure: A Plotly 3D scatter plot figure object.
    """
    tsne = TSNE(n_components=3, random_state=42)
    reduced_vectors = tsne.fit_transform(vectors)

    hover_texts = [
        f"Type: {t}<br>Text: {d[:100]}..." for t, d in zip(doc_types, documents)
    ]

    fig = go.Figure(
        data=[
            go.Scatter3d(
                x=reduced_vectors[:, 0],
                y=reduced_vectors[:, 1],
                z=reduced_vectors[:, 2],
                mode="markers",
                marker=dict(size=5, color=colors, opacity=0.8),
                text=hover_texts,
                hoverinfo="text",
            )
        ]
    )

    fig.update_layout(
        title=title,
        scene=dict(xaxis_title="x", yaxis_title="y", zaxis_title="z"),
        width=900,
        height=700,
        margin=dict(r=20, b=10, l=10, t=40),
    )

    return fig


fig = visualize_vectorstore_3d(vectors, documents, doc_types, colors)
fig.show()

In [None]:
def create_conversational_chain(vectorstore, model_name, temperature=0.7, k=25):
    """
    Set up a Conversational Retrieval Chain using OpenAI LLM, a retriever over the vectorstore, and memory.

    Args:
        vectorstore (Chroma): The vectorstore containing embedded document chunks.
        model_name (str): Name of the OpenAI model to use (e.g., 'gpt-3.5-turbo').
        temperature (float): Sampling temperature for generation.
        k (int): Number of chunks to retrieve for context.

    Returns:
        ConversationalRetrievalChain: A configured conversational chain for RAG.
    """
    llm = ChatOpenAI(temperature=temperature, model_name=model_name)
    memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
    retriever = vectorstore.as_retriever(search_kwargs={"k": k})

    return ConversationalRetrievalChain.from_llm(
        llm=llm, retriever=retriever, memory=memory
    )


conversation_chain = create_conversational_chain(vectorstore, model_name=MODEL)

In [None]:
def chat(question, history):
    """
    Submit a question to the conversational retrieval chain.

    Args:
        question (str): The user's input question.
        history (list): A list of (question, answer) tuples representing the chat history.
                        This is not used internally by the chain, but can be maintained externally.

    Returns:
        str: The assistant's response to the question.
    """
    result = conversation_chain.invoke({"question": question})
    return result["answer"]

In [None]:
force_dark_mode = """
function refresh() {
    const url = new URL(window.location);
    if (url.searchParams.get('__theme') !== 'dark') {
        url.searchParams.set('__theme', 'dark');
        window.location.href = url.href;
    }
}
"""

view = gr.ChatInterface(chat, type="messages", js=force_dark_mode).launch(
    inbrowser=True, share=True
)

In [None]:
from langchain.prompts import (
    ChatPromptTemplate,
    SystemMessagePromptTemplate,
    HumanMessagePromptTemplate,
)


def create_conversational_chain_with_fallback_prompt(
    vectorstore, model_name, temperature=0.7, k=25
):
    """
    Creates a conversational retrieval chain with a system prompt that allows fallback to general knowledge.

    Args:
        vectorstore (Chroma): Your vectorstore for retrieval.
        model_name (str): Model name like 'gpt-4' or 'gpt-3.5-turbo'.
        temperature (float): Temperature setting for the LLM.
        k (int): Number of top documents to retrieve.

    Returns:
        ConversationalRetrievalChain: The configured chain.
    """
    llm = ChatOpenAI(temperature=temperature, model_name=model_name)
    memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
    retriever = vectorstore.as_retriever(search_kwargs={"k": k})

    system_prompt = (
        "You are a helpful assistant. Use the following retrieved context to answer the user's question. "
        "If the context does not contain the answer, you may answer using your own knowledge."
    )

    system_message = SystemMessagePromptTemplate.from_template(system_prompt)
    human_message = HumanMessagePromptTemplate.from_template(
        "Context:\n{context}\n\nQuestion:\n{question}"
    )
    prompt = ChatPromptTemplate.from_messages([system_message, human_message])

    return ConversationalRetrievalChain.from_llm(
        llm=llm,
        retriever=retriever,
        memory=memory,
        combine_docs_chain_kwargs={"prompt": prompt},
    )


# Call this once
general_conversation_chain = create_conversational_chain_with_fallback_prompt(
    vectorstore, model_name=MODEL
)