In [1]:
import time
import os
from typing import List, Dict, Tuple
import pandas as pd
import torch
import re
from collections import defaultdict

# LangChain components
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
from langchain.vectorstores import Milvus, Chroma, FAISS
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.retrievers import BM25Retriever, EnsembleRetriever
from sentence_transformers import SentenceTransformer


In [2]:
def prepare_documents_enhanced(text: str, chunk_size: int = 800, chunk_overlap: int = 200) -> List[Document]:
    """Enhanced document preparation with tunable chunking and legal metadata"""
    
    # Clean the text and remove injected compliance questions
    lines = text.split('\n')
    cleaned_lines = []
    skip_section = False
    
    for line in lines:
        if 'COMPLIANCE_QUESTIONS' in line or 'How must HOA board elections' in line:
            skip_section = True
            continue
        if skip_section and (line.strip() == '' or line.startswith('    "')):
            continue
        if skip_section and not line.startswith('    '):
            skip_section = False
        if not skip_section:
            cleaned_lines.append(line)
    
    cleaned_text = '\n'.join(cleaned_lines)
    print(f"🧼 Cleaned text: {len(cleaned_text)} characters (removed test questions)")
    
    # Create splitter with adjustable chunking
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=[
            "\n=== ", "\nARTICLE ", "\nSECTION ", "\n\n", "\n", ". ", " "
        ],
        length_function=len,
        is_separator_regex=False,
    )
    
    docs = splitter.create_documents([cleaned_text])
    
    enhanced_docs = []
    for i, doc in enumerate(docs):
        content = doc.page_content.lower()
        
        # Infer source file from known keywords
        source_doc = "unknown"
        if "arroyo park" in content:
            source_doc = "arroyo_park"
        elif "camino place" in content:
            source_doc = "camino_place"
        elif "jackson oaks" in content:
            source_doc = "jackson_oaks"
        
        metadata = {
            "chunk_id": i,
            "source_document": source_doc,
            "chunk_size": len(doc.page_content),

            # Legal concept flags
            "has_voting": any(term in content for term in ["vote", "ballot", "election", "poll", "voting", "electoral"]),
            "has_proxy": "proxy" in content,
            "has_absentee": "absentee" in content or "mail" in content,
            "has_director": any(term in content for term in ["director", "board", "officer", "president", "secretary", "treasurer"]),
            "has_quorum": "quorum" in content,
            "has_term": any(term in content for term in ["term", "tenure", "serve", "serving"]),
            "has_meeting": any(term in content for term in ["meeting", "assembly", "session", "gathering"]),
            "has_notice": any(term in content for term in ["notice", "notification", "notify", "inform"]),
            "has_agenda": "agenda" in content,
            "has_california_law": any(term in content for term in ["civil code", "california", "state law", "statute", "regulation"]),
            "has_davis_stirling": "davis" in content and "stirling" in content,
            "has_corporate_code": "corporation" in content and "code" in content,
            "has_assessment": any(term in content for term in ["assessment", "fee", "dues", "charge", "levy"]),
            "has_collection": any(term in content for term in ["collection", "delinquent", "lien", "foreclosure"]),
            "has_architectural": any(term in content for term in ["architectural", "improvement", "modification", "construction", "building"]),
            "has_enforcement": any(term in content for term in ["violation", "fine", "penalty", "enforcement", "compliance"]),
            "has_ccr": "cc&r" in content or "covenants" in content,
            "is_bylaw": "bylaw" in content,
            "is_article": content.strip().startswith("article"),
            "is_section": content.strip().startswith("section"),
        }
        
        enhanced_docs.append(Document(page_content=doc.page_content, metadata=metadata))
    
    print(f"📦 Created {len(enhanced_docs)} chunks (avg length: {round(sum(len(d.page_content) for d in enhanced_docs)/len(enhanced_docs))} chars)")
    return enhanced_docs


In [3]:
# FIXED: Improved embedding setup with corrected parameters
def setup_embeddings_enhanced(hf_token=None):
    """Enhanced embedding setup with models optimized for legal text - FIXED VERSION"""
    models = {}
    model_timings = {}
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"🖥️ Using device: {device}")

    # Optimized models for legal/compliance text
    embedding_configs = [
        ("BGE-Large", "BAAI/bge-large-en-v1.5"),  # Excellent for legal text
        ("E5-Large", "intfloat/e5-large-v2"),     # Strong general performance
        ("GTE-Large", "Alibaba-NLP/gte-large-en-v1.5"),  # Your current best
        ("BGE-Base", "BAAI/bge-base-en-v1.5"),    # Faster alternative
        ("E5-Base", "intfloat/e5-base-v2"),       # Updated version
        ("MiniLM-L6", "sentence-transformers/all-MiniLM-L6-v2"),  # Fallback
    ]

    print(f"📥 Loading {len(embedding_configs)} embedding models...")
    total_start = time.time()

    for name, model_id in embedding_configs:
        model_start = time.time()
        try:
            # FIXED: Removed show_progress_bar from encode_kwargs to avoid conflict
            kwargs = {"device": device}
            
            if "gte-large" in model_id.lower():
                kwargs["trust_remote_code"] = True

            model = HuggingFaceEmbeddings(
                model_name=model_id,
                model_kwargs=kwargs,
                encode_kwargs={
                    "normalize_embeddings": True, 
                    "batch_size": 32,  # Smaller batch for stability
                    # REMOVED: "show_progress_bar": False - this causes the conflict
                }
            )
            load_time = time.time() - model_start
            models[name] = model
            model_timings[name] = load_time
            print(f"✓ Loaded {name} ({load_time:.2f}s)")
        
        except Exception as e:
            load_time = time.time() - model_start
            model_timings[name] = float('inf')
            print(f"✗ Failed to load {name}: {str(e)[:100]}")

    return models, model_timings


In [4]:
# Enhanced retrieval with hybrid search
def create_hybrid_retriever(vector_store, documents, top_k=5):
    """Create hybrid retriever combining vector and BM25 search"""
    try:
        # BM25 retriever for exact keyword matching
        bm25_retriever = BM25Retriever.from_documents(documents)
        bm25_retriever.k = top_k
        
        # Vector retriever
        vector_retriever = vector_store.as_retriever(search_kwargs={"k": top_k})
        
        # Ensemble retriever (combines both)
        ensemble_retriever = EnsembleRetriever(
            retrievers=[bm25_retriever, vector_retriever],
            weights=[0.3, 0.7]  # Favor vector search but include keyword matching
        )
        
        return ensemble_retriever
    except Exception as e:
        print(f"Failed to create hybrid retriever: {e}")
        return vector_store.as_retriever(search_kwargs={"k": top_k})


In [5]:
# Enhanced relevance scoring
def calculate_enhanced_relevance(question: str, results: List[Document]) -> float:
    """Enhanced relevance calculation with legal concept matching"""
    if not results:
        return 0.0
    
    question_lower = question.lower()
    
    # Legal concept keywords for different question types
    concept_keywords = {
        "elections": ["election", "vote", "ballot", "director", "board", "candidate"],
        "voting": ["vote", "voting", "ballot", "proxy", "absentee", "poll"],
        "qualifications": ["qualifications", "eligible", "candidate", "director", "board"],
        "notice": ["notice", "notification", "inform", "days", "advance", "prior"],
        "proxy": ["proxy", "voting", "authorization", "delegate"],
        "quorum": ["quorum", "minimum", "present", "meeting", "percentage"],
        "term": ["term", "serve", "serving", "years", "length", "duration"],
        "removal": ["removal", "remove", "recall", "dismissed", "terminated"],
        "assessment": ["assessment", "fee", "dues", "collection", "payment"],
        "architectural": ["architectural", "approval", "modification", "improvement", "construction"],
        "enforcement": ["enforcement", "violation", "fine", "penalty", "compliance"],
        "amendment": ["amendment", "amend", "change", "modify", "vote", "approval"]
    }
    
    # Identify question type
    question_type = None
    for concept, keywords in concept_keywords.items():
        if any(keyword in question_lower for keyword in keywords):
            question_type = concept
            break
    
    total_score = 0.0
    for doc in results:
        doc_content = doc.page_content.lower()
        
        # Base keyword overlap
        question_words = set(question_lower.split())
        doc_words = set(doc_content.split())
        keyword_overlap = len(question_words.intersection(doc_words)) / len(question_words)
        
        # Concept-specific bonus
        concept_bonus = 0.0
        if question_type and question_type in concept_keywords:
            concept_words = concept_keywords[question_type]
            concept_matches = sum(1 for word in concept_words if word in doc_content)
            concept_bonus = min(concept_matches / len(concept_words), 0.5)
        
        # Metadata bonus for relevant categorization
        metadata_bonus = 0.0
        if hasattr(doc, 'metadata'):
            metadata = doc.metadata
            if question_type == "elections" and (metadata.get('has_voting') or metadata.get('has_director')):
                metadata_bonus += 0.1
            elif question_type == "voting" and metadata.get('has_voting'):
                metadata_bonus += 0.1
            elif question_type == "notice" and metadata.get('has_notice'):
                metadata_bonus += 0.1
            # Add more metadata bonuses as needed
        
        # California law bonus
        legal_bonus = 0.1 if any(term in doc_content for term in [
            "civil code", "california", "davis-stirling", "state law"
        ]) else 0.0
        
        doc_score = keyword_overlap + concept_bonus + metadata_bonus + legal_bonus
        total_score += min(doc_score, 1.0)  # Cap individual document scores
    
    return min(total_score / len(results), 1.0)


In [6]:
def test_vector_store_enhanced(
    store_type: str,
    documents: List[Document],
    embeddings,
    store_name: str,
    use_hybrid: bool = True,
    show_samples: bool = True
):
    """Benchmark vector store with multiple HOA compliance questions using hybrid or standard retrieval"""
    print(f"\n--- Testing {store_name} with {store_type} (Hybrid: {use_hybrid}) ---")
    start_time = time.time()

    # Vector store creation
    try:
        if store_type == "FAISS":
            vector_store = FAISS.from_documents(documents, embeddings)
        elif store_type == "Chroma":
            vector_store = Chroma.from_documents(
                documents, embeddings,
                collection_name=f"hoa_enhanced_{int(time.time())}"
            )
        elif store_type == "Milvus":
            vector_store = Milvus.from_documents(
                documents, embeddings,
                connection_args={"host": "localhost", "port": "19530"},
                collection_name=f"hoa_enhanced_{int(time.time())}"
            )
        else:
            raise ValueError(f"Unsupported vector store: {store_type}")

        setup_time = time.time() - start_time
    except Exception as e:
        print(f"❌ Vector store setup failed: {e}")
        return None

    # Retriever setup
    retriever = (
        create_hybrid_retriever(vector_store, documents, top_k=5)
        if use_hybrid else
        vector_store.as_retriever(search_kwargs={"k": 5})
    )
    print(f"🧠 Retriever ready (setup time: {setup_time:.2f}s)")

    # HOA compliance questions
    compliance_questions = [
        "How must HOA board elections be conducted under California law?",
        "What voting methods are required for board elections?",
        "What qualifications are required for board candidates?",
        "What notice requirements exist for board meetings?",
        "How are proxy votes handled in HOA elections?",
        "What constitutes a quorum for member meetings?",
        "How long do directors serve on the board?",
        "Under what circumstances can a director be removed?",
        "What are the assessment collection procedures?",
        "How are architectural review requests processed?",
        "What enforcement actions can the HOA take for violations?",
        "What are the requirements for amending CC&Rs or bylaws?"
    ]

    query_times, relevance_scores, detailed_results = [], [], []

    for i, question in enumerate(compliance_questions):
        try:
            q_start = time.time()
            results = retriever.get_relevant_documents(question)
            query_time = time.time() - q_start
            query_times.append(query_time)

            relevance = calculate_enhanced_relevance(question, results)
            relevance_scores.append(relevance)

            top_chunk = results[0] if results else None
            detailed_results.append({
                "question": question,
                "relevance": relevance,
                "query_time": query_time,
                "top_result": top_chunk.page_content[:200] if top_chunk else "No results",
                "source": top_chunk.metadata.get("source_document", "unknown") if top_chunk else "unknown"
            })

            # Optional debug print
            if show_samples and i == 0:
                print(f"\n📌 Sample result: {question}")
                if top_chunk:
                    print(f"  → Source: {top_chunk.metadata.get('source_document', 'unknown')}")
                    print(f"  → Chunk ID: {top_chunk.metadata.get('chunk_id', 'N/A')}")
                    print(f"  → Content: {top_chunk.page_content[:200]}...")
                    print(f"  → Relevance: {relevance:.3f}")
                else:
                    print("  → No results found")
        except Exception as e:
            print(f"⚠️ Query error: '{question[:50]}...' → {e}")
            query_times.append(float('inf'))
            relevance_scores.append(0.0)

    # Final scoring
    avg_relevance = sum(relevance_scores) / len(relevance_scores)
    valid_times = [qt for qt in query_times if qt != float('inf')]
    avg_query_time = sum(valid_times) / len(valid_times) if valid_times else float('inf')
    time_score = min(1.0 / (1.0 + avg_query_time * 10), 1.0)
    compliance_score = avg_relevance * 0.85 + time_score * 0.15
    high_quality = sum(1 for score in relevance_scores if score > 0.7)

    print(f"\n📊 Compliance Benchmark Summary:")
    print(f"→ Average relevance: {avg_relevance:.3f}")
    print(f"→ Average query time: {avg_query_time*1000:.1f}ms")
    print(f"→ Compliance score: {compliance_score:.3f}")
    print(f"→ High-quality answers (>0.7): {high_quality}/12")

    return {
        "embedding": store_name,
        "vector_store": store_type,
        "hybrid": use_hybrid,
        "setup_time": setup_time,
        "avg_relevance": avg_relevance,
        "avg_query_time": avg_query_time,
        "compliance_score": compliance_score,
        "high_quality_count": high_quality,
        "detailed_results": detailed_results
    }



In [7]:
# Debug and diagnostic functions
def check_system_setup():
    """Check system setup and dependencies"""
    print("🔍 SYSTEM DIAGNOSTICS")
    print("=" * 40)
    
    # Check CUDA
    print(f"CUDA Available: {torch.cuda.is_available()}")
    if torch.cuda.is_available():
        print(f"CUDA Device: {torch.cuda.get_device_name(0)}")
        print(f"CUDA Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
    
    # Check imports
    try:
        from sentence_transformers import SentenceTransformer
        print("✓ sentence-transformers available")
    except ImportError:
        print("✗ sentence-transformers not available")
    
    try:
        from langchain.retrievers import BM25Retriever
        print("✓ BM25Retriever available")
    except ImportError:
        print("✗ BM25Retriever not available - hybrid search disabled")
    
    try:
        from pymilvus import connections
        print("✓ Milvus client available")
    except ImportError:
        print("✗ Milvus client not available")
    
    print()


In [8]:
def test_single_embedding_model(model_name: str, model_id: str):
    # Test a single embedding model with fallback for non-compatible architectures
    print(f"🧪 Testing {model_name}...")

    try:
        device = "cuda" if torch.cuda.is_available() else "cpu"
        model_kwargs = {"device": device}

        # Add GTE-specific trust logic to model_kwargs only
        if "gte-large" in model_id.lower():
            model_kwargs["trust_remote_code"] = True
            print("🔐 Enabled trust_remote_code for GTE-Large")

        start_time = time.time()

        # Fallback for LegalBERT-style raw transformer models
        if "legal-bert" in model_id.lower():
            from transformers import AutoModel, AutoTokenizer
            tokenizer = AutoTokenizer.from_pretrained(model_id)
            model = AutoModel.from_pretrained(model_id).to(device)

            def embed_documents(texts):
                inputs = tokenizer(texts, padding=True, truncation=True, return_tensors="pt").to(device)
                with torch.no_grad():
                    outputs = model(**inputs)
                return outputs.last_hidden_state[:, 0, :].cpu().numpy()

            load_time = time.time() - start_time
            print(f"  ⚖️ LegalBERT loaded in {load_time:.2f}s (transformers fallback)")

        else:
            model = HuggingFaceEmbeddings(
                model_name=model_id,
                model_kwargs=model_kwargs,  # ✅ trust_remote_code nested here
                encode_kwargs={
                    "normalize_embeddings": True,
                    "batch_size": 8
                }
            )
            embed_documents = model.embed_documents
            load_time = time.time() - start_time
            print(f"  ✓ {model_name} loaded in {load_time:.2f}s")

        # Test embedding output
        test_text = ["HOA board elections under California law", "Proxy voting procedures"]
        embed_start = time.time()
        embeddings = embed_documents(test_text)
        embed_time = time.time() - embed_start

        print(f"  ✓ Embedding test: {embed_time:.3f}s for {len(test_text)} docs")
        print(f"  ✓ Embedding dimension: {len(embeddings[0])}")
        print(f"  ✓ GPU Active: {torch.cuda.is_available() and torch.cuda.memory_allocated() > 0}")

        return model if isinstance(model, HuggingFaceEmbeddings) else embed_documents, True

    except Exception as e:
        print(f"  ✗ Failed: {e}")
        return None, False

In [9]:
# Simplified main function with extensive debugging
def run_enhanced_compliance_test():
    """Run the enhanced compliance test with comprehensive debugging"""
    print("🚀 ENHANCED HOA COMPLIANCE RAG TEST")
    print("=" * 60)
    
    # System diagnostics first
    check_system_setup()
    
    # Load documents with verification
    print("📁 LOADING DOCUMENTS")
    print("-" * 30)
    base_path = r"C:\Users\default.LAPTOP-4HP0OBME\OneDrive\Documents\GitHub\RAG\pro_elect"
    hoa_files = ["arroyo_park.txt", "camino_place.txt", "jackson_oaks.txt"]
    
    combined_text = ""
    for filename in hoa_files:
        filepath = os.path.join(base_path, filename)
        try:
            with open(filepath, 'r', encoding='utf-8') as file:
                content = file.read()
                combined_text += f"\n\n=== {filename.replace('.txt', '').replace('_', ' ').title()} HOA ===\n"
                combined_text += content
                print(f"✓ Loaded {filename} ({len(content):,} characters)")
        except Exception as e:
            print(f"✗ Error loading {filename}: {e}")
            return []
    
    if not combined_text:
        print("❌ No documents loaded successfully!")
        return []
    
    print(f"📊 Total text: {len(combined_text):,} characters")
    
    # Prepare documents with verification
    print(f"\n📝 PREPARING DOCUMENTS")
    print("-" * 30)
    documents = prepare_documents_enhanced(combined_text)
    
    if not documents:
        print("❌ No documents prepared!")
        return []
    
    print(f"✓ Created {len(documents)} document chunks")
    
    # Test embedding models one by one
    print(f"\n🤖 TESTING EMBEDDING MODELS")
    print("-" * 35)
    
    # Start with just one reliable model
    working_models = {}
    
    # Test models in order of preference
    models_to_test = [
        ("E5-Base", "intfloat/e5-base-v2"),
        ("BGE-Base", "BAAI/bge-base-en-v1.5"),
        ("MiniLM-L6", "sentence-transformers/all-MiniLM-L6-v2"),  # Most reliable fallback
        ("GTE-Large", "Alibaba-NLP/gte-large-en-v1.5"),
    ]
    
    for name, model_id in models_to_test:
        model, success = test_single_embedding_model(name, model_id)
        if success:
            working_models[name] = model
            print(f"  ✅ {name} working - will use for testing")
        else:
            print(f"  ❌ {name} failed")
    
    if not working_models:
        print("❌ No embedding models working!")
        return []
    
    # Test vector stores with the working model
    print(f"\n🗄️  TESTING VECTOR STORES")
    print("-" * 30)
    
    results = []
    embed_name, embed_model = next(iter(working_models.items()))
    
    # Test FAISS first (most reliable)
    print(f"Testing FAISS with {embed_name}...")
    try:
        faiss_result = test_vector_store_enhanced(
            "FAISS", documents, embed_model, embed_name, use_hybrid=False
        )
        if faiss_result:
            results.append(faiss_result)
            print(f"✅ FAISS test successful: {faiss_result['compliance_score']:.3f}")
        else:
            print("❌ FAISS test failed")
    except Exception as e:
        print(f"❌ FAISS error: {e}")
    
    # Try hybrid search if BM25 is available
    try:
        from langchain.retrievers import BM25Retriever
        print(f"Testing Hybrid FAISS with {embed_name}...")
        hybrid_result = test_vector_store_enhanced(
            "FAISS", documents, embed_model, embed_name, use_hybrid=True
        )
        if hybrid_result:
            results.append(hybrid_result)
            print(f"✅ Hybrid test successful: {hybrid_result['compliance_score']:.3f}")
        else:
            print("❌ Hybrid test failed") 
    except Exception as e:
        print(f"❌ Hybrid search not available: {e}")
    
    # Display results
    if results:
        print(f"\n📊 COMPLIANCE TEST RESULTS")
        print("=" * 50)
        
        results.sort(key=lambda x: x['compliance_score'], reverse=True)
        
        for i, result in enumerate(results, 1):
            hybrid_text = " + Hybrid" if result['hybrid'] else ""
            print(f"{i}. {result['embedding']} + {result['vector_store']}{hybrid_text}")
            print(f"   🎯 Compliance Score: {result['compliance_score']:.3f}")
            print(f"   📈 Relevance: {result['avg_relevance']:.3f}")
            print(f"   ⚡ Query Time: {result['avg_query_time']*1000:.1f}ms")
            print(f"   ⭐ High Quality: {result['high_quality_count']}/12")
            print()
        
        best = results[0]
        if best['compliance_score'] >= 0.70:
            print(f"🎯 SUCCESS! Best configuration achieves {best['compliance_score']:.3f}")
            print(f"   Recommended: {best['embedding']} + {best['vector_store']}")
        else:
            print(f"⚠️  Best score: {best['compliance_score']:.3f}")
            print("   Try additional optimizations or different models")
            
        return results
    else:
        print("❌ No successful test results!")
        return []


In [10]:
# =============================================================================
# CONFIGURATION - CHANGE THESE FLAGS TO CONTROL WHAT RUNS
# =============================================================================

RUN_QUICK_TEST = False      # Set to True to run quick test only
RUN_FULL_TEST = True        # Set to True to run full compliance test
SKIP_QUICK_IF_FULL = True   # Set to True to skip quick test when running full test

# =============================================================================
# QUICK TEST FUNCTION
# =============================================================================

def quick_test(model_name="E5-Base", model_id="intfloat/e5-base-v2"):
    """Quick test to verify preferred embedding model functionality"""
    print("🚀 QUICK FUNCTIONALITY TEST")
    print("=" * 40)

    try:
        # Use your enhanced embedding tester
        embed_model, success = test_single_embedding_model(model_name, model_id)

        if not success:
            print(f"❌ {model_name} embedding test failed.")
            return False

        # Confirm it embeds correctly
        test_input = ["HOA board elections under California law"]
        test_embeddings = embed_model.embed_documents(test_input)

        print(f"✅ {model_name} embedding successful: {len(test_embeddings[0])} dims")
        print(f"🧠 Sample embedding norm: {round(float(sum(x*x for x in test_embeddings[0])**0.5), 3)}")
        return True

    except Exception as e:
        print(f"❌ Embedding test exception: {e}")
        return False
    
    

In [11]:

# =============================================================================
# CONFIGURATION - CHANGE THESE FLAGS TO CONTROL WHAT RUNS
# =============================================================================

RUN_QUICK_TEST = False      # Set to True to run quick test only
RUN_FULL_TEST = True        # Set to True to run full compliance test
SKIP_QUICK_IF_FULL = True   # Set to True to skip quick test when running full test

In [12]:
# =============================================================================
# QUICK TEST FUNCTION
# =============================================================================

def quick_test(model_name="E5-Base", model_id="intfloat/e5-base-v2"):
    """Quick test to verify preferred embedding model functionality"""
    print("🚀 QUICK FUNCTIONALITY TEST")
    print("=" * 40)

    try:
        # Use your enhanced embedding tester
        embed_model, success = test_single_embedding_model(model_name, model_id)

        if not success:
            print(f"❌ {model_name} embedding test failed.")
            return False

        # Confirm it embeds correctly
        test_input = ["HOA board elections under California law"]
        test_embeddings = embed_model.embed_documents(test_input)

        print(f"✅ {model_name} embedding successful: {len(test_embeddings[0])} dims")
        print(f"🧠 Sample embedding norm: {round(float(sum(x*x for x in test_embeddings[0])**0.5), 3)}")
        return True

    except Exception as e:
        print(f"❌ Embedding test exception: {e}")
        return False
    
# =============================================================================
# MAIN EXECUTION LOGIC
# =============================================================================

def run_tests():
    """Main function to run tests based on configuration flags"""
    
    # Determine what to run
    should_run_quick = RUN_QUICK_TEST and (not RUN_FULL_TEST or not SKIP_QUICK_IF_FULL)
    should_run_full = RUN_FULL_TEST
    
    results = None
    
    # Run quick test if requested
    if should_run_quick:
        print("📣 Running quick test...")
        quick_success = quick_test()
        
        if not quick_success:
            print("❌ Quick test failed - trying alternative...")
            quick_success = quick_test_alternative()
        
        if not quick_success:
            print("❌ All quick tests failed!")
            if RUN_FULL_TEST:
                print("⚠️  Proceeding with full test anyway...")
            else:
                return None
        else:
            print("✅ Quick test passed!")
    
    # Run full test if requested
    if should_run_full:
        if should_run_quick:
            print("\n" + "=" * 60)
        
        print("📣 Running full compliance test...")
        try:
            results = run_enhanced_compliance_test()
        except Exception as e:
            print(f"❌ Full test failed with error: {e}")
            return None
    
    # Summary
    if should_run_quick and should_run_full:
        print("\n" + "=" * 60)
        print("🎯 COMPLETED: Both quick test and full compliance test")
    elif should_run_quick:
        print("\n" + "=" * 60)
        print("🎯 COMPLETED: Quick test only")
    elif should_run_full:
        print("\n" + "=" * 60)
        print("🎯 COMPLETED: Full compliance test only")
    
    return results

In [13]:
# =============================================================================
# MAIN EXECUTION LOGIC
# =============================================================================

def run_tests():
    """Main function to run tests based on configuration flags"""
    
    # Determine what to run
    should_run_quick = RUN_QUICK_TEST and (not RUN_FULL_TEST or not SKIP_QUICK_IF_FULL)
    should_run_full = RUN_FULL_TEST
    
    results = None
    
    # Run quick test if requested
    if should_run_quick:
        print("📣 Running quick test...")
        quick_success = quick_test()
        
        if not quick_success:
            print("❌ Quick test failed - trying alternative...")
            quick_success = quick_test_alternative()
        
        if not quick_success:
            print("❌ All quick tests failed!")
            if RUN_FULL_TEST:
                print("⚠️  Proceeding with full test anyway...")
            else:
                return None
        else:
            print("✅ Quick test passed!")
    
    # Run full test if requested
    if should_run_full:
        if should_run_quick:
            print("\n" + "=" * 60)
        
        print("📣 Running full compliance test...")
        try:
            results = run_enhanced_compliance_test()
        except Exception as e:
            print(f"❌ Full test failed with error: {e}")
            return None
    
    # Summary
    if should_run_quick and should_run_full:
        print("\n" + "=" * 60)
        print("🎯 COMPLETED: Both quick test and full compliance test")
    elif should_run_quick:
        print("\n" + "=" * 60)
        print("🎯 COMPLETED: Quick test only")
    elif should_run_full:
        print("\n" + "=" * 60)
        print("🎯 COMPLETED: Full compliance test only")
    
    return results

In [14]:
# =============================================================================
# JUPYTER EXECUTION CELL
# =============================================================================

# Just run this cell after setting your flags above:
if __name__ == "__main__" or True:  # This makes it work in Jupyter
    results = run_tests()

📣 Running full compliance test...
🚀 ENHANCED HOA COMPLIANCE RAG TEST
🔍 SYSTEM DIAGNOSTICS
CUDA Available: True
CUDA Device: NVIDIA GeForce RTX 4070 Laptop GPU
CUDA Memory: 8.6 GB
✓ sentence-transformers available
✓ BM25Retriever available
✓ Milvus client available

📁 LOADING DOCUMENTS
------------------------------
✓ Loaded arroyo_park.txt (10,214 characters)
✓ Loaded camino_place.txt (67,574 characters)
✓ Loaded jackson_oaks.txt (22,236 characters)
📊 Total text: 100,104 characters

📝 PREPARING DOCUMENTS
------------------------------
🧼 Cleaned text: 97327 characters (removed test questions)
📦 Created 172 chunks (avg length: 693 chars)
✓ Created 172 document chunks

🤖 TESTING EMBEDDING MODELS
-----------------------------------
🧪 Testing E5-Base...
  ✓ E5-Base loaded in 2.07s
  ✓ Embedding test: 0.127s for 2 docs
  ✓ Embedding dimension: 768
  ✓ GPU Active: True
  ✅ E5-Base working - will use for testing
🧪 Testing BGE-Base...
  ✓ BGE-Base loaded in 1.39s
  ✓ Embedding test: 0.008s for 

  results = retriever.get_relevant_documents(question)


🧠 Retriever ready (setup time: 1.36s)

📌 Sample result: How must HOA board elections be conducted under California law?
  → Source: unknown
  → Chunk ID: 135
  → Content: amended as set forth below: 
Section 3.02. Election; Tenure; Vacancies. 
(a) 
At each annual meeting the members shall by simple majority vote of those 
present in person or by proxy by secret ballot,...
  → Relevance: 0.780

📊 Compliance Benchmark Summary:
→ Average relevance: 0.699
→ Average query time: 5.6ms
→ Compliance score: 0.737
→ High-quality answers (>0.7): 7/12
✅ Hybrid test successful: 0.737

📊 COMPLIANCE TEST RESULTS
1. E5-Base + FAISS
   🎯 Compliance Score: 0.776
   📈 Relevance: 0.753
   ⚡ Query Time: 10.1ms
   ⭐ High Quality: 8/12

2. E5-Base + FAISS + Hybrid
   🎯 Compliance Score: 0.737
   📈 Relevance: 0.699
   ⚡ Query Time: 5.6ms
   ⭐ High Quality: 7/12

🎯 SUCCESS! Best configuration achieves 0.776
   Recommended: E5-Base + FAISS

🎯 COMPLETED: Full compliance test only


In [15]:
"""RUN_QUICK_TEST = True  # Toggle to False to skip quick test

def quick_test(model_name="E5-Base", model_id="intfloat/e5-base-v2"):
    #Quick test to verify preferred embedding model functionality
    print("🚀 QUICK FUNCTIONALITY TEST")
    print("=" * 40)

    try:
        # Use your enhanced embedding tester
        embed_model, success = test_single_embedding_model(model_name, model_id)

        if not success:
            print(f"❌ {model_name} embedding test failed.")
            return False

        # Confirm it embeds correctly
        test_input = ["HOA board elections under California law"]
        test_embeddings = embed_model.embed_documents(test_input)

        print(f"✅ {model_name} embedding successful: {len(test_embeddings[0])} dims")
        print(f"🧠 Sample embedding norm: {round(float(sum(x*x for x in test_embeddings[0])**0.5), 3)}")
        return True

    except Exception as e:
        print(f"❌ Embedding test exception: {e}")
        return False
if RUN_QUICK_TEST:
    if quick_test():
        run_enhanced_compliance_test()
    else:
        print("🛑 Quick test failed. Skipping full benchmark.")"""


'RUN_QUICK_TEST = True  # Toggle to False to skip quick test\n\ndef quick_test(model_name="E5-Base", model_id="intfloat/e5-base-v2"):\n    #Quick test to verify preferred embedding model functionality\n    print("🚀 QUICK FUNCTIONALITY TEST")\n    print("=" * 40)\n\n    try:\n        # Use your enhanced embedding tester\n        embed_model, success = test_single_embedding_model(model_name, model_id)\n\n        if not success:\n            print(f"❌ {model_name} embedding test failed.")\n            return False\n\n        # Confirm it embeds correctly\n        test_input = ["HOA board elections under California law"]\n        test_embeddings = embed_model.embed_documents(test_input)\n\n        print(f"✅ {model_name} embedding successful: {len(test_embeddings[0])} dims")\n        print(f"🧠 Sample embedding norm: {round(float(sum(x*x for x in test_embeddings[0])**0.5), 3)}")\n        return True\n\n    except Exception as e:\n        print(f"❌ Embedding test exception: {e}")\n        ret

In [16]:
"""RUN_QUICK_TEST = False  # Toggle this to False to skip quick test

def quick_test():
    #Quick test to verify basic embedding functionality
    print("🚀 QUICK FUNCTIONALITY TEST")
    print("=" * 40)
    
    try:
        from sentence_transformers import SentenceTransformer
        print("Testing sentence-transformers directly...")
        model = SentenceTransformer("all-MiniLM-L6-v2")
        embeddings = model.encode(["test sentence"])
        print(f"✅ Direct embedding test successful: {len(embeddings[0])} dims")
        return True
    except Exception as e:
        print(f"❌ Basic embedding test failed: {e}")
        return False


# Jupyter-safe launcher
if RUN_QUICK_TEST:
    if quick_test():
        print("\n" + "=" * 60)
        results = run_enhanced_compliance_test()
    else:
        print("❌ Quick test failed - skipping full benchmark")
else:
    print("📣 Skipping quick test – launching full benchmark directly")
    results = run_enhanced_compliance_test()"""


'RUN_QUICK_TEST = False  # Toggle this to False to skip quick test\n\ndef quick_test():\n    #Quick test to verify basic embedding functionality\n    print("🚀 QUICK FUNCTIONALITY TEST")\n    print("=" * 40)\n\n    try:\n        from sentence_transformers import SentenceTransformer\n        print("Testing sentence-transformers directly...")\n        model = SentenceTransformer("all-MiniLM-L6-v2")\n        embeddings = model.encode(["test sentence"])\n        print(f"✅ Direct embedding test successful: {len(embeddings[0])} dims")\n        return True\n    except Exception as e:\n        print(f"❌ Basic embedding test failed: {e}")\n        return False\n\n\n# Jupyter-safe launcher\nif RUN_QUICK_TEST:\n    if quick_test():\n        print("\n" + "=" * 60)\n        results = run_enhanced_compliance_test()\n    else:\n        print("❌ Quick test failed - skipping full benchmark")\nelse:\n    print("📣 Skipping quick test – launching full benchmark directly")\n    results = run_enhanced_compli

In [17]:
#run_enhanced_compliance_test()

In [18]:
run_enhanced_compliance_test()

🚀 ENHANCED HOA COMPLIANCE RAG TEST
🔍 SYSTEM DIAGNOSTICS
CUDA Available: True
CUDA Device: NVIDIA GeForce RTX 4070 Laptop GPU
CUDA Memory: 8.6 GB
✓ sentence-transformers available
✓ BM25Retriever available
✓ Milvus client available

📁 LOADING DOCUMENTS
------------------------------
✓ Loaded arroyo_park.txt (10,214 characters)
✓ Loaded camino_place.txt (67,574 characters)
✓ Loaded jackson_oaks.txt (22,236 characters)
📊 Total text: 100,104 characters

📝 PREPARING DOCUMENTS
------------------------------
🧼 Cleaned text: 97327 characters (removed test questions)
📦 Created 172 chunks (avg length: 693 chars)
✓ Created 172 document chunks

🤖 TESTING EMBEDDING MODELS
-----------------------------------
🧪 Testing E5-Base...
  ✓ E5-Base loaded in 0.94s
  ✓ Embedding test: 0.007s for 2 docs
  ✓ Embedding dimension: 768
  ✓ GPU Active: True
  ✅ E5-Base working - will use for testing
🧪 Testing BGE-Base...
  ✓ BGE-Base loaded in 0.95s
  ✓ Embedding test: 0.011s for 2 docs
  ✓ Embedding dimension: 76

[{'embedding': 'E5-Base',
  'vector_store': 'FAISS',
  'hybrid': False,
  'setup_time': 1.4776365756988525,
  'avg_relevance': 0.7529695767195768,
  'avg_query_time': 0.005413909753163655,
  'compliance_score': 0.7823203517374614,
  'high_quality_count': 8,
  'detailed_results': [{'question': 'How must HOA board elections be conducted under California law?',
    'relevance': 0.8733333333333333,
    'query_time': 0.0060007572174072266,
    'top_result': 'amended as set forth below: \nSection 3.02. Election; Tenure; Vacancies. \n(a) \nAt each annual meeting the members shall by simple majority vote of those \npresent in person or by proxy by secret ballot,',
    'source': 'unknown'},
   {'question': 'What voting methods are required for board elections?',
    'relevance': 0.9800000000000001,
    'query_time': 0.005040168762207031,
    'top_result': 'ARTICLE 4 - MEMBER MEETINGS AND VOTING\n4.6 Voting. Members in Good Standing shall be entitled to cast one (1) vote for each Unit owned or, 

# Run the model 

In [19]:
# Define question list
QUESTIONS = [
    "How must HOA board elections be conducted under California law?",
    "What voting methods are required for board elections?",
    "What qualifications are required for board candidates?",
    "What notice requirements exist for board meetings?",
    "How are proxy votes handled in HOA elections?",
    "What constitutes a quorum for member meetings?",
    "How long do directors serve on the board?",
    "Under what circumstances can a director be removed?",
    "What are the assessment collection procedures?",
    "How are architectural review requests processed?",
    "What enforcement actions can the HOA take for violations?",
    "What are the requirements for amending CC&Rs or bylaws?"
]

# Global setup function (place outside query function)
def setup_retriever(store_type: str, docs: List[Document], embed_model, use_hybrid=True):
    try:
        if store_type == "FAISS":
            vector_store = FAISS.from_documents(docs, embed_model)
        elif store_type == "Chroma":
            vector_store = Chroma.from_documents(docs, embed_model)
        elif store_type == "Milvus":
            vector_store = Milvus.from_documents(
                docs,
                embed_model,
                connection_args={"host": "localhost", "port": "19530"},
                collection_name=f"hoa_query_{int(time.time())}"
            )
        else:
            raise ValueError(f"Unsupported vector store type: {store_type}")
        
        return create_hybrid_retriever(vector_store, docs, top_k=5) if use_hybrid else vector_store.as_retriever(search_kwargs={"k": 5})
    except Exception as e:
        print(f"❌ Error setting up retriever: {e}")
        return None

# Main query function
def query_hoa_by_number(question_number: int, hoa_label: str, embed_model, vector_store_name="FAISS"):
    """Retrieve answer for specified question from a selected HOA document"""
    if not (1 <= question_number <= len(QUESTIONS)):
        print("❌ Invalid question number. Please select 1–12.")
        return None

    question = QUESTIONS[question_number - 1]
    print(f"\n🔎 Question {question_number}: {question}")
    print(f"📂 Searching in HOA: {hoa_label}")

    base_path = r"C:\Users\default.LAPTOP-4HP0OBME\OneDrive\Documents\GitHub\RAG\pro_elect"
    file_map = {
        "arroyo park": "arroyo_park.txt",
        "camino place": "camino_place.txt",
        "jackson oaks": "jackson_oaks.txt"
    }

    file_name = file_map.get(hoa_label.lower())
    if not file_name:
        print("❌ Invalid HOA name. Choose from: arroyo park, camino place, jackson oaks.")
        return None

    try:
        with open(os.path.join(base_path, file_name), "r", encoding="utf-8") as f:
            text = f.read()
        docs = prepare_documents_enhanced(text)
    except Exception as e:
        print(f"❌ Error loading file: {e}")
        return None

    retriever = setup_retriever(vector_store_name, docs, embed_model)
    if not retriever:
        print("❌ Retriever setup failed.")
        return None

    try:
        results = retriever.get_relevant_documents(question)
        print("\n📄 Top Retrieval Result:")
        print(f"→ Source: {hoa_label}")
        print(f"→ Content: {results[0].page_content[:400].strip()}...")
        return results[0].page_content
    except Exception as e:
        print(f"❌ Retrieval error: {e}")
        return None


In [20]:
# Assuming your embedding model was loaded like:
embed_model, _ = test_single_embedding_model("E5-Base", "intfloat/e5-base-v2")

# Run QA
query_hoa_by_number(3, "camino place", embed_model)


🧪 Testing E5-Base...
  ✓ E5-Base loaded in 0.98s
  ✓ Embedding test: 0.020s for 2 docs
  ✓ Embedding dimension: 768
  ✓ GPU Active: True

🔎 Question 3: What qualifications are required for board candidates?
📂 Searching in HOA: camino place
🧼 Cleaned text: 67574 characters (removed test questions)
📦 Created 120 chunks (avg length: 691 chars)

📄 Top Retrieval Result:
→ Source: camino place
→ Content: 6 
Director, and two or more Members of the Association. The Nominating 
Committee may make as many nominations for election to the Board as it deems 
appropriate. 
Any Member who satisfies the qualifications set forth in these Bylaws and 
Rules adopted pursuant to Civil Code section 5105 may place his or her name in 
nomination for election to the Board of Directors by giving written notice to th...


"6 \nDirector, and two or more Members of the Association. The Nominating \nCommittee may make as many nominations for election to the Board as it deems \nappropriate. \nAny Member who satisfies the qualifications set forth in these Bylaws and \nRules adopted pursuant to Civil Code section 5105 may place his or her name in \nnomination for election to the Board of Directors by giving written notice to the \nAssociation's managing agent and/or the Board. Notice of self-nomination must \nbe received prior to the published deadline for nominations. Nominations may \nnot be made from the floor at any meeting. \n5.4 \nElection. \nDirectors shall be elected annually by secret ballot in \naccordance with Civil Code sections 5100 through 5135 and Rules adopted"