In [None]:
"""Useage Gemini API for RAG"""
# Install required packages
!pip install sentence-transformers faiss-cpu google-generativeai
!pip install PyMuPDF  # For PDF processing

import fitz  # PyMuPDF for PDF extraction
import json
import os
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
import google.generativeai as genai
from google.colab import userdata
api_key = userdata.get("api_key") or "YOUR_GEMINI_API_KEY"
genai.configure(api_key=api_key)

In [None]:
# ------------------------------
# 1. PDF Text Extraction & Preprocessing
# ------------------------------

def extract_pdf_text(pdf_path):
    """Extract text from PDF file"""
    try:
        doc = fitz.open(pdf_path)
        text = ""
        for page in doc:
            text += page.get_text()
        doc.close()
        return text
    except Exception as e:
        print(f"Error extracting text from {pdf_path}: {e}")
        return ""

def clean_text(text):
    """Clean and preprocess text"""
    # Remove excessive whitespace and clean up text
    lines = text.split('\n')
    cleaned_lines = []

    for line in lines:
        line = line.strip()
        if len(line) > 10:  # Skip very short lines (likely headers/footers)
            cleaned_lines.append(line)

    return ' '.join(cleaned_lines)

def chunk_text(text, chunk_size=500, overlap=50):
    """Create overlapping chunks for better context retention"""
    words = text.split()
    chunks = []

    for i in range(0, len(words), chunk_size - overlap):
        chunk = " ".join(words[i:i + chunk_size])
        if len(chunk.strip()) > 50:  # Only keep substantial chunks
            chunks.append(chunk)

    return chunks

In [None]:
# ------------------------------
# 2. Build Legal Corpus
# ------------------------------

def build_legal_corpus():
    """Build comprehensive legal corpus from PDFs and Q&A data"""

    # PDF files (update paths as needed)
    pdf_files = {
        "CPA2019": "/content/CPA2019.pdf",
        "MVA": "/content/MVA.pdf",
        "RTI": "/content/rti-act.pdf"
    }

    corpus = []

    # Process PDF files
    print("Processing PDF files...")
    for label, path in pdf_files.items():
        if os.path.exists(path):
            print(f"Processing {label}...")
            full_text = extract_pdf_text(path)
            cleaned_text = clean_text(full_text)
            chunks = chunk_text(cleaned_text, chunk_size=500, overlap=50)

            for i, chunk in enumerate(chunks):
                corpus.append({
                    "text": chunk,
                    "metadata": {
                        "source": label,
                        "chunk_id": i,
                        "doc_type": "legislation"
                    }
                })
            print(f"Added {len(chunks)} chunks from {label}")
        else:
            print(f"Warning: {path} not found")

    # Process Q&A dataset
    qa_file = "/content/IndicLegalQA Dataset_10K_Revised.json"
    if os.path.exists(qa_file):
        print("Processing Q&A dataset...")
        try:
            with open(qa_file, "r", encoding="utf-8") as f:
                qa_data = json.load(f)

            for i, qa in enumerate(qa_data):
                corpus.append({
                    "text": f"Question: {qa['question']} Answer: {qa['answer']}",
                    "metadata": {
                        "source": "IndicLegalQA",
                        "case_name": qa.get("case_name", "Unknown"),
                        "judgement_date": qa.get("judgement_date", ""),
                        "doc_type": "qa_pair",
                        "qa_id": i
                    }
                })
            print(f"Added {len(qa_data)} Q&A pairs")

        except Exception as e:
            print(f"Error loading Q&A data: {e}")
    else:
        print(f"Warning: {qa_file} not found")

    return corpus

# Build the corpus
corpus = build_legal_corpus()
print(f"\nLegal corpus created with {len(corpus)} entries.")


In [None]:
# ------------------------------
# 3. Create Vector Embeddings & FAISS Index
# ------------------------------

class LegalRAGSystem:
    def __init__(self, corpus):
        self.corpus = corpus
        self.embedder = SentenceTransformer('all-MiniLM-L6-v2')  # Fast, good embeddings
        self.index = None
        self.gemini_model = genai.GenerativeModel('gemini-pro')

    def build_index(self):
        """Build FAISS vector index from corpus"""
        print("Creating embeddings...")
        texts = [item["text"] for item in self.corpus]

        # Create embeddings in batches to avoid memory issues
        batch_size = 100
        embeddings = []

        for i in range(0, len(texts), batch_size):
            batch = texts[i:i + batch_size]
            batch_embeddings = self.embedder.encode(batch)
            embeddings.append(batch_embeddings)
            print(f"Processed {min(i + batch_size, len(texts))}/{len(texts)} texts")

        # Combine all embeddings
        embeddings = np.vstack(embeddings)

        # Build FAISS index
        print("Building FAISS index...")
        dimension = embeddings.shape[1]
        self.index = faiss.IndexFlatIP(dimension)  # Inner product for similarity

        # Normalize embeddings for cosine similarity
        faiss.normalize_L2(embeddings)
        self.index.add(embeddings.astype('float32'))

        print(f"Index built with {self.index.ntotal} vectors")

    def search(self, query, top_k=5):
        """Search for relevant documents"""
        query_embedding = self.embedder.encode([query])
        faiss.normalize_L2(query_embedding)

        scores, indices = self.index.search(query_embedding.astype('float32'), top_k)

        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx < len(self.corpus):
                results.append({
                    "text": self.corpus[idx]["text"],
                    "metadata": self.corpus[idx]["metadata"],
                    "similarity_score": float(score)
                })

        return results

    def generate_answer(self, query, search_results):
        """Generate answer using Gemini based on search results"""

        # Prepare context from search results
        context = "\n\n".join([
            f"Source: {result['metadata']['source']}\n{result['text']}"
            for result in search_results
        ])

        # Create prompt for Gemini
        prompt = f"""You are a legal expert AI assistant. Use the following legal documents and information to answer the user's question accurately and comprehensively.

CONTEXT FROM LEGAL DOCUMENTS:
{context}

USER QUESTION: {query}

INSTRUCTIONS:
1. Provide a clear, accurate answer based on the provided legal context
2. Cite specific sources when possible (CPA2019, MVA, RTI, or case references)
3. If the context doesn't contain enough information, clearly state this
4. Use professional legal language but make it understandable
5. Structure your response with clear points when appropriate

ANSWER:"""

        try:
            response = self.gemini_model.generate_content(prompt)
            return response.text
        except Exception as e:
            return f"Error generating response: {e}"

    def query(self, question, top_k=5):
        """Main query function - search + generate"""
        print(f"Searching for: {question}")

        # Search for relevant documents
        search_results = self.search(question, top_k=top_k)

        print(f"Found {len(search_results)} relevant documents")
        for i, result in enumerate(search_results):
            print(f"  {i+1}. Source: {result['metadata']['source']} (Score: {result['similarity_score']:.3f})")

        # Generate answer using Gemini
        answer = self.generate_answer(question, search_results)

        return {
            "question": question,
            "answer": answer,
            "sources": search_results
        }

    def save_index(self, index_path="legal_index.faiss", metadata_path="legal_metadata.json"):
        """Save index and metadata for later use"""
        if self.index:
            faiss.write_index(self.index, index_path)
            with open(metadata_path, "w", encoding="utf-8") as f:
                json.dump(self.corpus, f, ensure_ascii=False, indent=2)
            print(f"Index saved to {index_path}, metadata to {metadata_path}")

    def load_index(self, index_path="legal_index.faiss", metadata_path="legal_metadata.json"):
        """Load pre-built index and metadata"""
        if os.path.exists(index_path) and os.path.exists(metadata_path):
            self.index = faiss.read_index(index_path)
            with open(metadata_path, "r", encoding="utf-8") as f:
                self.corpus = json.load(f)
            print(f"Index loaded from {index_path}")
            return True
        return False


In [None]:
# ------------------------------
# 4. Initialize and Build RAG System
# ------------------------------

# Create RAG system
rag_system = LegalRAGSystem(corpus)

# Try to load existing index, otherwise build new one
if not rag_system.load_index():
    print("Building new index...")
    rag_system.build_index()
    rag_system.save_index()
else:
    print("Loaded existing index")


In [None]:
# ------------------------------
# 5. Test the System
# ------------------------------

def test_legal_rag():
    """Test the RAG system with sample questions"""

    test_questions = [
        "What are the consumer rights under CPA 2019?",
        "What is the procedure for filing RTI application?",
        "What are the penalties under Motor Vehicle Act?",
        "How to file a consumer complaint?",
        "What information can be sought under RTI Act?"
    ]

    print("\n" + "="*80)
    print("TESTING LEGAL RAG SYSTEM")
    print("="*80)

    for i, question in enumerate(test_questions, 1):
        print(f"\n--- TEST {i} ---")
        result = rag_system.query(question, top_k=3)

        print(f"Question: {result['question']}")
        print(f"Answer: {result['answer']}")
        print("\nTop Sources:")
        for j, source in enumerate(result['sources'], 1):
            print(f"  {j}. {source['metadata']['source']} (Score: {source['similarity_score']:.3f})")
        print("-" * 60)

# Run tests
test_legal_rag()


In [None]:
 ------------------------------
# 6. Interactive Query Function
# ------------------------------

def interactive_legal_assistant():
    """Interactive legal assistant"""
    print("\n" + "="*60)
    print("LEGAL AI ASSISTANT (Powered by Gemini)")
    print("="*60)
    print("Ask me any legal questions about CPA 2019, MVA, RTI Act, or general legal queries!")
    print("Type 'exit' to quit")
    print("-" * 60)

    while True:
        try:
            question = input("\n🏛️  Your question: ").strip()

            if question.lower() in ['exit', 'quit', 'bye']:
                print("Thank you for using Legal AI Assistant!")
                break

            if not question:
                print("Please enter a question.")
                continue

            print("\n🔍 Searching legal database...")
            result = rag_system.query(question, top_k=3)

            print(f"\n📖 Answer:")
            print(result['answer'])

            print(f"\n📚 Sources consulted:")
            for i, source in enumerate(result['sources'], 1):
                source_name = source['metadata']['source']
                score = source['similarity_score']
                print(f"  {i}. {source_name} (Relevance: {score:.1%})")

        except KeyboardInterrupt:
            print("\n\nGoodbye!")
            break
        except Exception as e:
            print(f"Error: {e}")

# Start interactive assistant
interactive_legal_assistant()