In [1]:
from pathlib import Path
import yaml
with open("config.yml", 'r') as ymlfile:
    cfg = yaml.safe_load(ymlfile)
path_fname = cfg['path_fname']
fname = cfg['fname']
output_dir = cfg['output_dir']

In [2]:
# Lod chunks from json file
import json
with open("Documents\\SORA_chunks_cleaned_manual.json", 'r', encoding='utf-8') as f:
    chunks = json.load(f)
print(f"Number of chunks: {len(chunks)}")

from PreProcessing.embeddingToolsFAISSv2 import EmbeddingToolFAISS
embedder = EmbeddingToolFAISS( output_dir=Path(output_dir), index_backend="faiss")
embeddings = embedder.load_index()

from RAG.ragv2 import RAG
rag_system = RAG(embedding_tool=embedder, chunks=chunks, default_mode="hybrid", reranker="colbert")

from LLM.LLM_openAI_Classification import LLMIndicatorAssistant, InitialOperationInput, IndicatorName
engine = LLMIndicatorAssistant(rag_system=rag_system)

Number of chunks: 120


  from .autonotebook import tqdm as notebook_tqdm


✅ Loaded FAISS index with 120 vectors from PreProcessing\ProcessedFiles\index\faiss.index
   • Loaded metadata for 120 documents from PreProcessing\ProcessedFiles\index\docs.json


In [3]:
# from typing import Dict

# op = InitialOperationInput(
#     maximum_takeoff_mass_category="lt_25kg",  # lt_25kg or gte_25kg
#     vlos_or_bvlos="VLOS", # Visual Line of Sight VLOS or Beyond Visual Line of Sight BVLOS
#     ground_environment="sparsely_populated", # "controlled_area", "sparsely_populated", "populated"
#     airspace_type="uncontrolled", # "controlled", "uncontrolled"
#     maximum_altitude_category="gt_50m_le_120m",  # "le_50m", "gt_50m_le_120m", "gt_120m_le_150m", "gt_150m"
# )

# indicators = [
#     "likely_regulatory_pathway",
#     "initial_ground_risk_orientation",
#     "initial_air_risk_orientation",
#     "expected_assessment_depth",
# ]

# results: Dict[str, dict] = {}

# for indicator in indicators:
#     out = engine.answer_indicator(
#         indicator=indicator,
#         op=op,
#         stream=False,
#         print_sources=False,
#     )
#     results[indicator] = out["result"]


In [4]:
# print("Indicators:")
# for k, v in results.items():
#     print(k, v)

In [5]:
# output_path = "initial_indicators.json"
# with open(output_path, "w", encoding="utf-8") as f:
#     json.dump(results, f, indent=2, ensure_ascii=False)

In [6]:
# =============================================================================
# Consistency & Accuracy Evaluation Pipeline
# =============================================================================

import json
from collections import Counter
from typing import List, Tuple
from difflib import SequenceMatcher
import numpy as np
from pathlib import Path

def text_similarity(text1: str, text2: str) -> float:
    """Calculate similarity ratio between two texts (0 to 1)."""
    if not text1 or not text2:
        return 0.0 if (text1 or text2) else 1.0
    return SequenceMatcher(None, text1.lower(), text2.lower()).ratio()

def get_operation_key(op) -> str:
    """Generate a unique key for an operation input."""
    if isinstance(op, dict):
        return f"{op['maximum_takeoff_mass_category']}_{op['vlos_or_bvlos']}_{op['ground_environment']}_{op['airspace_type']}_{op['maximum_altitude_category']}"
    else:
        return f"{op.maximum_takeoff_mass_category}_{op.vlos_or_bvlos}_{op.ground_environment}_{op.airspace_type}_{op.maximum_altitude_category}"

def get_operation_file_path(save_path: Path, op) -> Path:
    """Get the file path for an operation result."""
    op_key = get_operation_key(op)
    return save_path / f"op_result_{op_key}.json"

