In [None]:
import sys
import os
import asyncio
import time
import json
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any

# Add src to path
sys.path.insert(0, str(Path.cwd().parent / 'src'))

# Import RAG system components
from husqbot.core.rag_system import HusqvarnaRAGSystem
from husqbot.storage.bigquery_client import BigQueryClient
from husqbot.data.document_processor import DocumentProcessor
from husqbot.models.embedding_models import EmbeddingModel
from husqbot.models.generation_models import GenerationModel

# Configuration
PROJECT_ID = "your-project-id"  # Replace with your project ID
LOCATION = "us-central1"

print("✅ Testing environment initialized!")
print(f"🔧 Project: {PROJECT_ID}")
print(f"📍 Location: {LOCATION}")


In [None]:
# Initialize the RAG system
rag_system = HusqvarnaRAGSystem(PROJECT_ID, LOCATION)

print("🏗️ RAG System initialized successfully!")

# Test system status
async def check_system_status():
    """Check if the system is ready for testing."""
    try:
        stats = await rag_system.get_system_stats()
        print("📊 System Status:")
        print(f"  Total chunks: {stats.get('total_chunks', 'N/A')}")
        print(f"  System status: {stats.get('system_status', 'N/A')}")
        print(f"  Cache hit rate: {stats.get('cache_hit_rate', 0):.2%}")
        return True
    except Exception as e:
        print(f"❌ System status check failed: {e}")
        return False

# Check if system is ready
system_ready = await check_system_status()


In [None]:
# Comprehensive test queries organized by category
test_queries = {
    "maintenance": [
        "How do I check the engine oil level?",
        "What is the recommended tire pressure for road riding?",
        "How often should I change the oil filter?",
        "How do I adjust the chain tension?",
        "What are the service intervals for this motorcycle?",
        "How do I check brake fluid levels?",
        "When should I replace the air filter?"
    ],
    "troubleshooting": [
        "Engine won't start when I press the start button",
        "What should I do if the engine overheats?",
        "Motorcycle is running rough at idle",
        "Brakes feel spongy, what's wrong?",
        "Battery keeps dying, how to diagnose?",
        "Engine turns but won't start",
        "What causes engine vibration at high RPM?"
    ],
    "safety": [
        "What safety precautions should I take when riding?",
        "How dangerous is it to ride with low tire pressure?",
        "What should I do if I smell fuel?",
        "Is it safe to ride with worn brake pads?",
        "What are the risks of operating without proper gear?",
        "How to safely work on the electrical system?"
    ],
    "specifications": [
        "What is the fuel tank capacity?",
        "What are the valve clearance specifications?",
        "What type of engine oil should I use?",
        "What is the maximum weight capacity?",
        "What tire sizes are recommended?",
        "What is the compression ratio?"
    ],
    "procedures": [
        "How do I start the motorcycle properly?",
        "What is the procedure for changing brake pads?",
        "How do I adjust the suspension settings?",
        "What is the proper way to wash the motorcycle?",
        "How to change riding modes?",
        "How to adjust mirror positions?"
    ],
    "complex": [
        "How do I diagnose electrical problems in the lighting system?",
        "What is the complete procedure for valve adjustment?",
        "How to troubleshoot fuel injection issues?",
        "What causes poor fuel economy and how to fix it?",
        "How to set up suspension for different riding conditions?"
    ]
}

print("🎯 Test query categories defined:")
for category, queries in test_queries.items():
    print(f"  📂 {category.title()}: {len(queries)} queries")

total_queries = sum(len(queries) for queries in test_queries.values())
print(f"\n📊 Total test queries: {total_queries}")


