# VoidBloom / CrisisCore Enhanced Prototype Notebook

This notebook demonstrates the complete Hidden-Gem Scanner pipeline with:
- **Provenance Tracking**: Full lineage tracking of all artifacts and transformations
- **Glossary Generation**: Automatic documentation of technical terms and metrics
- **Feature Extraction**: Time series analysis and market data processing
- **GemScore Calculation**: Weighted scoring with confidence metrics

## 1. Setup and Imports

In [None]:
"""
Setup comprehensive observability infrastructure for the Hidden Gem Scanner.

This includes:
1. Structured JSON logging with context binding
2. Prometheus metrics for monitoring
3. Distributed tracing with OpenTelemetry
"""

# Import observability components
from src.core.logging_config import init_logging, get_logger, LogContext
from src.core.metrics import (
    record_scan_request,
    record_scan_duration,
    record_scan_error,
    record_gem_score,
    record_confidence_score,
    record_flagged_token,
    is_prometheus_available,
)
from src.core.tracing import (
    setup_tracing,
    trace_operation,
    add_span_attributes,
    get_trace_id,
    is_tracing_available,
)

# Initialize structured logging
logger = init_logging(service_name="hidden-gem-scanner", level="INFO")

# Initialize tracing
tracer = setup_tracing(service_name="hidden-gem-scanner", enable_console_export=False)

# Log initialization
logger.info(
    "observability_initialized",
    notebook="hidden_gem_scanner",
    prometheus_available=is_prometheus_available(),
    tracing_available=is_tracing_available(),
)

print("✅ Observability Stack Initialized")
print(f"   📝 Structured Logging: Enabled (JSON format)")
print(f"   📊 Prometheus Metrics: {'Enabled' if is_prometheus_available() else 'Disabled (graceful fallback)'}")
print(f"   🔍 Distributed Tracing: {'Enabled' if is_tracing_available() else 'Disabled (graceful fallback)'}")
print(f"\n📌 Trace ID: {get_trace_id() or 'N/A'}")

In [None]:
import pandas as pd
from datetime import datetime, timedelta
from src.core.features import MarketSnapshot
from src.core.safety import evaluate_contract, liquidity_guardrail
from src.core.provenance_tracking import complete_pipeline_tracked
from src.core.provenance import get_provenance_tracker, reset_provenance_tracker
from src.core.glossary import get_glossary

# Reset tracker for clean demo
reset_provenance_tracker()

print("✅ Imports complete")
print(f"📊 Provenance tracker ready")
print(f"📖 Glossary contains {len(get_glossary().terms)} terms")

## 0. Initialize Logging & Metrics Infrastructure

## 2. Create Synthetic Market Data

In [None]:
# Create synthetic price series showing uptrend
from datetime import datetime, timezone, timedelta

# Use fixed date for reproducibility
now = datetime(2025, 10, 9, 12, 0, 0, tzinfo=timezone.utc)
dates = [now - timedelta(hours=i) for i in range(48)][::-1]
prices = pd.Series(
    data=[0.03 + 0.0002 * i for i in range(48)],
    index=pd.to_datetime(dates)
)

# Log data generation with structured context
with LogContext(logger, operation="data_generation", token="VBLOOM") as log:
    log.info(
        "synthetic_data_created",
        price_points=len(prices),
        start_price=prices.iloc[0],
        end_price=prices.iloc[-1],
        time_range_hours=48,
    )

# Create market snapshot
snapshot = MarketSnapshot(
    symbol="VBLOOM",
    timestamp=now,
    price=float(prices.iloc[-1]),
    volume_24h=250000,
    liquidity_usd=180000,
    holders=4200,
    onchain_metrics={"active_wallets": 950, "net_inflows": 125000, "unlock_pressure": 0.2},
    narratives=["AI", "DeFi", "VoidBloom"]
)

# Create contract safety report
contract_report = evaluate_contract(
    {"honeypot": False, "owner_can_mint": False, "owner_can_withdraw": False, "unverified": False},
    severity="none"
)

# Log snapshot creation with metrics
logger.info(
    "market_snapshot_created",
    symbol=snapshot.symbol,
    price=snapshot.price,
    volume_24h=snapshot.volume_24h,
    liquidity_usd=snapshot.liquidity_usd,
    holders=snapshot.holders,
    contract_safety_score=contract_report.score,
)

print(f"📈 Generated {len(prices)} price points for {snapshot.symbol}")
print(f"💰 Current price: ${snapshot.price:.6f}")
print(f"💧 Liquidity: ${snapshot.liquidity_usd:,.0f}")
print(f"👥 Holders: {snapshot.holders:,}")
print(f"🛡️ Contract safety score: {contract_report.score:.2f}")


## 3. Execute Pipeline with Provenance Tracking

In [None]:
# Execute complete pipeline with provenance tracking, logging, and metrics
import time

# Start timing for metrics
start_time = time.time()
token_symbol = snapshot.symbol

# Log pipeline execution start
logger.info(
    "pipeline_execution_started",
    token=token_symbol,
    data_source="synthetic_demo",
    trace_id=get_trace_id(),
)

try:
    # Execute with distributed tracing
    with trace_operation("complete_pipeline", attributes={"token": token_symbol, "source": "synthetic_demo"}):
        results = complete_pipeline_tracked(
            snapshot=snapshot,
            price_series=prices,
            narrative_embedding_score=0.72,
            contract_report=contract_report,
            data_source="synthetic_demo"
        )
        
        # Calculate duration
        duration_seconds = time.time() - start_time
        
        # Record metrics
        record_scan_request(token_symbol, "success")
        record_scan_duration(token_symbol, duration_seconds, "success")
        record_gem_score(token_symbol, results['result'].score)
        record_confidence_score(token_symbol, results['result'].confidence / 100)
        
        if results['flagged']:
            record_flagged_token(token_symbol, "manual_review_required")
        
        # Log successful completion with full context
        logger.info(
            "pipeline_execution_completed",
            token=token_symbol,
            gem_score=results['result'].score,
            confidence=results['result'].confidence,
            flagged=results['flagged'],
            duration_seconds=duration_seconds,
            top_features=[k for k, v in sorted(results['result'].contributions.items(), key=lambda x: -x[1])[:3]],
            trace_id=get_trace_id(),
        )
        
except Exception as e:
    # Record error metrics
    duration_seconds = time.time() - start_time
    record_scan_request(token_symbol, "failure")
    record_scan_duration(token_symbol, duration_seconds, "failure")
    record_scan_error(token_symbol, type(e).__name__)
    
    # Log error with full context
    logger.error(
        "pipeline_execution_failed",
        token=token_symbol,
        error_type=type(e).__name__,
        error_message=str(e),
        duration_seconds=duration_seconds,
        trace_id=get_trace_id(),
    )
    raise

print("=" * 60)
print("📊 ANALYSIS RESULTS")
print("=" * 60)
print(f"\n💎 GemScore: {results['result'].score:.2f}")
print(f"🎯 Confidence: {results['result'].confidence:.2f}%")
print(f"🚩 Flagged for Review: {results['flagged']}")
print(f"⏱️  Duration: {duration_seconds:.3f}s")
print(f"🔍 Trace ID: {get_trace_id() or 'N/A'}")
print(f"\n📋 Feature Contributions:")
for feature, contribution in sorted(results['result'].contributions.items(), key=lambda x: -x[1]):
    print(f"  - {feature}: {contribution:.4f}")

print(f"\n🔍 Debug Info:")
for key, value in results['debug'].items():
    print(f"  - {key}: {value}")

## 4. Explore Artifact Provenance

In [None]:
# Get provenance information
tracker = get_provenance_tracker()
score_id = results['provenance']['score_id']

print("=" * 60)
print("🔍 ARTIFACT PROVENANCE")
print("=" * 60)

# Get the score record
score_record = tracker.get_record(score_id)
if score_record:
    print(f"\n📦 Artifact: {score_record.artifact.name}")
    print(f"🆔 ID: {score_id[:16]}...")
    print(f"📅 Created: {score_record.artifact.created_at.strftime('%Y-%m-%d %H:%M:%S UTC')}")
    print(f"🏷️ Tags: {', '.join(score_record.artifact.tags)}")
    
    print(f"\n🔄 Transformations Applied:")
    for i, transform in enumerate(score_record.transformations, 1):
        print(f"  {i}. {transform.function_name} ({transform.transformation_type.value})")
        print(f"     ⏱️ Duration: {transform.duration_ms:.2f}ms")
    
    print(f"\n📊 Quality Metrics:")
    for metric, value in list(score_record.quality_metrics.items())[:5]:
        print(f"  - {metric}: {value:.4f}")
    
    if score_record.annotations:
        print(f"\n💬 Annotations:")
        for annotation in score_record.annotations:
            print(f"  - {annotation}")

# Get complete lineage
lineage = tracker.get_lineage(score_id)
print(f"\n🌳 Lineage Depth: {len(lineage)} artifacts")
print(f"📈 Artifact chain:")
for i, artifact_id in enumerate(lineage):
    record = tracker.get_record(artifact_id)
    if record:
        print(f"  {i+1}. {record.artifact.artifact_type.value}: {record.artifact.name}")

## 5. Visualize Lineage Graph

In [None]:
# Export lineage as Mermaid diagram
mermaid_diagram = tracker.export_lineage_graph(score_id, format="mermaid")

print("=" * 60)
print("🎨 LINEAGE DIAGRAM (Mermaid)")
print("=" * 60)
print("\nCopy this to https://mermaid.live for visualization:\n")
print(mermaid_diagram)
print("\n" + "=" * 60)

# Get tracker statistics
stats = tracker.get_statistics()
print("\n📊 PROVENANCE STATISTICS")
print("=" * 60)
print(f"Total Artifacts: {stats['total_artifacts']}")
print(f"Total Transformations: {stats['total_transformations']}")
print(f"Lineage Edges: {stats['lineage_edges']}")
print(f"\nArtifacts by Type:")
for artifact_type, count in stats['artifacts_by_type'].items():
    print(f"  - {artifact_type}: {count}")

## 6. Explore Technical Glossary

In [None]:
# Explore the glossary
glossary = get_glossary()

print("=" * 60)
print("📖 TECHNICAL GLOSSARY")
print("=" * 60)

# Get glossary statistics
glossary_stats = glossary.get_statistics()
print(f"\nTotal Terms: {glossary_stats['total_terms']}")
print(f"Categories: {glossary_stats['categories_count']}")
print(f"\nTerms by Category:")
for category, count in glossary_stats['category_breakdown'].items():
    print(f"  - {category}: {count}")

# Show some key terms
print("\n" + "=" * 60)
print("📚 SAMPLE TERMS")
print("=" * 60)

key_terms = ["GemScore", "RSI", "ContractSafety", "Confidence"]
for term_name in key_terms:
    term = glossary.get_term(term_name)
    if term:
        print(f"\n{term.term}")
        print("-" * 40)
        print(f"Category: {term.category.value}")
        print(f"Definition: {term.definition}")
        if term.formula:
            print(f"Formula: {term.formula}")
        if term.range:
            print(f"Range: [{term.range[0]}, {term.range[1]}]")
        if term.related_terms:
            print(f"Related: {', '.join(list(term.related_terms)[:3])}")

## 7. Search Glossary

In [None]:
# Search for terms related to risk
search_results = glossary.search("risk")

print("=" * 60)
print("🔍 SEARCH RESULTS: 'risk'")
print("=" * 60)

