In [None]:
!pip install pymupdf



In [None]:
!pip install -U langchain-community



In [None]:
!pip install faiss-cpu



In [None]:
import os
import fitz
from openai import OpenAI
import gradio as gr

from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI

In [None]:
from google.colab import userdata
os.environ["OPENAI_API_KEY"] = userdata.get('o_key')

qa_chain = None

In [None]:
#loading our PDF
def load_pdf_text(pdf_path):
  doc = fitz.open(pdf_path)
  full_text = ""
  for page in doc:
    text = page.get_text()
    full_text += text
  return full_text

In [None]:
#Split_text
def split_text(text):
  splitter = RecursiveCharacterTextSplitter(chunk_size=300 , chunk_overlap=100)
  docs = splitter.create_documents([text])
  return docs

In [None]:
#Create vector store
#each chunk of text converted into vector using OpenAI embeddings
#FAISS stores those vectors and allows fast similarity search
def create_vector_store(docs):
  embeddings = OpenAIEmbeddings()
  vectorstore = FAISS.from_documents(docs, embeddings)
  return vectorstore

In [None]:
#Setup RAG pipeline
#retriever - pulls top 3 most similar chunks to question
#llm - GPT model that will generate an answer using those chunks
#RetrievalQA - combines them into a Question answering system

def setup_rag_qa(vectorstore):
  retriever = vectorstore.as_retriever(search_type="similarity" , search_kwargs={"k":3})
  llm = ChatOpenAI(temperature=0.3, model="gpt-4.1-nano")
  rag_chain = RetrievalQA.from_chain_type(llm=llm,
                                          retriever=retriever)
  return rag_chain

In [None]:
# Handle PDF upload and process

def upload_pdf(file):
  global qa_chain
  if file is None:
    return "No file uploaded"

  try:
    text = load_pdf_text(file.name)
    docs = split_text(text)
    vectorstore = create_vector_store(docs)
    qa_chain = setup_rag_qa(vectorstore)
    return "PDF uploaded and processed successfully"
  except Exception as e:
    return f"Error : {str(e)}"

In [None]:
#Handle question input and answers
def ask_question(query):
  if qa_chain is None:
    return "Please upload the PDF first"
  try:
    result = qa_chain.run(query)
    return result
  except Exception as e:
    return f"Error : {str(e)}"

In [None]:
#Gradio UI
with gr.Blocks() as ui_demo:
  gr.Markdown("# RAG assistant with GPT")
  gr.Markdown("Upload a PDF, then ask questions from its content. We are using GPT + FAISS + langchain")

  with gr.Row():
    pdf_input = gr.File(label = "Upload PDF")
    upload_status = gr.Textbox(label = "Upload Status" , interactive=False)

  pdf_input.change(fn=upload_pdf , inputs=pdf_input , outputs=upload_status)

  with gr.Row():
    question_input = gr.Textbox(label="Ask few Questions")
    answer_output = gr.Textbox(label="Answer")

  question_input.submit(fn=ask_question , inputs=question_input , outputs=answer_output)


ui_demo.launch()

It looks like you are running Gradio on a hosted a Jupyter notebook. For the Gradio app to work, sharing must be enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://204d0d6f5f70b8e0a9.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




In [None]:

import gradio as gr
import openai
import os
from typing import List, Dict, Any
import tempfile
import shutil
from pathlib import Path
import PyPDF2
import docx
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
import json
import pickle

class RAGSystem:
    def __init__(self):
        self.embeddings_model = SentenceTransformer('all-MiniLM-L6-v2')
        self.documents = []
        self.document_embeddings = None
        self.index = None
        self.openai_client = None

    def setup_openai(self, api_key: str):
        """Setup OpenAI client with API key"""
        if api_key:
            self.openai_client = openai.OpenAI(api_key=api_key)
            return "✅ OpenAI API key configured successfully!"
        return "❌ Please provide a valid API key"

    def extract_text_from_file(self, file_path: str) -> str:
        """Extract text from various file formats"""
        text = ""
        file_extension = Path(file_path).suffix.lower()

        try:
            if file_extension == '.pdf':
                with open(file_path, 'rb') as file:
                    pdf_reader = PyPDF2.PdfReader(file)
                    for page in pdf_reader.pages:
                        text += page.extract_text()

            elif file_extension == '.docx':
                doc = docx.Document(file_path)
                for paragraph in doc.paragraphs:
                    text += paragraph.text + "\n"

            elif file_extension == '.txt':
                with open(file_path, 'r', encoding='utf-8') as file:
                    text = file.read()

            else:
                return f"Unsupported file format: {file_extension}"

        except Exception as e:
            return f"Error reading file: {str(e)}"

        return text

    def process_documents(self, files: List[Any]) -> str:
        """Process uploaded documents and create embeddings"""
        if not files:
            return "No files uploaded"

        self.documents = []
        processed_files = []

        for file in files:
            if hasattr(file, 'name'):
                file_path = file.name
            else:
                file_path = str(file)

            text = self.extract_text_from_file(file_path)

            if text and not text.startswith("Error") and not text.startswith("Unsupported"):
                # Split text into chunks
                chunks = self.split_text(text, chunk_size=1000, overlap=200)
                for i, chunk in enumerate(chunks):
                    self.documents.append({
                        'text': chunk,
                        'source': Path(file_path).name,
                        'chunk_id': i
                    })
                processed_files.append(Path(file_path).name)
            else:
                processed_files.append(f"❌ {Path(file_path).name}: {text}")

        if self.documents:
            # Create embeddings
            texts = [doc['text'] for doc in self.documents]
            embeddings = self.embeddings_model.encode(texts)

            # Create FAISS index
            self.document_embeddings = embeddings
            self.index = faiss.IndexFlatL2(embeddings.shape[1])
            self.index.add(embeddings.astype('float32'))

            return f"✅ Successfully processed {len(processed_files)} files:\n" + "\n".join(processed_files)

        return "❌ No documents could be processed"

    def split_text(self, text: str, chunk_size: int = 1000, overlap: int = 200) -> List[str]:
        """Split text into overlapping chunks"""
        chunks = []
        start = 0

        while start < len(text):
            end = start + chunk_size
            chunk = text[start:end]
            chunks.append(chunk)
            start = end - overlap

            if start >= len(text):
                break

        return chunks

    def search_documents(self, query: str, top_k: int = 3) -> List[Dict]:
        """Search for relevant documents using semantic similarity"""
        if not self.index or not self.documents:
            return []

        query_embedding = self.embeddings_model.encode([query])
        distances, indices = self.index.search(query_embedding.astype('float32'), top_k)

        results = []
        for i, idx in enumerate(indices[0]):
            if idx < len(self.documents):
                results.append({
                    'text': self.documents[idx]['text'],
                    'source': self.documents[idx]['source'],
                    'score': float(distances[0][i])
                })

        return results

    def generate_response(self, query: str, chat_history: List[List[str]]) -> str:
        """Generate response using RAG with GPT"""
        if not self.openai_client:
            return "❌ Please configure your OpenAI API key first"

        if not self.documents:
            return "❌ Please upload and process documents first"

        # Search for relevant documents
        relevant_docs = self.search_documents(query, top_k=3)

        if not relevant_docs:
            return "❌ No relevant documents found"

        # Create context from relevant documents
        context = "\n\n".join([
            f"Source: {doc['source']}\nContent: {doc['text']}"
            for doc in relevant_docs
        ])

        # Create messages for GPT
        messages = [
            {
                "role": "system",
                "content": f"""You are a helpful assistant that answers questions based on the provided context.
                Use the context below to answer the user's question. If the answer cannot be found in the context,
                say so clearly. Always cite the source documents when possible.

                Context:
                {context}"""
            }
        ]

        # Add chat history
        for user_msg, assistant_msg in chat_history[-5:]:  # Last 5 exchanges
            messages.append({"role": "user", "content": user_msg})
            if assistant_msg:
                messages.append({"role": "assistant", "content": assistant_msg})

        # Add current query
        messages.append({"role": "user", "content": query})

        try:
            response = self.openai_client.chat.completions.create(
                model="gpt-3.5-turbo",
                messages=messages,
                max_tokens=1000,
                temperature=0.7
            )

            answer = response.choices[0].message.content

            # Add source information
            sources = list(set([doc['source'] for doc in relevant_docs]))
            answer += f"\n\n📚 Sources: {', '.join(sources)}"

            return answer

        except Exception as e:
            return f"❌ Error generating response: {str(e)}"

# Initialize RAG system
rag_system = RAGSystem()

# Gradio interface functions
def setup_api_key(api_key):
    return rag_system.setup_openai(api_key)

def process_files(files):
    return rag_system.process_documents(files)

def chat_function(message, history):
    if not message.strip():
        return history, ""

    response = rag_system.generate_response(message, history)
    history.append([message, response])
    return history, ""

def clear_chat():
    return [], ""

def clear_documents():
    rag_system.documents = []
    rag_system.document_embeddings = None
    rag_system.index = None
    return "✅ Documents cleared successfully"

# Create Gradio interface
with gr.Blocks(title="RAG-based Q&A System", theme=gr.themes.Soft()) as demo:
    gr.Markdown("""
    # 🤖 RAG-based Q&A System with GPT

    Upload multiple documents and ask questions about their content. The system uses:
    - **RAG (Retrieval-Augmented Generation)** for finding relevant information
    - **GPT** for generating natural language responses
    - **Semantic Search** for accurate document retrieval

    Supported formats: PDF, DOCX, TXT
    """)

    with gr.Row():
        with gr.Column(scale=1):
            gr.Markdown("### 🔑 Setup")
            api_key_input = gr.Textbox(
                label="OpenAI API Key",
                type="password",
                placeholder="Enter your OpenAI API key..."
            )
            api_key_btn = gr.Button("Configure API Key", variant="primary")
            api_key_status = gr.Textbox(label="Status", interactive=False)

            gr.Markdown("### 📁 Document Upload")
            file_upload = gr.Files(
                label="Upload Documents",
                file_count="multiple",
                file_types=[".pdf", ".docx", ".txt"]
            )
            process_btn = gr.Button("Process Documents", variant="primary")
            process_status = gr.Textbox(label="Processing Status", interactive=False)

            clear_docs_btn = gr.Button("Clear Documents", variant="secondary")
            clear_docs_status = gr.Textbox(label="Clear Status", interactive=False)

        with gr.Column(scale=2):
            gr.Markdown("### 💬 Chat Interface")

            chatbot = gr.Chatbot(
                label="Q&A Chat",
                height=400,
                show_label=True,
                container=True
            )

            with gr.Row():
                msg_input = gr.Textbox(
                    label="Your Question",
                    placeholder="Ask a question about your documents...",
                    scale=4
                )
                submit_btn = gr.Button("Send", variant="primary", scale=1)

            with gr.Row():
                clear_btn = gr.Button("Clear Chat", variant="secondary")

    gr.Markdown("""
    ### 📖 How to Use:
    1. **Configure API Key**: Enter your OpenAI API key
    2. **Upload Documents**: Select multiple PDF, DOCX, or TXT files
    3. **Process Documents**: Click to extract text and create embeddings
    4. **Ask Questions**: Type your questions and get AI-powered responses

    ### 🔍 Features:
    - **Multi-file Support**: Upload and process multiple documents simultaneously
    - **Semantic Search**: Find relevant information using AI embeddings
    - **Context-aware Responses**: GPT generates answers based on document content
    - **Source Attribution**: Responses include source document references
    - **Interactive Chat**: Maintain conversation context
    """)

    # Event handlers
    api_key_btn.click(setup_api_key, inputs=[api_key_input], outputs=[api_key_status])
    process_btn.click(process_files, inputs=[file_upload], outputs=[process_status])
    clear_docs_btn.click(clear_documents, outputs=[clear_docs_status])

    msg_input.submit(chat_function, inputs=[msg_input, chatbot], outputs=[chatbot, msg_input])
    submit_btn.click(chat_function, inputs=[msg_input, chatbot], outputs=[chatbot, msg_input])
    clear_btn.click(clear_chat, outputs=[chatbot, msg_input])

if __name__ == "__main__":
    #demo.launch(server_name="0.0.0.0", server_port=5000, share=False)
    demo.launch()

  chatbot = gr.Chatbot(


It looks like you are running Gradio on a hosted a Jupyter notebook. For the Gradio app to work, sharing must be enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://745346a372a6851983.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


In [None]:
!pip install PyPDF2

Collecting PyPDF2
  Downloading pypdf2-3.0.1-py3-none-any.whl.metadata (6.8 kB)
Downloading pypdf2-3.0.1-py3-none-any.whl (232 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m232.6/232.6 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: PyPDF2
Successfully installed PyPDF2-3.0.1


In [None]:
!pip install python-docx

Collecting python-docx
  Downloading python_docx-1.2.0-py3-none-any.whl.metadata (2.0 kB)
Downloading python_docx-1.2.0-py3-none-any.whl (252 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/253.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━[0m [32m163.8/253.0 kB[0m [31m4.6 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m253.0/253.0 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: python-docx
Successfully installed python-docx-1.2.0


**Assignment - Create an RAG app for multiple files**