Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support heterogeneous computing? #5440

Open
tinylambda opened this issue Apr 4, 2019 · 1 comment
Open

Support heterogeneous computing? #5440

tinylambda opened this issue Apr 4, 2019 · 1 comment

Comments

@tinylambda
Copy link

tinylambda commented Apr 4, 2019

I am working on a heterogeneous computing that is:

10 GPU servers for GPU intensive loads, here we analyze video part of a MP4;
2 CPU servers for CPU intensive loads, here we analyze audio part of a MP4;

I want to submit video tasks to GPU servers and audio tasks to CPU servers.

So , I use one task just for just coordinating tasks:
celery -A sigprocess worker -l info -P eventlet -c 1000 -Q coordinator

I tried group , I can get the final results, but it submit the 2 kinds of tasks to one server for a MP4 file.

I tried to submit two kinds of tasks to two different queue
celery -A sigprocess worker -l info -P prefork -c 1 -Q video_analysis
celery -A sigprocess worker -l info -P prefork -c 1 -Q audio_analysis
by using multiprocessing.dummy.Pool apply_async(queue='queue_name') and wait the results:

@shared_task(bind=True, base=SigTask, ignore_result=False)
def do_task(self, *args, **kwargs):
    i = random.randint(0, 10)`
    print(f"Sleeping {i} seconds")
    time.sleep(i)
    print(f"Sleeping Done")
    return i

def worker(i):
    print(f"Doing work {i}!!!")
    if i % 2 == 0:
        result = do_task.apply_async(queue="audio_analysis")
    else:
        result = do_task.apply_async(queue="video_analysis")
    return result.get()

@shared_task(bind=True, base=CoordinatorTask, ignore_result=False)
def sig_task(self, *args, **kwargs):
    with multiprocessing.dummy.Pool() as pool:
        results = pool.map(worker, range(2))
        return resultsI found that, the two tasks successfully executed on two celery workers, but I cannot read the results from the coordinator, it just stuck .

any suggestions on this issue ?

Thanks.

@thedrow
Copy link
Member

thedrow commented Apr 4, 2019

There really aren't enough details for me to go on.
Please create a minimal test case for me to test.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants