Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCSFileSystem() hangs when called from multiple processes #379

Open
JackKelly opened this issue Apr 26, 2021 · 44 comments
Open

GCSFileSystem() hangs when called from multiple processes #379

JackKelly opened this issue Apr 26, 2021 · 44 comments

Comments

@JackKelly
Copy link

JackKelly commented Apr 26, 2021

What happened:
In the last two versions of gcsfs (versions 2021.04.0 and 0.8.0), calling gcsfs.GCSFileSystem() from multiple processes hangs without any error messages if gcsfs.GCSFileSystem() has been called previously in the same Python interpreter session.

This bug was not present in gcsfs version 0.7.2 (with fsspec 0.8.7). All the code examples below work perfectly with gcsfs version 0.7.2 (with fsspec 0.8.7).

Minimal Complete Verifiable Example:

The examples below assume gcsfs version 2021.04.0 is installed (with fsspec 2021.04.0) or gcsfs version 0.8.0 (with fsspec 0.9.0)

Install a fresh conda environment: conda create --name test_gcsfs python=3.8 gcsfs ipykernel

The last block of this code hangs:

from concurrent.futures import ProcessPoolExecutor
import gcsfs

# This line works fine!  (And it's fine to repeat this line multiple times.)
gcs = gcsfs.GCSFileSystem() 

# This block hangs, with no error messages:
with ProcessPoolExecutor() as executor:
    for i in range(8):
        future = executor.submit(gcsfs.GCSFileSystem)

But, if we don't do gcs = gcsfs.GCSFileSystem(), then the code works fine. The next code example works perfectly, if run in a fresh Python interpreter. The only difference between the next code example and the previous code example is I've removed gcs = gcsfs.GCSFileSystem().

from concurrent.futures import ProcessPoolExecutor
import gcsfs

# This works fine:
with ProcessPoolExecutor() as executor:
    for i in range(8):
        future = executor.submit(gcsfs.GCSFileSystem)

Likewise, calling the ProcessPoolExecutor multiple times works the first time, but hangs on subsequent tries:

from concurrent.futures import ProcessPoolExecutor
import gcsfs

def process_pool():
    with ProcessPoolExecutor(max_workers=1) as executor:
        for i in range(8):
            future = executor.submit(gcsfs.GCSFileSystem)

# The first attempt works fine:
process_pool()

# This second attempt hangs:
process_pool()

Anything else we should know

Thank you so much for all your hard work on gcsfs - it's a hugely useful tool! Sorry to be reporting a bug!

I tested all this code in a Jupyter Lab notebook.

This issue might be related to this Stack Overflow issue: https://stackoverflow.com/questions/66283634/use-gcsfilesystem-with-multiprocessing

Environment:

  • Dask version: Not installed
  • Python version: 3.8
  • Operating System: Ubuntu 20.10
  • Install method: conda, from conda-forge, using a fresh conda environment.
@martindurant
Copy link
Member

Please try changing your subprocess execution method to spawn or forkserver https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods

@JackKelly
Copy link
Author

Wow - thank you for the very quick reply! You're right, adding the line multiprocessing.set_start_method('forkserver') or multiprocessing.set_start_method('spawn') fixes the issue in my minimal example...

...Let me check if this also fixes the issue in my 'real' code project (loading data from a Zarr store in parallel from a PyTorch DataLoader)....

Is this expected behaviour?

@martindurant
Copy link
Member

For is known to cause problems when there are any other threads or event loops already active. I thought there was a guard in the code to give a reasonable error in this case, but it seems not.

@JackKelly
Copy link
Author

Unfortunately setting the start method to either spawn or forkserver doesn't work for my PyTorch code.

Here's the error when I add multiprocessing.set_start_method('spawn') at the start of the code:

------------------------------------------------------------------------
RuntimeError                           Traceback (most recent call last)
~/miniconda3/envs/predict_pv_yield/lib/python3.8/site-packages/torch/utils/data/dataloader.py in _try_get_data(self, timeout)
    985         try:
--> 986             data = self._data_queue.get(timeout=timeout)
    987             return (True, data)

~/miniconda3/envs/predict_pv_yield/lib/python3.8/multiprocessing/queues.py in get(self, block, timeout)
    106                     timeout = deadline - time.monotonic()
--> 107                     if not self._poll(timeout):
    108                         raise Empty

~/miniconda3/envs/predict_pv_yield/lib/python3.8/multiprocessing/connection.py in poll(self, timeout)
    256         self._check_readable()
--> 257         return self._poll(timeout)
    258 

~/miniconda3/envs/predict_pv_yield/lib/python3.8/multiprocessing/connection.py in _poll(self, timeout)
    423     def _poll(self, timeout):
--> 424         r = wait([self], timeout)
    425         return bool(r)

~/miniconda3/envs/predict_pv_yield/lib/python3.8/multiprocessing/connection.py in wait(object_list, timeout)
    930             while True:
--> 931                 ready = selector.select(timeout)
    932                 if ready:

~/miniconda3/envs/predict_pv_yield/lib/python3.8/selectors.py in select(self, timeout)
    414         try:
--> 415             fd_event_list = self._selector.poll(timeout)
    416         except InterruptedError:

~/miniconda3/envs/predict_pv_yield/lib/python3.8/site-packages/torch/utils/data/_utils/signal_handling.py in handler(signum, frame)
     65         # Python can still get and update the process status successfully.
---> 66         _error_if_any_worker_fails()
     67         if previous_handler is not None:

RuntimeError: DataLoader worker (pid 67567) exited unexpectedly with exit code 1. Details are lost due to multiprocessing. Rerunning with num_workers=0 may give better error trace.

The above exception was the direct cause of the following exception:

RuntimeError                           Traceback (most recent call last)
<timed exec> in <module>

~/miniconda3/envs/predict_pv_yield/lib/python3.8/site-packages/torch/utils/data/dataloader.py in __next__(self)
    515             if self._sampler_iter is None:
    516                 self._reset()
--> 517             data = self._next_data()
    518             self._num_yielded += 1
    519             if self._dataset_kind == _DatasetKind.Iterable and \

~/miniconda3/envs/predict_pv_yield/lib/python3.8/site-packages/torch/utils/data/dataloader.py in _next_data(self)
   1180 
   1181             assert not self._shutdown and self._tasks_outstanding > 0
-> 1182             idx, data = self._get_data()
   1183             self._tasks_outstanding -= 1
   1184             if self._dataset_kind == _DatasetKind.Iterable:

~/miniconda3/envs/predict_pv_yield/lib/python3.8/site-packages/torch/utils/data/dataloader.py in _get_data(self)
   1146         else:
   1147             while True:
-> 1148                 success, data = self._try_get_data()
   1149                 if success:
   1150                     return data

~/miniconda3/envs/predict_pv_yield/lib/python3.8/site-packages/torch/utils/data/dataloader.py in _try_get_data(self, timeout)
    997             if len(failed_workers) > 0:
    998                 pids_str = ', '.join(str(w.pid) for w in failed_workers)
--> 999                 raise RuntimeError('DataLoader worker (pid(s) {}) exited unexpectedly'.format(pids_str)) from e
   1000             if isinstance(e, queue.Empty):
   1001                 return (False, None)

RuntimeError: DataLoader worker (pid(s) 67567) exited unexpectedly

Here's the error when I add multiprocessing.set_start_method('forkserver') at the start of the code:

------------------------------------------------------------------------
Empty                                  Traceback (most recent call last)
~/miniconda3/envs/predict_pv_yield/lib/python3.8/site-packages/torch/utils/data/dataloader.py in _try_get_data(self, timeout)
    985         try:
--> 986             data = self._data_queue.get(timeout=timeout)
    987             return (True, data)

~/miniconda3/envs/predict_pv_yield/lib/python3.8/multiprocessing/queues.py in get(self, block, timeout)
    107                     if not self._poll(timeout):
--> 108                         raise Empty
    109                 elif not self._poll():

Empty: 

The above exception was the direct cause of the following exception:

RuntimeError                           Traceback (most recent call last)
<timed exec> in <module>

~/miniconda3/envs/predict_pv_yield/lib/python3.8/site-packages/torch/utils/data/dataloader.py in __next__(self)
    515             if self._sampler_iter is None:
    516                 self._reset()