for term in search_results:
    print(f"\n✓ {term.term} ({term.category.value})")
    print(f"  {term.definition[:100]}...")

# Get all terms in a specific category
from src.core.glossary import TermCategory

risk_factors = glossary.get_by_category(TermCategory.RISK_FACTOR)
print("\n" + "=" * 60)
print("⚠️ ALL RISK FACTORS")
print("=" * 60)

for term in risk_factors:
    print(f"\n• {term.term}")
    print(f"  {term.definition}")
    if term.range:
        print(f"  Range: [{term.range[0]}, {term.range[1]}]")

## 8. Export Documentation

In [None]:
from pathlib import Path

# Export glossary as markdown
# Use path relative to notebook location
notebook_dir = Path.cwd()
if notebook_dir.name == "notebooks":
    docs_dir = notebook_dir.parent / "docs"
else:
    docs_dir = notebook_dir / "docs"

docs_dir.mkdir(exist_ok=True)

glossary_path = docs_dir / "GLOSSARY.md"
glossary_markdown = glossary.export_markdown(output_path=glossary_path, include_toc=True, group_by_category=True)

print("=" * 60)
print("📄 EXPORTED DOCUMENTATION")
print("=" * 60)
print(f"\n✅ Glossary exported to: {glossary_path}")
print(f"📏 Document size: {len(glossary_markdown)} characters")

# Export as JSON for programmatic access
glossary_json_path = docs_dir / "glossary.json"
glossary_json = glossary.export_json(output_path=glossary_json_path)

print(f"✅ JSON export to: {glossary_json_path}")

# Show preview of markdown
print("\n" + "=" * 60)
print("📄 GLOSSARY PREVIEW (first 500 chars)")
print("=" * 60)
print(glossary_markdown[:500] + "...")


## 9. Extended Backtest Metrics (IC & Risk-Adjusted Performance)

In [None]:
import numpy as np
from backtest.extended_metrics import (
    calculate_extended_metrics,
    calculate_ic_metrics,
    format_ic_summary,
)

# IMPORTANT: Set seed for reproducibility
np.random.seed(42)

# Create synthetic backtest data
# Simulate 50 token snapshots with predictions and actual returns

# Generate predictions (GemScores)
predictions = np.random.uniform(0.3, 0.9, 50)

# Generate actual returns with some correlation to predictions
actual_returns = predictions * 0.05 + np.random.normal(0, 0.02, 50)

# Create mock snapshots
class MockSnapshot:
    def __init__(self, token, features, future_return):
        self.token = token
        self.features = features
        self.future_return_7d = future_return

snapshots = [
    MockSnapshot(f"TOKEN{i:02d}", {}, actual_returns[i])
    for i in range(50)
]

print("=" * 70)
print("📊 SYNTHETIC BACKTEST DATA (SEED=42)")
print("=" * 70)
print(f"Total Snapshots: {len(snapshots)}")
print(f"Prediction Range: [{predictions.min():.4f}, {predictions.max():.4f}]")
print(f"Return Range: [{actual_returns.min():.4f}, {actual_returns.max():.4f}]")
print(f"Mean Return: {actual_returns.mean():.4f}")
print(f"Return Std Dev: {actual_returns.std():.4f}")


In [None]:
# Calculate Information Coefficient (IC) metrics
ic_metrics = calculate_ic_metrics(predictions, actual_returns)

print("=" * 70)
print("📈 INFORMATION COEFFICIENT ANALYSIS")
print("=" * 70)
print(format_ic_summary(ic_metrics))

# Interpretation
print("\n" + "=" * 70)
print("🔍 IC INTERPRETATION")
print("=" * 70)
print("""
The Information Coefficient measures the correlation between predicted
scores and actual returns. Key insights:

1. **Pearson IC**: Measures linear correlation
   - IC > 0.05: Strong predictive power
   - IC > 0.02: Moderate predictive power
   - IC < 0.02: Weak predictive power

2. **Spearman IC**: Measures rank correlation (robust to outliers)
   - Useful when returns are skewed or have outliers

3. **Kendall's Tau**: Alternative rank correlation
   - More conservative than Spearman

4. **Hit Rate**: Percentage of correct direction predictions
   - > 55%: Better than random for direction
   - > 60%: Strong directional signal

5. **IC IR (Information Ratio)**: IC_mean / IC_std
   - Measures consistency of IC across periods
   - Higher is better (more stable predictions)
""")

In [None]:
# Calculate comprehensive extended metrics
extended_metrics = calculate_extended_metrics(
    snapshots=snapshots,
    predictions=predictions,
    top_k=10,  # Evaluate top 10 predictions
    risk_free_rate=0.0,
    periods_per_year=52,  # Weekly returns
)

print("=" * 70)
print("💰 COMPREHENSIVE BACKTEST METRICS")
print("=" * 70)
print(extended_metrics.summary_string())

In [None]:
# Compare with baseline strategies
from backtest.baseline_strategies import (
    RandomStrategy,
    CapWeightedStrategy,
    SimpleMomentumStrategy,
)
from backtest.extended_metrics import compare_extended_metrics

# Add market cap and momentum features to snapshots for baseline comparison
for i, snap in enumerate(snapshots):
    snap.features = {
        'MarketCap': np.random.uniform(100000, 10000000),
        'PriceChange7d': np.random.uniform(-0.1, 0.2),
    }

# Calculate baseline metrics
baseline_strategies = [
    RandomStrategy(),
    CapWeightedStrategy(),
    SimpleMomentumStrategy(),
]

