# Ray RLlib Training for Zelda Oracle of Seasons

This notebook deploys a Ray cluster on OpenShift/Kubernetes and submits a distributed training job.

Based on the Double Dragon KubeRay implementation.


## ⚠️ RBAC Setup Required

**Before running this notebook**, you need to grant RBAC permissions to the service account.

Run these commands in a terminal:

```bash
cd /Users/cnuland/hello-chris-rl-llm-zelda

# Apply RBAC permissions
oc apply -f ops/openshift/rbac.yaml

# Verify permissions
oc auth can-i list rayclusters --as=system:serviceaccount:zelda-hybrid-rl-llm:zelda-rl-training -n zelda-hybrid-rl-llm
oc auth can-i create rayclusters --as=system:serviceaccount:zelda-hybrid-rl-llm:zelda-rl-training -n zelda-hybrid-rl-llm
```

All should return `yes` ✅

**Then proceed with the cells below.**


In [None]:
!pip install codeflare-sdk


In [None]:
# Updated imports for newer codeflare_sdk versions
from codeflare_sdk.cluster.cluster import Cluster, ClusterConfiguration
from codeflare_sdk.cluster.auth import TokenAuthentication
from ray.job_submission import JobSubmissionClient, JobStatus
import os
import time
import subprocess


In [None]:
# Authenticate with OpenShift
# Get your token: oc whoami -t
# Get your server: oc cluster-info

auth = TokenAuthentication(
    token = 'YOUR_TOKEN_HERE',  # Replace with: oc whoami -t
    server = 'YOUR_SERVER_HERE',  # Replace with: oc cluster-info
    skip_tls=False
)
auth.login()


In [None]:
# First, let's check what parameters ClusterConfiguration actually accepts
import inspect
sig = inspect.signature(ClusterConfiguration.__init__)
print("Available ClusterConfiguration parameters:")
for param_name, param in sig.parameters.items():
    if param_name != 'self':
        default = param.default if param.default != inspect.Parameter.empty else "REQUIRED"
        print(f"  {param_name}: {default}")


In [None]:
# Configure Ray Cluster
# 🚀 MAXIMIZING ALL CLUSTER RESOURCES!
# 🎮 Utilize all 6 GPU nodes + 2 CPU nodes for massive parallel training
# ⚡ High-performance configuration for production-scale training

cluster = Cluster(ClusterConfiguration(
    name='zelda-rl',
    namespace='zelda-hybrid-rl-llm',
    num_workers=7,      # 🚀 MAXIMIZE: Use all 6 GPU nodes + 1 large CPU node
    
    # 🚀 HIGH-PERFORMANCE: Maximize CPU/memory per node (g5.2xlarge capacity)
    min_cpus=6,         # 🚀 Request 6 CPUs per pod (80% of g5.2xlarge)
    max_cpus=7,         # 🚀 Allow up to 7 CPUs (90% of g5.2xlarge)
    min_memory=24,      # 🚀 Request 24GB RAM per pod (80% of g5.2xlarge)
    max_memory=28,      # 🚀 Allow up to 28GB RAM (90% of g5.2xlarge)
    
    # No direct GPU usage - preserve GPUs for LLM inference
    num_gpus=0,
    
    # ✅ Using your existing DD image
    image="quay.io/cnuland/dd-kuberay-worker:latest",
))

print(f"🚀 HIGH-PERFORMANCE CLUSTER CONFIGURATION:")
print(f"   Name: {cluster.config.name}")
print(f"   Namespace: {cluster.config.namespace}")
print(f"   Workers: {cluster.config.num_workers}")
print(f"   CPUs per pod: {cluster.config.min_cpus}-{cluster.config.max_cpus}")
print(f"   Memory per pod: {cluster.config.min_memory}-{cluster.config.max_memory} GB")
print(f"   Image: {cluster.config.image}")
print(f"   Total pods: {1 + cluster.config.num_workers} (1 head + {cluster.config.num_workers} workers)")
print(f"   Total resources: {(1 + cluster.config.num_workers) * cluster.config.min_cpus}-{(1 + cluster.config.num_workers) * cluster.config.max_cpus} CPUs, {(1 + cluster.config.num_workers) * cluster.config.min_memory}-{(1 + cluster.config.num_workers) * cluster.config.max_memory} GB RAM")
print(f"   Parallel environments: {cluster.config.num_workers * 12} (12 per worker)")
print(f"\n🚀 MAXIMUM RESOURCE UTILIZATION:")
print(f"   - Utilizes all 6 GPU nodes + 1 large CPU node")
print(f"   - ~{(1 + cluster.config.num_workers) * cluster.config.min_cpus} CPU cores total")
print(f"   - ~{(1 + cluster.config.num_workers) * cluster.config.min_memory}GB RAM total")
print(f"   - Preserves GPUs for LLM inference workloads")
print(f"✅ Kueue queues (zelda-ray-queue → ray-cluster-queue) are ready!")

# Ensure Kueue LocalQueue exists for this namespace
print("\n📝 Setting up Kueue LocalQueue for Ray cluster scheduling...")
try:
    !oc apply -f ops/openshift/zelda-localqueue.yaml
    print("✅ Kueue LocalQueue configured successfully!")
except Exception as e:
    print(f"⚠️  Warning: Could not configure Kueue LocalQueue: {e}")
    print("   This may cause Ray cluster creation to fail.")


In [None]:
# Create the Ray cluster (or connect to existing)
from codeflare_sdk.cluster.cluster import CodeFlareClusterStatus

print("🚀 Creating/connecting to Ray cluster...")
print("\n📝 Note: If cluster creation fails due to Kueue validation,")
print("   we'll add the required queue label after creation.")

try:
    cluster.up()
except Exception as e:
    print(f"⚠️  Error creating cluster: {e}")
    if 'queue-name' in str(e):
        print("\n🔧 Applying Kueue queue label to RayCluster...")
        # Apply the queue label to the RayCluster
        !oc label raycluster {cluster.config.name} -n {cluster.config.namespace} kueue.x-k8s.io/queue-name=zelda-ray-queue --overwrite
        print("✅ Kueue queue label applied. Cluster should start now.")
    else:
        raise e

# Check status immediately
print("\n📊 Checking cluster status...")
try:
    status_info = cluster.status()
    cluster_state = status_info[0] if isinstance(status_info, tuple) else status_info
except ValueError as e:
    if 'suspended' in str(e):
        print("⚠️  Ray cluster is suspended by Kueue but may be operational")
        # Check if Ray cluster actually exists and has running pods
        ray_status = !oc get rayclusters {cluster.config.name} -n {cluster.config.namespace} --no-headers
        if ray_status and len(ray_status) > 0:
            print("✅ Ray cluster exists, checking pods...")
            ray_pods = !oc get pods -n {cluster.config.namespace} -l ray.io/cluster={cluster.config.name} --no-headers
            if ray_pods and any('Running' in pod for pod in ray_pods):
                print("✅ Ray cluster has running pods - proceeding with job submission")
                cluster_state = "READY_SUSPENDED"  # Custom state
            else:
                print("❌ No running Ray pods found")
                cluster_state = "UNKNOWN"
        else:
            print("❌ Ray cluster not found")
            cluster_state = "UNKNOWN"
    else:
        raise e

print(f"Cluster state: {cluster_state}")

# Only wait if cluster is not already active
if cluster_state == CodeFlareClusterStatus.READY:
    print("✅ Cluster is already READY!")
elif cluster_state in [CodeFlareClusterStatus.STARTING, CodeFlareClusterStatus.UNKNOWN]:
    try:
        print("\n⏳ Waiting for cluster to be ready (max 10 minutes)...")
        cluster.wait_ready(timeout=600)
        print("✅ Cluster is ready!")
    except Exception as e:
        print(f"⚠️ Error waiting for cluster: {e}")
        print("\n📋 Cluster details:")
        print(cluster.details())
else:
    print(f"✅ Cluster status: {cluster_state}")
    
# Show final cluster details
print("\n" + "="*60)
print("📊 FINAL CLUSTER STATUS:")
print("="*60)
cluster.details()


In [None]:
# Diagnostic: Check cluster status and image pull issues
import subprocess

print("🔍 DIAGNOSTIC: Checking Ray cluster deployment...\n")

try:
    # Check RayCluster resource
    print("1️⃣ RayCluster resource:")
    result = subprocess.run(
        ["oc", "get", "raycluster", "-n", "zelda-hybrid-rl-llm"],
        capture_output=True, text=True, timeout=10
    )
    print(result.stdout if result.returncode == 0 else result.stderr)
    
    # Check pods
    print("\n2️⃣ Pods in namespace:")
    result2 = subprocess.run(
        ["oc", "get", "pods", "-n", "zelda-hybrid-rl-llm"],
        capture_output=True, text=True, timeout=10
    )
    print(result2.stdout if result2.returncode == 0 else result2.stderr)
    
    # Check for image pull errors in events
    print("\n3️⃣ Recent events (looking for ImagePullBackOff errors):")
    result3 = subprocess.run(
        ["oc", "get", "events", "-n", "zelda-hybrid-rl-llm", 
         "--sort-by=.lastTimestamp", "--field-selector=type=Warning"],
        capture_output=True, text=True, timeout=10
    )
    events = result3.stdout if result3.returncode == 0 else result3.stderr
    print(events if events.strip() else "No warning events found")
    
    # Check image pull secrets
    print("\n4️⃣ Image pull secrets in namespace:")
    result4 = subprocess.run(
        ["oc", "get", "secrets", "-n", "zelda-hybrid-rl-llm", 
         "-o", "jsonpath={.items[?(@.type==\"kubernetes.io/dockerconfigjson\")].metadata.name}"],
        capture_output=True, text=True, timeout=10
    )
    secrets = result4.stdout if result4.returncode == 0 else result4.stderr
    print(secrets if secrets.strip() else "⚠️  No dockerconfigjson secrets found!")
    
except Exception as e:
    print(f"❌ Error running diagnostic commands: {e}")

print("\n" + "="*60)
print("💡 IMAGE PULL FIX:")
print("="*60)
print("If you see 'ImagePullBackOff' errors, you need to:")
print("1. Check if quay.io/cnuland/dd-kuberay-worker:latest exists")
print("2. If private, create image pull secret:")
print("   oc create secret docker-registry quay-pull-secret \\")
print("     --docker-server=quay.io \\")
print("     --docker-username=YOUR_USERNAME \\")
print("     --docker-password=YOUR_PASSWORD \\")
print("     -n zelda-hybrid-rl-llm")
print("3. Then add to ClusterConfiguration:")
print("   image_pull_secrets=['quay-pull-secret']")


In [None]:
# 🖥️ HUD DASHBOARD - ALREADY DEPLOYED
# The HUD provides real-time visualization of training progress
# - Real-time training metrics and progress tracking
# - Shows current vision analysis from LLM
# - Displays epoch, episode, rewards, and performance metrics

print("🖥️ Checking HUD Dashboard status...")

# Check if HUD is already running
hud_status = !oc get pods -n zelda-hybrid-rl-llm -l app=zelda-hud --no-headers
if hud_status and 'Running' in hud_status[0]:
    print("✅ HUD Dashboard is already running!")
else:
    print("⚠️  HUD Dashboard not found. Deploying...")
    !oc apply -f ops/openshift/hud-deployment.yaml
    !oc wait --for=condition=ready pod -l app=zelda-hud -n zelda-hybrid-rl-llm --timeout=120s

# Get HUD route URL
print("\n🌐 Getting HUD Dashboard URL...")
hud_route = !oc get route zelda-hud-route -n zelda-hybrid-rl-llm -o jsonpath='{.spec.host}'
hud_url = f"https://{hud_route[0]}" if hud_route and hud_route[0] else "Route not found"

print(f"\n✅ HUD Dashboard ready!")
print(f"📊 External Dashboard URL: {hud_url}")
print(f"🔌 Internal Service URL: http://zelda-hud-service.zelda-hybrid-rl-llm.svc.cluster.local:8086")
print(f"\n💡 Open the external dashboard URL in your browser to watch training!")
print(f"   Ray training jobs will automatically connect to the internal service URL.")


In [None]:
# 🔄 RESET HUD SESSION
# Clear any stale HUD sessions before starting new training jobs
# This prevents "409 Conflict" errors when restarting jobs
# Run this cell AFTER deploying HUD and BEFORE starting Ray jobs

import requests

print("🔄 Resetting HUD session for new training job...")
hud_internal_url = "http://zelda-hud-service.zelda-hybrid-rl-llm.svc.cluster.local:8086"

try:
    response = requests.post(f"{hud_internal_url}/api/reset_session", timeout=5)
    if response.status_code == 200:
        data = response.json()
        prev = data.get('previous_session')
        if prev:
            print(f"✅ HUD session cleared: {prev}")
        else:
            print(f"✅ HUD ready (no previous session)")
    else:
        print(f"⚠️  Reset request returned: {response.status_code}")
except Exception as e:
    print(f"⚠️  Could not reset HUD (not critical): {e}")

print("🎯 HUD is ready for new training job!")


In [None]:
# Get cluster details and submit job
from ray.job_submission import JobSubmissionClient

clusterDetails = cluster.details()
print(f"🌐 External Ray Dashboard URL: {clusterDetails.dashboard}")
print("   (Use this URL in your browser to view the dashboard)")

# Create Ray Job Submission Client
# IMPORTANT: Use INTERNAL service URL (not external route) to avoid 403 Forbidden
# The external HTTPS route requires authentication, but internal service doesn't
ray_cluster_uri = "ray://zelda-rl-head-svc.zelda-hybrid-rl-llm.svc:10001"
ray_dashboard_internal_url = "http://zelda-rl-head-svc.zelda-hybrid-rl-llm.svc.cluster.local:8265"

print(f"\n📡 Ray Cluster URI: {ray_cluster_uri}")
print(f"🔌 Connecting to internal dashboard: {ray_dashboard_internal_url}")

# Create client using the INTERNAL dashboard URL
client = JobSubmissionClient(ray_dashboard_internal_url)

# Configure environment variables
# 🚀 HIGH-PERFORMANCE SCALING CONFIGURATION
env_vars = {
    # S3/MinIO Storage (for model checkpoints and training results)
    'S3_ACCESS_KEY_ID': 'admin',                    # MinIO access key
    'S3_SECRET_ACCESS_KEY': 'zelda-rl-minio-2024',  # MinIO secret key
    'S3_REGION_NAME': 'us-east-1',                  # MinIO region
    'S3_ENDPOINT_URL': 'https://minio-api-route-minio-system.apps.rosa.rosa-58cx6.acrs.p3.openshiftapps.com',
    'S3_BUCKET_NAME': 'zelda-rl-checkpoints',       # MinIO bucket for checkpoints
    
    # HUD Dashboard (for real-time visualization - using deployed service)
    'HUD_URL': 'http://zelda-hud-service.zelda-hybrid-rl-llm.svc.cluster.local:8086',
    
    # LLM endpoint - Llama4Scout via AWS ELB (vision model needs /v1/chat/completions)
    # NOTE: Host header routes to llama4scout - do NOT include /llama4scout/ in URL path!
    'LLM_ENDPOINT': 'http://a48d81637dbfe42908405beb02605516-1010388078.us-east-2.elb.amazonaws.com/v1/chat/completions',
    'LLM_HOST_HEADER': 'llama4scout.llm-d.local',  # Required Host header for routing
    # Alternative: Disable LLM vision by setting empty endpoint
    # 'LLM_ENDPOINT': '',  # Uncomment to disable LLM and run pure PPO
    
    # ROM path (relative to working_dir)
    'ROM_PATH': 'roms/zelda_oracle_of_seasons.gbc',
    
    # Config paths
    'ENV_CONFIG': 'configs/env.yaml',
    'VISION_PROMPT_CONFIG': 'configs/vision_prompt.yaml',
    
    # 🚀 HIGH-PERFORMANCE CLUSTER SCALING (maximizes all resources)
    'RAY_WORKERS': '84',              # 🚀 Scale to massive parallelism (7 Ray workers * 12 envs each)
    'ENVS_PER_WORKER': '12',          # 🚀 12 parallel environments per worker
    'EPISODE_LENGTH': '61440',        # 🚀 Extended episodes (2048 * 30)
    'BATCH_SIZE': '32768',            # 🚀 Large batch size for distributed training
}

# Submit training job
# ROM files will be downloaded from S3 'roms' bucket at startup
submission_id = client.submit_job(
    entrypoint="bash start_ray_training.sh",  # Downloads ROM from S3 then starts training
    runtime_env={
        "env_vars": env_vars,
        'working_dir': './',  # Uploads our Zelda code, configs, and start_ray_training.sh
        'pip': ['boto3', 'pyboy>=2.6.0', 'Pillow>=9.0.0', 'PyYAML>=6.0', 'requests>=2.31.0'],  # Added vision LLM dependencies
        "excludes": [
            "*.ipynb", "*.md", "__pycache__", "*.pyc", 
            "checkpoints/*", "training_runs/*", "strategic_test_results/*",
            ".git/*", "tmp/*", "notebooks/*", "examples/*",
            "HUD/templates/*", "HUD/static/*", "HUD/Containerfile"
            # NOTE: Not excluding *.sh - we need start_ray_training.sh!
            # NOTE: Including HUD/hud_client.py for workers to connect to HUD server
        ],
    }
)

print(f"\n✅ Job submitted successfully!")
print(f"📋 Submission ID: {submission_id}")
print(f"🌐 Monitor at: {clusterDetails.dashboard}")
print(f"\n💡 TIP: Open the dashboard URL in your browser to watch training in real-time!")


In [None]:
# Monitor job status
from ray.job_submission import JobStatus
import time

print("🔍 Monitoring job status...")
print("(This will check every 30 seconds until completion)")
print()

while True:
    status = client.get_job_status(submission_id)
    info = client.get_job_info(submission_id)
    
    print(f"[{time.strftime('%H:%M:%S')}] Job status: {status}")
    
    if status in [JobStatus.SUCCEEDED, JobStatus.FAILED, JobStatus.STOPPED]:
        print(f"\n{'='*60}")
        print(f"📊 Job completed with status: {status}")
        print(f"{'='*60}\n")
        
        # Get final logs
        logs = client.get_job_logs(submission_id)
        print("📝 Final job logs:")
        print(logs[-5000:] if len(logs) > 5000 else logs)  # Last 5000 chars
        break
    
    time.sleep(30)  # Check every 30 seconds


In [None]:
# Get job logs (anytime)
logs = client.get_job_logs(submission_id)
print(logs)


In [None]:
# 🗑️ CLEANUP: Delete the Ray cluster (only run when completely done!)
# 
# WARNING: This will delete the entire Ray cluster!
# - All running training jobs will be stopped
# - All pods will be terminated
# - The RayCluster resource will be deleted
#
# Uncomment the line below to delete:
# cluster.down()

# To check if cluster was deleted:
# !oc get raycluster -n zelda-hybrid-rl-llm