--> 517             data = self._next_data()
    518             self._num_yielded += 1
    519             if self._dataset_kind == _DatasetKind.Iterable and \

~/miniconda3/envs/predict_pv_yield/lib/python3.8/site-packages/torch/utils/data/dataloader.py in _next_data(self)
   1180 
   1181             assert not self._shutdown and self._tasks_outstanding > 0
-> 1182             idx, data = self._get_data()
   1183             self._tasks_outstanding -= 1
   1184             if self._dataset_kind == _DatasetKind.Iterable:

~/miniconda3/envs/predict_pv_yield/lib/python3.8/site-packages/torch/utils/data/dataloader.py in _get_data(self)
   1146         else:
   1147             while True:
-> 1148                 success, data = self._try_get_data()
   1149                 if success:
   1150                     return data

~/miniconda3/envs/predict_pv_yield/lib/python3.8/site-packages/torch/utils/data/dataloader.py in _try_get_data(self, timeout)
    997             if len(failed_workers) > 0:
    998                 pids_str = ', '.join(str(w.pid) for w in failed_workers)
--> 999                 raise RuntimeError('DataLoader worker (pid(s) {}) exited unexpectedly'.format(pids_str)) from e
   1000             if isinstance(e, queue.Empty):
   1001                 return (False, None)

RuntimeError: DataLoader worker (pid(s) 67640, 67641, 67642, 67643, 67644, 67645, 67646, 67647) exited unexpectedly

@martindurant
Copy link
Member

I'm afraid those tracebacks are not coming from the child process, so they don't contain any useful information.

@martindurant
Copy link
Member

discussion on fork and event loops: https://bugs.python.org/issue21998
Note that threads suffer from this separately. You should, at the very minimum, clear fsspec's instance cache before calling subprocesses

gcs = fsspec.filesystem("gcs", ...)
# use gcs
gcs.clear_instance_cache()
# call subprocesses

and do not pass the instance gcs to the child processes.

@JackKelly
Copy link
Author

Thanks loads for the additional help!

Unfortunately, PyTorch doesn't log error messages from the child processes. I've tried (briefly) and I can't seem to peer into the child processes (at least, not without hacking the PyTorch library itself).

I've tried setting the start method using torch.multiprocessing.set_start_method and/or using dataloader.multiprocessing_context = 'spawn' but these still result in the same error messages above :(

Right now, the only fix I'm aware of for my code is to use gcsfs version 0.7.2.

Do you know what might have changed within gcsfs between version 0.7.2 and version 0.8.0 to cause this to stop working in 0.8.0? Version 0.7.2 works fine for the code above :)

@martindurant
Copy link
Member

You might also try v 2021.04.0.

@JackKelly
Copy link
Author

Unfortunately this is broken for me in the two most recent versions of gcsfs (i.e. it's broken in both 0.8.0 and 2021.04.0).

@martindurant
Copy link
Member

To be sure, if you don't use gcs/fsspec in the main process, all is well?

@martindurant
Copy link
Member

Pytorch claims to use spawn, so if you can make code that causes a crash with spawn, I'll have something specific to fix.
The fsspec tests try HTTPFileSystem (another one that is async) with both spawn and forkserver.

@JackKelly
Copy link
Author

To be sure, if you don't use gcs/fsspec in the main process, all is well?

That's almost the case in the minimal code snippet at the top of this thread (although the code hangs a second time we try the ProcessPoolExecutor code block).

In my PyTorch code, I haven't yet tried removing calls to gcsfs.GCSFileSystem() from the main process yet (but shouldn't be too much work)

Pytorch claims to use spawn, so if you can make code that causes a crash with spawn, I'll have something specific to fix.

OK, sounds good. Yeah, I think that perhaps my minimal code snippet isn't perfectly capturing the issue I'm experiencing with PyTorch + gcsfs. I'll need to dig through the PyTorch library code to try to come up with a better minimal code snippet... unfortunately that'll take a little while as the rest of this week is full of meetings :(

@martindurant
Copy link
Member

That's almost the case in the minimal code snippet at the top of this thread (although the code hangs a second time we try the ProcessPoolExecutor code block).

Right, but only with fork - which is known to be troublesome, and we attempt to warn the user against. It is interesting that it works one time and then not - not sure what that means.

@JackKelly
Copy link
Author

Right, but only with fork

Yeah, exactly.

I've had a quick look at the PyTorch library code and tried to create a simple minimal example. I've failed so far! TBH, the PyTorch multiprocessing code looks too complex for me to wrap my head round any time soon. So, instead, I'm going to push ahead with my machine learning research using gcsfs 0.7.2 (which works perfectly for my needs!). If this ML research pans out and we build a production service then I may revisit this issue to try to help fix it! Sorry, I appreciate that's not idea for gcsfs!

For reference, here's the PyTorch loop which constructs the worker processes:
https://github.com/pytorch/pytorch/blob/2f598b53ddfbd2dbbaddb76a0f30018a713f3c7a/torch/utils/data/dataloader.py#L898

@JackKelly
Copy link
Author

JackKelly commented May 4, 2021

Quick update:

I've just tried opening my Zarr file on Google Cloud like this:

import xarray as xr
dataset = xr.open_zarr('gs://<bucket>/<path>')

instead of like this:

    gcs = gcsfs.GCSFileSystem(access='read_only')
    store = gcsfs.GCSMap(root='<bucket>/<path>', gcs=gcs)
    dataset = xr.open_zarr(store, consolidated=True)

I've also tried updating all my other Python packages.

Unfortunately, neither of these changes fixes the crash when using gcsfs 2021.4.0 with a PyTorch multi-process data loader. But, as before, this isn't fatal for my work, because I can continue using gcsfs 0.7.2 and all is well :)

@martindurant
Copy link
Member

dataset = xr.open_zarr('gs:///')

This is essentially the same as before; but you are not using multiprocessing now?

@JackKelly
Copy link
Author

Ah, sorry for not being explicit: I'm still using multiprocessing (via PyTorch), and the code still hangs if I use multiprocessing with gcsfs 2021.4.0 or 0.8, sorry.

In terms of next steps... would one or both of these actions be useful?

  1. Whilst it's probably beyond me (!) to create a minimal code example demonstrating this bug without PyTorch (because that would require me to understand PyTorch's innards well enough to re-create the essential parts of the PyTorch code!), I should be able to make a minimal code example with PyTorch and Zarr. Would that be useful?
  2. And/or I could post on the PyTorch GitHub issue queue to ask them to help.

@JackKelly
Copy link
Author

Hi @martindurant,

Re-reading this issue, I'm starting to wonder if I'm doing something stupid in my code :)

In my code, every sub-process does this when the sub-process starts up:

gcs = gcsfs.GCSFileSystem(access='read_only')
store = gcsfs.GCSMap(root=filename, gcs=gcs)
dataset = xr.open_zarr(store, consolidated=consolidated)
gcs.clear_instance_cache()

Is that wrong?! Should only the parent process run the code above, and then pass a copy of dataset into each child process?!

@martindurant
Copy link
Member

Maybe call gcs.clear_instance_cache() before the block instead of at the end, or include skip_instance_cache=True in the constructor; but this still doesn't clear the reference to the loop and thread. You could do that with

fsspec.asyn.iothread[0] = None
fsspec.asyn.loop[0] = None

and that is what any fork-detecting code should be doing.

@JackKelly
Copy link
Author

Oooh... some good news! I've got my multiprocessing code working with PyTorch and gcsfs 2021.4.0 :) I've made quite a few changes so I need to do some more tinkering to figure out exactly which change fixed the issue! (Thanks for all your help btw!)

@martindurant
Copy link
Member

Very glad to hear it

@JackKelly
Copy link
Author

I'm pretty sure it's the trick you suggested in your comment above which fixed the issue :)

    fsspec.asyn.iothread[0] = None
    fsspec.asyn.loop[0] = None

I run that in every process. To be even more specific, I run this in every process:

