New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[batch] adds requester pays config to hailtop.fs.open
#13795
Conversation
b4ece33
to
da24ff8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, just a few organizational comments
... requester_pays_config=('my-project', ['my-bucket', 'bucket-2']) | ||
... ) as f: | ||
... for line in f: | ||
... print(line.strip()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like this should only support requester_pays_project
, since you can't open
from more than one bucket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICT the only method where it would be possible to specify multiple buckets would be copy
. It's perhaps worth asking though if anyone would ever want to specify two different requester pays projects when copying between two buckets. Seems kind of weird and like it's leading to some more annoying bookkeeping than if we just had a Dict[str, FS]
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point; checked in with @danking and he said it's not inconceivable that someone would want to put together one big requester pays config and just pass the same object into multiple invocations of open
, so hopefully this latest change reduces the bookkeeping enough that it's not too big of an issue to support the Tuple[str, List[str]]
case of the type
hashable_config = ( | ||
requester_pays_config | ||
if requester_pays_config is None or isinstance(requester_pays_config, str) | ||
else f"{requester_pays_config[0]}:{','.join(requester_pays_config[1])}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will produce different keys for ("project", ["bucket1", "bucket2"])
vs ("project", ["bucket2", "bucket1"])
. If doing this interpolation is stemming from lists not being hashable, you can construct a Tuple[str, frozenset]
which should be hashable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooo good catch, updated to use a frozenset
except KeyError: | ||
requester_pays_fses[hashable_config] = cls( | ||
gcs_kwargs={"gcs_requester_pays_configuration": requester_pays_config} | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe its the introduction of the hashable stuff, but the bookkeeping around the requester_pays_fses
is starting to feel a little messy. I wonder if instead of keeping a dict
around + using this utility method, we could instead subclass defaultddict?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i definitely like the approach of subclassing DefaultDict
better haha, updated to do that
3943d78
to
e823430
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay on this, this looks great! Just a couple more organizational nits.
hail/python/hailtop/batch/backend.py
Outdated
@@ -160,7 +157,12 @@ def __init__(self, | |||
flags += f' -v {gsa_key_file}:/gsa-key/key.json' | |||
|
|||
self._extra_docker_run_flags = flags | |||
self.__fs = RouterAsyncFS() | |||
self.__requester_pays_fses = gcs_requester_pays_fs_cache(fs_constructor=RouterAsyncFS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of having this field in both subclasses and requiring an abstract property, you can pass a GCSRequesterPaysFSCache
to super().__init__
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good call, updated!
|
||
|
||
class GCSRequesterPaysFSCache(DefaultDict[MaybeGCSRequesterPaysConfiguration, FS]): | ||
def __init__(self, *args: Any, fs_constructor: Type[FS], default_kwargs: Dict[str, Any] = {}, **kwargs: Any) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the only use the *args
here the default factory in the gcs_requester_pays_fs_cache
function? If so can we restrict to just that argument and add more as necessary down the line? Makes for nicer type checking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a similar note, I don't see **kwargs
used. Can we remove it until we need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed!
class GCSRequesterPaysFSCache(DefaultDict[MaybeGCSRequesterPaysConfiguration, FS]): | ||
def __init__(self, *args: Any, fs_constructor: Type[FS], default_kwargs: Dict[str, Any] = {}, **kwargs: Any) -> None: | ||
self._fs_constructor = fs_constructor | ||
self[None] = fs_constructor(**default_kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like a cache shouldn't eagerly construct elements. Can we delete this and let the normal behavior deal with this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated!
kwargs_2 = {"gcs_requester_pays_configuration": config_2} | ||
fses = gcs_requester_pays_fs_cache(cls) | ||
assert attrgetter(kwargs_field)(fses[None]) == {} | ||
assert attrgetter(kwargs_field)(fses[config_1]) == kwargs_1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, I've never seen attrgetter
before. I think I would prefer though adding a _gcs_kwargs
property to both RouterFS
and RouterAsyncFS
and accessing that property directly, so that if I mess something up the linters will tell me that these attr accesses will fail without me having to run this test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added!
assert attrgetter(kwargs_field)(default_kwargs_fses[None]) == kwargs_1 | ||
|
||
test_with_cls(RouterFS, "afs._gcs_kwargs") | ||
test_with_cls(RouterAsyncFS, "_gcs_kwargs") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like we can instead make test_with_cls
the actual test method and use pytest.mark.parametrize
with the two classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated!
bc81b1a
to
c3e8e04
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last thing. We should no longer need to apply nest_asyncio in this constructor.
hail/python/hailtop/batch/backend.py
Outdated
self._requester_pays_fses: Dict[GCSRequesterPaysConfiguration, RouterAsyncFS] = {} | ||
def __init__(self, requester_pays_fses: GCSRequesterPaysFSCache): | ||
import nest_asyncio # pylint: disable=import-outside-toplevel | ||
nest_asyncio.apply() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this nest_asyncio
was deleted in main. Is this still necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooo good catch, thank you!
c3e8e04
to
287220a
Compare
Closes #13567.