In [None]:
async def test_single_query(query: str, skill_level: str = "intermediate", verbose: bool = True):
    """Test a single query with detailed analysis."""
    
    if verbose:
        print(f"🔍 Testing Query: {query}")
        print(f"👤 Skill Level: {skill_level}")
        print("-" * 50)
    
    start_time = time.time()
    
    try:
        result = await rag_system.query_system(
            query=query,
            user_skill_level=skill_level
        )
        
        processing_time = time.time() - start_time
        
        if verbose:
            print(f"✅ Success!")
            print(f"📖 Answer: {result.answer[:200]}...")
            print(f"🎯 Confidence: {result.confidence:.3f}")
            print(f"⚠️ Safety Level: {result.safety_level}")
            print(f"⏱️ Processing Time: {result.processing_time:.2f}s")
            print(f"📚 Sources Used: {len(result.sources)}")
            print(f"🔧 Intent: {result.metadata.get('intent', 'N/A')}")
            
            # Show top sources
            print("\n📋 Top Sources:")
            for i, source in enumerate(result.sources[:3], 1):
                section = source.get('section', 'Unknown')
                page = source.get('page_number', 'N/A')
                chunk_type = source.get('chunk_type', 'N/A')
                print(f"  {i}. {section} (Page {page}, Type: {chunk_type})")
        
        return {
            "query": query,
            "success": True,
            "confidence": result.confidence,
            "safety_level": result.safety_level,
            "processing_time": result.processing_time,
            "sources_count": len(result.sources),
            "answer_length": len(result.answer),
            "intent": result.metadata.get('intent', 'unknown'),
            "answer": result.answer
        }
        
    except Exception as e:
        if verbose:
            print(f"❌ Error: {e}")
        
        return {
            "query": query,
            "success": False,
            "error": str(e),
            "processing_time": time.time() - start_time
        }

# Test a sample query
sample_result = await test_single_query("How do I check the engine oil level?")


In [None]:
async def test_category(category: str, queries: List[str], skill_level: str = "intermediate"):
    """Test all queries in a specific category."""
    
    print(f"🧪 Testing Category: {category.title()}")
    print(f"📊 Queries to test: {len(queries)}")
    print("=" * 60)
    
    results = []
    start_time = time.time()
    
    for i, query in enumerate(queries, 1):
        print(f"\n🔍 Query {i}/{len(queries)}: {query[:50]}...")
        
        result = await test_single_query(query, skill_level, verbose=False)
        results.append(result)
        
        # Quick status
        if result["success"]:
            print(f"  ✅ Success - Confidence: {result['confidence']:.2f}, Time: {result['processing_time']:.1f}s")
        else:
            print(f"  ❌ Failed - {result.get('error', 'Unknown error')[:50]}...")
    
    total_time = time.time() - start_time
    
    # Calculate statistics
    successful = [r for r in results if r["success"]]
    success_rate = len(successful) / len(results) if results else 0
    
    print(f"\n📊 Category Summary: {category.title()}")
    print("-" * 40)
    print(f"✅ Success Rate: {success_rate:.1%} ({len(successful)}/{len(results)})")
    
    if successful:
        avg_confidence = sum(r["confidence"] for r in successful) / len(successful)
        avg_processing_time = sum(r["processing_time"] for r in successful) / len(successful)
        avg_sources = sum(r["sources_count"] for r in successful) / len(successful)
        avg_answer_length = sum(r["answer_length"] for r in successful) / len(successful)
        
        print(f"🎯 Avg Confidence: {avg_confidence:.3f}")
        print(f"⏱️ Avg Processing Time: {avg_processing_time:.2f}s")
        print(f"📚 Avg Sources Used: {avg_sources:.1f}")
        print(f"📝 Avg Answer Length: {avg_answer_length:.0f} chars")
        
        # Safety analysis
        safety_levels = [r["safety_level"] for r in successful]
        if safety_levels:
            avg_safety = sum(safety_levels) / len(safety_levels)
            print(f"⚠️ Avg Safety Level: {avg_safety:.1f}")
    
    print(f"🕒 Total Category Time: {total_time:.1f}s")
    
    return {
        "category": category,
        "results": results,
        "success_rate": success_rate,
        "total_time": total_time,
        "statistics": {
            "avg_confidence": avg_confidence if successful else 0,
            "avg_processing_time": avg_processing_time if successful else 0,
            "avg_sources": avg_sources if successful else 0,
            "avg_answer_length": avg_answer_length if successful else 0
        }
    }

# Test a specific category (choose one to start)
category_to_test = "maintenance"  # Change this to test different categories

if category_to_test in test_queries:
    category_results = await test_category(category_to_test, test_queries[category_to_test])
else:
    print(f"❌ Category '{category_to_test}' not found. Available: {list(test_queries.keys())}")


