Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions migrations/001_initial.sql
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,15 @@ BEGIN
SELECT pgmq.pop(queue_name, timeout) INTO msg;
RETURN msg;
END;
$$;

-- pgmq_delete(queue_name, msg_id)
CREATE OR REPLACE FUNCTION pgmq_delete(queue_name text, msg_id bigint)
RETURNS boolean
LANGUAGE plpgsql
AS $$
BEGIN
PERFORM pgmq.delete(queue_name, msg_id);
RETURN TRUE;
END;
$$;
1 change: 1 addition & 0 deletions netengine/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from netengine.core.state import RuntimeState
from netengine.core.supabase_client import get_supabase
from netengine.handlers.app_handler import AppHandler
from netengine.handlers.dns import DNSHandler
from netengine.handlers.docker_handler import DockerHandler
from netengine.handlers.oidc_handler import OIDCHandler
from netengine.handlers.pki_handler import PKIHandler
Expand Down
221 changes: 77 additions & 144 deletions netengine/core/orchestrator.py
Original file line number Diff line number Diff line change
@@ -1,162 +1,95 @@
import asyncio
import logging
from dataclasses import dataclass
from typing import Any, Dict
from typing import Any, Dict, List, Type

from netengine.core.state import RuntimeState
from netengine.handlers._base import BasePhaseHandler
from netengine.handlers.context import PhaseContext
from netengine.handlers.dns import DNSHandler
from netengine.handlers.docker_handler import DockerHandler
from netengine.handlers.phase_pki import PKIPhaseHandler
from netengine.handlers.pki_handler import PKIHandler
from netengine.handlers.substrate import SubstrateHandler
from netengine.phases.phase_ands import ANDsPhaseHandler
from netengine.phases.phase_inworld_identity import InWorldIdentityPhaseHandler
from netengine.phases.phase_platform_identity import PlatformIdentityPhaseHandler
from netengine.phases.phase_registries import RegistriesPhaseHandler
from netengine.phases.phase_services import ServicesPhaseHandler

logger = logging.getLogger(__name__)

phase_handlers = [
DNSHandler(), # phases 1-2
PKIPhaseHandler(), # phase 3
InWorldIdentityPhaseHandler(), # phase 4
# ... more phases later
]


@dataclass
class PhaseContext:
state: RuntimeState
docker: DockerHandler
dns: DNSHandler
# Other handlers will be added later
spec: Dict[str, Any] # loaded YAML spec


class Orchestrator:
"""Phase orchestration for NetEngine bootstrap.

Executes phases 0-8 in sequence with proper dependency tracking,
error handling, and state persistence.
"""

# Phase registry: (phase_number, handler_class)
PHASE_HANDLERS: List[tuple[int, Type[BasePhaseHandler]]] = [
(0, SubstrateHandler),
(1, DNSHandler),
(2, DNSHandler), # DNS phases 1-2 in same handler
(3, PKIPhaseHandler),
(4, PlatformIdentityPhaseHandler),
(5, RegistriesPhaseHandler),
(6, InWorldIdentityPhaseHandler),
(7, ANDsPhaseHandler),
(8, ServicesPhaseHandler),
]

def __init__(self, spec: Dict[str, Any]):
"""Initialize orchestrator with spec.

Args:
spec: Loaded YAML specification with all phase configs
"""
self.spec = spec
self.state = RuntimeState.load()
self.docker = DockerHandler()
self.dns = DNSHandler(self.docker, self.state)
self.context = PhaseContext(state=self.state, docker=self.docker, dns=self.dns, spec=spec)
self.phases = [
self.phase_0_substrate,
self.phase_1_dns_root,
self.phase_2_dns_hierarchy,
self.phase_3_pki, # M2
self.phase_5_registries,
self.phase_6_inworld_identity,
self.phase_7_ands,
self.phase_8_services,
]

async def run(self):
for i, phase_func in enumerate(self.phases):
phase_name = phase_func.__name__
if self.state.phase_completed.get(str(i), False):
logger.info(f"Phase {i} ({phase_name}) already completed, skipping.")
self.runtime_state = RuntimeState.load()
self.context = PhaseContext(
spec=spec,
runtime_state=self.runtime_state,
logger=logger,
)

async def execute_phases(self, up_to_phase: int = 8) -> None:
"""Execute phases 0 through up_to_phase.

Args:
up_to_phase: Highest phase number to execute (default 8, all phases)

Raises:
RuntimeError: If any phase fails or dependency validation fails
"""
for phase_num, handler_class in self.PHASE_HANDLERS:
if phase_num > up_to_phase:
break

# Instantiate handler
handler = handler_class()

# Check if should skip (already executed)
if await handler.should_skip(self.context):
logger.info(
f"Phase {phase_num}: {handler_class.__name__} already completed, skipping"
)
self.runtime_state.phase_completed[str(phase_num)] = True
continue
logger.info(f"Running Phase {i}: {phase_name}")

logger.info(f"Phase {phase_num}: {handler_class.__name__} starting")
try:
await phase_func()
self.state.phase_completed[str(i)] = True
self.state.save()
logger.info(f"Phase {i} completed successfully.")
except Exception as e:
logger.error(f"Phase {i} failed: {e}")
raise
# Execute phase
await handler.execute(self.context)

# --- Phase 0: Substrate (stub, assume already implemented) ---
async def phase_0_substrate(self):
# M0 already built, so we just ensure networks exist.
# For demonstration, we'll create the core network if missing.
# In real code, this would use the spec to create networks.
pass

# --- Phase 1: DNS Root (stub) ---
async def phase_1_dns_root(self):
# Already implemented in M1.
pass

# --- Phase 2: DNS Hierarchy (stub) ---
async def phase_2_dns_hierarchy(self):
# Already implemented in M1.
pass

# --- Phase 3: PKI + ACME (M2) ---
async def phase_3_pki(self):
pki = PKIHandler(self.docker, self.state)
await pki.bootstrap()
# Register DNS record for ca.platform.internal
await self.dns.add_zone_record(
zone="platform.internal", record_type="A", name="ca", value=pki.ca_ip, ttl=300
)
# Optionally, ensure platform zone exists
await self.dns.ensure_zone(
"platform.internal", "ns1.platform.internal.", "ns1.platform.internal."
)
# Healthcheck
if not await handler.healthcheck(self.context):
raise RuntimeError(f"Phase {phase_num} healthcheck failed")

# Mark complete
self.runtime_state.phase_completed[str(phase_num)] = True
self.runtime_state.save()
logger.info(f"Phase {phase_num} completed successfully")

async def phase_4_platform_identity(context):
# 1. Ensure Supabase migrations run (idempotent)
from netengine.utils.run_migrations import apply_migrations

await apply_migrations(context.supabase)

# 2. Start Keycloak container
from netengine.handlers.oidc_handler import OIDCHandler

oidc = OIDCHandler(context.state, context.supabase)

# Get cert from PKI handler for auth.platform.internal
pki = PKIHandler(context.docker, context.state)
cert, key = await pki.issue_cert(common_name="auth.platform.internal", sans=[])

# Start container (use docker_handler)
await context.docker.start_container(
name="netengine_keycloak_platform",
image="quay.io/keycloak/keycloak:23.0.7",
command=["start"],
volumes={...}, # mount cert/key
network="core",
ip="10.0.0.7",
environment={
"KC_HOSTNAME": "auth.platform.internal",
"KC_HTTPS_CERTIFICATE_FILE": "/certs/tls.crt",
"KC_HTTPS_CERTIFICATE_KEY_FILE": "/certs/tls.key",
"KC_BOOTSTRAP_ADMIN_USERNAME": "admin",
"KC_BOOTSTRAP_ADMIN_PASSWORD": context.bootstrap_admin_password,
},
)

# 3. Healthcheck
# wait for /health/ready

# 4. Register DNS
await context.dns.add_zone_record("platform.internal", "A", "auth", "10.0.0.7", 300)

# 5. Bootstrap realm (via Admin API)
# Need to get an admin token. Use the bootstrap admin credentials.
await oidc.create_platform_realm()

# 6. Create OIDC scopes (if needed)
# 7. Update state
context.state.phase_completed["4"] = True
await context.state.save()


async def phase_3_pki(context):
pki = PKIHandler(context.docker, context.state)
# 1. Generate CA (if not already generated)
if not context.state.ca_cert_pem:
await pki.generate_root_ca()
# 2. Start step-ca server
await pki.start_ca_server()
# 3. Healthcheck
if not await pki.healthcheck():
raise RuntimeError("step-ca not responding")
# 4. Register DNS record for ca.platform.internal
dns = DNSHandler(context.docker, context.state)
await dns.add_zone_record(
zone="platform.internal", record_type="A", name="ca", value=pki.ca_ip, ttl=300
)
# 5. Update state
context.state.phase_completed["3"] = True
await context.state.save()
except Exception as e:
logger.error(f"Phase {phase_num} failed: {e}")
self.runtime_state.error = str(e)
self.runtime_state.save()
raise
1 change: 1 addition & 0 deletions netengine/handlers/and_handler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import ipaddress
from typing import Any, Dict

from netengine.core.pgmq_client import PGMQClient
Expand Down
6 changes: 3 additions & 3 deletions netengine/handlers/phase_pki.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ async def healthcheck(self, context: PhaseContext) -> bool:

async def should_skip(self, context: PhaseContext) -> bool:
"""Skip if already bootstrapped."""
return context.runtime_state.get("pki_bootstrapped", False)
return context.runtime_state.phase_completed.get("3", False)

async def _emit_event(self, context, event_type, payload):
event = EventEnvelope.create(
event_type=event_type,
emitted_by="pki_phase",
payload=payload,
correlation_id=context.runtime_state.correlation_id,
parent_event_id=context.runtime_state.parent_event_id,
correlation_id=getattr(context.runtime_state, "correlation_id", None),
parent_event_id=getattr(context.runtime_state, "parent_event_id", None),
)
context.logger.info(f"Event emitted: {event_type}")
# In M4+ you would queue to pgmq
2 changes: 1 addition & 1 deletion netengine/phases/phase_ands.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
from datetime import datetime

from netengine.handlers import BasePhaseHandler
from netengine.handlers._base import BasePhaseHandler
from netengine.handlers.and_handler import ANDHandler
from netengine.handlers.context import PhaseContext
from netengine.handlers.docker_handler import DockerHandler
Expand Down
48 changes: 47 additions & 1 deletion netengine/phases/phase_platform_identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from netengine.handlers.dns import DNSHandler
from netengine.handlers.docker_handler import DockerHandler
from netengine.handlers.oidc_handler import OIDCHandler
from netengine.handlers.pki_handler import PKIHandler
from netengine.utils.run_migrations import apply_migrations


Expand All @@ -23,7 +24,7 @@ async def execute(self, context: PhaseContext) -> None:
await apply_migrations()

# 2. Generate or retrieve bootstrap admin password for Keycloak
admin_password = context.runtime_state.get("bootstrap_admin_password")
admin_password = getattr(context.runtime_state, "bootstrap_admin_password", None)
if not admin_password:
admin_password = secrets.token_urlsafe(16)
context.runtime_state.bootstrap_admin_password = admin_password
Expand Down Expand Up @@ -89,6 +90,51 @@ async def execute(self, context: PhaseContext) -> None:

logger.info("Phase 4 complete: platform identity bootstrapped")

async def healthcheck(self, context: PhaseContext) -> bool:
"""Check if Keycloak platform is ready."""
import asyncio

import aiohttp

try:
# Check if container ID is set
container_id = getattr(context.runtime_state, "keycloak_platform_container_id", None)
if not container_id:
return False

# Check container is running
docker = DockerHandler()
try:
container = docker.client.containers.get(container_id)
if container.status != "running":
return False
except Exception:
return False

# Check OIDC discovery endpoint
ssl_context = __import__("ssl").create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = __import__("ssl").CERT_NONE

async with aiohttp.ClientSession() as session:
try:
async with session.get(
"https://auth.platform.internal/.well-known/openid-configuration",
ssl=ssl_context,
timeout=aiohttp.ClientTimeout(total=5),
) as resp:
return resp.status == 200
except asyncio.TimeoutError:
return False
except aiohttp.ClientError:
return False
except Exception:
return False

async def should_skip(self, context: PhaseContext) -> bool:
"""Skip if Phase 4 already completed."""
return context.runtime_state.phase_completed.get("4", False)

async def _wait_for_keycloak(self, url: str, timeout: int = 60):
import asyncio

Expand Down
1 change: 1 addition & 0 deletions netengine/phases/phase_registries.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import json
from datetime import datetime

from netengine.core.pgmq_client import PGMQClient
Expand Down
Loading
Loading