def open_zarr_on_gcp(filename: Union[str, Path]) -> xr.DataArray:
    """Lazily opens the Zarr store on Google Cloud Storage (GCS)."""
    gcs = gcsfs.GCSFileSystem(
        access='read_only', 
        skip_instance_cache=True  # Why skip_instance_cache?  See https://github.com/dask/gcsfs/issues/379#issuecomment-839929801
    )

    # Clear reference to the loop and thread.
    # See https://github.com/dask/gcsfs/issues/379#issuecomment-839929801
    # Only relevant for fsspec >= 0.9.0
    fsspec.asyn.iothread[0] = None
    fsspec.asyn.loop[0] = None
    
    store = gcsfs.GCSMap(root=filename, gcs=gcs)
    return xr.open_zarr(store)

@JackKelly
Copy link
Author

JackKelly commented May 13, 2021

I've done a few more experiments (in the hopes that this might be of use to other people in a similar situation; or maybe useful to help understand what's going on!)

It turns out that fsspec.asyn.iothread[0] = None; fsspec.asyn.loop[0] = None needs to be run in every worker process. It's not sufficient to just do this in the parent process.

It doesn't matter if the code does fsspec.asyn.iothread[0] = None; fsspec.asyn.loop[0] = None before or after gcs = gcsfs.GCSFileSystem().

When using fsspec.asyn.iothread[0] = None; fsspec.asyn.loop[0] = None, it's no longer necessary to do skip_instance_cache=True or gcs.clear_instance_cache().

Each worker process has to open the Zarr store. If I try lazily opening the Zarr store in the main process and passing this object into each worker process then fsspec throws an error saying it's not thread safe. That's fine, it's no problem for my code to open the Zarr store in each worker process.

@martindurant
Copy link
Member

OK, that more or less confirms what I thought. I still don't know how the code can be in a place to set those to None after fork and before any instance tries to use the defunct objects. Or maybe we can simply document this situation?

@JackKelly
Copy link
Author

Yeah, I agree - I think it's sufficient to just document this somewhere. Happy to have a go at drafting a paragraph or two if you can recommend where best to document this! (But also more than happy for someone else to write it!)

@martindurant
Copy link
Member

I suppose it should be mainly documented in fsspec along with the rest of the async docs, but also mentioned here in gcsfs (which has far less docs in general).

@ShethR
Copy link

ShethR commented Jul 20, 2021

downgrading to gscfs==0.7.2 and fsspec==0.8.0 worked for me.

@martindurant
Copy link
Member

@ShethR : did you try avoiding fork, as discussed above?

@ShethR
Copy link

ShethR commented Jul 20, 2021

No I did not! I used the default fork method.
Just for added information, I am using torch.multiprocessing instead of native multiprocessing. But i guess they both should have similar behavior.

@martindurant
Copy link
Member

i guess they both should have similar behavior.

They will be executing the same low-level OS calls, yes. Fork is a long-standing highly efficient by problematic process.

@lhoestq
Copy link

lhoestq commented May 19, 2022

Same issue for HTTPFileSystem, and fsspec.asyn.iothread[0] = None; fsspec.asyn.loop[0] = None also does the job.

I still don't know how the code can be in a place to set those to None after fork and before any instance tries to use the defunct objects.

It's been a few months since the last message, do you have a better idea of how it can be done ?

@martindurant
Copy link
Member

Again, I would urge everyone simply not to do this! I'll comment shortly in fsspec/filesystem_spec#963 on what needs to happen.

@swt2c
Copy link

swt2c commented May 19, 2022

Again, I would urge everyone simply not to do this! I'll comment shortly in fsspec/filesystem_spec#963 on what needs to happen.

There are situations where fork() is useful, though, such as sharing a large read-only object between multiple processes without having to copy it / duplicate memory usage. It sure would be nice if fsspec was fork-friendly. :-)

@martindurant
Copy link
Member

It sure would be nice if fsspec was fork-friendly

In practice, it's hard, and there's a reason that dask, for instance, doesn't do this. Following the prescription in the linked thread, I think it would be fair for fsspec to require the calling code to make sure that fork safety requirements are met, rather than attempt to automatically detect the change of PID and arrange the appropriate discarding of objects itself.

There are actually many python constructs that are (surprisingly) not fork safe, such as a requests Session or an open file object - never mind anything to do with async or threads.

sharing a large read-only object between multiple processes

I intend on working on (single node) shared memory for dask distributed this summer, if that helps.

@frankShih
Copy link

I am facing the same issue when I using concurrent.futures.ThreadPoolExecutor
Any possible solutions?

@martindurant
Copy link
Member

@frankShih , please provide a reproducer. gcsfs is routinely used from multiple threads, particularly by dask.

@fg91
Copy link

fg91 commented Feb 22, 2023

I recommend this blog post if you want to understand why using the start method os.fork in combination with multi-threading generally is a bad idea.

@SaravananSathyanandhaQC
Copy link

SaravananSathyanandhaQC commented May 26, 2023

I came across the same issue - I'm running FastAPI on Gunicorn with Uvicorn workers, 4 workers running. What's the recommended way to use GCSFileSystem in such a setup? I use fsspec.filesystem("gs") to initialize in case that's relevant.

@fg91
Copy link

fg91 commented May 26, 2023

I came across the same issue - I'm running FastAPI on Gunicorn with Uvicorn workers, 4 workers running. What's the recommended way to use GCSFileSystem in such a setup? I use fsspec.filesystem("gs") to initialize in case that's relevant.

You could try this (assuming that you fork the worker processes causing deadlocks):

os.register_at_fork(
    after_in_child=fsspec.asyn.reset_lock,
)

@SaravananSathyanandhaQC

I came across the same issue - I'm running FastAPI on Gunicorn with Uvicorn workers, 4 workers running. What's the recommended way to use GCSFileSystem in such a setup? I use fsspec.filesystem("gs") to initialize in case that's relevant.

You could try this (assuming that you fork the worker processes causing deadlocks):

os.register_at_fork(
    after_in_child=fsspec.asyn.reset_lock,
)

No difference unfortunately. I'm using gunicorn with --preload so putting that line in at the start of my app.py should get picked up before the fork happens, but I'm not certain about how gunicorn handles its forking

@martindurant
Copy link
Member

The best solution is not to use gcsfs in the parent process before fork, sorry :|

You may wish to try the nascent GCS support in rfsspec, which does not use an IO thread. It has not been tested against fork at all.

@fg91
Copy link

fg91 commented May 28, 2023

I came across the same issue - I'm running FastAPI on Gunicorn with Uvicorn workers, 4 workers running. What's the recommended way to use GCSFileSystem in such a setup? I use fsspec.filesystem("gs") to initialize in case that's relevant.

You could try this (assuming that you fork the worker processes causing deadlocks):

os.register_at_fork(
    after_in_child=fsspec.asyn.reset_lock,
)

No difference unfortunately. I'm using gunicorn with --preload so putting that line in at the start of my app.py should get picked up before the fork happens, but I'm not certain about how gunicorn handles its forking

If you are not sure whether the registered hook gets picked up before the fork you could also just directly call fsspec.asyn.reset_lock() at the beginning of your code. This should release the lock that is causing the deadlock.

@SaravananSathyanandhaQC
Copy link

I came across the same issue - I'm running FastAPI on Gunicorn with Uvicorn workers, 4 workers running. What's the recommended way to use GCSFileSystem in such a setup? I use fsspec.filesystem("gs") to initialize in case that's relevant.

You could try this (assuming that you fork the worker processes causing deadlocks):

os.register_at_fork(
    after_in_child=fsspec.asyn.reset_lock,
)

No difference unfortunately. I'm using gunicorn with --preload so putting that line in at the start of my app.py should get picked up before the fork happens, but I'm not certain about how gunicorn handles its forking

If you are not sure whether the registered hook gets picked up before the fork you could also just directly call fsspec.asyn.reset_lock() at the beginning of your code. This should release the lock that is causing the deadlock.

This did the trick for me! As you say had to find the right place to put it (there was some ThreadPoolExecutor code pre-fork as well which was messing with things). By making sure I called reset_lock (in the parent thread) after fsspec.filesystem(...) had been called in the parent thread, things worked nicely on all the forks

@arnavmehta7
Copy link

Hey guys, Did anyone figured out how to use it in threads?

I am getting this error:

  File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 81, in sync
    raise RuntimeError("Loop is not running")
RuntimeError: Loop is not running"

I have following block in "each" thread and new threads might spawn:
fsspec.asyn.iothread[0] = None

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants