In [2]:
# Add current directory to path for proper imports
import sys
import os


# Direct imports from local files, not from the installed package
try:
    from .tools_function import *
except ImportError:
    from tools_function import *
try:
    from .main_function_code import *
except ImportError:
    from main_function_code import *
try:
    from .Uncertainty_quantification import *
except ImportError:
    from Uncertainty_quantification import *
try:
    from .subclustering import *
except ImportError:
    from subclustering import *
import pandas as pd
import numpy as np
import argparse
import re

# Import the new unified modules for annotation boost
try:
    from annotation_boost import (
        iterative_marker_analysis,
        runCASSIA_annotationboost,
        runCASSIA_annotationboost_additional_task
    )
    from llm_utils import call_llm
    print("Successfully imported unified annotation boost modules")
except ImportError as e:
    print(f"Note: Could not import unified modules: {str(e)}")
    print("Using original implementations from tools_function.py")
    # These will be provided by tools_function import

# Setup configuration variables
output_name = "intestine_detailed"
model_name = "google/gemini-2.5-flash-preview"
provider = "openrouter"
tissue = "large intestine"
species = "human"

# Setup configuration variables for custermized api key (deepseek example)
# Uncomment the following block to use DeepSeek as a custom provider
# output_name = "intestine_detailed"
# model_name = "deepseek-chat"
# provider = "https://api.deepseek.com"
# tissue = "large intestine"
# species = "human"
# api_key = "sk-afb39114f1334ba486505d9425937d16"

Successfully imported unified annotation boost modules


In [None]:
# Load example marker data
def load_example_data():
    """Load example marker datasets from the data directory"""
    data_dir = Path("data")
    
    # Load unprocessed markers (FindAllMarkers format)
    unprocessed_path = data_dir / "unprocessed.csv"
    if unprocessed_path.exists():
        unprocessed_markers = pd.read_csv(unprocessed_path)
        print(f"✓ Loaded unprocessed markers: {unprocessed_markers.shape}")
        print(f"Columns: {list(unprocessed_markers.columns)}")
        print(f"Unique clusters: {unprocessed_markers['cluster'].unique()}")
    else:
        print(f"❌ Could not find {unprocessed_path}")
        return None, None, None
    
    # Load processed markers (simple format)
    processed_path = data_dir / "processed.csv"
    if processed_path.exists():
        processed_markers = pd.read_csv(processed_path)
        print(f"✓ Loaded processed markers: {processed_markers.shape}")
    else:
        processed_markers = None
        print(f"⚠️ Could not find {processed_path}")
    
    # Load subcluster results
    subcluster_path = data_dir / "subcluster_results.csv"
    if subcluster_path.exists():
        subcluster_results = pd.read_csv(subcluster_path)
        print(f"✓ Loaded subcluster results: {subcluster_results.shape}")
    else:
        subcluster_results = None
        print(f"⚠️ Could not find {subcluster_path}")
    
    return unprocessed_markers, processed_markers, subcluster_results

# Load the data
unprocessed_markers, processed_markers, subcluster_results = load_example_data()


✓ Loaded unprocessed markers: (31313, 8)
Columns: ['Unnamed: 0', 'p_val', 'avg_log2FC', 'pct.1', 'pct.2', 'p_val_adj', 'cluster', 'gene']
Unique clusters: ['monocyte' 'plasma cell' 'cd8-positive, alpha-beta t cell'
 'transit amplifying cell of large intestine'
 'intestinal enteroendocrine cell' 'intestinal crypt stem cell']
✓ Loaded processed markers: (6, 3)
✓ Loaded subcluster results: (2023, 7)


In [None]:
# Configure API keys - Replace with your actual API keys
def setup_api_keys():
    """Configure API keys for different providers"""
    
    # Option 1: OpenRouter (Recommended - provides access to multiple models)
    openrouter_key = "your-openrouter-api-key-here"
    if openrouter_key != "your-openrouter-api-key-here":
        os.environ["OPENROUTER_API_KEY"] = openrouter_key
        print("✓ OpenRouter API key configured")
    
    # Option 2: OpenAI
    openai_key = "your-openai-api-key-here"
    if openai_key != "your-openai-api-key-here":
        os.environ["OPENAI_API_KEY"] = openai_key
        print("✓ OpenAI API key configured")
    
    # Option 3: Anthropic
    anthropic_key = "your-anthropic-api-key-here"
    if anthropic_key != "your-anthropic-api-key-here":
        os.environ["ANTHROPIC_API_KEY"] = anthropic_key
        print("✓ Anthropic API key configured")
    
    # Option 4: Custom provider (e.g., DeepSeek)
    custom_key = "your-custom-api-key-here"
    custom_url = "https://api.deepseek.com"  # Example
    if custom_key != "your-custom-api-key-here":
        set_api_key(custom_key, provider=custom_url)
        print(f"✓ Custom provider API key configured for {custom_url}")
    
    print("\n📝 Note: Replace the placeholder API keys above with your actual keys")
    print("Available providers: openrouter, openai, anthropic, or custom URLs")

