# ü¶Ö Ostrich-Legs: Colab Worker Node
Run the cells below in sequence (or click **Runtime > Run all**) to connect this Colab instance to your compute swarm.

In [None]:
# 1. INSTALL DEPENDENCIES
!pip install python-socketio[client] requests torch numpy --quiet


In [None]:
# 2. IMPORTS & HARDWARE DETECTION
import sys, os, time, uuid, threading, multiprocessing
import numpy as np
import socketio

HAS_GPU = False
try:
    import torch
    if torch.cuda.is_available():
        HAS_GPU = True
        print(f"üöÄ GPU DETECTED: {torch.cuda.get_device_name(0)}")
    else:
        print("‚ö†Ô∏è GPU not found. Falling back to CPU compute.")
except ImportError:
    print("‚ö†Ô∏è PyTorch not installed. Falling back to CPU compute.")


In [None]:
# 3. SWARM CONFIGURATION
# @markdown Enter your server details and swarm invite code below:

SERVER_URL = "https://your-tunnel-url.trycloudflare.com" # @param {type:"string"}
JOIN_CODE = "" # @param {type:"string"}
DEVICE_NAME = "Colab-Titan-Node" # @param {type:"string"}

# --- IDENTITY MANAGEMENT ---
id_file = "device_identity.txt"
if os.path.exists(id_file):
    with open(id_file, "r") as f:
        DEVICE_ID = f.read().strip()
else:
    DEVICE_ID = f"node-colab-{str(uuid.uuid4())[:8]}"
    with open(id_file, "w") as f:
        f.write(DEVICE_ID)

print(f"üÜî Identity Loaded: {DEVICE_ID} ({DEVICE_NAME})")


In [None]:
# 4. KERNELS & SOCKET PROTOCOLS
sio = socketio.Client(reconnection=True, reconnection_delay=5)
is_working = False
last_work_time = time.time()

def run_stress_test(iterations):
    start = time.time()
    count = int(iterations or 100000)
    x = np.random.rand(int(count/100))
    np.sin(x) * np.sqrt(x)
    return float(time.time() - start)

def run_matrix_mul(data):
    start = time.time()
    size = int(data.get('size', 1024))
    if HAS_GPU:
        a = torch.rand(size, size, device='cuda')
        b = torch.rand(size, size, device='cuda')
        torch.matmul(a, b)
        torch.cuda.synchronize()
    else:
        a = np.random.rand(size, size)
        b = np.random.rand(size, size)
        np.dot(a, b)
    return float(time.time() - start)

def process_job(job):
    global last_work_time
    if not sio.connected: return False

    job_id = job['id']
    job_type = job['type']
    
    try:
        duration = 0
        if job_type == 'MATH_STRESS':
            duration = run_stress_test(job.get('data', {}).get('iterations', 100000))
        elif job_type == 'MAT_MUL':
            duration = run_matrix_mul(job.get('data', {}))

        last_work_time = time.time()
        sio.emit('job:complete', {
            'chunkId': job_id,
            'workerId': DEVICE_ID,
            'result': 'SUCCESS',
            'durationMs': duration * 1000
        })
        return True
    except Exception as e:
        print(f"\n‚ùå Job Error: {e}")
        sio.emit('job:complete', {
            'chunkId': job_id,
            'workerId': DEVICE_ID,
            'error': str(e)
        })
        return False

@sio.event
def connect():
    print(f"\nüü¢ Linked to Swarm! Registering as {DEVICE_NAME}...")
    sio.emit('device:register', {
        'name': DEVICE_NAME,
        'capabilities': {
            'cpuCores': multiprocessing.cpu_count(),
            'memoryGB': 16,
            'gpuAvailable': HAS_GPU,
            'gpuName': torch.cuda.get_device_name(0) if HAS_GPU else 'None'
        }
    })

@sio.on('job:batch')
def on_batch(jobs):
    global is_working
    is_working = True
    print(f"\rüì¶ Processing Batch: {len(jobs)} jobs | GPU: {'ON' if HAS_GPU else 'OFF'}", end="")
    for job in jobs:
        if not sio.connected: break
        process_job(job)
    is_working = False
    if sio.connected:
        sio.emit('job:request_batch')

@sio.on('cmd:run_benchmark')
def on_benchmark():
    print("\nüöÄ Running Operations Benchmark...", end="")
    try:
        start = time.time()
        if HAS_GPU:
            a = torch.rand(2000, 2000, device='cuda')
            b = torch.rand(2000, 2000, device='cuda')
            torch.matmul(a, b)
            torch.cuda.synchronize()
            duration = time.time() - start
            score = int(5000000000 / (duration + 0.00001))
        else:
            run_stress_test(5000000)
            duration = time.time() - start
            score = int(5000000 / (duration + 0.00001))
            
        print(f" Score: {score:,} OPS/s")
        sio.emit('benchmark:result', {'score': score})
    except Exception as e:
        print(f" Benchmark Failed: {e}")

@sio.event
def disconnect():
    print("\n‚≠ï Connection lost. Reconnecting...")

def poller_loop():
    while True:
        try:
            if sio.connected and not is_working and (time.time() - last_work_time > 1.0):
                sio.emit('job:request_batch')
            if sio.connected:
                sio.emit('heartbeat', {'lastInteraction': time.time() * 1000})
            time.sleep(2.0)
        except:
            pass


In [None]:
# 5. EXECUTION LAYER
def main():
    print("\nüåê Booting Ostrich-Legs Colab Worker...")
    
    # Format Auth Query per Phase 2 Server Spec
    auth_url = f"{SERVER_URL}?persistentId={DEVICE_ID}"
    
    t = threading.Thread(target=poller_loop, daemon=True)
    t.start()

    while True:
        try:
            if not sio.connected:
                print(f"üîå Dialing Master at {SERVER_URL}...")
                # Send the join code token in the auth payload
                sio.connect(
                    auth_url, 
                    auth={'token': JOIN_CODE}, 
                    transports=['websocket', 'polling']
                )
                sio.wait()
        except KeyboardInterrupt:
            print("\nüõë Graceful Shutdown Initiated.")
            if sio.connected: sio.disconnect()
            break
        except Exception as e:
            print(f"\n‚ö†Ô∏è Connection Retry Error: {e}")
            time.sleep(5)

if __name__ == '__main__':
    main()
