In [2]:
!pip install websockets

Collecting websockets
  Downloading websockets-14.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Downloading websockets-14.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (168 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/168.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━[0m [32m163.8/168.2 kB[0m [31m5.7 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m168.2/168.2 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: websockets
Successfully installed websockets-14.1


In [5]:
import asyncio
import websockets
import json
import nest_asyncio

nest_asyncio.apply()

WORKER_NODES = {}

# Register workers
async def register_worker(websocket):
    try:
        async for message in websocket:
            data = json.loads(message)
            if data["type"] == "register":
                task_type = data["task_type"]
                WORKER_NODES[task_type] = websocket
                print(f"Worker registered for task type: {task_type}")
    except websockets.ConnectionClosed:
        print("Worker disconnected.")

# Assign tasks
async def assign_tasks(task_sequence):
    results = []
    for task in task_sequence:
        task_type = task[0]  # e.g., 'a' from 'a1'
        worker = WORKER_NODES.get(task_type)
        if worker:
            try:
                await worker.send(json.dumps({"type": "task", "task": task}))
                result = await worker.recv()
                print(f"Result received: {result}")
                results.append(result)
            except websockets.ConnectionClosed:
                print(f"Worker for {task_type} disconnected.")
        else:
            print(f"No worker for task type {task_type}")
    return results

async def wait_for_workers(timeout=10):
    """Wait for workers to register."""
    for _ in range(timeout):
        if WORKER_NODES:
            return True
        print("Waiting for workers to register...")
        await asyncio.sleep(1)
    return False

async def master_main():
    print("Master started, waiting for workers...")
    async with websockets.serve(register_worker, "0.0.0.0", 8000):  # Bind to all interfaces
        workers_ready = await wait_for_workers(timeout=10)
        if not workers_ready:
            print("No workers registered. Exiting...")
            return

        task_sequence = ["a1", "a2", "c1", "b1", "c2", "a3", "b2", "b3", "a4", "c3", "b4"]
        results = await assign_tasks(task_sequence)
        print("All tasks completed. Results:", results)

await master_main()


Master started, waiting for workers...
Waiting for workers to register...
Waiting for workers to register...
Waiting for workers to register...
Waiting for workers to register...
Waiting for workers to register...
Waiting for workers to register...
Waiting for workers to register...
Waiting for workers to register...
Waiting for workers to register...
Waiting for workers to register...
No workers registered. Exiting...


In [None]:
import asyncio
import websockets
import json
import nest_asyncio

nest_asyncio.apply()

async def worker_main(task_type, server_ip, server_port=8000):
    uri = f"ws://{server_ip}:{server_port}"
    try:
        async with websockets.connect(uri) as websocket:
            # Register the worker
            registration_message = {"type": "register", "task_type": task_type}
            await websocket.send(json.dumps(registration_message))
            print(f"Worker registered for task type: {task_type}")

            # Handle tasks
            async for message in websocket:
                data = json.loads(message)
                if data["type"] == "task":
                    task = data["task"]
                    print(f"Executing task: {task}")
                    result = f"{task}_completed"
                    await websocket.send(result)
    except Exception as e:
        print(f"Error in worker: {e}")

# Input server details
server_ip = input("Enter the server IP (default 127.0.0.1): ").strip() or "127.0.0.1"
server_port_input = input("Enter the server port (default 8000): ").strip()
server_port = 8000  # Default port
if server_port_input:
    try:
        server_port = int(server_port_input)
    except ValueError:
        print("Invalid port number! Using default port 8000.")

task_type = input("Enter task type (e.g., a, b, c): ").strip()

# Run the worker
await worker_main(task_type, server_ip, server_port)