In [7]:
# --- Cell 1: Imports and Environment Setup ---
import os
import re
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
# Added loaders for Word, CSV, and Excel document support
from langchain_community.document_loaders import PyPDFLoader, TextLoader, Docx2txtLoader, CSVLoader, UnstructuredExcelLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_google_genai import ChatGoogleGenerativeAI
from dotenv import load_dotenv
from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.documents import Document
from typing import List



In [6]:
# Load environment variables from .env file
# Make sure you have a .env file with your GOOGLE_API_KEY
load_dotenv()

True

In [None]:
# --- Cell 2: Configuration ---
DOCS_PATH = "./documents"
CHROMA_PERSIST_PATH = "./chroma_db"
EMBEDDING_MODEL = 'sentence-transformers/all-MiniLM-L6-v2'
LLM_MODEL = "gemini-1.5-flash-latest" 
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")

In [None]:
# --- Cell 3: Helper Functions (Document Processors, Vector Store, and RAG Chain Creation) ---

class SmartPDFProcessor:
    """Advanced PDF processing with error handling"""
    def __init__(self, chunk_size=1000, chunk_overlap=100):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.chunk_size,
            chunk_overlap=self.chunk_overlap,
            separators=["\n\n", "\n", " ", ""],
        )

    def process_pdf(self, pdf_path: str) -> List[Document]:
        """Process PDF with smart chunking and metadata enhancement"""
        print(f"Processing PDF: {pdf_path}")
        try:
            # Load PDF
            loader = PyPDFLoader(pdf_path)
            pages = loader.load()

            # Process each page
            processed_chunks = []
            for page_num, page in enumerate(pages):
                # Clean text
                cleaned_text = self._clean_text(page.page_content)

                # Skip nearly empty pages
                if len(cleaned_text.strip()) < 50:
                    continue

                # Create chunks with enhanced metadata
                chunks = self.text_splitter.create_documents(
                    texts=[cleaned_text],
                    metadatas=[{
                        **page.metadata,
                        "page": page_num + 1,
                        "total_pages": len(pages),
                        "chunk_method": "smart_pdf_processor",
                        "char_count": len(cleaned_text)
                    }]
                )
                processed_chunks.extend(chunks)
            
            print(f"Successfully processed {len(processed_chunks)} chunks from {pdf_path}")
            return processed_chunks
        except Exception as e:
            print(f"Error processing {pdf_path}: {e}")
            return []

    def _clean_text(self, text: str) -> str:
        """Clean extracted text from PDFs."""
        # Remove excessive whitespace
        text = re.sub(r'\s+', ' ', text).strip()
        
        # Fix common PDF extraction issues (ligatures)
        text = text.replace("ﬁ", "fi")
        text = text.replace("ﬂ", "fl")
        
        return text

class SmartDocProcessor:
    """Handles processing of various document types like .txt and .docx."""
    def __init__(self, chunk_size=1000, chunk_overlap=100):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.chunk_size,
            chunk_overlap=self.chunk_overlap,
            separators=["\n\n", "\n", " ", ""],
        )

    def process_document(self, doc_path: str) -> List[Document]:
        """Loads, cleans, and chunks a document (.txt, .docx)."""
        print(f"Processing document: {doc_path}")
        try:
            if doc_path.lower().endswith(".docx"):
                loader = Docx2txtLoader(doc_path)
            elif doc_path.lower().endswith(".txt"):
                loader = TextLoader(doc_path, encoding='utf-8')
            else:
                print(f"Unsupported file type: {doc_path}")
                return []

            documents = loader.load()
            
            # Clean and filter documents
            cleaned_docs = []
            for doc in documents:
                cleaned_text = self._clean_text(doc.page_content)
                if len(cleaned_text.strip()) < 50:
                    continue
                # Create a new Document to avoid modifying the original
                new_doc = Document(page_content=cleaned_text, metadata=doc.metadata.copy())
                cleaned_docs.append(new_doc)

            splits = self.text_splitter.split_documents(cleaned_docs)
            
            # Add extra metadata to each chunk
            for split in splits:
                split.metadata.update({
                    "chunk_method": "smart_doc_processor",
                    "char_count": len(split.page_content)
                })

            print(f"Successfully processed {len(splits)} chunks from {doc_path}")
            return splits
        except Exception as e:
            print(f"Error processing {doc_path}: {e}")
            return []

    def _clean_text(self, text: str) -> str:
        """Cleans extracted text from documents."""
        text = re.sub(r'\s+', ' ', text).strip()
        return text

class SmartSheetProcessor:
    """Handles processing of spreadsheet files like .csv and .xlsx."""
    def process_sheet(self, sheet_path: str) -> List[Document]:
        """Loads and processes a spreadsheet file, treating rows as documents."""
        print(f"Processing sheet: {sheet_path}")
        try:
            if sheet_path.lower().endswith(".csv"):
                loader = CSVLoader(file_path=sheet_path, encoding='utf-8')
            elif sheet_path.lower().endswith(".xlsx"):
                # mode="elements" is good for unstructured tables
                # mode="single" might be better if each sheet is a cohesive document
                loader = UnstructuredExcelLoader(sheet_path, mode="elements")
            else:
                print(f"Unsupported sheet type: {sheet_path}")
                return []

            documents = loader.load()
            
            processed_docs = []
            for doc in documents:
                cleaned_content = self._clean_text(doc.page_content)
                if len(cleaned_content.strip()) < 10: # Rows can be short but not empty
                    continue
                
                doc.metadata.update({
                    "chunk_method": "smart_sheet_processor",
                    "char_count": len(cleaned_content)
                })
                doc.page_content = cleaned_content
                processed_docs.append(doc)

            print(f"Successfully processed {len(processed_docs)} rows/elements from {sheet_path}")
            return processed_docs

        except Exception as e:
            print(f"Error processing {sheet_path}: {e}")
            return []

    def _clean_text(self, text: str) -> str:
        """Cleans extracted text from sheets."""
        return re.sub(r'\s+', ' ', text).strip()


In [None]:
def get_vectorstore(embedding_function):
    """Initializes and returns a Chroma vector store using smart processors."""
    if os.path.exists(CHROMA_PERSIST_PATH):
        print("Loading existing vector store...")
        return Chroma(persist_directory=CHROMA_PERSIST_PATH, embedding_function=embedding_function)
    else:
        print("Creating new vector store...")
        all_splits = []
        pdf_processor = SmartPDFProcessor()
        doc_processor = SmartDocProcessor()
        sheet_processor = SmartSheetProcessor() # New processor
        
        # Process all files in the documents directory
        for filename in os.listdir(DOCS_PATH):
            file_path = os.path.join(DOCS_PATH, filename)
            
            if filename.lower().endswith(".pdf"):
                all_splits.extend(pdf_processor.process_pdf(file_path))
            elif filename.lower().endswith((".txt", ".docx")):
                all_splits.extend(doc_processor.process_document(file_path))
            elif filename.lower().endswith((".csv", ".xlsx")):
                all_splits.extend(sheet_processor.process_sheet(file_path))

        if not all_splits:
            raise ValueError(f"No processable documents found in {DOCS_PATH}. Please add your curriculum files.")

        # Create and persist the vector store
        print(f"Creating vector store with {len(all_splits)} document chunks...")
        vectorstore = Chroma.from_documents(
            documents=all_splits,
            embedding=embedding_function,
            persist_directory=CHROMA_PERSIST_PATH
        )
        print("Vector store created successfully.")
        return vectorstore

def create_conversational_rag_chain(retriever, llm):
    """Creates the conversational RAG chain."""
    # --- Contextualizing the Question ---
    contextualize_q_system_prompt = (
        "Given a chat history and the latest user question "
        "which might reference context in the chat history, "
        "formulate a standalone question which can be understood "
        "without the chat history. Do NOT answer the question, "
        "just reformulate it if needed and otherwise return it as is."
    )
    contextualize_q_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", contextualize_q_system_prompt),
            MessagesPlaceholder("chat_history"),
            ("human", "{input}"),
        ]
    )
    history_aware_retriever = create_history_aware_retriever(
        llm, retriever, contextualize_q_prompt
    )

    # --- Answering the Question ---
    qa_system_prompt = (
        "You are an expert AI Curriculum Assistant. Your task is to answer user questions "
        "accurately and concisely based ONLY on the provided context from the user's uploaded documents. "
        "If the context does not contain the answer, state that you cannot find the information "
        "in the provided materials. You may use any external knowledge. "
        "Be friendly, helpful, and clear in your response.\n\n"
        "Context:\n{context}"
    )
    qa_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", qa_system_prompt),
            MessagesPlaceholder("chat_history"),
            ("human", "{input}"),
        ]
    )
    
    question_answer_chain = create_stuff_documents_chain(llm, qa_prompt)
    rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)
    return rag_chain


In [None]:
# --- Cell 4: Initialization ---

print("--- AI Curriculum Assistant ---")

if not GOOGLE_API_KEY:
    print("Error: GOOGLE_API_KEY not found. Please set it in your .env file.")
else:
    # Initialize embeddings
    embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
    
    # Get or create the vector store
    try:
        vectorstore = get_vectorstore(embeddings)
        
        # Initialize the Gemini LLM with the API key
        llm = ChatGoogleGenerativeAI(model=LLM_MODEL, google_api_key=GOOGLE_API_KEY)
        
        # Create the retriever
        retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 5})

        # Create the conversational RAG chain
        rag_chain = create_conversational_rag_chain(retriever, llm)
        
        # Initialize chat history
        chat_history = []
        
        print("\nAssistant is ready! You can now ask questions in the next cell.")
        
    except ValueError as e:
        print(f"Error: {e}")
    except Exception as e:
        print(f"An unexpected error occurred during initialization: {e}")

In [None]:
# --- Cell 5: Interactive Q&A ---
# To ask a new question, simply change the `user_input` variable and re-run this cell.

user_input = "What is the main topic of the documents?"

if 'rag_chain' in locals():
    # Invoke the chain with the user's input and history
    response = rag_chain.invoke({"input": user_input, "chat_history": chat_history})
    
    # Print the answer and update history
    answer = response["answer"]
    print(f"Assistant: {answer}")
    
    chat_history.append(HumanMessage(content=user_input))
    chat_history.append(AIMessage(content=answer))
else:
    print("The RAG chain is not initialized. Please run the previous cells successfully.")

# You can check the chat history by running a new cell with:
# print(chat_history)