New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error if joblib used within Executor.map (PicklingError) #465

Closed
stsievert opened this Issue Aug 27, 2016 · 13 comments

Comments

Projects
None yet
2 participants
@stsievert
Contributor

stsievert commented Aug 27, 2016

I have a function I would like to run over many different inputs. I'm using Executor.map which provides the natural interface for this.

In the function I call with many different inputs, I call another function many times. In my case, this serves the purpose of obtaining some average performance. The natural interface to use is a joblib for-loop. See the code below for an example of what I'm describing.

My first thought to get around this is to use an example in the docs to have double mapped functions. However, I'd rather not further complicate my function definitions and have to pass Executor.map into repeated_tests below.

I have verified that the example in the docs of using distributed.joblib for the joblib interface worked.

def model_performance(x):
    """ returns 0 or 1 depending if model succeed or failed """
    time.sleep(2)
    return np.random.randint(2)

def repeated_tests(x, n=10):
    """ calls model_performance n times to get average performance """
    y = Parallel(n_jobs=-1)(delayed(model_performance)(x)
                            for _ in range(n))
    return sum(y) / n

with joblib.parallel_backend('distributed', scheduler_host=host):
    model_params = [1, 2, 3, 4]

    # normal
    a = map(repeated_tests, model_params)
    print(list(a))

    # dask.distributed
    b = e.map(repeated_tests, model_params)
    b = e.gather(b)
    print(b)

The error message I get is below, but the full stack trace can be found in a gist

PicklingError: Can't pickle <function model_performance at 0x7f0fac1c8950>: attribute lookup model_performance on __main__ failed
@mrocklin

This comment has been minimized.

Show comment
Hide comment
@mrocklin

mrocklin Aug 28, 2016

Member

Yeah, you're implicitly trying to pickle up a connection to the scheduler, which will be tricky. At first glance I see three options:

  1. Use normal, non-distributed joblib within repeated_tests. Maybe start your workers with fewer threads to give joblib some more parallelism space.
  2. Open up an Executor within repeated_tests() or use the with joblib.parallel_backend('distributed', ...) context manager within repeated_tests(). You'll have to pass the scheduler host information to that function. It's a bit unconventional to have tasks in workers spin up short-lived Executors, but it should be ok.
  3. Just use the Executor to submit all the work up front:
a = [[e.submit(model_performance, x, pure=False) for i in range n]
      for n in model_params]
b = e.gather(b)
Member

mrocklin commented Aug 28, 2016

Yeah, you're implicitly trying to pickle up a connection to the scheduler, which will be tricky. At first glance I see three options:

  1. Use normal, non-distributed joblib within repeated_tests. Maybe start your workers with fewer threads to give joblib some more parallelism space.
  2. Open up an Executor within repeated_tests() or use the with joblib.parallel_backend('distributed', ...) context manager within repeated_tests(). You'll have to pass the scheduler host information to that function. It's a bit unconventional to have tasks in workers spin up short-lived Executors, but it should be ok.
  3. Just use the Executor to submit all the work up front:
a = [[e.submit(model_performance, x, pure=False) for i in range n]
      for n in model_params]
b = e.gather(b)
@stsievert

This comment has been minimized.

Show comment
Hide comment
@stsievert

stsievert Aug 28, 2016

Contributor

use the with joblib.parallel_backend('distributed', ...) context manager within repeated_tests()

This is the solution I first tried to use, but I get another bug when I try this -- I tried bringing the parallel_backend to the outside because of this. When I modify repeated_tests to look like below, I get a CancelledError on repeated_tests (full stack trace).

def repeated_tests(x, n=10):
    with joblib.parallel_backend('distributed', scheduler_host='144.92.142.180:8786'):
        y = Parallel(n_jobs=-1)(delayed(model_performance)(x)
                                for _ in range(n))
    return sum(y) / n

It looks like joblib doesn't recognize distributed as a joblib frontend (throws a KeyError for distributed), even though I've verified that I am running joblib 0.10.0.

I've verified that other distributed code works; the script can talk to the scheduler who has cores. The example in the docs to change the joblib frontend to distributed still pass.

Contributor

stsievert commented Aug 28, 2016

use the with joblib.parallel_backend('distributed', ...) context manager within repeated_tests()

This is the solution I first tried to use, but I get another bug when I try this -- I tried bringing the parallel_backend to the outside because of this. When I modify repeated_tests to look like below, I get a CancelledError on repeated_tests (full stack trace).

def repeated_tests(x, n=10):
    with joblib.parallel_backend('distributed', scheduler_host='144.92.142.180:8786'):
        y = Parallel(n_jobs=-1)(delayed(model_performance)(x)
                                for _ in range(n))
    return sum(y) / n

It looks like joblib doesn't recognize distributed as a joblib frontend (throws a KeyError for distributed), even though I've verified that I am running joblib 0.10.0.

I've verified that other distributed code works; the script can talk to the scheduler who has cores. The example in the docs to change the joblib frontend to distributed still pass.

@mrocklin

This comment has been minimized.

Show comment
Hide comment
@mrocklin

mrocklin Aug 28, 2016

Member

You'll probably have to redo the distributed.joblib import each time to trigger the code that informs the Joblib library on that worker that the distributed backend is available.

Member

mrocklin commented Aug 28, 2016

You'll probably have to redo the distributed.joblib import each time to trigger the code that informs the Joblib library on that worker that the distributed backend is available.

@stsievert

This comment has been minimized.

Show comment
Hide comment
@stsievert

stsievert Aug 28, 2016

Contributor

Tested. I import distributed.joblib in repeated_tests as below but still see the same bug as the first iteration.

def repeated_tests(x, n=10):
    import distributed.joblib
    with joblib.parallel_backend('distributed', scheduler_host=host):
        y = Parallel()(delayed(model_performance)(x) for x in range(n))
    return sum(y) / n
Contributor

stsievert commented Aug 28, 2016

Tested. I import distributed.joblib in repeated_tests as below but still see the same bug as the first iteration.

def repeated_tests(x, n=10):
    import distributed.joblib
    with joblib.parallel_backend('distributed', scheduler_host=host):
        y = Parallel()(delayed(model_performance)(x) for x in range(n))
    return sum(y) / n
@mrocklin

This comment has been minimized.

Show comment
Hide comment
@mrocklin

mrocklin Aug 29, 2016

Member

Does option 3 work for you? I hesitate to dive too much into how Joblib works to debug this.

Member

mrocklin commented Aug 29, 2016

Does option 3 work for you? I hesitate to dive too much into how Joblib works to debug this.

@stsievert

This comment has been minimized.

Show comment
Hide comment
@stsievert

stsievert Aug 29, 2016

Contributor

Yes, it worked for me. I would like to see including joblib inside a Executor.map function allowed though.

One issue I ran into was that all the pseudo-random numbers were the same. To get around this, I changed the seed inside model_performance to be i_repeat * some_model_param.

def model_performance(x, seed=42):
    np.random.seed(seed)
    return np.random.randint(9)

b = [[e.submit(model_performance, x, seed=i*x) for i in range(4)] for x in [1, 2, 3, 4]]
b = e.gather(b)
Contributor

stsievert commented Aug 29, 2016

Yes, it worked for me. I would like to see including joblib inside a Executor.map function allowed though.

One issue I ran into was that all the pseudo-random numbers were the same. To get around this, I changed the seed inside model_performance to be i_repeat * some_model_param.

def model_performance(x, seed=42):
    np.random.seed(seed)
    return np.random.randint(9)

b = [[e.submit(model_performance, x, seed=i*x) for i in range(4)] for x in [1, 2, 3, 4]]
b = e.gather(b)
@mrocklin

This comment has been minimized.

Show comment
Hide comment
@mrocklin

mrocklin Sep 1, 2016

Member

I would like to see including joblib inside a Executor.map function allowed though.

I would too. My first guess would be that something within Joblib isn't threadsafe; I don't know though. You'll probably have to inquire on the Joblib issue tracker. Using dask executors recursively within dask workers is definitely fine, so I'm not sure how else to assist here.

One issue I ran into was that all the pseudo-random numbers were the same. To get around this, I changed the seed inside model_performance to be i_repeat * some_model_param.

Yup, Python randon numbers can be weird. This is one of those things you need to be careful about when computing in parallel. Using seeds, or numpy.random.RandomState() objects is a good idea.

Is there anything else to do for this issue? I'd like to close it if not.

Member

mrocklin commented Sep 1, 2016

I would like to see including joblib inside a Executor.map function allowed though.

I would too. My first guess would be that something within Joblib isn't threadsafe; I don't know though. You'll probably have to inquire on the Joblib issue tracker. Using dask executors recursively within dask workers is definitely fine, so I'm not sure how else to assist here.

One issue I ran into was that all the pseudo-random numbers were the same. To get around this, I changed the seed inside model_performance to be i_repeat * some_model_param.

Yup, Python randon numbers can be weird. This is one of those things you need to be careful about when computing in parallel. Using seeds, or numpy.random.RandomState() objects is a good idea.

Is there anything else to do for this issue? I'd like to close it if not.

@stsievert

This comment has been minimized.

Show comment
Hide comment
@stsievert

stsievert Sep 1, 2016

Contributor

No more here -- I think that's it. I would like to see this issue revisited in the future.

Using joblib within a distributed function call would be nice (I want to stress that), but I have a solution that works. The solution above is a little longer in practice, but it works.

Contributor

stsievert commented Sep 1, 2016

No more here -- I think that's it. I would like to see this issue revisited in the future.

Using joblib within a distributed function call would be nice (I want to stress that), but I have a solution that works. The solution above is a little longer in practice, but it works.

@mrocklin

This comment has been minimized.

Show comment
Hide comment
@mrocklin

mrocklin Sep 1, 2016

Member

Yeah, to be clear I agree that it would be nice but this is out of my control. I think that you need to check in with the joblib developers. I don't perceive anything wrong happening on the Dask side. I recommend checking in with the Joblib developers and verifying that joblib can be run in multiple threads safely.

Member

mrocklin commented Sep 1, 2016

Yeah, to be clear I agree that it would be nice but this is out of my control. I think that you need to check in with the joblib developers. I don't perceive anything wrong happening on the Dask side. I recommend checking in with the Joblib developers and verifying that joblib can be run in multiple threads safely.

@stsievert

This comment has been minimized.

Show comment
Hide comment
@stsievert

stsievert Sep 4, 2016

Contributor

This issue is alleviated by pd.pivot_table and DataFrame.pivot_table, which allows easy conversion of a DataFrame (aka list of dictionaries) into the appropriate 2D pandas data frame.

def test_model(n, p):
    # ...
    return {'n': n, 'p': p, 'error': error}

errors = [test_model(n, p) for n in range(5) for p in range(5) for repeat in range(5)]
errors = pd.DataFrame(errors)
df = pd.pivot_table(errors, index='n', columns='p', values='error')
# df is a 2D data frame with index n, columns p and values error
# the results can now can easily visualize results (e.g., with seaborn)

pd.pivot_table even has an optional parameter aggfunc that specifies how a list of values for identical index/columns is represented. Each value in the data frame is something like {'n':1, 'p':1, errors: aggfunc([errors w/ n=1, p=1])}. By default aggfunc is np.mean, exactly what I want.

Contributor

stsievert commented Sep 4, 2016

This issue is alleviated by pd.pivot_table and DataFrame.pivot_table, which allows easy conversion of a DataFrame (aka list of dictionaries) into the appropriate 2D pandas data frame.

def test_model(n, p):
    # ...
    return {'n': n, 'p': p, 'error': error}

errors = [test_model(n, p) for n in range(5) for p in range(5) for repeat in range(5)]
errors = pd.DataFrame(errors)
df = pd.pivot_table(errors, index='n', columns='p', values='error')
# df is a 2D data frame with index n, columns p and values error
# the results can now can easily visualize results (e.g., with seaborn)

pd.pivot_table even has an optional parameter aggfunc that specifies how a list of values for identical index/columns is represented. Each value in the data frame is something like {'n':1, 'p':1, errors: aggfunc([errors w/ n=1, p=1])}. By default aggfunc is np.mean, exactly what I want.

@stsievert

This comment has been minimized.

Show comment
Hide comment
@stsievert

stsievert Nov 22, 2017

Contributor

I ran into this issue again with a more complex task, and it's resolved with distributed.get_client.

This task involved some adaptive job submission, where the values passed to a function are not known in advance. That is, it's something like

def g(x):
    return np.random.rand()

def f(space, k=0):
    client = get_client()
    jobs = client.map(g, space)
    y = client.gather(jobs)

    m = np.min(y)
    if k == 2:
        return m
    new_space = np.logspace(m/2, m*2, num=len(space))
    return f(new_space, k=k+1)

if __name__ == "__main__":
    client = Client()
    jobs = client.map(f, [[1, 2, 3],
                          [4, 5, 6],
                          [7, 8, 9]])
    out = client.gather(jobs)
    print(out)

If get_client didn't exist, I could expand the recursive function into a for-loop and use future.add_done_callback.

Contributor

stsievert commented Nov 22, 2017

I ran into this issue again with a more complex task, and it's resolved with distributed.get_client.

This task involved some adaptive job submission, where the values passed to a function are not known in advance. That is, it's something like

def g(x):
    return np.random.rand()

def f(space, k=0):
    client = get_client()
    jobs = client.map(g, space)
    y = client.gather(jobs)

    m = np.min(y)
    if k == 2:
        return m
    new_space = np.logspace(m/2, m*2, num=len(space))
    return f(new_space, k=k+1)

if __name__ == "__main__":
    client = Client()
    jobs = client.map(f, [[1, 2, 3],
                          [4, 5, 6],
                          [7, 8, 9]])
    out = client.gather(jobs)
    print(out)

If get_client didn't exist, I could expand the recursive function into a for-loop and use future.add_done_callback.

@mrocklin

This comment has been minimized.

Show comment
Hide comment
@mrocklin

mrocklin Nov 22, 2017

Member

Note that when using get_client you may also want to use secede and rejoin. You can also use dask.compute again within the function if that's easier (it will do everything correctly).

Note that add_done_callback just computes on the client side. It isn't the best for building distributed computations.

From your comment I believe that everything is ok and this is just for future users, is this corect @stsievert ?

Member

mrocklin commented Nov 22, 2017

Note that when using get_client you may also want to use secede and rejoin. You can also use dask.compute again within the function if that's easier (it will do everything correctly).

Note that add_done_callback just computes on the client side. It isn't the best for building distributed computations.

From your comment I believe that everything is ok and this is just for future users, is this corect @stsievert ?

@stsievert

This comment has been minimized.

Show comment
Hide comment
@stsievert

stsievert Nov 22, 2017

Contributor

you may also want to use secede and rejoin. You can also use dask.compute again

Thanks for the tip! I have long running jobs, so it's very practical. I see why secede is useful now (and have filed #1583). I've switched over to using compute, it's the simplest.

Note that add_done_callback just computes on the client side. It isn't the best for building distributed computations.

Ah. Thanks.

From your comment I believe that everything is ok and this is just for future users

@mrocklin correct. I've also updated the notes in my blog post too.

Contributor

stsievert commented Nov 22, 2017

you may also want to use secede and rejoin. You can also use dask.compute again

Thanks for the tip! I have long running jobs, so it's very practical. I see why secede is useful now (and have filed #1583). I've switched over to using compute, it's the simplest.

Note that add_done_callback just computes on the client side. It isn't the best for building distributed computations.

Ah. Thanks.

From your comment I believe that everything is ok and this is just for future users

@mrocklin correct. I've also updated the notes in my blog post too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment