In [2]:
from dotenv import load_dotenv
import os

path = os.getenv("HLLSETS_PATH")
print("HLLSETS_PATH: ", path)

# Import the module from sgs_core
import sys
from pathlib import Path

import hdf5

# Add the sgs_core directory to the Python path
sys.path.append(str(Path.cwd() / "sgs_core"))
# sys.path.index(os.path.abspath(path))

# Import the meta_algebra module
import meta_algebra
# import u_controller 

# Test a method from meta_algebra
hll = meta_algebra.HllSet()
result = hll.count()  # Replace 'some_method' with the actual method name
print(result)

HLLSETS_PATH:  /home/alexmy/SGS/SGS.ai/sgs_core/HllSets/src/HllSets.jl
1


In [5]:
# demo_triangulation.ipynb

# Cell 1: Setup and Imports
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from collections import defaultdict
import random
# from hll_wrapper import HLLSetWrapper, TokenGenerator
from triangulation import SemanticTriangulation

In [None]:
# Set up plotting
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

print("🚀 Multiple Triangulation Methods Demo")
print("======================================")

# Cell 2: Initialize Components
print("Initializing HLLSet wrapper and token generator...")

hll_wrapper = HLLSetWrapper(p=10)
token_gen = TokenGenerator(vocab_size=500)
triangulation = SemanticTriangulation(hll_wrapper, num_seeds=6)

# Get categorized tokens for meaningful demonstration
categories = token_gen.get_token_categories()
all_tokens = [token for category_tokens in categories.values() for token in category_tokens]

print(f"Generated {len(all_tokens)} tokens across {len(categories)} categories")
print("Categories:", list(categories.keys()))

# Cell 3: Create Test Scenario
print("\n🎯 Creating Test Scenario")
print("=========================")

# Select a target set of tokens (what we're trying to recover)
target_tokens = random.sample(all_tokens, 20)
print(f"Target tokens ({len(target_tokens)}): {target_tokens[:5]}...")

# Create candidate pool (includes target tokens + noise)
candidate_pool = target_tokens + random.sample(
    [t for t in all_tokens if t not in target_tokens], 80
)
random.shuffle(candidate_pool)

print(f"Candidate pool: {len(candidate_pool)} tokens")
print(f"Signal-to-noise ratio: {len(target_tokens)}:{len(candidate_pool)-len(target_tokens)}")

# Cell 4: Generate Multi-Seed Observations
print("\n📡 Generating Multi-Seed Observations")
print("====================================")

observations = triangulation.create_multi_seed_observations(target_tokens)
print(f"Created observations using {len(observations)} seeds")

# Show observation characteristics
for i, (seed, obs) in enumerate(observations.items()):
    print(f"Seed {i+1}: ID={obs['id'][:8]}..., Cardinality={obs['cardinality']}")

# Cell 5: Basic Triangulation
print("\n🔍 Basic Triangulation (Intersection Method)")
print("===========================================")

basic_result = triangulation.basic_triangulation(observations, candidate_pool)
print(f"Basic triangulation found {len(basic_result)} tokens")

# Calculate precision and recall
true_positives = len(basic_result.intersection(target_tokens))
false_positives = len(basic_result - set(target_tokens))
false_negatives = len(set(target_tokens) - basic_result)

precision = true_positives / len(basic_result) if basic_result else 0
recall = true_positives / len(target_tokens) if target_tokens else 0

print(f"Precision: {precision:.2%}")
print(f"Recall: {recall:.2%}")
print(f"F1-score: {2 * precision * recall / (precision + recall):.2%}" if (precision + recall) > 0 else "N/A")

# Cell 6: Weighted Triangulation
print("\n⚖️ Weighted Triangulation")
print("=======================")

# Assign higher weights to first few seeds (simulating better "satellites")
seed_weights = {seed: 1.0 + 0.5 * i for i, seed in enumerate(observations.keys())}
weighted_scores = triangulation.weighted_triangulation(observations, candidate_pool, seed_weights)

# Get top candidates
threshold = 0.7
weighted_candidates = [token for token, score in weighted_scores.items() if score >= threshold]

print(f"Weighted triangulation found {len(weighted_candidates)} candidates (score ≥ {threshold})")

# Calculate metrics for weighted approach
weighted_precision = len(set(weighted_candidates) & set(target_tokens)) / len(weighted_candidates) if weighted_candidates else 0
weighted_recall = len(set(weighted_candidates) & set(target_tokens)) / len(target_tokens)

print(f"Precision: {weighted_precision:.2%}")
print(f"Recall: {weighted_recall:.2%}")

# Cell 7: Progressive Triangulation
print("\n🔄 Progressive Triangulation")
print("===========================")

progressive_result = triangulation.progressive_triangulation(observations, candidate_pool)

print(f"Final result: {len(progressive_result['final_candidates'])} tokens")
print(f"Seeds used: {len(progressive_result['seeds_used'])}")
print(f"Convergence achieved at iteration {len(progressive_result['convergence_history'])}")

# Plot convergence
history = progressive_result['convergence_history']
iterations = [h['iteration'] for h in history]
candidate_sizes = [h['candidate_size'] for h in history]
confidences = [h['confidence'] for h in history]

plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(iterations, candidate_sizes, 'bo-', linewidth=2)
plt.xlabel('Iteration')
plt.ylabel('Candidate Set Size')
plt.title('Progressive Disambiguation')
plt.grid(True)

plt.subplot(1, 2, 2)
plt.plot(iterations, confidences, 'ro-', linewidth=2)
plt.xlabel('Iteration')
plt.ylabel('Confidence')
plt.title('Confidence Progression')
plt.grid(True)

plt.tight_layout()
plt.show()

# Cell 8: Bayesian Triangulation
print("\n🎲 Bayesian Triangulation")
print("=======================")

# Create non-uniform priors (some tokens more likely)
priors = {token: 0.5 + random.random() * 0.5 for token in candidate_pool}  # 0.5-1.0 range
# Normalize
total_prior = sum(priors.values())
priors = {k: v/total_prior for k, v in priors.items()}

bayesian_probs = triangulation.bayesian_triangulation(observations, candidate_pool, priors)

# Get high probability tokens
prob_threshold = 0.1
high_prob_tokens = [token for token, prob in bayesian_probs.items() if prob >= prob_threshold]

print(f"Bayesian approach found {len(high_prob_tokens)} high-probability tokens (P ≥ {prob_threshold})")

# Show top 5 tokens by probability
top_tokens = sorted(bayesian_probs.items(), key=lambda x: x[1], reverse=True)[:5]
print("Top 5 tokens by probability:")
for token, prob in top_tokens:
    status = "✓" if token in target_tokens else "✗"
    print(f"  {status} {token}: {prob:.3f}")

# Cell 9: Robust Triangulation
print("\n🛡️ Robust Triangulation (Outlier Detection)")
print("==========================================")

# Simulate a noisy observation by adding an outlier
noisy_observations = observations.copy()
outlier_seed = max(observations.keys()) + 1
noisy_observations[outlier_seed] = hll_wrapper.create_set(random.sample(all_tokens, 15), seed=outlier_seed)

robust_result = triangulation.robust_triangulation(noisy_observations, candidate_pool)

print(f"Robust method detected {len(robust_result['outlier_seeds'])} outlier seeds")
print(f"Final tokens: {len(robust_result['tokens'])}")
print(f"Method used: {robust_result['method']}")

# Compare with non-robust approach on noisy data
non_robust_result = triangulation.basic_triangulation(noisy_observations, candidate_pool)
print(f"Non-robust approach found {len(non_robust_result)} tokens")

# Cell 10: Comparative Analysis
print("\n📊 Comparative Analysis")
print("======================")

methods = {
    "Basic": basic_result,
    "Weighted": set(weighted_candidates),
    "Progressive": progressive_result['final_candidates'],
    "Bayesian": set(high_prob_tokens),
    "Robust": robust_result['tokens']
}

# Calculate metrics for each method
comparison_data = []
for method_name, result_tokens in methods.items():
    true_pos = len(result_tokens & set(target_tokens))
    false_pos = len(result_tokens - set(target_tokens))
    false_neg = len(set(target_tokens) - result_tokens)
    
    precision = true_pos / len(result_tokens) if result_tokens else 0
    recall = true_pos / len(target_tokens) if target_tokens else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
    
    comparison_data.append({
        'Method': method_name,
        'Precision': precision,
        'Recall': recall,
        'F1-Score': f1,
        'Tokens Found': len(result_tokens),
        'True Positives': true_pos
    })

# Display comparison table
import pandas as pd
df_comparison = pd.DataFrame(comparison_data)
print(df_comparison.round(3))

# Plot comparison
plt.figure(figsize=(10, 6))
x_pos = np.arange(len(methods))
width = 0.25

plt.bar(x_pos - width, df_comparison['Precision'], width, label='Precision', alpha=0.8)
plt.bar(x_pos, df_comparison['Recall'], width, label='Recall', alpha=0.8)
plt.bar(x_pos + width, df_comparison['F1-Score'], width, label='F1-Score', alpha=0.8)

plt.xlabel('Method')
plt.ylabel('Score')
plt.title('Triangulation Method Comparison')
plt.xticks(x_pos, df_comparison['Method'])
plt.legend()
plt.grid(True, alpha=0.3)
plt.ylim(0, 1)
plt.tight_layout()
plt.show()

# Cell 11: Military Triangulation Analogy
print("\n🎖️ Military Triangulation Analogy")
print("================================")

print("""
Geo-Location Concept → Semantic Disambiguation
------------------------------------------------
Satellites           → Hash seeds
Signal measurements  → Bit position observations  
Triangulation        → Set intersection
Error ellipses       → Candidate token sets
GPS accuracy         → Disambiguation precision
Signal noise         → Hash collisions

Key Insight: Just as military GPS uses multiple satellites to 
triangulate position, we use multiple hash seeds to triangulate 
token identity through consensus across independent observations.
""")

# Demonstrate the "semantic GPS" concept
print("Semantic GPS Demonstration:")
print(f"• Target tokens: {len(target_tokens)} 'emitters' to locate")
print(f"• Satellite seeds: {triangulation.num_seeds} independent observers")  
print(f"• Candidate area: {len(candidate_pool)} possible locations")
print(f"• Best precision: {df_comparison['Precision'].max():.1%}")

# Cell 12: Conclusion and Insights
print("\n💡 Key Insights")
print("==============")

print("""
1. **Multiple Seeds Essential**: Single hash seeds suffer from ambiguity, 
   but multiple seeds provide independent "votes" for disambiguation.

2. **Progressive Refinement**: Adding seeds sequentially dramatically 
   reduces candidate set size, similar to GPS getting more satellite fixes.

3. **Robustness Matters**: Real-world data has noise; robust methods 
   that detect and ignore outliers perform better.

4. **Prior Knowledge Helps**: Bayesian methods leverage domain knowledge 
   to improve disambiguation when available.

5. **Military Analogy Holds**: The triangulation concept from geolocation 
   directly applies to semantic disambiguation with remarkable similarity.
""")

print("🎯 Demo Complete! The multiple triangulation approach successfully")
print("transforms HLLSet ambiguity from a weakness into a powerful")
print("disambiguation mechanism through consensus across hash seeds.")