# HeadySoul + ATLAS GPU Node — Mission Scoring & Semantic Intelligence

```
╔══════════════════════════════════════════════════════════════════╗
║  ∞ SACRED GEOMETRY ∞  Organic Systems · Breathing Interfaces    ║
║  NODE: HeadySoul + ATLAS Combined (Colab Pro+ GPU)              ║
║  PURPOSE: Mission alignment scoring + semantic search + docs    ║
╚══════════════════════════════════════════════════════════════════╝
```

**Combined capabilities (2-in-1 GPU node):**
- HeadySoul: Mission alignment scoring via semantic similarity
- HeadySoul: Strategy filtering for Monte Carlo engine
- ATLAS: Embedding generation (mpnet + MiniLM)
- ATLAS: Semantic search across documents
- ATLAS: Code documentation & summarization

**Branded domains only:** headysystems.com | headycloud.com | headyconnection.com

In [None]:
# Cell 1: Install dependencies + verify GPU
!pip install -q sentence-transformers fastapi uvicorn pyngrok pyyaml httpx scikit-learn aiohttp nest_asyncio

import torch
print(f'PyTorch: {torch.__version__}')
print(f'CUDA available: {torch.cuda.is_available()}')
if torch.cuda.is_available():
    print(f'GPU: {torch.cuda.get_device_name(0)}')
    print(f'GPU Memory: {torch.cuda.get_device_properties(0).total_mem / 1e9:.1f} GB')
else:
    print('WARNING: No GPU detected — running on CPU (slower)')

In [None]:
# Cell 2: Configuration
import os

HEADY_CONFIG = {
    'node_id': 'soul-atlas-gpu',
    'node_role': 'soul-scorer-archivist',
    'port': 5000,
    'cloud_layers': {
        'headysystems': 'https://headyio.com',
        'headyme': 'https://headycloud.com',
        'headyconnection': 'https://headyconnection.com',
    },
    'registration_endpoints': [
        'https://headyio.com/api/soul/colab/connect',
        'https://headycloud.com/api/soul/colab/connect',
        'https://headyconnection.com/api/soul/colab/connect',
        'https://headyio.com/api/nodes/register',
        'https://headycloud.com/api/nodes/register',
        'https://headyconnection.com/api/nodes/register',
    ],
    'api_key': os.environ.get('HEADY_API_KEY', ''),
    'capabilities': ['soul_scoring', 'embeddings', 'semantic_search', 'documentation', 'summarize'],
    'heartbeat_interval_sec': 30,
}

# HeadySoul mission value weights (must match heady-soul.yaml)
VALUE_WEIGHTS = {
    'access': 0.30,
    'fairness': 0.25,
    'intelligence': 0.20,
    'happiness': 0.15,
    'redistribution': 0.10,
}

THRESHOLDS = {
    'veto': 40,
    'escalate': 60,
    'auto_approve': 75,
}

print('HeadySoul + ATLAS combined node configured')
print(f'Values: {VALUE_WEIGHTS}')
print(f'Capabilities: {HEADY_CONFIG["capabilities"]}')

In [None]:
# Cell 3: Load ALL models on GPU (shared across HeadySoul + ATLAS)
from sentence_transformers import SentenceTransformer
from transformers import pipeline as hf_pipeline
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
import json, time
from datetime import datetime

device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Shared embedding model (used by both HeadySoul and ATLAS)
print('Loading shared embedding model (all-MiniLM-L6-v2)...')
model_minilm = SentenceTransformer('all-MiniLM-L6-v2', device=device)

# ATLAS additional models
print('Loading ATLAS mpnet model (all-mpnet-base-v2)...')
model_mpnet = SentenceTransformer('sentence-transformers/all-mpnet-base-v2', device=device)

print('Loading ATLAS large model (all-MiniLM-L12-v2)...')
model_large = SentenceTransformer('sentence-transformers/all-MiniLM-L12-v2', device=device)

print('Loading summarizer (bart-large-cnn)...')
summarizer = hf_pipeline('summarization', model='facebook/bart-large-cnn', device=0 if torch.cuda.is_available() else -1, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32)

# Pre-compute HeadySoul mission dimension embeddings
MISSION_TEXTS = {
    'access': 'expand access to underserved users free nonprofit students PPP pricing purchasing power parity inclusive education community low-income global south multilingual offline mobile-first open-source',
    'fairness': 'no lock-in no extraction co-ownership open transparent ethical consent privacy fair equitable democratic honest trust accountability no dark patterns portable exportable',
    'intelligence': 'AI learning self-improving adaptive intelligent optimization pattern recognition prediction analysis insight autonomous self-correcting evolution neural network machine learning',
    'happiness': 'user joy delight satisfaction UX user experience beautiful intuitive simple fast responsive pleasant friendly helpful supportive empowering wellbeing positive',
    'redistribution': 'wealth redistribution revenue sharing donation profit sharing cooperative mutual aid social impact giveback scholarship subsidy pay-what-you-can sliding scale fair compensation',
}
SACRED_GEOMETRY_TEXT = 'organic breathing deterministic self-correcting fractal natural rhythmic renewal healing coherent harmonic sacred geometry golden ratio fibonacci'

MISSION_VECTORS = {dim: model_minilm.encode(text) for dim, text in MISSION_TEXTS.items()}
SG_VECTOR = model_minilm.encode(SACRED_GEOMETRY_TEXT)

print(f'\nAll models loaded on {device}')
print(f'Mission embeddings: {len(MISSION_VECTORS)} dimensions + Sacred Geometry')
if torch.cuda.is_available():
    print(f'GPU Memory used: {torch.cuda.memory_allocated(0) / 1e9:.2f} GB')

In [None]:
# Cell 4: HeadySoul scoring functions

def score_task(task):
    """Score task alignment with mission values using GPU-accelerated semantic similarity"""
    task_text = f"{task.get('title', '')} {task.get('description', '')} {task.get('type', '')} {json.dumps(task.get('metadata', {}))}"
    task_vec = model_minilm.encode(task_text)

    breakdown = {}
    for dim, mission_vec in MISSION_VECTORS.items():
        sim = float(cosine_similarity([task_vec], [mission_vec])[0][0])
        breakdown[dim] = round(max(0, min(100, sim * 100)), 2)

    sg_sim = float(cosine_similarity([task_vec], [SG_VECTOR])[0][0])
    sacred_geometry = round(max(0, min(1, sg_sim)), 4)

    total = sum(breakdown[dim] * VALUE_WEIGHTS[dim] for dim in breakdown)
    total += sacred_geometry * 0.15 * 100
    total = round(min(100, total), 2)

    return {
        'total': total,
        'breakdown': breakdown,
        'sacred_geometry': sacred_geometry,
        'veto': total < THRESHOLDS['veto'],
        'escalate': total >= THRESHOLDS['veto'] and total < THRESHOLDS['escalate'],
        'auto_approve': total >= THRESHOLDS['auto_approve'],
        'scored_by': 'colab_gpu',
        'model': 'all-MiniLM-L6-v2',
        'device': device,
        'timestamp': datetime.now().isoformat(),
    }

def score_strategies(strategies, context=''):
    """Batch score strategies for Monte Carlo filtering"""
    results = []
    for s in strategies:
        desc = f"{s.get('name', '')} {json.dumps(s.get('steps', []))} {context}"
        score = score_task({'description': desc, 'type': 'strategy'})
        results.append({'strategy_id': s.get('id'), 'alignment': score['total'], 'breakdown': score['breakdown']})
    return results

# ATLAS functions
def do_embeddings(payload):
    """Generate embeddings for texts"""
    m = model_large if payload.get('large_model') else model_mpnet
    texts = payload.get('texts', [])
    e = m.encode(texts, batch_size=64, device=device, convert_to_numpy=True)
    return {'embeddings': e.tolist(), 'dimensions': e.shape[1], 'count': len(texts)}

def do_search(payload):
    """Semantic search across documents"""
    q = model_mpnet.encode([payload['query']], device=device)
    docs = payload.get('documents', [])
    d = model_mpnet.encode(docs, batch_size=64, device=device)
    s = cosine_similarity(q, d)[0]
    top_k = payload.get('top_k', 5)
    top = s.argsort()[-top_k:][::-1]
    return {
        'query': payload['query'],
        'results': [{'document': docs[i], 'score': float(s[i]), 'rank': r+1} for r, i in enumerate(top)]
    }

def do_docs(payload):
    """Generate documentation for code"""
    code = payload.get('code', '')
    lang = payload.get('language', 'python')
    if len(code) > 500:
        return {'documentation': summarizer(code, max_length=150, min_length=50, do_sample=False)[0]['summary_text']}
    return {'documentation': f'```{lang}\n{code}\n```'}

def do_summarize(payload):
    """Summarize text"""
    s = summarizer(payload['text'], max_length=payload.get('max_length', 150), min_length=payload.get('min_length', 50), do_sample=False)
    return {'summary': s[0]['summary_text'], 'compression_ratio': len(s[0]['summary_text']) / len(payload['text'])}

# Quick test
test_results = [
    ('PPP pricing for nonprofits', score_task({'title': 'Add PPP pricing for nonprofits', 'description': 'Purchasing power parity tiers for underserved communities', 'metadata': {'targetUsers': ['free', 'nonprofit']}})),
    ('Dark pattern engagement hack', score_task({'title': 'Add viral growth hack', 'description': 'Dark pattern to increase engagement and lock users in'})),
]

print('=== HeadySoul Scoring Test ===')
for name, result in test_results:
    status = 'VETO' if result['veto'] else 'ESCALATE' if result['escalate'] else 'APPROVED'
    print(f'  {name}: {result["total"]:.1f}/100 [{status}]')

print('\n=== ATLAS Embedding Test ===')
test_embed = do_embeddings({'texts': ['test sentence']})
print(f'  Dimensions: {test_embed["dimensions"]}, Count: {test_embed["count"]}')

In [None]:
# Cell 5: Combined FastAPI server (HeadySoul + ATLAS endpoints)
from fastapi import FastAPI, Request, HTTPException, Header
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Optional, Dict, Any, List
import uvicorn
import threading
import asyncio
import httpx
import aiohttp

app = FastAPI(title='HeadySoul + ATLAS GPU Node', version='2.0.0')
node_state = {
    'status': 'initializing',
    'tasks_completed': 0,
    'tasks_failed': 0,
    'uptime_start': datetime.now().isoformat(),
    'current_tasks': [],
    'registered_with': [],
}

class HeadyTask(BaseModel):
    task_id: str
    task_type: str
    payload: Dict[str, Any]
    priority: str = 'P1'
    source_cloud: str = 'unknown'

# ─── Health ─────────────────────────────────────────
@app.get('/health')
async def health():
    gpu_mem = torch.cuda.memory_allocated(0) / 1e9 if torch.cuda.is_available() else 0
    return {
        'status': node_state['status'],
        'node_id': HEADY_CONFIG['node_id'],
        'node_role': HEADY_CONFIG['node_role'],
        'combined_nodes': ['heady-soul', 'atlas'],
        'gpu': torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'cpu',
        'gpu_memory_used_gb': round(gpu_mem, 2),
        'models': ['all-MiniLM-L6-v2', 'all-mpnet-base-v2', 'all-MiniLM-L12-v2', 'bart-large-cnn'],
        'capabilities': HEADY_CONFIG['capabilities'],
        'dimensions': list(VALUE_WEIGHTS.keys()),
        'tasks_completed': node_state['tasks_completed'],
        'current_load': len(node_state['current_tasks']),
    }

# ─── HeadySoul Endpoints ────────────────────────────
@app.post('/api/soul/evaluate')
async def evaluate_task(request: Request):
    task = await request.json()
    return score_task(task)

@app.post('/api/soul/filter-strategies')
async def filter_strategies(request: Request):
    data = await request.json()
    strategies = data.get('strategies', [])
    context = data.get('context', '')
    min_score = data.get('minAlignmentScore', THRESHOLDS['escalate'])
    scored = score_strategies(strategies, context)
    approved = [s for s in scored if s['alignment'] >= min_score]
    rejected = [s for s in scored if s['alignment'] < min_score]
    return {'approved': approved, 'rejected': rejected, 'total': len(strategies), 'passed': len(approved)}

@app.get('/api/soul/values')
async def get_values():
    return {'weights': VALUE_WEIGHTS, 'thresholds': THRESHOLDS}

# ─── ATLAS Endpoints ───────────────────────────────
@app.post('/api/atlas/embeddings')
async def api_embeddings(request: Request):
    data = await request.json()
    return do_embeddings(data)

@app.post('/api/atlas/search')
async def api_search(request: Request):
    data = await request.json()
    return do_search(data)

@app.post('/api/atlas/docs')
async def api_docs(request: Request):
    data = await request.json()
    return do_docs(data)

@app.post('/api/atlas/summarize')
async def api_summarize(request: Request):
    data = await request.json()
    return do_summarize(data)

# ─── Universal Task Executor ───────────────────────
@app.post('/task')
async def execute_task_atlas(task: HeadyTask, authorization: Optional[str] = Header(None)):
    start = datetime.now()
    try:
        node_state['current_tasks'].append(task.task_id)
        handlers = {
            'generate_embeddings': do_embeddings,
            'semantic_search': do_search,
            'documentation': do_docs,
            'summarize': do_summarize,
            'soul_evaluate': lambda p: score_task(p),
            'soul_filter': lambda p: {'results': score_strategies(p.get('strategies', []), p.get('context', ''))},
        }
        handler = handlers.get(task.task_type)
        if not handler:
            raise ValueError(f'Unknown task type: {task.task_type}')
        result = handler(task.payload)
        node_state['tasks_completed'] += 1
        node_state['current_tasks'].remove(task.task_id)
        return {'task_id': task.task_id, 'status': 'success', 'result': result,
                'execution_time_ms': (datetime.now() - start).total_seconds() * 1000, 'gpu_accelerated': True}
    except Exception as e:
        node_state['tasks_failed'] += 1
        if task.task_id in node_state['current_tasks']:
            node_state['current_tasks'].remove(task.task_id)
        return {'task_id': task.task_id, 'status': 'failed', 'error': str(e)}

@app.post('/api/tasks/execute')
async def execute_task_generic(request: Request):
    data = await request.json()
    task_type = data.get('type', 'soul_evaluate')
    payload = data.get('payload', data)
    try:
        if task_type in ('soul_evaluate', 'soul_scoring'):
            result = score_task(payload)
        elif task_type == 'soul_filter':
            result = {'results': score_strategies(payload.get('strategies', []), payload.get('context', ''))}
        elif task_type in ('generate_embeddings', 'embeddings'):
            result = do_embeddings(payload)
        elif task_type == 'semantic_search':
            result = do_search(payload)
        elif task_type == 'documentation':
            result = do_docs(payload)
        elif task_type == 'summarize':
            result = do_summarize(payload)
        else:
            result = score_task(payload)
        return {'success': True, 'node_id': HEADY_CONFIG['node_id'], 'task_type': task_type, 'result': result}
    except Exception as e:
        return {'success': False, 'error': str(e), 'node_id': HEADY_CONFIG['node_id']}

print('Combined FastAPI ready with endpoints:')
print('  HeadySoul: /api/soul/evaluate, /api/soul/filter-strategies, /api/soul/values')
print('  ATLAS: /api/atlas/embeddings, /api/atlas/search, /api/atlas/docs, /api/atlas/summarize')
print('  Universal: /health, /task, /api/tasks/execute')

In [None]:
# Cell 6: ngrok tunnel + auto-register with all HeadyCloud layers
from pyngrok import ngrok, conf
import nest_asyncio
nest_asyncio.apply()

# Authenticate ngrok
conf.get_default().auth_token = "39ZBirdUD63xgta7yN7OFZpE84m_3QZyJTDno1b8Yhv9Nfy8s"

# Start ngrok tunnel
public_url = ngrok.connect(HEADY_CONFIG['port']).public_url
print(f'HeadySoul+ATLAS GPU Node live at: {public_url}')

PUBLIC_URL = public_url

async def register_with_clouds():
    """Register this combined node with all HeadyCloud layers"""
    reg_data = {
        'node_id': HEADY_CONFIG['node_id'],
        'node_type': 'ai-node',
        'node_role': HEADY_CONFIG['node_role'],
        'cloud_layer': 'colab_pro_plus',
        'url': PUBLIC_URL,
        'primary_tool': 'soul-scoring-and-autodoc',
        'triggers': ['soul', 'evaluate', 'score', 'embeddings', 'semantic_search', 'documentation', 'summarize'],
        'capabilities': {
            'gpu': torch.cuda.is_available(),
            'gpu_type': torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'none',
            'combined_nodes': ['heady-soul', 'atlas'],
            'primary_functions': ['score_task', 'filter_strategies', 'generate_embeddings', 'semantic_search', 'documentation', 'summarize'],
        },
    }
    async with httpx.AsyncClient(timeout=15) as client:
        for endpoint in HEADY_CONFIG['registration_endpoints']:
            try:
                resp = await client.post(endpoint, json=reg_data)
                if resp.status_code == 200:
                    node_state['registered_with'].append(endpoint)
                    print(f'  Registered with {endpoint}: {resp.status_code}')
                else:
                    print(f'  Registration pending: {endpoint} ({resp.status_code})')
            except Exception as e:
                print(f'  Could not reach {endpoint}: {e}')
    node_state['status'] = 'active' if node_state['registered_with'] else 'standalone'

async def heartbeat_loop():
    """Send heartbeats every 30s to all cloud layers"""
    while True:
        await asyncio.sleep(HEADY_CONFIG['heartbeat_interval_sec'])
        async with httpx.AsyncClient(timeout=10) as client:
            for layer, url in HEADY_CONFIG['cloud_layers'].items():
                try:
                    await client.post(f'{url}/api/nodes/heartbeat', json={
                        'node_id': HEADY_CONFIG['node_id'],
                        'status': node_state['status'],
                        'url': PUBLIC_URL,
                        'metrics': {
                            'gpu': torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'cpu',
                            'gpu_memory_used': torch.cuda.memory_allocated(0) / 1e9 if torch.cuda.is_available() else 0,
                            'combined_nodes': ['heady-soul', 'atlas'],
                            'tasks_completed': node_state['tasks_completed'],
                            'current_load': len(node_state['current_tasks']),
                        }
                    })
                except:
                    pass

# Register and start heartbeats
asyncio.run(register_with_clouds())
threading.Thread(target=lambda: asyncio.run(heartbeat_loop()), daemon=True).start()
print(f'HeadySoul+ATLAS node live at {PUBLIC_URL} — accepting scoring & search requests')
# Colab VM bind address for uvicorn (dual-stack all-interfaces)
uvicorn.run(app, host='::', port=HEADY_CONFIG['port'])