Cell 1: Imports

In [None]:
# Imports and Pydantic Models
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
import pandas as pd
import fitz  # PyMuPDF
import ollama
import json
import os
from langchain_ollama import OllamaEmbeddings, ChatOllama
from langchain_chroma import Chroma
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_text_splitters import RecursiveCharacterTextSplitter

# Pydantic model for command extraction
class QueryCommand(BaseModel):
    command: str = Field(..., description="Command type (e.g., summarize, answer_question, search, compare_files)")
    params: Dict = Field(default_factory=dict, description="Parameters like keyword, file_name, file_type")

# Pydantic model for response formatting
class ChatbotResponse(BaseModel):
    answer: str = Field(..., description="Concise response to the query, under 100 words", min_length=1, max_length=100)
    sources: List[str] = Field(default_factory=list, description="List of file names referenced in the answer")

Cell 2: Load docs

In [None]:
# Document Loading Module
def load_document(file_path: str) -> tuple[str, List[Dict]]:
    """Parse a document (Excel, PDF, TXT) into chunks with metadata."""
    chunks = []
    file_name = os.path.basename(file_path)
    file_ext = os.path.splitext(file_name)[1].lower()
    
    if file_ext in ['.xlsx', '.xls']:
        sheets = pd.read_excel(file_path, sheet_name=None)
        for sheet_name, df in sheets.items():
            for i, row in df.iterrows():
                chunks.append({"text": f"Sheet: {sheet_name}, Row {i}: {row.to_dict()}", 
                              "metadata": {"file_name": file_name, "file_type": "excel", "sheet": sheet_name}})
    elif file_ext == '.pdf':
        doc = fitz.open(file_path)
        for page in doc:
            text = page.get_text()
            chunks.append({"text": f"Page {page.number}: {text}", 
                          "metadata": {"file_name": file_name, "file_type": "pdf", "page": page.number}})
        doc.close()
    elif file_ext == '.txt':
        with open(file_path, 'r', encoding='utf-8') as f:
            text = f.read()
            chunks.append({"text": text, "metadata": {"file_name": file_name, "file_type": "txt"}})
    else:
        return file_name, []
    
    # Split large texts
    splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100)
    split_chunks = []
    for chunk in chunks:
        split_texts = splitter.split_text(chunk["text"])
        for split_text in split_texts:
            split_chunks.append({"text": split_text, "metadata": chunk["metadata"]})
    
    return file_name, split_chunks

Cell 3: Extraction

In [None]:
# Command Extraction Module
def extract_command(query: str) -> QueryCommand:
    """Use Mistral to parse query into a structured command."""
    prompt = (
        "You are a document repository assistant. Given this query: '{query}', identify the command "
        "(e.g., 'summarize', 'answer_question', 'search', 'compare_files') and parameters "
        "(e.g., file_name, keyword, file_type). Output JSON: {'command': '...', 'params': {'file_name': '...', 'keyword': '...', 'file_type': '...'}}. "
        "Examples:\n"
        "Query: 'Summarize sales data' → {'command': 'summarize', 'params': {'keyword': 'sales', 'file_type': 'all'}}\n"
        "Query: 'Compare revenue in file1.xlsx and file2.xlsx' → {'command': 'compare_files', 'params': {'file_name': ['file1.xlsx', 'file2.xlsx'], 'keyword': 'revenue'}}\n"
        "Query: 'What is in report.pdf?' → {'command': 'answer_question', 'params': {'file_name': 'report.pdf', 'file_type': 'pdf'}}"
    )
    response = ollama.chat(model='mistral:7b-instruct-v0.3-q4_0', messages=[
        {'role': 'system', 'content': prompt},
        {'role': 'user', 'content': f"Query: {query}"}
    ])
    try:
        command_dict = json.loads(response['message']['content'])
        return QueryCommand(**command_dict)
    except:
        return QueryCommand(command="answer_question", params={"keyword": query, "file_type": "all"})

Cell 4: Retrieve

In [None]:
# Data Retrieval Module
def setup_rag(chunks: List[Dict]) -> any:
    """Set up RAG pipeline with Chroma for repository-wide retrieval."""
    texts = [chunk["text"] for chunk in chunks]
    metadatas = [chunk["metadata"] for chunk in chunks]
    embeddings = OllamaEmbeddings(model='nomic-embed-text')
    vectorstore = Chroma.from_texts(texts, embeddings, metadatas=metadatas, persist_directory='./db')
    return vectorstore.as_retriever(search_kwargs={"k": 5})

Cell 5: Response Generation

In [None]:
# Response Generation Module
def generate_response(chunks: List[Dict], query: str, command: QueryCommand) -> ChatbotResponse:
    """Generate a concise, document-grounded response using RAG."""
    retriever = setup_rag(chunks)
    # Apply metadata filters if specified
    filter_params = {}
    if command.params.get("file_type") != "all":
        filter_params["file_type"] = command.params["file_type"]
    if command.params.get("file_name"):
        filter_params["file_name"] = {"$in": command.params["file_name"]} if isinstance(command.params["file_name"], list) else command.params["file_name"]
    retriever.search_kwargs["filter"] = filter_params if filter_params else None
    
    llm = ChatOllama(model='mistral:7b-instruct-v0.3-q4_0')
    prompt = ChatPromptTemplate.from_template(
        "Context: {context}\nQuestion: {query}\nProvide a concise answer (under 100 words) based only on the document data. Cite file names."
    )
    chain = {"context": retriever, "query": RunnablePassthrough()} | prompt | llm | StrOutputParser()
    answer = chain.invoke(query)
    
    # Extract sources from retrieved documents
    retrieved_docs = retriever.get_relevant_documents(query)
    sources = list(set(doc.metadata.get("file_name", "unknown") for doc in retrieved_docs))
    
    return ChatbotResponse(answer=answer, sources=sources)

Cell 6: Commands

In [None]:
# Testing the Pipeline
import glob

# Load all documents in a directory (replace with your path)
repo_path = "./documents"  # Create a folder and add .xlsx, .pdf, .txt files
all_chunks = []
for file_path in glob.glob(f"{repo_path}/*"):
    file_name, chunks = load_document(file_path)
    if chunks:
        print(f"Loaded {file_name}")
        all_chunks.extend(chunks)

# Test query
query = "Summarize sales data"
response = handle_query(all_chunks, query)
print(f"Query: {query}")
print(f"Answer: {response.answer}")
print(f"Sources: {response.sources}")

Cell 7: Testing

In [None]:
# Testing the Pipeline
import glob

# Load all documents in a directory (replace with your path)
repo_path = "./documents"  # Create a folder and add .xlsx, .pdf, .txt files
all_chunks = []
for file_path in glob.glob(f"{repo_path}/*"):
    file_name, chunks = load_document(file_path)
    if chunks:
        print(f"Loaded {file_name}")
        all_chunks.extend(chunks)

# Test query
query = "Summarize sales data"
response = handle_query(all_chunks, query)
print(f"Query: {query}")
print(f"Answer: {response.answer}")
print(f"Sources: {response.sources}")

In [None]:
# Streamlit UI (Save as separate app.py file and run with `streamlit run app.py`)
import streamlit as st

st.title("Local Document Repository RAG Chatbot")
uploaded_files = st.file_uploader("Upload documents", type=['xlsx', 'xls', 'pdf', 'txt'], accept_multiple_files=True)
if uploaded_files:
    all_chunks = []
    for file in uploaded_files:
        file_path = f"./temp/{file.name}"
        os.makedirs(os.path.dirname(file_path), exist_ok=True)
        with open(file_path, "wb") as f:
            f.write(file.read())
        file_name, chunks = load_document(file_path)
        if chunks:
            all_chunks.extend(chunks)
            st.success(f"Loaded {file_name}")
    
    query = st.text_input("Ask about the document repository (e.g., 'Summarize sales data', 'Compare revenue in file1.xlsx and file2.xlsx')")
    if query:
        response = handle_query(all_chunks, query)
        st.write(f"**Answer**: {response.answer}")
        st.write(f"**Sources**: {', '.join(response.sources)}")