Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

credentials error with distributed #90

Open
rabernat opened this issue Mar 12, 2018 · 18 comments
Open

credentials error with distributed #90

rabernat opened this issue Mar 12, 2018 · 18 comments

Comments

@rabernat
Copy link
Contributor

I am trying to use gcsfs via distributed in pangeo-data/pangeo#150. I have uncovered what seems like a serialization bug.

This works from my notebook (the token appears to be cached):

fs = gcsfs.GCSFileSystem(project='pangeo-181919')
fs.buckets

It returns the four buckets: ['pangeo', 'pangeo-data', 'pangeo-data-private', 'zarr_store_test'].

Now I create a distributed cluster and client and use it to run the same command:

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(processes=False)
client = Client(cluster)
client.run(lambda : fs.buckets)

I get the following error:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-25-3de328a517c7> in <module>()
----> 1 client.run(lambda : fs.buckets)

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/client.py in run(self, function, *args, **kwargs)
   1906          '192.168.0.101:9000': 'running}
   1907         """
-> 1908         return self.sync(self._run, function, *args, **kwargs)
   1909 
   1910     @gen.coroutine

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    601             return future
    602         else:
--> 603             return sync(self.loop, func, *args, **kwargs)
    604 
    605     def __repr__(self):

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    251             e.wait(10)
    252     if error[0]:
--> 253         six.reraise(*error[0])
    254     else:
    255         return result[0]

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/utils.py in f()
    235             yield gen.moment
    236             thread_state.asynchronous = True
--> 237             result[0] = yield make_coro()
    238         except Exception as exc:
    239             logger.exception(exc)

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1053 
   1054                     try:
-> 1055                         value = future.result()
   1056                     except Exception:
   1057                         self.had_exception = True

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
    236         if self._exc_info is not None:
    237             try:
--> 238                 raise_exc_info(self._exc_info)
    239             finally:
    240                 self = None

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1067                             exc_info = None
   1068                     else:
-> 1069                         yielded = self.gen.send(value)
   1070 
   1071                     if stack_context._state.contexts is not orig_stack_contexts:

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/client.py in _run(self, function, *args, **kwargs)
   1860                 results[key] = resp['result']
   1861             elif resp['status'] == 'error':
-> 1862                 six.reraise(*clean_exception(**resp))
   1863         raise gen.Return(results)
   1864 

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

<ipython-input-25-3de328a517c7> in <lambda>()
----> 1 client.run(lambda : fs.buckets)

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in buckets()
    449     def buckets(self):
    450         """Return list of available project buckets."""
--> 451         return [b["name"] for b in self._list_buckets()["items"]]
    452 
    453     @classmethod

<decorator-gen-128> in _list_buckets()

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in _tracemethod()
     49         logger.log(logging.DEBUG - 1, tb_io.getvalue())
     50 
---> 51     return f(self, *args, **kwargs)
     52 
     53 

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in _list_buckets()
    568         items = []
    569         page = self._call(
--> 570             'get', 'b/', project=self.project
    571         )
    572 

<decorator-gen-123> in _call()

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in _tracemethod()
     49         logger.log(logging.DEBUG - 1, tb_io.getvalue())
     50 
---> 51     return f(self, *args, **kwargs)
     52 
     53 

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in _call()
    429             try:
    430                 time.sleep(2**retry - 1)
--> 431                 r = meth(self.base + path, params=kwargs, json=json)
    432                 validate_response(r, path)
    433                 break

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/requests/sessions.py in get()
    519 
    520         kwargs.setdefault('allow_redirects', True)
--> 521         return self.request('GET', url, **kwargs)
    522 
    523     def options(self, url, **kwargs):

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/google/auth/transport/requests.py in request()
    195         request_headers = headers.copy() if headers is not None else {}
    196 
--> 197         self.credentials.before_request(
    198             self._auth_request, method, url, request_headers)
    199 

AttributeError: 'AuthorizedSession' object has no attribute 'credentials'
@martindurant
Copy link
Member

How is the authorization happening in this case?

@rabernat
Copy link
Contributor Author

I originally called fs = gcsfs.GCSFileSystem(project='pangeo-181919', token='browser'). From then on, I didn't need to authenticate any more.

I am also trying to use a service account json token, but that doesn't work (see #89). These two issues are coupled.

@martindurant
Copy link
Member

OK, makes sense. Yes, the browser method makes a token which is cached in the file ~/.gcs_tokens - you will need to distribute this to all nodes to use this method.

@rabernat
Copy link
Contributor Author

There are no nodes. It's a local, threaded cluster.

@martindurant
Copy link
Member

Can you explicitly try token='cache' ?

@rabernat
Copy link
Contributor Author

It works with token='cache'

@martindurant
Copy link
Member

Excellent! So it seems that in the default case (token=None), google_default must be returning some form of non-working credentials. I'm not sure what we can do to check that the credentials are valid, as there could well be valid credentials that do not have bucket_list privilege. Maybe try to list some known public bucket?

@rabernat
Copy link
Contributor Author

Isn't that what the check=True option is for in gcsfs.mapping.GCSMap?

@martindurant
Copy link
Member

That check has higher demands: it lists the buckets to ensure the given bucket exists and tries to write to it too.

@martindurant
Copy link
Member

For example gcs.ls('genomics-public-data/references') should always work, as this is in google's set of public reference datasets; this could be a "check" option on the GCSFileSystem instance, done for each auth type. Thoughts?

@jgerardsimcock
Copy link

jgerardsimcock commented Apr 10, 2018

@martindurant Can you provide some references to best way to distribute .gcs_tokens to worker nodes?

Is there something like this for gcsfs fsspec/s3fs#28

@martindurant
Copy link
Member

The general assumption was that each worker would perform its own authentication, so you would need the google-default login correctly set-up (via CLI or google config files in special locations), a metadata service all can reach, a local token file on each worker, or the special gcs cache file on each worker. The token= parameter must be in the storage options passed to the backend and match the type of auth desired.

As you can see in #91, many workers attempting to authenticate at the same time appears to cause problems, sometimes. In such a case, you may wish to distribute the token directly, assuming the network is secure. This is not documented, but #91 (comment) should work.

@jgerardsimcock
Copy link

@martindurant Thank you! If/when I get this working, would you accept a PR with documentation?

@martindurant
Copy link
Member

Of course!

@delgadom
Copy link

delgadom commented Apr 11, 2018

@martindurant per-worker browser-based authentication is a tough route, especially when used in connection with dask-kubernetes or a pangeo-style setup. I've been experimenting with json-based authentication using some combination of client.upload and mapping subprocess calls to mount the fuse directories manually before starting an analysis, but every time a worker dies and is rebooted automatically by the kubernetes engine I get a giant stream of FileNotFound errors. any suggestions on doing this at worker setup time?

@delgadom
Copy link

if distributed + gcsfs were able to recognize the setup of the fuse system as a required worker dependency that would do the trick. is there some hook we could customize to do this manually?

@martindurant
Copy link
Member

Once you have done browser authentication, you will have a ~/.gcs_tokens file, which you distribute to workers, and use the token='cache' method. The automatic way to ensure the file appears for every worker pod in the right place will depend on your kubernetes configuration.

@delgadom
Copy link

ok thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants