In [3]:
pip install gradio groq



In [None]:
import os
import tempfile
import re
from typing import List, Tuple, Optional
from pathlib import Path

import gradio as gr
from groq import Groq
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
import pandas as pd
import pypdf as PyPDF2
from docx import Document as DocxDocument

# ============================================================================
# DOCUMENT PARSERS
# ============================================================================

class DocumentParser:
    """Handles parsing of multiple file formats"""

    @staticmethod
    def parse_pdf(file_path: str) -> str:
        """Extract text from PDF files"""
        try:
            with open(file_path, 'rb') as file:
                pdf_reader = PyPDF2.PdfReader(file)
                text = []
                for page_num in range(len(pdf_reader.pages)):
                    page = pdf_reader.pages[page_num]
                    text.append(page.extract_text())
                return "\n\n".join(text)
        except Exception as e:
            return f"Error parsing PDF: {str(e)}"

    @staticmethod
    def parse_excel(file_path: str) -> str:
        """Extract text from Excel files"""
        try:
            excel_file = pd.ExcelFile(file_path)
            all_text = []

            for sheet_name in excel_file.sheet_names:
                df = pd.read_excel(file_path, sheet_name=sheet_name)
                all_text.append(f"=== Sheet: {sheet_name} ===")
                all_text.append(df.to_string(index=False))
                all_text.append("\n")

            return "\n".join(all_text)
        except Exception as e:
            return f"Error parsing Excel: {str(e)}"

    @staticmethod
    def parse_text(file_path: str) -> str:
        """Extract text from plain text files"""
        try:
            with open(file_path, 'r', encoding='utf-8', errors='ignore') as file:
                return file.read()
        except Exception as e:
            return f"Error parsing text file: {str(e)}"

    @staticmethod
    def parse_docx(file_path: str) -> str:
        """Extract text from DOCX files"""
        try:
            doc = DocxDocument(file_path)
            text = []
            for paragraph in doc.paragraphs:
                text.append(paragraph.text)
            return "\n".join(text)
        except Exception as e:
            return f"Error parsing DOCX: {str(e)}"

    @classmethod
    def parse_file(cls, file_path: str) -> Tuple[str, str]:
        """Parse any supported file format"""
        file_extension = Path(file_path).suffix.lower()

        try:
            if file_extension == '.pdf':
                text = cls.parse_pdf(file_path)
            elif file_extension in ['.xlsx', '.xls']:
                text = cls.parse_excel(file_path)
            elif file_extension in ['.txt', '.md', '.csv']:
                text = cls.parse_text(file_path)
            elif file_extension == '.docx':
                text = cls.parse_docx(file_path)
            else:
                return "", f"Unsupported file format: {file_extension}"

            if text.startswith("Error"):
                return "", text

            return text, f"‚úì Successfully parsed {Path(file_path).name}"

        except Exception as e:
            return "", f"‚úó Error parsing {Path(file_path).name}: {str(e)}"

# ============================================================================
# TEXT CHUNKING
# ============================================================================

class TextChunker:
    """Intelligent text chunking with overlap"""

    @staticmethod
    def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 200) -> List[str]:
        """Split text into overlapping chunks"""
        if not text or len(text.strip()) == 0:
            return []

        text = re.sub(r'\s+', ' ', text).strip()
        chunks = []
        start = 0
        text_length = len(text)

        while start < text_length:
            end = start + chunk_size

            if end < text_length:
                chunk = text[start:end]
                last_period = chunk.rfind('.')
                last_newline = chunk.rfind('\n')
                last_break = max(last_period, last_newline)

                if last_break > chunk_size // 2:
                    end = start + last_break + 1
                else:
                    last_space = chunk.rfind(' ')
                    if last_space > chunk_size // 2:
                        end = start + last_space

            chunk = text[start:end].strip()
            if chunk:
                chunks.append(chunk)

            start = end - overlap if end < text_length else end

        return chunks

# ============================================================================
# RAG SYSTEM
# ============================================================================

class GroqRAGSystem:
    """Complete RAG system with Groq LLM"""

    def __init__(self):
        self.groq_client: Optional[Groq] = None
        self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
        self.chroma_client = chromadb.Client(Settings(
            anonymized_telemetry=False,
            allow_reset=True
        ))
        self.collection = None
        self.document_count = 0
        self.chunk_count = 0
        self.api_key_valid = False

    def verify_api_key(self, api_key: str) -> Tuple[bool, str]:
        """Verify Groq API key validity"""
        if not api_key or len(api_key.strip()) == 0:
            return False, "‚ùå API key is empty"

        try:
            self.groq_client = Groq(api_key=api_key.strip())

            response = self.groq_client.chat.completions.create(
                model="llama-3.3-70b-versatile",
                messages=[
                    {"role": "user", "content": "Say 'API key verified' if you can read this."}
                ],
                max_tokens=50,
                temperature=0.1
            )

            if response.choices[0].message.content:
                self.api_key_valid = True
                return True, "‚úÖ API Key Verified Successfully! You can now upload documents."
            else:
                self.api_key_valid = False
                return False, "‚ùå API key validation failed: No response from Groq"

        except Exception as e:
            self.api_key_valid = False
            error_msg = str(e)
            if "401" in error_msg or "authentication" in error_msg.lower():
                return False, "‚ùå Invalid API Key. Please check your Groq API key."
            elif "rate_limit" in error_msg.lower():
                return False, "‚ùå Rate limit exceeded. Please try again later."
            else:
                return False, f"‚ùå Error verifying API key: {error_msg}"

    def initialize_vector_store(self):
        """Initialize or reset the vector store"""
        try:
            self.chroma_client.reset()
            self.collection = self.chroma_client.create_collection(
                name="documents",
                metadata={"hnsw:space": "cosine"}
            )
            self.document_count = 0
            self.chunk_count = 0
            return "‚úì Vector store initialized"
        except Exception as e:
            return f"‚úó Error initializing vector store: {str(e)}"

    def ingest_documents(self, files) -> str:
        """Ingest multiple documents into the vector store"""
        if not self.api_key_valid:
            return "‚ùå Please verify your API key first before uploading documents."

        if not files:
            return "‚ö†Ô∏è No files provided"

        self.initialize_vector_store()

        results = []
        total_chunks = 0

        for file in files:
            text, parse_status = DocumentParser.parse_file(file.name)

            if not text:
                results.append(f"‚úó {Path(file.name).name}: {parse_status}")
                continue

            chunks = TextChunker.chunk_text(text, chunk_size=1000, overlap=200)

            if not chunks:
                results.append(f"‚úó {Path(file.name).name}: No content to process")
                continue

            filename = Path(file.name).name
            for i, chunk in enumerate(chunks):
                try:
                    embedding = self.embedding_model.encode(chunk).tolist()

                    self.collection.add(
                        embeddings=[embedding],
                        documents=[chunk],
                        metadatas=[{
                            "source": filename,
                            "chunk_id": i,
                            "total_chunks": len(chunks)
                        }],
                        ids=[f"{filename}_chunk_{i}"]
                    )
                    total_chunks += 1
                except Exception as e:
                    results.append(f"‚úó Error storing chunk {i} from {filename}: {str(e)}")

            results.append(f"‚úì {filename}: {len(chunks)} chunks processed")
            self.document_count += 1

        self.chunk_count = total_chunks

        summary = f"\n\nüìä Summary:\n"
        summary += f"‚Ä¢ Documents processed: {self.document_count}\n"
        summary += f"‚Ä¢ Total chunks created: {self.chunk_count}\n"
        summary += f"‚Ä¢ Vector store ready for queries"

        return "\n".join(results) + summary

    def retrieve_context(self, query: str, top_k: int = 5) -> List[dict]:
        """Retrieve relevant document chunks for a query"""
        if not self.collection:
            return []

        try:
            query_embedding = self.embedding_model.encode(query).tolist()

            results = self.collection.query(
                query_embeddings=[query_embedding],
                n_results=top_k,
                include=["documents", "metadatas", "distances"]
            )

            context_chunks = []
            if results['documents'] and len(results['documents'][0]) > 0:
                for doc, metadata, distance in zip(
                    results['documents'][0],
                    results['metadatas'][0],
                    results['distances'][0]
                ):
                    context_chunks.append({
                        "text": doc,
                        "source": metadata.get("source", "Unknown"),
                        "chunk_id": metadata.get("chunk_id", 0),
                        "relevance_score": 1 - distance
                    })

            return context_chunks

        except Exception as e:
            print(f"Error retrieving context: {str(e)}")
            return []

    def generate_answer(self, query: str, context_chunks: List[dict]) -> Tuple[str, str]:
        """Generate answer using Groq LLM"""
        if not self.groq_client:
            return "‚ùå Groq client not initialized. Please verify your API key.", ""

        if not context_chunks:
            return "‚ùå No relevant information found in the uploaded documents.", ""

        context_text = "\n\n".join([
            f"[Source: {chunk['source']}, Chunk: {chunk['chunk_id']+1}]\n{chunk['text']}"
            for chunk in context_chunks
        ])

        system_prompt = """You are a helpful AI assistant that answers questions based strictly on the provided context from uploaded documents.

Rules:
1. Answer ONLY using information from the provided context
2. If the context doesn't contain enough information, say so clearly
3. Cite the source document name when providing information
4. Be concise but comprehensive
5. Do not make up or infer information not present in the context"""

        user_prompt = f"""Context from uploaded documents:
{context_text}

Question: {query}

Please provide a detailed answer based solely on the context above. Cite sources when possible."""

        try:
            response = self.groq_client.chat.completions.create(
                model="llama-3.3-70b-versatile",
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": user_prompt}
                ],
                temperature=0.2,
                max_tokens=2000,
                top_p=0.9
            )

            answer = response.choices[0].message.content

            sources = "\n\n---\n\n### üìö Sources Used:\n\n"
            unique_sources = {}
            for chunk in context_chunks:
                source = chunk['source']
                if source not in unique_sources:
                    unique_sources[source] = []
                unique_sources[source].append(chunk['chunk_id'] + 1)

            for source, chunks in unique_sources.items():
                sources += f"**{source}** ‚Äî Chunks: {', '.join(map(str, chunks))}\n\n"

            return answer, sources

        except Exception as e:
            return f"‚ùå Error generating answer: {str(e)}", ""

    def query(self, question: str, top_k: int = 5) -> str:
        """Complete RAG query pipeline"""
        if not self.api_key_valid:
            return "‚ùå Please verify your API key first."

        if not self.collection or self.chunk_count == 0:
            return "‚ö†Ô∏è No documents have been uploaded yet. Please upload documents first."

        if not question or len(question.strip()) == 0:
            return "‚ö†Ô∏è Please enter a question."

        context_chunks = self.retrieve_context(question, top_k)

        if not context_chunks:
            return "‚ùå No relevant information found in the uploaded documents for your query."

        answer, sources = self.generate_answer(question, context_chunks)

        response = f"{answer}\n\n{sources}"

        return response

    def get_stats(self) -> str:
        """Get system statistics"""
        stats = f"""üìä **System Statistics**

**API Key Status:** {'‚úÖ Verified' if self.api_key_valid else '‚ùå Not Verified'}
**Documents Indexed:** {self.document_count}
**Total Chunks:** {self.chunk_count}
**Vector Store:** {'‚úÖ Active' if self.collection else '‚ùå Not Initialized'}
**Embedding Model:** all-MiniLM-L6-v2
**LLM Model:** Groq Llama 3.3 70B
"""
        return stats

# ============================================================================
# GRADIO INTERFACE WITH CUSTOM CSS
# ============================================================================

# Custom CSS for enhanced styling
CUSTOM_CSS = """
/* Main container styling */
.gradio-container {
    font-family: 'Inter', -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif !important;
    max-width: 1400px !important;
    margin: 0 auto !important;
}

/* Header styling */
.header-title {
    background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
    -webkit-background-clip: text;
    -webkit-text-fill-color: transparent;
    background-clip: text;
    font-size: 2.5rem !important;
    font-weight: 800 !important;
    text-align: center;
    margin-bottom: 1rem;
}

.header-subtitle {
    text-align: center;
    color: #6b7280;
    font-size: 1.1rem;
    margin-bottom: 2rem;
}

/* Section headers */
.section-header {
    background: linear-gradient(to right, #f3f4f6, #ffffff);
    border-left: 4px solid #667eea;
    padding: 12px 20px;
    margin: 25px 0 15px 0;
    border-radius: 8px;
    font-weight: 600;
    font-size: 1.1rem;
    color: #1f2937;
}

/* Card styling */
.custom-card {
    background: white;
    border-radius: 12px;
    padding: 20px;
    box-shadow: 0 1px 3px rgba(0,0,0,0.1);
    border: 1px solid #e5e7eb;
}

/* Button styling */
.primary-button {
    background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important;
    border: none !important;
    color: white !important;
    font-weight: 600 !important;
    padding: 12px 24px !important;
    border-radius: 8px !important;
    transition: all 0.3s ease !important;
    box-shadow: 0 4px 6px rgba(102, 126, 234, 0.3) !important;
}

.primary-button:hover {
    transform: translateY(-2px) !important;
    box-shadow: 0 6px 12px rgba(102, 126, 234, 0.4) !important;
}

.secondary-button {
    background: white !important;
    border: 2px solid #e5e7eb !important;
    color: #374151 !important;
    font-weight: 600 !important;
    border-radius: 8px !important;
    transition: all 0.3s ease !important;
}

.secondary-button:hover {
    border-color: #667eea !important;
    color: #667eea !important;
}

/* Input field styling */
.input-field {
    border-radius: 8px !important;
    border: 2px solid #e5e7eb !important;
    padding: 10px !important;
    transition: all 0.3s ease !important;
}

.input-field:focus {
    border-color: #667eea !important;
    box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1) !important;
}

/* Stats box styling */
.stats-box {
    background: linear-gradient(135deg, #f3f4f6 0%, #ffffff 100%);
    border-radius: 12px;
    padding: 20px;
    border: 2px solid #e5e7eb;
    font-family: 'Monaco', 'Courier New', monospace;
    font-size: 0.9rem;
}

/* Output text styling */
.output-text {
    background: #f9fafb;
    border-radius: 8px;
    padding: 15px;
    border: 1px solid #e5e7eb;
    font-size: 0.95rem;
    line-height: 1.6;
    color: #1f2937; /* Darker text color */
}

/* Answer box styling */
.answer-box {
    background: 0707ED;
    border-radius: 12px;
    padding: 25px;
    border: 2px solid #e5e7eb;
    box-shadow: 0 4px 6px rgba(0,0,0,0.05);
    line-height: 1.8;
}

.answer-box h3 {
    color: #02024F;
    border-bottom: 2px solid #667eea;
    padding-bottom: 8px;
    margin-bottom: 15px;
}

/* Tips section */
.tips-section {
    background: linear-gradient(135deg, #fef3c7 0%, #fde68a 100%);
    border-radius: 12px;
    padding: 20px;
    border-left: 4px solid #f59e0b;
    margin-top: 25px;
}

.tips-section h3 {
    color: #92400e;
    margin-top: 0;
}

.tips-section ul {
    color: #78350f;
    line-height: 1.8;
}

/* File upload area */
.file-upload {
    border: 2px dashed #cbd5e1 !important;
    border-radius: 12px !important;
    padding: 30px !important;
    text-align: center !important;
    background: #f8fafc !important;
    transition: all 0.3s ease !important;
}

.file-upload:hover {
    border-color: #667eea !important;
    background: #f0f4ff !important;
}

/* Slider styling */
.slider {
    accent-color: #667eea !important;
}

/* Success message */
.success-msg {
    color: #059669;
    background: #d1fae5;
    padding: 12px;
    border-radius: 8px;
    border-left: 4px solid #059669;
}

/* Error message */
.error-msg {
    color: #dc2626;
    background: #fee2e2;
    padding: 12px;
    border-radius: 8px;
    border-left: 4px solid #dc2626;
}

/* Warning message */
.warning-msg {
    color: #d97706;
    background: #fef3c7;
    padding: 12px;
    border-radius: 8px;
    border-left: 4px solid #d97706;
}

/* Markdown content styling */
.markdown-content {
    line-height: 1.8;
    color: #374151;
}

.markdown-content h1, .markdown-content h2, .markdown-content h3 {
    color: #1f2937;
    margin-top: 1.5rem;
    margin-bottom: 0.75rem;
}

.markdown-content code {
    background: #f3f4f6;
    padding: 2px 6px;
    border-radius: 4px;
    font-size: 0.9em;
}

.markdown-content pre {
    background: #1f2937;
    color: #f9fafb;
    padding: 15px;
    border-radius: 8px;
    overflow-x: auto;
}

/* Divider */
.divider {
    height: 2px;
    background: linear-gradient(to right, transparent, #e5e7eb, transparent);
    margin: 30px 0;
}

/* Animation for buttons */
@keyframes pulse {
    0%, 100% { opacity: 1; }
    50% { opacity: 0.8; }
}

.loading {
    animation: pulse 1.5s ease-in-out infinite;
}
"""

def create_gradio_interface():
    """Create the enhanced Gradio UI"""

    rag_system = GroqRAGSystem()

    def verify_key(api_key):
        is_valid, message = rag_system.verify_api_key(api_key)
        stats = rag_system.get_stats()
        return message, stats

    def upload_docs(files):
        result = rag_system.ingest_documents(files)
        stats = rag_system.get_stats()
        return result, stats

    def ask_question(question, top_k):
        return rag_system.query(question, top_k)

    def clear_system():
        rag_system.initialize_vector_store()
        rag_system.api_key_valid = False
        rag_system.groq_client = None
        return "System cleared. Please verify API key again.", rag_system.get_stats()

    with gr.Blocks(css=CUSTOM_CSS, title="RAG System with Groq", theme=gr.themes.Soft()) as demo:

        gr.HTML("""
        <div style="text-align: center; margin-bottom: 30px;">
            <h1 class="header-title">üöÄ Education_Content_Analyser_Chatbot</h1>
            <p class="header-subtitle">
                Upload documents and ask questions powered by Groq's ultra-fast inference
            </p>
        </div>
        """)

        with gr.Row():
            with gr.Column(scale=2):
                gr.HTML('<div class="section-header">üîë Step 1: API Key Verification</div>')

                api_key_input = gr.Textbox(
                    label="Groq API Key",
                    placeholder="Enter your Groq API key (get it from console.groq.com)",
                    type="password",
                    lines=1,
                    elem_classes=["input-field"]
                )

                with gr.Row():
                    verify_btn = gr.Button("üîç Verify API Key", variant="primary", scale=3)
                    clear_btn = gr.Button("üóëÔ∏è Clear System", variant="stop", scale=1)

                verification_output = gr.Textbox(
                    label="Verification Status",
                    interactive=False,
                    lines=2,
                    elem_classes=["output-text"]
                )

            with gr.Column(scale=1):
                stats_output = gr.Markdown(
                    value=rag_system.get_stats(),
                    elem_classes=["stats-box"]
                )

        gr.HTML('<div class="divider"></div>')

        gr.HTML('<div class="section-header">üìÑ Step 2: Upload Documents</div>')

        file_upload = gr.File(
            label="Upload Documents",
            file_count="multiple",
            file_types=[".pdf", ".xlsx", ".xls", ".txt", ".md", ".csv", ".docx"],
            elem_classes=["file-upload"]
        )

        upload_btn = gr.Button("üì§ Process Documents", variant="primary", size="lg")

        upload_output = gr.Textbox(
            label="Upload Status",
            interactive=False,
            lines=10,
            elem_classes=["output-text"]
        )

        gr.HTML('<div class="divider"></div>')

        gr.HTML('<div class="section-header">üí¨ Step 3: Ask Questions</div>')

        with gr.Row():
            with gr.Column(scale=4):
                question_input = gr.Textbox(
                    label="Your Question",
                    placeholder="Ask a question about your documents...",
                    lines=3,
                    elem_classes=["input-field"]
                )
            with gr.Column(scale=1):
                top_k_slider = gr.Slider(
                    minimum=1,
                    maximum=10,
                    value=5,
                    step=1,
                    label="Chunks to Retrieve",
                    elem_classes=["slider"]
                )

        query_btn = gr.Button("üîç Get Answer", variant="primary", size="lg")

        answer_output = gr.Markdown(
            label="Answer",
            elem_classes=["answer-box", "markdown-content"]
        )

        # Event handlers
        verify_btn.click(
            fn=verify_key,
            inputs=[api_key_input],
            outputs=[verification_output, stats_output]
        )

        upload_btn.click(
            fn=upload_docs,
            inputs=[file_upload],
            outputs=[upload_output, stats_output]
        )

        query_btn.click(
            fn=ask_question,
            inputs=[question_input, top_k_slider],
            outputs=[answer_output]
        )

        clear_btn.click(
            fn=clear_system,
            inputs=[],
            outputs=[verification_output, stats_output]
        )

        gr.HTML("""
        <div class="tips-section">
            <h3>üí° Tips for Best Results</h3>
            <ul>
                <li><strong>Get your API key:</strong> Visit <a href="https://console.groq.com" target="_blank">console.groq.com</a></li>
                <li><strong>Supported formats:</strong> PDF, Excel (.xlsx, .xls), Text (.txt, .md, .csv), DOCX</li>
                <li><strong>Multiple files:</strong> Upload multiple documents at once for comprehensive answers</li>
                <li><strong>Adjust retrieval:</strong> Increase "Chunks to Retrieve" for more context in answers</li>
                <li><strong>Document-based:</strong> All answers are strictly derived from your uploaded documents</li>
            </ul>
        </div>
        """)

    return demo

# ============================================================================
# MAIN EXECUTION
# ============================================================================

if __name__ == "__main__":
    demo = create_gradio_interface()

    demo.launch(
        share=True,
        debug=True,
        server_name="0.0.0.0",
        server_port=7860
    )

  with gr.Blocks(css=CUSTOM_CSS, title="RAG System with Groq", theme=gr.themes.Soft()) as demo:
  with gr.Blocks(css=CUSTOM_CSS, title="RAG System with Groq", theme=gr.themes.Soft()) as demo:


Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://37ffb19b6ab49b095b.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/uvicorn/protocols/http/httptools_impl.py", line 409, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/uvicorn/middleware/proxy_headers.py", line 60, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/fastapi/applications.py", line 1133, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.12/dist-packages/starlette/applications.py", line 113, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.12/dist-packages/starlette/middleware/errors.py", line 186, in __call__
    raise exc
  File "/usr/local/lib/python3.12/dist-packages/starlette/middleware/error