In [None]:
async def performance_test():
    """Comprehensive performance testing."""
    
    print("⚡ Starting Performance Testing")
    print("=" * 50)
    
    # Test different query complexities
    performance_queries = [
        ("Simple", "What is tire pressure?"),
        ("Medium", "How do I check the engine oil level and what should I look for?"),
        ("Complex", "What should I do if my engine won't start and I've already checked the battery and fuses?"),
        ("Safety", "What are the dangers of riding with low tire pressure and how can I prevent accidents?"),
        ("Technical", "What are the complete valve clearance specifications and adjustment procedures?")
    ]
    
    # Test different skill levels
    skill_levels = ["beginner", "intermediate", "expert"]
    
    # Test different chunk counts
    chunk_counts = [3, 5, 7]
    
    print("🔍 Testing Query Complexity Impact")
    print("-" * 30)
    
    complexity_results = {}
    
    for complexity, query in performance_queries:
        print(f"\n📊 Testing {complexity} Query: {query[:40]}...")
        
        times = []
        confidences = []
        
        # Run multiple times for average
        for run in range(3):
            result = await test_single_query(query, verbose=False)
            if result["success"]:
                times.append(result["processing_time"])
                confidences.append(result["confidence"])
        
        if times:
            avg_time = sum(times) / len(times)
            avg_confidence = sum(confidences) / len(confidences)
            
            complexity_results[complexity] = {
                "avg_time": avg_time,
                "avg_confidence": avg_confidence,
                "query": query
            }
            
            print(f"  ⏱️ Avg Time: {avg_time:.2f}s")
            print(f"  🎯 Avg Confidence: {avg_confidence:.3f}")
        else:
            print(f"  ❌ All runs failed")
    
    print(f"\n🎓 Testing Skill Level Impact")
    print("-" * 30)
    
    skill_results = {}
    test_query = "How do I check the engine oil level?"
    
    for skill in skill_levels:
        print(f"\n👤 Testing skill level: {skill}")
        
        result = await test_single_query(test_query, skill_level=skill, verbose=False)
        
        if result["success"]:
            skill_results[skill] = {
                "processing_time": result["processing_time"],
                "confidence": result["confidence"],
                "answer_length": result["answer_length"]
            }
            
            print(f"  ⏱️ Time: {result['processing_time']:.2f}s")
            print(f"  📝 Answer Length: {result['answer_length']} chars")
    
    print(f"\n📊 Testing Chunk Count Impact")
    print("-" * 30)
    
    chunk_results = {}
    
    for chunk_count in chunk_counts:
        print(f"\n📚 Testing with {chunk_count} chunks")
        
        try:
            result = await rag_system.query_system(
                query="How do I start the motorcycle?",
                max_chunks=chunk_count
            )
            
            chunk_results[chunk_count] = {
                "processing_time": result.processing_time,
                "confidence": result.confidence,
                "sources_used": len(result.sources)
            }
            
            print(f"  ⏱️ Time: {result.processing_time:.2f}s")
            print(f"  🎯 Confidence: {result.confidence:.3f}")
            print(f"  📚 Sources: {len(result.sources)}")
            
        except Exception as e:
            print(f"  ❌ Error: {e}")
    
    # Summary
    print(f"\n📈 Performance Summary")
    print("=" * 30)
    
    if complexity_results:
        fastest = min(complexity_results.items(), key=lambda x: x[1]["avg_time"])
        slowest = max(complexity_results.items(), key=lambda x: x[1]["avg_time"])
        
        print(f"🏃 Fastest Query Type: {fastest[0]} ({fastest[1]['avg_time']:.2f}s)")
        print(f"🐌 Slowest Query Type: {slowest[0]} ({slowest[1]['avg_time']:.2f}s)")
    
    return {
        "complexity_results": complexity_results,
        "skill_results": skill_results,
        "chunk_results": chunk_results
    }

# Run performance test
perf_results = await performance_test()


