In [None]:
import os

from arize.otel import register
from dotenv import load_dotenv
from openinference.instrumentation.langchain import LangChainInstrumentor

from src.python.generate_queries import generate_questions_from_files
from src.python.RagRetriever import RAGRetriever


load_dotenv()


tracer_provider = register(
    space_id = os.getenv("ARIZE_SPACE_ID"),
    api_key =  os.getenv("ARIZE_API_KEY"),
    project_name = "rag-app-notebook",
)
LangChainInstrumentor().instrument(tracer_provider=tracer_provider)


rag_retriever = RAGRetriever(
    collection_name="chunk_size_500",
)


In [None]:
pdf_dir = "./data/arxiv_papers"
questions = generate_questions_from_files(pdf_dir, max_files=10)

In [None]:
import time
import statistics
import concurrent.futures
import numpy as np
from typing import List, Dict, Any
from tqdm import tqdm

def process_query(rag_retriever, query: str, similarity_top_k: int = 20, rerank_top_n: int = 3) -> Dict[str, Any]:
    """Process a single query and record timing information."""
    start_time = time.time()
    
    results = rag_retriever.similarity_search(query, similarity_top_k=similarity_top_k, rerank_top_n=rerank_top_n)
    
    end_time = time.time()
    time_taken = end_time - start_time
    
    return {
        "query": query,
        "time": time_taken,
        "results": results
    }

def run_concurrent_queries(rag_retriever, queries: List[str], max_workers: int = 4, 
                         similarity_top_k: int = 20, rerank_top_n: int = 3) -> List[Dict[str, Any]]:
    """Run queries concurrently with a progress bar and return timing results."""
    all_results = []
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [
            executor.submit(
                process_query, 
                rag_retriever, 
                query, 
                similarity_top_k, 
                rerank_top_n
            ) for query in queries
        ]
        
        for future in tqdm(
            concurrent.futures.as_completed(futures),
            total=len(futures),
            desc="Processing queries",
            unit="query"
        ):
            try:
                result = future.result()
                all_results.append(result)
            except Exception as e:
                # Just log the error without interrupting the progress bar
                all_results.append({"time": None, "error": str(e)})
    
    return all_results

def calculate_percentiles(times: List[float]) -> Dict[str, float]:
    """Calculate various percentiles of the timing data."""
    percentiles = {
        "p50": np.percentile(times, 50),
        "p75": np.percentile(times, 75),
        "p90": np.percentile(times, 90),
        "p95": np.percentile(times, 95),
        "p99": np.percentile(times, 99),
    }
    return percentiles

max_workers = 4

results = run_concurrent_queries(
    rag_retriever, 
    questions, 
    max_workers=max_workers,
    similarity_top_k=20, 
    rerank_top_n=3
)

query_times = [result["time"] for result in results if result["time"] is not None]



In [None]:
if query_times:
    avg_time = statistics.mean(query_times)
    min_time = min(query_times)
    max_time = max(query_times)
    
    percentiles = calculate_percentiles(query_times)
    
    print("\nQuery Latency Statistics:")
    print("-" * 40)
    print(f"Average: {avg_time:.4f}s")
    print(f"Minimum: {min_time:.4f}s")
    print(f"Maximum: {max_time:.4f}s")
    
    print("\nPercentile Latencies:")
    print(f"p50 (median): {percentiles['p50']:.4f}s")
    print(f"p75: {percentiles['p75']:.4f}s")
    print(f"p90: {percentiles['p90']:.4f}s")
    print(f"p95: {percentiles['p95']:.4f}s")
    print(f"p99: {percentiles['p99']:.4f}s")
    
    if len(query_times) > 1:
        stddev = statistics.stdev(query_times)
        print(f"\nStandard deviation: {stddev:.4f}s")
    
    print(f"\nTotal queries: {len(query_times)}")
    print(f"Failed queries: {len(results) - len(query_times)}")
    
    total_sequential_time = sum(query_times)
    elapsed_wall_time = max(result["time"] for result in results if result["time"] is not None)
    
    print(f"\nTotal sequential processing time: {total_sequential_time:.4f}s")
    print(f"Wall clock time with concurrency: {elapsed_wall_time:.4f}s")
    print(f"Speedup from concurrency: {total_sequential_time / elapsed_wall_time:.2f}x")


Query Latency Statistics:
----------------------------------------
Average: 0.5096s
Minimum: 0.1914s
Maximum: 6.8483s

Percentile Latencies:
p50 (median): 0.3454s
p75: 0.4359s
p90: 0.6828s
p95: 1.1492s
p99: 4.9238s

Standard deviation: 0.7121s

Total queries: 522
Failed queries: 0

Total sequential processing time: 266.0272s
Wall clock time with concurrency: 6.8483s
Speedup from concurrency: 38.85x


In [None]:
import time
import statistics
import concurrent.futures
import numpy as np
from typing import List, Dict, Any
from tqdm import tqdm

def process_query(rag_retriever, query: str, similarity_top_k: int = 20, rerank_top_n: int = 3) -> Dict[str, Any]:
    """Process a single query and record timing information."""
    start_time = time.time()
    
    results = rag_retriever.get_relevant_documents(query)
    
    end_time = time.time()
    time_taken = end_time - start_time
    
    return {
        "query": query,
        "time": time_taken,
        "results": results
    }

def run_concurrent_queries(rag_retriever, queries: List[str], max_workers: int = 4, 
                         similarity_top_k: int = 20, rerank_top_n: int = 3) -> List[Dict[str, Any]]:
    """Run queries concurrently with a progress bar and return timing results."""
    all_results = []
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [
            executor.submit(
                process_query, 
                rag_retriever, 
                query, 
                similarity_top_k, 
                rerank_top_n
            ) for query in queries
        ]
        
        for future in tqdm(
            concurrent.futures.as_completed(futures),
            total=len(futures),
            desc="Processing queries",
            unit="query"
        ):
            try:
                result = future.result()
                all_results.append(result)
            except Exception as e:
                # Just log the error without interrupting the progress bar
                all_results.append({"time": None, "error": str(e)})
    
    return all_results

def calculate_percentiles(times: List[float]) -> Dict[str, float]:
    """Calculate various percentiles of the timing data."""
    percentiles = {
        "p50": np.percentile(times, 50),
        "p75": np.percentile(times, 75),
        "p90": np.percentile(times, 90),
        "p95": np.percentile(times, 95),
        "p99": np.percentile(times, 99),
    }
    return percentiles

max_workers = 4

results = run_concurrent_queries(
    rag_retriever, 
    questions, 
    max_workers=max_workers,
    similarity_top_k=20, 
    rerank_top_n=3
)

query_times = [result["time"] for result in results if result["time"] is not None]


  return self.base_retriever.get_relevant_documents(query)
Processing queries: 100%|██████████| 1000/1000 [02:01<00:00,  8.23query/s]


Query Latency Statistics:
----------------------------------------
Average: 0.4795s
Minimum: 0.1698s
Maximum: 8.3467s

Percentile Latencies:
p50 (median): 0.3598s
p75: 0.4461s
p90: 0.6161s
p95: 0.8421s
p99: 3.6308s

Standard deviation: 0.6205s

Total queries: 1000
Failed queries: 0

Total sequential processing time: 479.5331s
Wall clock time with concurrency: 8.3467s
Speedup from concurrency: 57.45x





In [None]:
if query_times:
    avg_time = statistics.mean(query_times)
    min_time = min(query_times)
    max_time = max(query_times)
    
    percentiles = calculate_percentiles(query_times)
    
    print("\nQuery Latency Statistics:")
    print("-" * 40)
    print(f"Average: {avg_time:.4f}s")
    print(f"Minimum: {min_time:.4f}s")
    print(f"Maximum: {max_time:.4f}s")
    
    print("\nPercentile Latencies:")
    print(f"p50 (median): {percentiles['p50']:.4f}s")
    print(f"p75: {percentiles['p75']:.4f}s")
    print(f"p90: {percentiles['p90']:.4f}s")
    print(f"p95: {percentiles['p95']:.4f}s")
    print(f"p99: {percentiles['p99']:.4f}s")
    
    if len(query_times) > 1:
        stddev = statistics.stdev(query_times)
        print(f"\nStandard deviation: {stddev:.4f}s")
    
    print(f"\nTotal queries: {len(query_times)}")
    print(f"Failed queries: {len(results) - len(query_times)}")
    
    total_sequential_time = sum(query_times)
    elapsed_wall_time = max(result["time"] for result in results if result["time"] is not None)
    
    print(f"\nTotal sequential processing time: {total_sequential_time:.4f}s")
    print(f"Wall clock time with concurrency: {elapsed_wall_time:.4f}s")
    print(f"Speedup from concurrency: {total_sequential_time / elapsed_wall_time:.2f}x")