baseline_metrics = {}
for strategy in baseline_strategies:
    # Select assets using baseline strategy
    selected = strategy.select_assets(snapshots, top_k=10, seed=42)
    
    # Get predictions (uniform for baselines)
    baseline_predictions = np.array([1.0 if snap in selected else 0.0 for snap in snapshots])
    
    # Calculate metrics
    baseline_metrics[strategy.get_name()] = calculate_extended_metrics(
        snapshots=snapshots,
        predictions=baseline_predictions,
        top_k=10,
        risk_free_rate=0.0,
        periods_per_year=52,
    )

# Compare GemScore to baselines
comparisons = compare_extended_metrics(extended_metrics, baseline_metrics)

print("=" * 70)
print("🎯 BASELINE COMPARISONS")
print("=" * 70)
for baseline_name, comparison in comparisons.items():
    print(f"\n{baseline_name.replace('_', ' ').title()}:")
    print(f"  IC Improvement:     {comparison['ic_improvement']:>8.4f}  {'✅' if comparison['ic_better'] else '❌'}")
    print(f"  Sharpe Improvement: {comparison['sharpe_improvement']:>8.4f}  {'✅' if comparison['sharpe_better'] else '❌'}")
    print(f"  Sortino Improvement:{comparison['sortino_improvement']:>8.4f}")
    print(f"  Return Improvement: {comparison['return_improvement']:>8.4f}")
    print(f"  Risk-Adjusted Better: {'✅ YES' if comparison['risk_adjusted_better'] else '❌ NO'}")

In [None]:
# Visualize IC distribution over multiple periods
import pandas as pd
import matplotlib.pyplot as plt

# Simulate multi-period IC
np.random.seed(42)
n_periods = 20
period_ics = []

for period in range(n_periods):
    # Generate predictions and actuals for each period
    period_preds = np.random.uniform(0.3, 0.9, 30)
    period_actuals = period_preds * 0.05 + np.random.normal(0, 0.025, 30)
    
    # Calculate IC for this period
    from scipy import stats
    ic, _ = stats.pearsonr(period_preds, period_actuals)
    period_ics.append(ic)

# Create visualization
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# IC over time
axes[0, 0].plot(range(1, n_periods + 1), period_ics, marker='o', linewidth=2)
axes[0, 0].axhline(y=0, color='gray', linestyle='--', alpha=0.5)
axes[0, 0].axhline(y=0.02, color='orange', linestyle='--', alpha=0.5, label='Moderate IC')
axes[0, 0].axhline(y=0.05, color='green', linestyle='--', alpha=0.5, label='Strong IC')
axes[0, 0].set_xlabel('Period')
axes[0, 0].set_ylabel('Information Coefficient')
axes[0, 0].set_title('IC Over Time')
axes[0, 0].legend()
axes[0, 0].grid(alpha=0.3)

# IC distribution
axes[0, 1].hist(period_ics, bins=15, edgecolor='black', alpha=0.7)
axes[0, 1].axvline(x=np.mean(period_ics), color='red', linestyle='--', linewidth=2, label=f'Mean: {np.mean(period_ics):.4f}')
axes[0, 1].set_xlabel('Information Coefficient')
axes[0, 1].set_ylabel('Frequency')
axes[0, 1].set_title('IC Distribution')
axes[0, 1].legend()
axes[0, 1].grid(alpha=0.3)

# Predictions vs Actuals scatter
axes[1, 0].scatter(predictions[:30], actual_returns[:30], alpha=0.6)
axes[1, 0].plot([predictions.min(), predictions.max()], 
                [predictions.min() * 0.05, predictions.max() * 0.05], 
                'r--', label='Expected Relationship')
axes[1, 0].set_xlabel('Predicted Score')
axes[1, 0].set_ylabel('Actual Return')
axes[1, 0].set_title('Predictions vs Actual Returns')
axes[1, 0].legend()
axes[1, 0].grid(alpha=0.3)

# Cumulative returns
cumulative_returns = np.cumsum(actual_returns[:30])
axes[1, 1].plot(range(1, 31), cumulative_returns, linewidth=2)
axes[1, 1].fill_between(range(1, 31), 0, cumulative_returns, alpha=0.3)
axes[1, 1].set_xlabel('Asset Index (sorted by GemScore)')
axes[1, 1].set_ylabel('Cumulative Return')
axes[1, 1].set_title('Cumulative Returns (Top 30)')
axes[1, 1].grid(alpha=0.3)

plt.tight_layout()
plt.savefig('../docs/ic_analysis.png', dpi=150, bbox_inches='tight')
print("📊 Visualization saved to: ../docs/ic_analysis.png")
plt.show()

# Print IC statistics
print("\n" + "=" * 70)
print("📊 IC STATISTICS (Multi-Period)")
print("=" * 70)
print(f"Mean IC:        {np.mean(period_ics):>8.4f}")
print(f"Median IC:      {np.median(period_ics):>8.4f}")
print(f"Std Dev IC:     {np.std(period_ics):>8.4f}")
print(f"IC IR:          {np.mean(period_ics) / np.std(period_ics):>8.4f}")
print(f"Min IC:         {np.min(period_ics):>8.4f}")
print(f"Max IC:         {np.max(period_ics):>8.4f}")
print(f"Positive ICs:   {sum(ic > 0 for ic in period_ics)}/{n_periods} ({100 * sum(ic > 0 for ic in period_ics) / n_periods:.1f}%)")

## 📊 Observability Dashboard: View Metrics & Logs

In [None]:
"""
Observability Dashboard: View collected metrics and logs.

This cell demonstrates:
1. Viewing Prometheus metrics programmatically
2. Accessing structured logs
3. Checking trace information
4. System health overview
"""

from prometheus_client import REGISTRY, generate_latest
import json

print("=" * 80)
print("📊 PROMETHEUS METRICS DASHBOARD")
print("=" * 80)

if is_prometheus_available():
    # Get all metrics
    metrics_output = generate_latest(REGISTRY).decode('utf-8')
    
    # Parse and display scanner-specific metrics
    scanner_metrics = [
        line for line in metrics_output.split('\n')
        if any(keyword in line for keyword in ['scan_', 'gem_score', 'confidence_score', 'flagged_tokens'])
        and not line.startswith('#')
        and line.strip()
    ]
    
    print("\n🔍 Scanner Metrics:\n")
    for metric in scanner_metrics[:20]:  # Show first 20
        print(f"  {metric}")
    
    if len(scanner_metrics) > 20:
        print(f"\n  ... and {len(scanner_metrics) - 20} more metrics")
    
    # Count metrics by type
    scan_requests = len([m for m in scanner_metrics if 'scan_requests_total' in m])
    gem_scores = len([m for m in scanner_metrics if 'gem_score_distribution' in m])
    
    print(f"\n📈 Metrics Summary:")
    print(f"  - Scan requests tracked: {scan_requests}")
    print(f"  - GemScore distributions: {gem_scores}")
    
else:
    print("\n⚠️  Prometheus client not available - metrics are mocked")

print("\n" + "=" * 80)
print("📝 STRUCTURED LOGGING STATUS")
print("=" * 80)
print(f"\n✅ Logging Format: JSON")
print(f"✅ Service Name: hidden-gem-scanner")
print(f"✅ Context Binding: Enabled")
print(f"✅ Trace Correlation: {get_trace_id() or 'N/A'}")

print("\n💡 To view all logs in JSON format:")
print("   - Logs are written to stdout/stderr")
print("   - Each log entry is a valid JSON object")
print("   - Can be piped to jq for parsing: python script.py 2>&1 | jq .")

print("\n" + "=" * 80)
print("🔍 DISTRIBUTED TRACING")
print("=" * 80)

if is_tracing_available():
    print(f"\n✅ Tracing: Enabled")
    print(f"✅ Current Trace ID: {get_trace_id() or 'N/A'}")
    print(f"✅ Service: hidden-gem-scanner")
    print("\n💡 Traces can be exported to:")
    print("   - Jaeger (set JAEGER_ENDPOINT)")
    print("   - Zipkin")
    print("   - OTLP-compatible backends")
else:
    print(f"\n⚠️  Tracing: Disabled (graceful fallback)")

print("\n" + "=" * 80)
print("🎯 OBSERVABILITY QUICK ACTIONS")
print("=" * 80)
print("""
1. Start Metrics Server:
   python -m src.services.metrics_server --port 9090

2. View Metrics Endpoint:
   curl http://localhost:9090/metrics

3. Query with Prometheus:
   - Install Prometheus: https://prometheus.io/download/
   - Configure scrape target: localhost:9090
   - Query: rate(scan_requests_total[5m])

4. View Logs with jq:
   python your_script.py 2>&1 | jq 'select(.event=="scan_completed")'

5. Monitor in Real-Time:
   watch -n 1 'curl -s http://localhost:9090/metrics | grep scan_requests'
""")

## 🚀 Production Monitoring Patterns

In [None]:
"""
Production-Ready Monitoring Patterns

Demonstrates:
1. Error handling with metrics
2. Performance monitoring
3. Alert-worthy conditions
4. Batch processing with metrics
"""

from contextlib import contextmanager
from typing import List, Dict, Any

@contextmanager
def monitored_operation(operation_name: str, token: str):
    """Context manager for monitored operations with automatic logging and metrics."""
    start = time.time()
    operation_logger = logger.bind(operation=operation_name, token=token)
    
    operation_logger.info(f"{operation_name}_started")
    
    try:
        yield operation_logger
        duration = time.time() - start
        
        # Log success
        operation_logger.info(
            f"{operation_name}_completed",
            duration_seconds=duration,
            status="success"
        )
        
    except Exception as e:
        duration = time.time() - start
        
        # Log and record error
        operation_logger.error(
            f"{operation_name}_failed",
            duration_seconds=duration,
            error_type=type(e).__name__,
            error_message=str(e),
            status="failure"
        )
        
        # Record error metric
        record_scan_error(token, type(e).__name__)
        raise


# Example: Batch processing with monitoring
def process_token_batch(tokens: List[str]) -> Dict[str, Any]:
    """Process a batch of tokens with comprehensive monitoring."""
    
    results = {}
    batch_start = time.time()
    
    logger.info(
        "batch_processing_started",
        batch_size=len(tokens),
        tokens=tokens[:5] if len(tokens) > 5 else tokens,
    )
    
    success_count = 0
    error_count = 0
    
    for token in tokens:
        try:
            with monitored_operation("token_scan", token):
                # Simulate token processing
                scan_start = time.time()
                
                # Mock scan (replace with actual scan logic)
                mock_score = 75.0 + (hash(token) % 20)
                mock_confidence = 0.8 + (hash(token) % 20) / 100
                
                scan_duration = time.time() - scan_start
                
                # Record metrics
                record_scan_request(token, "success")
                record_scan_duration(token, scan_duration, "success")
                record_gem_score(token, mock_score)
                record_confidence_score(token, mock_confidence)
                
                results[token] = {
                    "score": mock_score,
                    "confidence": mock_confidence,
                    "status": "success"
                }
                
                success_count += 1
                
        except Exception as e:
            error_count += 1
            results[token] = {
                "error": str(e),
                "status": "error"
            }
    
    batch_duration = time.time() - batch_start
    
    # Log batch completion with summary
    logger.info(
        "batch_processing_completed",
        batch_size=len(tokens),
        success_count=success_count,
        error_count=error_count,
        batch_duration_seconds=batch_duration,
        avg_duration_per_token=batch_duration / len(tokens),
        success_rate=success_count / len(tokens),
    )
    
    return results


# Demo: Process sample batch
sample_tokens = ["TOKEN1", "TOKEN2", "TOKEN3", "ERROR_TOKEN", "TOKEN5"]

print("=" * 80)
print("🔄 BATCH PROCESSING WITH MONITORING")
print("=" * 80)

# Simulate an error for ERROR_TOKEN
def simulate_scan(token):
    if token == "ERROR_TOKEN":
        raise ValueError("Simulated error for demonstration")
    return True

# Patch for demo
original_process = process_token_batch

batch_results = process_token_batch(sample_tokens)

print(f"\n✅ Batch completed:")
print(f"   Total: {len(sample_tokens)} tokens")
print(f"   Success: {sum(1 for r in batch_results.values() if r['status'] == 'success')}")
print(f"   Errors: {sum(1 for r in batch_results.values() if r['status'] == 'error')}")

print("\n📊 Results:")
for token, result in batch_results.items():
    if result['status'] == 'success':
        print(f"   ✓ {token}: Score={result['score']:.1f}, Confidence={result['confidence']:.2f}")
    else:
        print(f"   ✗ {token}: {result.get('error', 'Unknown error')}")

print("\n" + "=" * 80)
print("🎯 PRODUCTION MONITORING CHECKLIST")
print("=" * 80)
print("""
✅ Structured Logging
   - All events logged with context
   - Errors include stack traces
   - JSON format for log aggregation

✅ Metrics Collection
   - Request counts per token
   - Latency histograms
   - Error rates by type
   - Score distributions

✅ Distributed Tracing
   - Trace IDs for correlation
   - Span attributes for context
   - Cross-service tracing ready

✅ Error Handling
   - Graceful degradation
   - Error metrics recorded
   - Detailed error logging

✅ Performance Tracking
   - Operation timing
   - Batch processing metrics
   - Resource utilization

📋 Next Steps for Production:
   1. Set up Prometheus server
   2. Configure Grafana dashboards
   3. Set up alerting rules
   4. Configure log aggregation (ELK/Datadog)
   5. Enable Jaeger for distributed tracing
""")

## 🚨 Alert Engine v2 - Compound Logic & Suppression

Advanced alerting with:
- **Compound conditions** (AND/OR/NOT logic)
- **Alert suppression** (prevent alert fatigue)
- **Deduplication** (fingerprint-based)
- **Escalation policies** (tiered notifications)

In [None]:
# Initialize Alert Engine v2
from src.services.alerting_v2 import (
    AlertCondition, CompoundCondition, AlertRule, AlertManager,
    SuppressionRule, EscalationPolicy, Alert
)
from datetime import timedelta

# Create alert manager
alert_manager = AlertManager()

# Example 1: Simple condition alert
simple_condition = AlertCondition(
    metric="gem_score",
    operator="lt",
    threshold=30
)

simple_rule = AlertRule(
    id="low_score_warning",
    name="Low GemScore Warning",
    condition=simple_condition,
    severity="warning",
    message="Token has low GemScore: {gem_score}"
)

alert_manager.add_rule(simple_rule)

# Example 2: Compound AND condition - Critical risk
critical_condition = CompoundCondition(
    operator="AND",
    conditions=[
        AlertCondition("gem_score", "lt", 30),
        AlertCondition("honeypot_detected", "eq", True)
    ]
)

critical_rule = AlertRule(
    id="critical_risk",
    name="Critical Risk Detected",
    condition=critical_condition,
    severity="critical",
    message="CRITICAL: Low score AND honeypot detected!"
)

alert_manager.add_rule(critical_rule)

# Example 3: Complex nested condition
suspicious_condition = CompoundCondition(
    operator="AND",
    conditions=[
        AlertCondition("gem_score", "gte", 70),
        CompoundCondition(
            operator="OR",
            conditions=[
                AlertCondition("liquidity_usd", "lt", 10000),
                AlertCondition("safety_score", "lt", 0.5)
            ]
        )
    ]
)

suspicious_rule = AlertRule(
    id="suspicious_high_score",
    name="Suspicious High Score",
    condition=suspicious_condition,
    severity="warning",
    message="High score but with red flags: liquidity={liquidity_usd}, safety={safety_score}"
)

alert_manager.add_rule(suspicious_rule)

print("✅ Alert Engine v2 initialized with 3 rules")
print(f"📊 Rules: {[r.id for r in alert_manager.rules.values()]}")

In [None]:
# Evaluate alerts on scan results
metrics_data = {
    "gem_score": 25,
    "honeypot_detected": True,
    "liquidity_usd": 5000,
    "safety_score": 0.3
}

print("\n🔍 Evaluating rules with test data:")
print(f"   Metrics: {metrics_data}\n")

# Check each rule
fired_alerts = alert_manager.evaluate(metrics_data)

print(f"\n🚨 Alerts fired: {len(fired_alerts)}")
for alert in fired_alerts:
    print(f"\n   Alert: {alert.rule_id}")
    print(f"   Severity: {alert.severity}")
    print(f"   Message: {alert.message}")
    print(f"   Fingerprint: {alert.fingerprint[:16]}...")
    print(f"   Status: {alert.status}")

# Show active alerts
active = alert_manager.get_active_alerts()
print(f"\n📈 Active alerts: {len(active)}")

# Example: Acknowledge an alert
if fired_alerts:
    first_alert = fired_alerts[0]
    alert_manager.acknowledge_alert(first_alert.id, "Investigating via notebook")
    print(f"\n✅ Acknowledged alert: {first_alert.id}")

In [None]:
# Alert Suppression - Prevent duplicate alerts
print("\n🔇 Testing Alert Suppression\n")

# Create suppression rule
suppression = SuppressionRule(
    pattern=r".*test.*",
    field="token_name",
    duration=timedelta(hours=1)
)

alert_manager.add_suppression_rule(suppression)

# Test with same metrics again (should be suppressed due to deduplication)
print("1️⃣ First evaluation:")
alerts1 = alert_manager.evaluate(metrics_data)
print(f"   Fired: {len(alerts1)} alerts")

print("\n2️⃣ Second evaluation (same metrics):")
alerts2 = alert_manager.evaluate(metrics_data)
print(f"   Fired: {len(alerts2)} alerts (should be 0 - suppressed)")

# Show suppression stats
from src.core.metrics import ALERTS_SUPPRESSED
print(f"\n📊 Total alerts suppressed: {ALERTS_SUPPRESSED._value.get()}")

# Different metrics should trigger new alerts
print("\n3️⃣ Third evaluation (different metrics):")
different_metrics = {
    "gem_score": 20,  # Changed
    "honeypot_detected": False,  # Changed
    "liquidity_usd": 3000,
    "safety_score": 0.2
}
alerts3 = alert_manager.evaluate(different_metrics)
print(f"   Fired: {len(alerts3)} alerts (new fingerprint)")

In [None]:
# Escalation Policies - Tiered notifications
print("\n📢 Escalation Policy Example\n")

# Create escalation policy
escalation = EscalationPolicy(
    levels=[
        {"delay": timedelta(seconds=0), "channels": ["slack"]},
        {"delay": timedelta(minutes=5), "channels": ["telegram"]},
        {"delay": timedelta(minutes=15), "channels": ["pagerduty"]}
    ]
)

# Create rule with escalation
escalated_rule = AlertRule(
    id="critical_with_escalation",
    name="Critical Alert with Escalation",
    condition=AlertCondition("gem_score", "lt", 20),
    severity="critical",
    escalation_policy=escalation,
    message="Critical: GemScore below 20!"
)

alert_manager.add_rule(escalated_rule)

# Trigger the alert
critical_metrics = {"gem_score": 15}
escalated_alerts = alert_manager.evaluate(critical_metrics)

if escalated_alerts:
    alert = escalated_alerts[0]
    print(f"Alert: {alert.rule_id}")
    print(f"Escalation levels: {len(alert.escalation_policy.levels)}")
    
    for i, level in enumerate(alert.escalation_policy.levels):
        print(f"\n   Level {i+1}:")
        print(f"   - Delay: {level['delay']}")
        print(f"   - Channels: {level['channels']}")

print("\n✅ Alert will escalate through 3 notification levels")

## 📊 Drift Monitor MVP - Statistical Drift Detection

Monitor data drift with statistical methods:
- **Kolmogorov-Smirnov test** (continuous features)
- **Population Stability Index (PSI)** (distribution shift)
- **Chi-square test** (categorical features)
- **Baseline management** (save/load reference distributions)

In [None]:
# Initialize Drift Monitor
from src.monitoring.drift_monitor import DriftMonitor, DriftDetector, Baseline
import numpy as np
from pathlib import Path

# Create drift monitor
drift_monitor = DriftMonitor()

# Generate synthetic baseline data (replace with real scan results)
print("🔧 Creating baseline from historical data...\n")

# Simulate 1000 historical scans
baseline_features = {
    "gem_score": np.random.normal(60, 15, 1000).clip(0, 100),
    "liquidity_usd": np.random.lognormal(10, 2, 1000),
    "holder_count": np.random.poisson(500, 1000),
    "safety_score": np.random.beta(5, 2, 1000)
}

baseline_predictions = np.random.normal(65, 20, 1000).clip(0, 100)

# Create baseline
baseline = Baseline(
    features=baseline_features,
    predictions=baseline_predictions.tolist()
)

# Save baseline for future use
baseline_path = Path("artifacts/baselines/gem_scanner_baseline.json")
baseline_path.parent.mkdir(parents=True, exist_ok=True)
baseline.save(str(baseline_path))

print(f"✅ Baseline created and saved to {baseline_path}")
print(f"   Features: {list(baseline_features.keys())}")
print(f"   Samples: {len(baseline_predictions)}")

In [None]:
# Detect Feature Drift
print("\n📊 Testing Feature Drift Detection\n")

# Scenario 1: No drift - similar distribution
print("1️⃣ Scenario: Normal market conditions (no drift)")
normal_features = {
    "gem_score": np.random.normal(60, 15, 200).clip(0, 100),
    "liquidity_usd": np.random.lognormal(10, 2, 200),
    "holder_count": np.random.poisson(500, 200),
    "safety_score": np.random.beta(5, 2, 200)
}

drift_report_1 = drift_monitor.detect_feature_drift(baseline, normal_features)

print(f"\n   Drift detected: {drift_report_1.drift_detected}")
print("   Feature results:")
for feature, result in drift_report_1.feature_drift.items():
    print(f"   - {feature}:")
    print(f"     KS statistic: {result.ks_statistic:.3f} (p={result.ks_p_value:.3f})")
    print(f"     PSI: {result.psi:.3f}")
    print(f"     Drift: {'⚠️ YES' if result.drift_detected else '✅ NO'}")

# Scenario 2: Drift detected - shifted distribution
print("\n\n2️⃣ Scenario: Market shift detected (drift expected)")
drifted_features = {
    "gem_score": np.random.normal(40, 15, 200).clip(0, 100),  # Lower mean
    "liquidity_usd": np.random.lognormal(8, 2, 200),  # Lower liquidity
    "holder_count": np.random.poisson(300, 200),  # Fewer holders
    "safety_score": np.random.beta(2, 5, 200)  # Lower safety
}

drift_report_2 = drift_monitor.detect_feature_drift(baseline, drifted_features)

print(f"\n   Drift detected: {drift_report_2.drift_detected}")
print("   Feature results:")
for feature, result in drift_report_2.feature_drift.items():
    print(f"   - {feature}:")
    print(f"     KS statistic: {result.ks_statistic:.3f} (p={result.ks_p_value:.3f})")
    print(f"     PSI: {result.psi:.3f}")
    print(f"     Drift: {'⚠️ YES' if result.drift_detected else '✅ NO'}")

In [None]:
# Detect Prediction Drift
print("\n🎯 Testing Prediction Drift Detection\n")

# Normal predictions (similar to baseline)
normal_predictions = np.random.normal(65, 20, 200).clip(0, 100)

pred_report_1 = drift_monitor.detect_prediction_drift(baseline, normal_predictions.tolist())

print("1️⃣ Normal predictions:")
print(f"   Drift detected: {pred_report_1.drift_detected}")
print(f"   KS statistic: {pred_report_1.prediction_drift.ks_statistic:.3f}")
print(f"   KS p-value: {pred_report_1.prediction_drift.ks_p_value:.3f}")
print(f"   PSI: {pred_report_1.prediction_drift.psi:.3f}")

# Drifted predictions (different distribution)
drifted_predictions = np.random.normal(45, 25, 200).clip(0, 100)  # Lower mean, higher variance

pred_report_2 = drift_monitor.detect_prediction_drift(baseline, drifted_predictions.tolist())

print("\n2️⃣ Drifted predictions:")
print(f"   Drift detected: {pred_report_2.drift_detected}")
print(f"   KS statistic: {pred_report_2.prediction_drift.ks_statistic:.3f}")
print(f"   KS p-value: {pred_report_2.prediction_drift.ks_p_value:.3f}")
print(f"   PSI: {pred_report_2.prediction_drift.psi:.3f}")

# Statistical interpretation
if pred_report_2.drift_detected:
    print("\n   ⚠️ DRIFT ALERT:")
    print("   - Model predictions have shifted significantly")
    print("   - Possible causes: market regime change, data quality issues")
    print("   - Action: Review recent predictions, retrain model if needed")

In [None]:
# Comprehensive Drift Report with Visualization
print("\n📈 Comprehensive Drift Analysis\n")

import matplotlib.pyplot as plt

# Full drift detection
full_report = drift_monitor.detect_drift(
    baseline=baseline,
    current_features=drifted_features,
    current_predictions=drifted_predictions.tolist()
)

# Print summary
print("=" * 60)
print(f"DRIFT REPORT - {full_report.timestamp}")
print("=" * 60)
print(f"\nOverall Drift Detected: {full_report.drift_detected}")
print(f"Total Features Checked: {len(full_report.feature_drift)}")
print(f"Features with Drift: {sum(1 for r in full_report.feature_drift.values() if r.drift_detected)}")

# Detailed breakdown
print("\n" + "-" * 60)
print("FEATURE DRIFT DETAILS")
print("-" * 60)
for feature, result in full_report.feature_drift.items():
    status = "🔴 DRIFT" if result.drift_detected else "🟢 STABLE"
    print(f"\n{feature}: {status}")
    print(f"  KS Test: stat={result.ks_statistic:.3f}, p={result.ks_p_value:.4f}")
    print(f"  PSI: {result.psi:.3f} ({'HIGH' if result.psi > 0.2 else 'MEDIUM' if result.psi > 0.1 else 'LOW'})")

print("\n" + "-" * 60)
print("PREDICTION DRIFT")
print("-" * 60)
pred = full_report.prediction_drift
status = "🔴 DRIFT" if pred.drift_detected else "🟢 STABLE"
print(f"Status: {status}")
print(f"KS Test: stat={pred.ks_statistic:.3f}, p={pred.ks_p_value:.4f}")
print(f"PSI: {pred.psi:.3f}")

# Visualization
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle('Drift Detection Analysis', fontsize=16, fontweight='bold')

# Plot 1: KS Statistics
features = list(full_report.feature_drift.keys())
ks_stats = [full_report.feature_drift[f].ks_statistic for f in features]
colors = ['red' if full_report.feature_drift[f].drift_detected else 'green' for f in features]

axes[0, 0].bar(range(len(features)), ks_stats, color=colors, alpha=0.7)
axes[0, 0].axhline(y=0.1, color='orange', linestyle='--', label='Threshold')
axes[0, 0].set_xticks(range(len(features)))
axes[0, 0].set_xticklabels(features, rotation=45, ha='right')
axes[0, 0].set_ylabel('KS Statistic')
axes[0, 0].set_title('Kolmogorov-Smirnov Test Results')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# Plot 2: PSI Scores
psi_scores = [full_report.feature_drift[f].psi for f in features]
axes[0, 1].bar(range(len(features)), psi_scores, color=colors, alpha=0.7)
axes[0, 1].axhline(y=0.1, color='yellow', linestyle='--', label='Low threshold')
axes[0, 1].axhline(y=0.2, color='orange', linestyle='--', label='High threshold')
axes[0, 1].set_xticks(range(len(features)))
axes[0, 1].set_xticklabels(features, rotation=45, ha='right')
axes[0, 1].set_ylabel('PSI Score')
axes[0, 1].set_title('Population Stability Index')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# Plot 3: Distribution comparison (gem_score)
axes[1, 0].hist(baseline.features['gem_score'], bins=30, alpha=0.5, label='Baseline', color='blue')
axes[1, 0].hist(drifted_features['gem_score'], bins=30, alpha=0.5, label='Current', color='red')
axes[1, 0].set_xlabel('GemScore')
axes[1, 0].set_ylabel('Frequency')
axes[1, 0].set_title('Feature Distribution: gem_score')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

# Plot 4: Prediction distribution
axes[1, 1].hist(baseline.predictions, bins=30, alpha=0.5, label='Baseline', color='blue')
axes[1, 1].hist(drifted_predictions, bins=30, alpha=0.5, label='Current', color='red')
axes[1, 1].set_xlabel('Prediction Value')
axes[1, 1].set_ylabel('Frequency')
axes[1, 1].set_title('Prediction Distribution')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("\n✅ Drift analysis complete with visualizations")