def run_consistency_evaluation(
    engine,
    operation_inputs: List[InitialOperationInput],
    indicators: List[str],
    num_runs: int = 3,
    verbose: bool = True,
    save_path: str = None,
    delay_between_calls: float = 1.0,
    skip_existing: bool = True
) -> dict:
    """
    Run the model multiple times for each input combination to check consistency.
    
    Args:
        engine: The LLM engine
        operation_inputs: List of operation inputs to test
        indicators: List of indicator names to evaluate
        num_runs: Number of runs per input combination
        verbose: Whether to print progress
        save_path: Path to save results after each operation (optional)
        delay_between_calls: Delay in seconds between API calls to avoid rate limits
        skip_existing: Whether to skip operations that already have saved results
    
    Returns:
        Dictionary with the last operation's results (all results saved to file).
    """
    import time
    
    # Setup save path
    if save_path:
        save_path = Path(save_path)
        save_path.mkdir(parents=True, exist_ok=True)
        
    for op_idx, op in enumerate(operation_inputs):
        # Check if result file already exists
        if save_path and skip_existing:
            op_file = get_operation_file_path(save_path, op)
            if op_file.exists():
                if verbose:
                    print(f"\n{'='*60}")
                    print(f"Operation Input {op_idx + 1}/{len(operation_inputs)} - SKIPPED (file exists)")
                    print(f"  Mass: {op.maximum_takeoff_mass_category}, VLOS: {op.vlos_or_bvlos}")
                    print(f"  Ground: {op.ground_environment}, Airspace: {op.airspace_type}")
                    print(f"  Altitude: {op.maximum_altitude_category}")
                    print(f"  File: {op_file}")
                    print(f"{'='*60}")
                continue
        
        if verbose:
            print(f"\n{'='*60}")
            print(f"Operation Input {op_idx + 1}/{len(operation_inputs)}")
            print(f"  Mass: {op.maximum_takeoff_mass_category}, VLOS: {op.vlos_or_bvlos}")
            print(f"  Ground: {op.ground_environment}, Airspace: {op.airspace_type}")
            print(f"  Altitude: {op.maximum_altitude_category}")
            print(f"{'='*60}")
        
        op_results = {
            "operation_input": {
                "maximum_takeoff_mass_category": op.maximum_takeoff_mass_category,
                "vlos_or_bvlos": op.vlos_or_bvlos,
                "ground_environment": op.ground_environment,
                "airspace_type": op.airspace_type,
                "maximum_altitude_category": op.maximum_altitude_category,
            },
            "runs": [],
            "consistency_metrics": {}
        }
        
        # Run multiple times
        for run_idx in range(num_runs):
            if verbose:
                print(f"  Run {run_idx + 1}/{num_runs}...", end=" ", flush=True)
            
            run_results = {}
            for indicator in indicators:
                out = engine.answer_indicator(
                    indicator=indicator,
                    op=op,
                    stream=False,
                    print_sources=False,
                )
                run_results[indicator] = {
                    "value": out["result"].get("value"),
                    "explanation": out["result"].get("explanation")
                }
                
                # Add delay between API calls to avoid rate limiting
                if delay_between_calls > 0:
                    time.sleep(delay_between_calls)
            
            op_results["runs"].append(run_results)
            if verbose:
                print("Done")
        
        # Calculate consistency metrics for this operation input
        for indicator in indicators:
            values = [run[indicator]["value"] for run in op_results["runs"]]
            explanations = [run[indicator]["explanation"] for run in op_results["runs"]]
            
            # Value consistency: percentage of runs with the most common value
            value_counts = Counter(values)
            most_common_value, most_common_count = value_counts.most_common(1)[0]
            value_consistency = most_common_count / num_runs
            
            # Explanation consistency: average pairwise similarity
            explanation_similarities = []
            for i in range(len(explanations)):
                for j in range(i + 1, len(explanations)):
                    explanation_similarities.append(text_similarity(explanations[i], explanations[j]))
            avg_explanation_similarity = np.mean(explanation_similarities) if explanation_similarities else 1.0
            
            op_results["consistency_metrics"][indicator] = {
                "value_consistency": value_consistency,
                "most_common_value": most_common_value,
                "unique_values": list(value_counts.keys()),
                "explanation_similarity": avg_explanation_similarity,
            }
            
            if verbose:
                print(f"  {indicator}:")
                print(f"    Value consistency: {value_consistency:.1%} (most common: '{most_common_value}')")
                print(f"    Explanation similarity: {avg_explanation_similarity:.1%}")
        
        # Save individual operation results after all runs are complete
        if save_path:
            op_file = get_operation_file_path(save_path, op)
            
            with open(op_file, 'w', encoding='utf-8') as f:
                json.dump(op_results, f, indent=2, ensure_ascii=False, default=str)
            
            if verbose:
                print(f"  Saved operation results to {op_file}")
        
        # Explicitly clean up run data to free memory
        del op_results



def load_operation_results(save_dir: str, verbose: bool = False) -> List[dict]:
    """
    Load all operation results from saved files in the directory.
    
    Args:
        save_dir: Directory containing saved operation result files
        verbose: Whether to print information about loaded files
    
    Returns:
        List of operation result dictionaries
    """
    save_dir = Path(save_dir)
    results = []
    
    # Find all op_result_*.json files in the directory
    op_files = sorted(save_dir.glob("op_result_*.json"))
    
    if verbose:
        print(f"Loading results from {save_dir}")
        print(f"Found {len(op_files)} operation result files")
    
    for op_file in op_files:
        with open(op_file, 'r', encoding='utf-8') as f:
            op_result = json.load(f)
            results.append(op_result)
            if verbose:
                op_key = get_operation_key(op_result["operation_input"])
                num_runs = len(op_result.get("runs", []))
                print(f"  Loaded: {op_file.name} ({num_runs} runs)")
    
    return results


def calculate_accuracy_vs_ground_truth(
    save_dir: str,
    ground_truth: dict,
    indicators: List[str],
    verbose: bool = True
) -> dict:
    """
    Compare model outputs with ground truth and calculate accuracy.
    
    Args:
        save_dir: Directory containing saved operation result files
        ground_truth: Dict with ground truth values per operation input key
        indicators: List of indicator names
        verbose: Whether to print progress and details
    
    Returns:
        Accuracy metrics dictionary
    """
    # Load all saved results from files
    results = load_operation_results(save_dir, verbose=verbose)
    
    if not results:
        print(f"Warning: No operation results found in {save_dir}")
        return {
            "per_indicator": {ind: {"correct": 0, "total": 0, "accuracy": 0.0} for ind in indicators},
            "per_operation": [],
            "overall": {"correct": 0, "total": 0, "accuracy": 0.0}
        }
    
    if verbose:
        print(f"\nCalculating accuracy for {len(results)} operations against ground truth...")
    
    accuracy_metrics = {
        "per_indicator": {ind: {"correct": 0, "total": 0} for ind in indicators},
        "per_operation": [],
        "overall": {"correct": 0, "total": 0}
    }
    
    for op_result in results:
        op_input = op_result["operation_input"]
        
        # Create a key to match with ground truth
        op_key = get_operation_key(op_input)
        
        if op_key not in ground_truth:
            if verbose:
                print(f"Warning: No ground truth found for operation key: {op_key}")
            continue
        
        gt = ground_truth[op_key]
        op_accuracy = {"operation_key": op_key, "indicators": {}}
        
        for indicator in indicators:
            # Use the most common value from the runs
            predicted_value = op_result["consistency_metrics"][indicator]["most_common_value"]
            gt_value = gt.get(indicator, {}).get("value")
            
            if gt_value is None:
                if verbose:
                    print(f"  Warning: No ground truth value for indicator '{indicator}' in operation '{op_key}'")
                continue
            
            is_correct = predicted_value.lower().strip() == gt_value.lower().strip()
            
            accuracy_metrics["per_indicator"][indicator]["total"] += 1
            accuracy_metrics["overall"]["total"] += 1
            
            if is_correct:
                accuracy_metrics["per_indicator"][indicator]["correct"] += 1
                accuracy_metrics["overall"]["correct"] += 1
            
            op_accuracy["indicators"][indicator] = {
                "predicted": predicted_value,
                "ground_truth": gt_value,
                "correct": is_correct
            }
        
        accuracy_metrics["per_operation"].append(op_accuracy)
    
    # Calculate percentages
    for indicator in indicators:
        stats = accuracy_metrics["per_indicator"][indicator]
        stats["accuracy"] = stats["correct"] / stats["total"] if stats["total"] > 0 else 0.0
    
    overall = accuracy_metrics["overall"]
    overall["accuracy"] = overall["correct"] / overall["total"] if overall["total"] > 0 else 0.0
    
    if verbose:
        print(f"Accuracy calculation complete. Evaluated {len(accuracy_metrics['per_operation'])} operations.")
    
    return accuracy_metrics