In [None]:
async def comprehensive_analysis():
    """Run comprehensive analysis across all categories."""
    
    print("📊 Starting Comprehensive Analysis")
    print("=" * 60)
    
    all_results = {}
    overall_start = time.time()
    
    # Test all categories
    for category, queries in test_queries.items():
        print(f"\n🔄 Processing {category.title()} Category...")
        category_result = await test_category(category, queries[:3])  # Limit to 3 queries per category for demo
        all_results[category] = category_result
    
    total_analysis_time = time.time() - overall_start
    
    # Overall statistics
    print(f"\n📈 Overall System Analysis")
    print("=" * 40)
    
    total_queries = sum(len(result["results"]) for result in all_results.values())
    total_successful = sum(len([r for r in result["results"] if r["success"]]) for result in all_results.values())
    overall_success_rate = total_successful / total_queries if total_queries > 0 else 0
    
    print(f"📊 Total Queries Tested: {total_queries}")
    print(f"✅ Overall Success Rate: {overall_success_rate:.1%}")
    print(f"🕒 Total Analysis Time: {total_analysis_time:.1f}s")
    
    # Category comparison
    print(f"\n📋 Category Performance Comparison")
    print("-" * 50)
    
    category_stats = []
    for category, result in all_results.items():
        stats = {
            "category": category,
            "success_rate": result["success_rate"],
            "avg_confidence": result["statistics"]["avg_confidence"],
            "avg_time": result["statistics"]["avg_processing_time"]
        }
        category_stats.append(stats)
        
        print(f"{category.title():15} | Success: {result['success_rate']:5.1%} | "
              f"Confidence: {result['statistics']['avg_confidence']:5.3f} | "
              f"Time: {result['statistics']['avg_processing_time']:5.2f}s")
    
    # Best and worst performing categories
    if category_stats:
        best_success = max(category_stats, key=lambda x: x["success_rate"])
        worst_success = min(category_stats, key=lambda x: x["success_rate"])
        
        best_confidence = max(category_stats, key=lambda x: x["avg_confidence"])
        fastest_category = min(category_stats, key=lambda x: x["avg_time"])
        
        print(f"\n🏆 Performance Highlights")
        print("-" * 30)
        print(f"🥇 Best Success Rate: {best_success['category'].title()} ({best_success['success_rate']:.1%})")
        print(f"🥉 Worst Success Rate: {worst_success['category'].title()} ({worst_success['success_rate']:.1%})")
        print(f"🎯 Highest Confidence: {best_confidence['category'].title()} ({best_confidence['avg_confidence']:.3f})")
        print(f"⚡ Fastest Category: {fastest_category['category'].title()} ({fastest_category['avg_time']:.2f}s)")
    
    # Intent analysis
    print(f"\n🔧 Intent Detection Analysis")
    print("-" * 30)
    
    intent_counts = {}
    for result in all_results.values():
        for query_result in result["results"]:
            if query_result["success"]:
                intent = query_result.get("intent", "unknown")
                intent_counts[intent] = intent_counts.get(intent, 0) + 1
    
    for intent, count in sorted(intent_counts.items(), key=lambda x: x[1], reverse=True):
        print(f"  {intent}: {count} queries")
    
    # Safety analysis
    print(f"\n⚠️ Safety Analysis")
    print("-" * 20)
    
    safety_levels = []
    for result in all_results.values():
        for query_result in result["results"]:
            if query_result["success"]:
                safety_levels.append(query_result["safety_level"])
    
    if safety_levels:
        avg_safety = sum(safety_levels) / len(safety_levels)
        high_safety_count = len([s for s in safety_levels if s >= 2])
        
        print(f"  Average Safety Level: {avg_safety:.2f}")
        print(f"  High Safety Queries: {high_safety_count}/{len(safety_levels)} ({high_safety_count/len(safety_levels):.1%})")
    
    return {
        "all_results": all_results,
        "overall_stats": {
            "total_queries": total_queries,
            "total_successful": total_successful,
            "success_rate": overall_success_rate,
            "analysis_time": total_analysis_time
        },
        "category_stats": category_stats,
        "intent_analysis": intent_counts,
        "safety_analysis": {
            "avg_safety_level": avg_safety if safety_levels else 0,
            "high_safety_count": high_safety_count if safety_levels else 0
        }
    }

# Run comprehensive analysis (comment out if testing individual categories)
print("⚠️ Comprehensive analysis will test multiple categories")
print("💡 This may take several minutes and will consume API quota")
print("🔄 Uncomment the line below to run full analysis")

# comprehensive_results = await comprehensive_analysis()


In [None]:
def debug_query(query: str, expected_answer_contains: str = None):
    """Debug a specific query with detailed analysis."""
    
    print(f"🛠️ Debugging Query: {query}")
    print("=" * 60)
    
    async def debug_analysis():
        try:
            # Test the query
            result = await rag_system.query_system(query=query)
            
            print(f"✅ Query Status: SUCCESS")
            print(f"📖 Answer: {result.answer}")
            print(f"🎯 Confidence: {result.confidence:.3f}")
            print(f"⚠️ Safety Level: {result.safety_level}")
            print(f"⏱️ Processing Time: {result.processing_time:.2f}s")
            print(f"🔧 Intent: {result.metadata.get('intent', 'N/A')}")
            
            print(f"\n📚 Retrieved Sources ({len(result.sources)}):")
            for i, source in enumerate(result.sources, 1):
                print(f"  {i}. Section: {source.get('section', 'Unknown')}")
                print(f"     Subsection: {source.get('subsection', 'N/A')}")
                print(f"     Page: {source.get('page_number', 'N/A')}")
                print(f"     Type: {source.get('chunk_type', 'N/A')}")
                print(f"     Content: {source.get('content', '')[:100]}...")
                print()
            
            # Check if expected content is present
            if expected_answer_contains:
                if expected_answer_contains.lower() in result.answer.lower():
                    print(f"✅ Expected content '{expected_answer_contains}' found in answer")
                else:
                    print(f"❌ Expected content '{expected_answer_contains}' NOT found in answer")
            
            # Analyze potential issues
            issues = []
            
            if result.confidence < 0.5:
                issues.append("Low confidence score - may indicate poor chunk retrieval")
            
            if len(result.sources) < 2:
                issues.append("Few sources retrieved - may need better query or more chunks")
            
            if result.processing_time > 5:
                issues.append("Slow processing time - potential performance issue")
            
            if len(result.answer) < 50:
                issues.append("Very short answer - may indicate insufficient context")
            
            if issues:
                print(f"⚠️ Potential Issues Detected:")
                for issue in issues:
                    print(f"  • {issue}")
            else:
                print(f"✅ No obvious issues detected")
            
            return result
            
        except Exception as e:
            print(f"❌ Query Status: FAILED")
            print(f"🐛 Error: {e}")
            print(f"📊 Error Type: {type(e).__name__}")
            
            # Common troubleshooting suggestions
            print(f"\n🔧 Troubleshooting Suggestions:")
            print(f"  • Check if BigQuery dataset and table exist")
            print(f"  • Verify embeddings are populated in BigQuery")
            print(f"  • Confirm Vertex AI authentication and quotas")
            print(f"  • Check network connectivity")
            
            return None
    
    return debug_analysis()

def system_diagnostics():
    """Run comprehensive system diagnostics."""
    
    print("🏥 System Diagnostics")
    print("=" * 30)
    
    # Check components
    components = {
        "BigQuery Client": lambda: BigQueryClient(PROJECT_ID, LOCATION),
        "Document Processor": lambda: DocumentProcessor(),
        "Embedding Model": lambda: EmbeddingModel(),
        "Generation Model": lambda: GenerationModel()
    }
    
    for component_name, component_factory in components.items():
        try:
            component = component_factory()
            print(f"✅ {component_name}: OK")
        except Exception as e:
            print(f"❌ {component_name}: FAILED - {e}")
    
    # Check environment
    print(f"\n🌍 Environment Check:")
    print(f"  📋 PROJECT_ID: {PROJECT_ID}")
    print(f"  📍 LOCATION: {LOCATION}")
    print(f"  🐍 Python Path: {sys.path[0]}")
    
    # Check Google Cloud authentication
    try:
        import google.auth
        credentials, project = google.auth.default()
        print(f"  🔐 GCP Auth: OK (Project: {project})")
    except Exception as e:
        print(f"  🔐 GCP Auth: FAILED - {e}")

# Run system diagnostics
system_diagnostics()

# Example debug session
print(f"\n🔍 Example Debug Session:")
print("Uncomment the line below to debug a specific query")

# debug_result = await debug_query("How do I check the engine oil level?", "oil level viewer")