setup_api_keys()


In [None]:
def test_single_annotation():
    """Test single cluster annotation"""
    if unprocessed_markers is None:
        print("❌ Cannot run test - no marker data loaded")
        return
    
    print("🧪 Testing single cluster annotation...")
    
    # Get markers for one cluster (e.g., plasma cell)
    cluster_name = "plasma cell"
    cluster_markers = unprocessed_markers[unprocessed_markers['cluster'] == cluster_name]
    
    if cluster_markers.empty:
        available_clusters = unprocessed_markers['cluster'].unique()
        print(f"❌ Cluster '{cluster_name}' not found. Available: {available_clusters}")
        cluster_name = available_clusters[0]
        cluster_markers = unprocessed_markers[unprocessed_markers['cluster'] == cluster_name]
        print(f"Using '{cluster_name}' instead")
    
    # Convert to simple format for testing
    marker_list = cluster_markers.nlargest(20, 'avg_log2FC')[['gene', 'cluster']]
    marker_list.columns = ['gene', 'cell_type']
    
    print(f"Testing with {len(marker_list)} markers for {cluster_name}")
    print(f"Top 5 markers: {list(marker_list['gene'].head())}")
    
    try:
        # Run annotation
        result = runCASSIA(
            tissue="large intestine",
            species="human", 
            additional_info=None,
            temperature=0.3,
            marker_list=marker_list,
            model="gpt-4o-mini",  # Use a fast, cheap model for testing
            provider="openai",
            validator_involvement="v1"
        )
        
        print("✅ Single annotation test successful!")
        print(f"Main cell type: {result.get('main_cell_type', 'N/A')}")
        print(f"Sub cell types: {result.get('sub_cell_types', 'N/A')}")
        return result
        
    except Exception as e:
        print(f"❌ Single annotation test failed: {str(e)}")
        return None

# Run the test
single_result = test_single_annotation()


In [None]:
def test_batch_annotation():
    """Test batch annotation of all clusters"""
    if unprocessed_markers is None:
        print("❌ Cannot run test - no marker data loaded")
        return
    
    print("🧪 Testing batch annotation...")
    
    output_name = "local_test_batch"
    
    try:
        # Run batch annotation
        runCASSIA_batch(
            marker=unprocessed_markers,
            output_name=output_name,
            model="gpt-4o-mini",  # Fast model for testing
            tissue="large intestine",
            species="human",
            max_workers=3,  # Reduced for testing
            n_genes=30,  # Fewer genes for faster testing
            additional_info=None,
            provider="openai",
            validator_involvement="v1"
        )
        
        print("✅ Batch annotation test successful!")
        
        # Check output files
        full_file = f"{output_name}_full.csv"
        summary_file = f"{output_name}_summary.csv"
        
        if os.path.exists(full_file):
            results_df = pd.read_csv(full_file)
            print(f"Results saved: {results_df.shape[0]} clusters annotated")
            print(f"Columns: {list(results_df.columns)}")
            return results_df
        else:
            print(f"⚠️ Output file {full_file} not found")
            return None
            
    except Exception as e:
        print(f"❌ Batch annotation test failed: {str(e)}")
        return None

# Run the test
batch_results = test_batch_annotation()


🧪 Testing batch annotation...
Processing FindAllMarkers-style dataframe with 8 columns
Processed to 6 clusters with top markers
Using 'cluster' as cell type column and 'markers' as gene column.
All analyses completed. Results saved to 'local_test_batch'.
Results saved to:
  - Full results: local_test_batch_full.csv
  - Summary: local_test_batch_summary.csv
  - JSON: local_test_batch.json
✅ Batch annotation test successful!
Results saved: 6 clusters annotated
Columns: ['cell_type', 'main_cell_type', 'sub_cell_types', 'possible_mixed_cell_types', 'possible_tissues', 'iterations', 'num_markers', 'final_annotation', 'all_conversations']


In [None]:
def test_validator_levels():
    """Test different validator involvement levels"""
    if unprocessed_markers is None:
        print("❌ Cannot run test - no marker data loaded")
        return
    
    print("🧪 Testing validator involvement levels...")
    
    # Test both v0 (stricter) and v1 (moderate) validators
    validator_levels = ["v0", "v1"]
    results = {}
    
    for validator in validator_levels:
        print(f"\n--- Testing {validator} validator ---")
        output_name = f"local_test_{validator}_validator"
        
        try:
            runCASSIA_batch(
                marker=unprocessed_markers,
                output_name=output_name,
                model="gpt-4o-mini",
                tissue="large intestine",
                species="human",
                max_workers=2,  # Minimal for testing
                n_genes=20,     # Even fewer genes
                additional_info=None,
                provider="openai",
                validator_involvement=validator
            )
            
            print(f"✅ {validator} validator test successful!")
            
            # Check results
            full_file = f"{output_name}_full.csv"
            if os.path.exists(full_file):
                df = pd.read_csv(full_file)
                results[validator] = df
                print(f"Results: {df.shape[0]} clusters annotated")
                
                # Show differences if both validators completed
                if len(results) == 2:
                    print(f"\n📊 Comparing validator results:")
                    v0_df = results.get("v0")
                    v1_df = results.get("v1") 
                    if v0_df is not None and v1_df is not None:
                        print(f"v0 clusters: {v0_df.shape[0]}, v1 clusters: {v1_df.shape[0]}")
            
        except Exception as e:
            print(f"❌ {validator} validator test failed: {str(e)}")
            results[validator] = None
    
    return results

# Run the test
validator_results = test_validator_levels()


In [None]:
def test_quality_scoring():
    """Test quality scoring of annotation results"""
    input_file = "local_test_batch_full.csv"
    output_file = "local_test_scored.csv"
    
    if not os.path.exists(input_file):
        print(f"❌ Cannot run test - {input_file} not found")
        print("Run the batch annotation test first")
        return
    
    print("🧪 Testing quality scoring...")
    
    try:
        # Run quality scoring
        runCASSIA_score_batch(
            input_file=input_file,
            output_file=output_file,
            max_workers=3,
            model="gpt-4o-mini",
            provider="openai"
        )
        
        print("✅ Quality scoring test successful!")
        
        # Check results
        if os.path.exists(output_file):
            scored_df = pd.read_csv(output_file)
            print(f"Scored results: {scored_df.shape[0]} clusters")
            if 'score' in scored_df.columns:
                scores = scored_df['score'].dropna()
                print(f"Average score: {scores.mean():.1f}")
                print(f"Score range: {scores.min()}-{scores.max()}")
            return scored_df
        else:
            print(f"⚠️ Output file {output_file} not found")
            return None
            
    except Exception as e:
        print(f"❌ Quality scoring test failed: {str(e)}")
        return None

# Run the test
scoring_results = test_quality_scoring()


In [None]:
def summarize_tests():
    """Summarize all test results"""
    print("\n" + "="*50)
    print("🎯 CASSIA LOCAL TESTING SUMMARY")
    print("="*50)
    
    # Check which tests passed by looking for expected variables
    tests = {
        "Single Annotation": 'single_result' in globals() and single_result is not None,
        "Batch Annotation": 'batch_results' in globals() and batch_results is not None,
        "Validator Testing": 'validator_results' in globals() and validator_results is not None,
        "Quality Scoring": 'scoring_results' in globals() and scoring_results is not None,
    }
    
    for test_name, passed in tests.items():
        status = "✅ PASSED" if passed else "❌ FAILED"
        print(f"{test_name:<20}: {status}")
    
    # Show generated files
    print("\n📁 Generated files:")
    test_files = [
        "local_test_batch_full.csv",
        "local_test_batch_summary.csv", 
        "local_test_scored.csv",
        "local_test_v0_validator_full.csv",
        "local_test_v1_validator_full.csv",
    ]
    
    for file in test_files:
        if os.path.exists(file):
            size = os.path.getsize(file)
            print(f"  ✓ {file} ({size} bytes)")
    
    passed_count = sum(tests.values())
    total_count = len(tests)
    
    print(f"\n🏆 Overall: {passed_count}/{total_count} tests passed")
    
    if passed_count == total_count:
        print("🎉 All tests passed! CASSIA is working correctly.")
    elif passed_count >= total_count // 2:
        print("⚠️ Most tests passed. Check failed tests above.")
    else:
        print("🚨 Multiple tests failed. Check API keys and error messages.")
    
    # Provide next steps
    print("\n🚀 Next Steps:")
    print("1. Set your actual API keys in the configuration cell")
    print("2. Try with your own marker data")
    print("3. Experiment with different models and parameters")
    print("4. Use runCASSIA_pipeline() for complete analysis")
    print("5. Explore advanced features like annotation boost")

# Run summary
summarize_tests()


In [None]:
# Uncomment to clean up test files
# test_files = [
#     "local_test_batch_full.csv", "local_test_batch_summary.csv",
#     "local_test_scored.csv", "local_test_v0_validator_full.csv",
#     "local_test_v0_validator_summary.csv", "local_test_v1_validator_full.csv",
#     "local_test_v1_validator_summary.csv"
# ]
# for file in test_files:
#     if os.path.exists(file):
#         os.remove(file)
#         print(f"Deleted {file}")
# print("Cleanup complete!")
