In [1]:
import ray
import torch
import time
import os

def get_gpu_names():
    gpu_names = []
    if not ray.is_initialized():
        ray.init(address='auto', ignore_reinit_error=True)
    nodes = ray.nodes()
    for node in nodes:
        node_name = node['NodeName']
        num_gpus = node.get('Resources', {}).get('GPU', 0)
        num_gpus = int(num_gpus)
        if num_gpus > 0:
            for i in range(num_gpus):
                gpu_name = f"{node_name}_GPU{i}"
                gpu_names.append(gpu_name)
    return gpu_names

def get_custom_gpu_names():
    # check if ray is initialized
    if not ray.is_initialized():
        ray.init(address='auto', ignore_reinit_error=True)
    # Get all resources in the Ray cluster
    resources = ray.cluster_resources()
    gpu_names = [k for k in resources.keys() if "_GPU" in k]
    # sort alphabetically
    gpu_names.sort()
    return gpu_names

def select_gpu(gpu_name):
    # assigned_resources = ray.get_runtime_context().get_assigned_resources()
    # resource_name = [k for k in assigned_resources.keys() if "_GPU" in k][0]  # e.g., "node1_GPU0"
    print(f'selecting {gpu_name}')
    local_gpu_index = int(gpu_name.split("_GPU")[-1])       # Extract "0"    
    os.environ["CUDA_VISIBLE_DEVICES"] = str(local_gpu_index)

@ray.remote
class CustomGPU:
    def __init__(self, gpu_name):
        select_gpu(gpu_name)

    def get_free_memory(self):
        free = torch.cuda.mem_get_info(0)[0] / 1024 / 1024 / 1024 # GB
        self.free_memory = free
        return free

def find_top_k_gpu(k=1):
    print(f"Finding top {k} GPU...")
    # check if ray is initialized
    if not ray.is_initialized():
        ray.init(address='auto', ignore_reinit_error=True)
    # gpu_names = get_custom_gpu_names()
    gpu_names = get_gpu_names()
    print('all GPUs:', gpu_names)
    gpu_free_memory = []
    for gpu_name in gpu_names:
        try:
            node_name = gpu_name.split('_GPU')[0]
            # actor = CustomGPU.options(resources={gpu_name: 0.01}).remote()
            actor = CustomGPU.options(resources={f"node:{node_name}": 0.01}, num_cpus=1).remote(gpu_name)
            free_memory = ray.get(actor.get_free_memory.remote())
            gpu_free_memory.append((gpu_name, free_memory))
            print(f"GPU: {gpu_name}, Free memory: {free_memory:.2f} GB")
            ray.kill(actor)
        except Exception as e:
            print(f"Error checking {gpu_name}: {e}")
    # sort by free memory
    gpu_free_memory.sort(key=lambda x: x[1], reverse=True)
    gpu_names = [gpu_name for gpu_name, _ in gpu_free_memory]
    if k == 0:
        return gpu_names
    top_k_gpu = gpu_names[:k]
    return top_k_gpu

def find_eligible_gpu(gpu_names, n_gpu=4, free_memory_threshold=10):
    print(f'finding {n_gpu} GPUs with free memory greater than {free_memory_threshold} GB')
    # find all GPUs with free memory greater than the threshold in unit of GB
    eligible_gpu = []
    for gpu_name in gpu_names:
        try:
            node_name = gpu_name.split('_GPU')[0]
            # actor = CustomGPU.options(resources={gpu_name: 0.01}).remote()
            actor = CustomGPU.options(resources={f"node:{node_name}": 0.01}, num_cpus=1).remote(gpu_name)
            free_memory = ray.get(actor.get_free_memory.remote())
        except Exception as e:
            print(f"Error checking {gpu_name}: {e}")
            continue
        if free_memory > free_memory_threshold:
            eligible_gpu.append(gpu_name)
            print(f"Found eligible GPU: {gpu_name}, Free memory: {free_memory:.2f} GB")
        if len(eligible_gpu) >= n_gpu:
            return eligible_gpu
    return None

def my_task():
    current_time = time.localtime()
    print(f'start task at {current_time.tm_hour:02d}:{current_time.tm_min:02d}:{current_time.tm_sec:02d}')
    time.sleep(5)
    a = torch.randn(1000, 1000)
    b = torch.randn(1000, 1000)
    c = torch.matmul(a, b)
    result = c.shape
    current_time = time.localtime()
    print(f'end task at {current_time.tm_hour:02d}:{current_time.tm_min:02d}:{current_time.tm_sec:02d}')
    return result

@ray.remote
class worker:
    def __init__(self, gpu_name):
        select_gpu(gpu_name)

    def task(self):
        return my_task()

if __name__ == "__main__":
    # ray_address = '10.15.0.16:6379'
    # ray.init(address=ray_address, ignore_reinit_error=True)
    # sort GPUs by free memory
    # sorted_gpu_names = find_top_k_gpu(k=0)

    # Run 4 workers on top 4 GPUs simultaneously
    n_workers = 4
    free_memory_threshold=10 # GB
    gpu_names = get_gpu_names()
    eligible_gpu = find_eligible_gpu(gpu_names, n_gpu=n_workers, free_memory_threshold=free_memory_threshold)
    print(f"Eligible GPUs: {eligible_gpu}")
    if len(eligible_gpu) == n_workers:
        node_names = [gpu.split('_GPU')[0] for gpu in eligible_gpu]
        workers = [worker.options(resources={f"node:{node_name}": 0.01}).remote(gpu) for gpu, node_name in zip(eligible_gpu, node_names)]
        results = ray.get([worker.task.remote() for worker in workers])
        print(results)
    # shutdown ray
    ray.shutdown()

2025-03-18 13:46:15,959	INFO worker.py:1654 -- Connecting to existing Ray cluster at address: 10.11.140.31:6379...
2025-03-18 13:46:15,977	INFO worker.py:1832 -- Connected to Ray cluster. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


finding 4 GPUs with free memory greater than 10 GB
[36m(CustomGPU pid=7290)[0m selecting 10.11.140.31_GPU0
Found eligible GPU: 10.11.140.31_GPU0, Free memory: 23.36 GB
Found eligible GPU: 10.11.140.31_GPU1, Free memory: 23.42 GB
Found eligible GPU: 10.11.140.64_GPU0, Free memory: 17.82 GB
[36m(CustomGPU pid=1877866, ip=10.11.140.64)[0m selecting 10.11.140.64_GPU1[32m [repeated 3x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)[0m
Found eligible GPU: 10.11.140.64_GPU1, Free memory: 23.43 GB
Eligible GPUs: ['10.11.140.31_GPU0', '10.11.140.31_GPU1', '10.11.140.64_GPU0', '10.11.140.64_GPU1']
[36m(worker pid=7295)[0m start task at 13:46:26
[36m(worker pid=7295)[0m end task at 13:46:32
[36m(worker pid=1877955, ip=10.11.140.64)[0m selecting 10.11.140.64_GPU1[32m [repeated 4x across cluster][0m
[36m(w