def print_summary(save_dir: str, accuracy_metrics: dict, indicators: List[str], verbose: bool = True):
    """
    Print a summary of all metrics.
    
    Args:
        save_dir: Directory containing saved operation result files
        accuracy_metrics: Accuracy metrics dictionary from calculate_accuracy_vs_ground_truth
        indicators: List of indicator names
        verbose: Whether to print detailed loading information
    
    Returns:
        Summary dictionary with consistency and accuracy metrics
    """
    # Load all saved results from files
    consistency_results = load_operation_results(save_dir, verbose=False)
    
    if not consistency_results:
        print(f"Warning: No operation results found in {save_dir}")
        return None
    
    print("\n" + "="*70)
    print("EVALUATION SUMMARY")
    print("="*70)
    print(f"\nLoaded {len(consistency_results)} operation results from {save_dir}")
    
    # Consistency Summary
    print("\n--- CONSISTENCY METRICS ---")
    avg_value_consistency = {}
    avg_explanation_similarity = {}
    
    for indicator in indicators:
        value_consistencies = []
        explanation_sims = []
        
        for r in consistency_results:
            if indicator in r.get("consistency_metrics", {}):
                value_consistencies.append(r["consistency_metrics"][indicator]["value_consistency"])
                explanation_sims.append(r["consistency_metrics"][indicator]["explanation_similarity"])
        
        if value_consistencies:
            avg_value_consistency[indicator] = np.mean(value_consistencies)
            avg_explanation_similarity[indicator] = np.mean(explanation_sims)
            
            print(f"\n{indicator}:")
            print(f"  Avg Value Consistency:      {avg_value_consistency[indicator]:.1%}")
            print(f"  Avg Explanation Similarity: {avg_explanation_similarity[indicator]:.1%}")
        else:
            print(f"\n{indicator}: No data available")
            avg_value_consistency[indicator] = 0.0
            avg_explanation_similarity[indicator] = 0.0
    
    overall_value_consistency = np.mean(list(avg_value_consistency.values())) if avg_value_consistency else 0.0
    overall_explanation_similarity = np.mean(list(avg_explanation_similarity.values())) if avg_explanation_similarity else 0.0
    
    print(f"\nOVERALL CONSISTENCY:")
    print(f"  Value Consistency:      {overall_value_consistency:.1%}")
    print(f"  Explanation Similarity: {overall_explanation_similarity:.1%}")
    
    # Accuracy Summary
    print("\n--- ACCURACY VS GROUND TRUTH ---")
    for indicator in indicators:
        stats = accuracy_metrics["per_indicator"].get(indicator, {"accuracy": 0, "correct": 0, "total": 0})
        print(f"\n{indicator}:")
        print(f"  Accuracy: {stats['accuracy']:.1%} ({stats['correct']}/{stats['total']})")
    
    overall = accuracy_metrics.get("overall", {"accuracy": 0, "correct": 0, "total": 0})
    print(f"\nOVERALL ACCURACY: {overall['accuracy']:.1%} ({overall['correct']}/{overall['total']})")
    
    return {
        "num_operations": len(consistency_results),
        "consistency": {
            "avg_value_consistency": overall_value_consistency,
            "avg_explanation_similarity": overall_explanation_similarity,
            "per_indicator_value_consistency": avg_value_consistency,
            "per_indicator_explanation_similarity": avg_explanation_similarity,
        },
        "accuracy": {
            "overall": overall["accuracy"],
            "per_indicator": {ind: accuracy_metrics["per_indicator"][ind]["accuracy"] for ind in indicators if ind in accuracy_metrics["per_indicator"]}
        }
    }

print("Consistency & Accuracy Pipeline loaded successfully!")

Consistency & Accuracy Pipeline loaded successfully!


In [7]:
# =============================================================================
# Define test operation inputs (different combinations)
# =============================================================================

test_operations = [
    InitialOperationInput(
        maximum_takeoff_mass_category="lt_25kg",
        vlos_or_bvlos="VLOS",
        ground_environment="sparsely_populated",
        airspace_type="uncontrolled",
        maximum_altitude_category="gt_50m_le_120m",
    ),
    InitialOperationInput(
        maximum_takeoff_mass_category="lt_25kg",
        vlos_or_bvlos="BVLOS",
        ground_environment="populated",
        airspace_type="controlled",
        maximum_altitude_category="gt_120m_le_150m",
    ),
    InitialOperationInput(
        maximum_takeoff_mass_category="gte_25kg",
        vlos_or_bvlos="BVLOS",
        ground_environment="controlled_area",
        airspace_type="uncontrolled",
        maximum_altitude_category="le_50m",
    ),
]

# List of indicators to evaluate
eval_indicators = [
    "likely_regulatory_pathway",
    "initial_ground_risk_orientation",
    "initial_air_risk_orientation",
    "expected_assessment_depth",
]

print(f"Testing {len(test_operations)} operation inputs with {len(eval_indicators)} indicators each")

Testing 3 operation inputs with 4 indicators each


In [8]:
# =============================================================================
# Run Consistency Evaluation (multiple runs per input)
# =============================================================================

NUM_RUNS = 10  # Number of times to run each input combination
SAVE_DIR = "Results\\ClassificationTask"
DELAY_BETWEEN_CALLS = 1.0  # Seconds between API calls to avoid rate limits
SKIP_EXISTING = True  # Skip operations that already have saved results

run_consistency_evaluation(
    engine=engine,
    operation_inputs=test_operations,
    indicators=eval_indicators,
    num_runs=NUM_RUNS,
    verbose=True,
    save_path=SAVE_DIR,
    delay_between_calls=DELAY_BETWEEN_CALLS,
    skip_existing=SKIP_EXISTING
)

print(f"\nConsistency evaluation complete!")
print(f"Individual operation results saved to {SAVE_DIR}")


Operation Input 1/3 - SKIPPED (file exists)
  Mass: lt_25kg, VLOS: VLOS
  Ground: sparsely_populated, Airspace: uncontrolled
  Altitude: gt_50m_le_120m
  File: Results\ClassificationTask\op_result_lt_25kg_VLOS_sparsely_populated_uncontrolled_gt_50m_le_120m.json

Operation Input 2/3 - SKIPPED (file exists)
  Mass: lt_25kg, VLOS: BVLOS
  Ground: populated, Airspace: controlled
  Altitude: gt_120m_le_150m
  File: Results\ClassificationTask\op_result_lt_25kg_BVLOS_populated_controlled_gt_120m_le_150m.json

Operation Input 3/3 - SKIPPED (file exists)
  Mass: gte_25kg, VLOS: BVLOS
  Ground: controlled_area, Airspace: uncontrolled
  Altitude: le_50m
  File: Results\ClassificationTask\op_result_gte_25kg_BVLOS_controlled_area_uncontrolled_le_50m.json

Consistency evaluation complete!
Individual operation results saved to Results\ClassificationTask


In [18]:
# =============================================================================
# Load Ground Truth and Calculate Accuracy
# =============================================================================

# Load ground truth from file

with open("Results\\ClassificationTask\\gt.json", "r", encoding="utf-8") as f:
    ground_truth = json.load(f)
print(f"Loaded ground truth with {len(ground_truth)} operation inputs")

# Calculate accuracy using saved results
accuracy_metrics = calculate_accuracy_vs_ground_truth(
    save_dir=SAVE_DIR,
    ground_truth=ground_truth,
    indicators=eval_indicators
)

Loaded ground truth with 3 operation inputs
Loading results from Results\ClassificationTask
Found 3 operation result files
  Loaded: op_result_gte_25kg_BVLOS_controlled_area_uncontrolled_le_50m.json (10 runs)
  Loaded: op_result_lt_25kg_BVLOS_populated_controlled_gt_120m_le_150m.json (10 runs)
  Loaded: op_result_lt_25kg_VLOS_sparsely_populated_uncontrolled_gt_50m_le_120m.json (10 runs)

Calculating accuracy for 3 operations against ground truth...
Accuracy calculation complete. Evaluated 3 operations.


In [19]:
# =============================================================================
# Print Summary of All Metrics
# =============================================================================

summary = print_summary(SAVE_DIR, accuracy_metrics, eval_indicators)


EVALUATION SUMMARY

Loaded 3 operation results from Results\ClassificationTask

--- CONSISTENCY METRICS ---

likely_regulatory_pathway:
  Avg Value Consistency:      83.3%
  Avg Explanation Similarity: 59.3%

initial_ground_risk_orientation:
  Avg Value Consistency:      86.7%
  Avg Explanation Similarity: 43.9%

initial_air_risk_orientation:
  Avg Value Consistency:      96.7%
  Avg Explanation Similarity: 60.2%

expected_assessment_depth:
  Avg Value Consistency:      100.0%
  Avg Explanation Similarity: 66.9%

OVERALL CONSISTENCY:
  Value Consistency:      91.7%
  Explanation Similarity: 57.6%

--- ACCURACY VS GROUND TRUTH ---

likely_regulatory_pathway:
  Accuracy: 0.0% (0/3)

initial_ground_risk_orientation:
  Accuracy: 33.3% (1/3)

initial_air_risk_orientation:
  Accuracy: 33.3% (1/3)

expected_assessment_depth:
  Accuracy: 100.0% (3/3)

OVERALL ACCURACY: 41.7% (5/12)


In [20]:
# =============================================================================
# Save Detailed Results
# =============================================================================

# Load consistency results from saved files for final output
consistency_results_all = load_operation_results(SAVE_DIR)

# Save all results to file for further analysis
evaluation_output = {
    "num_runs": NUM_RUNS,
    "num_operations": len(test_operations),
    "indicators": eval_indicators,
    "consistency_results": consistency_results_all,
    "accuracy_metrics": accuracy_metrics,
    "summary": summary
}

output_path = "Results\\ClassificationTask\\evaluation_results.json"
with open(output_path, "w", encoding="utf-8") as f:
    json.dump(evaluation_output, f, indent=2, ensure_ascii=False, default=str)

print(f"Detailed results saved to {output_path}")

Detailed results saved to Results\ClassificationTask\evaluation_results.json
