-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FERC EQR Archiver #43
Conversation
Like, a global 3-user throttle? Oy vey. I think we could specifically limit the # of concurrent connections to diff --git a/src/pudl_archiver/__init__.py b/src/pudl_archiver/__init__.py
index 4eb4439..cf9d20c 100644
--- a/src/pudl_archiver/__init__.py
+++ b/src/pudl_archiver/__init__.py
@@ -47,13 +47,19 @@ async def archive_dataset(
await archiver.create_archive()
+DATASETS_LIMITED_CONCURRENCY = {"ferc_eqr"}
+
+
async def archive_datasets(
datasets: list[str],
sandbox: bool = True,
initialize: bool = False,
dry_run: bool = True,
+ conns_per_host: int = 20, # could be parsed as a CLI option
):
"""A CLI for the PUDL Zenodo Storage system."""
+ if set(datasets).intersection(DATASETS_LIMITED_CONCURRENCY):
+ conns_per_host = 1
if sandbox:
upload_key = os.environ["ZENODO_SANDBOX_TOKEN_UPLOAD"]
publish_key = os.environ["ZENODO_SANDBOX_TOKEN_PUBLISH"]
@@ -61,7 +67,7 @@ async def archive_datasets(
upload_key = os.environ["ZENODO_TOKEN_UPLOAD"]
publish_key = os.environ["ZENODO_TOKEN_PUBLISH"]
- connector = aiohttp.TCPConnector(limit_per_host=20, force_close=True)
+ connector = aiohttp.TCPConnector(limit_per_host=conns_per_host, force_close=True)
async with aiohttp.ClientSession(
connector=connector, raise_for_status=False
) as session:
@@ -75,7 +81,6 @@ async def archive_datasets(
publish_key,
testing=sandbox,
)
tasks.append(
archive_dataset(
dataset,
Re: the file size, if we were to upload individual files to Zenodo instead of downloading the whole dataset and then uploading everything, would that work? I don't think that would be too disruptive of a modification to how archiver works, and it looks like each file is significantly smaller than 14 GB - e.g. the 2013 file is 1G |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, that was quick! Of course there are, well, the other issues you mentioned 😓 - but overall this looks good. One non-blocking code structure comment.
I'm trying to figure out what testing for this might look like - any thoughts @zschira ?
Testing the individual archivers is definitely an important/unanswered question. This archiver is following the general pattern of having a method that gets the url for/downloads a single file. We could always mock out In general I'd like to have testing that would actually check that the file names and partitions are structured in the way that PUDL expects, so we could catch issues like the one I found with |
As for the rate limiting and file size issues, I like the idea of being able to set the number of concurrent connections per dataset. I think this will be useful in tuning the rate limiting more generally. I also think downloading one file at a time seems like a good approach. We could probably even do something where we download a file, go through the whole deposition update process (uploads/deletes and update datapackage), but just don't publish the new version, and continue this until we're through the whole process. This might be an easy approach without having to change very much. |
Sounds like we should make a separate ticket for "streaming deposition update in archiver" - I can do that. In terms of testing the actual data - how did you find that Could we basically download a small subset of the data for each dataset and validate that? Seems like we'd want to run it through the whole ETL flow, so we could try something like this:
I don't actually know the details of what the nightly build does, but is this similar to that? |
So far I've very coarsely tested this by just printing the URLs to terminal and running a spot check to make sure all is expected as I'm developing the code, but in terms of long-term testing the url inputs I could see a few options working here. The download_hyperlinks() method by default only generates hyperlinks which exist on the page, so testing is not an issue for these methods. This leaves download_zipfile() and download_file() as the two methods which take a url as input.
|
@jdangerx this is from my understanding basically what the nightly builds are doing. My thoughts have been that once things are stabilized we would just create new production archives every time we run the automated archiver, and then kick off a nightly build using these new archives. We could then investigate any failures that arise during the nightly builds. We could certainly create sandbox archives first instead, I don't have particularly strong feelings, but it felt like maybe an unnecessary step if failures shouldn't arise very often. Outside of this though, I think having a way to test the interfaces during unit/integration tests would be really helpful. Technically we could probably do this now since the archiver does depend on PUDL, so we could devise some tests that use the Eventually, when we get to pulling the |
Even just having the raw EQR archived somewhere (AWS open data...) that doesn't have a global 3-user limitation will dramatically improve data accessibility 😄 @zschira I think once an archiver is up and running and working well, we should probably be using it to create new production archives on a schedule, while also being conservative about ensuring that the archiver fails if something unexpected happens, so we don't end up creating polluted (irrevocable) archives. My guess is that for many of our datasets, almost every new archive will cause the nightly builds to fail, so if we're going to attempt an auto-PR to switch to a new archive, doing it one dataset at a time will be necessary. Snapshotting the data is and versioning it has value (and is relatively easy) and should make it easy for us to switch over to a new set of DOIs once a quarter or once a month, address any new brokenness, and cut a release. |
Update: this PR is either waiting on the resolution of issue #44 or on the development of an interim workaround. In the meantime, I'll hold off on merging. |
I guess one potential workaround is that instead of running this archiver on at GitHub runner, we spin up a GCP instance on a schedule (kicked off by a GitHub action on a schedule?) and run it there instead, with plenty of disk and a fast, reliable net connection. Maybe just monthly or quarterly if it ends up being a bear. 🐻 |
@@ -34,6 +41,22 @@ class Config: | |||
arbitrary_types_allowed = True | |||
|
|||
|
|||
class FileWrapper(io.BytesIO): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When aiohttp
times out while uploading a file, it will then close the file, which leads to an error when we retry the upload. I created this wrapper so aiohttp
can't close the file, which doesn't seem like the best solution, but it was easy, and maybe it's good enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The discussion on this issue implies that there's some sort of custom Payload we could pass in, and that this is the preferred method by the aiohttp maintainer. But, it's been a whole major version since then, and the documentation doesn't mention this at all.
This issue uses a "monkeypatch the .close()
function" approach which seems equally janky but a little shorter. 🤷
This existing option is good enough for me, though we should add some more documentation about this somewhat surprising behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I found an issue where the aiohttp
maintainers seemed quite convinced that they should take ownership of a file, and callers should reopen the file, which would requrie a lot of re-engineering on our part, so gonna have to live with a hack I guess.
# Leave immediately after generating changes if dry_run | ||
resources = {} | ||
changed = False | ||
async for name, resource in self.downloader.download_all_resources(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handles one file at a time now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good, but I want to make sure we think through the apply_changes
interface a bit, and I wanted to flag the tmp_dir
lifecycle question.
The FileWrapper is fine... I tried to look at how aiohttp wants us to do this, and couldn't figure it out with a few minutes of searching, so I gave up.
Finally I have a small suggestion for the test that (I think) means you get to skip making a whole new test class.
src/pudl_archiver/orchestrator.py
Outdated
|
||
if action: | ||
changed = True | ||
await self._apply_changes(action, name, resource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A single change is a tuple of action, name, resource
, right? If that's the case, it's nice to not be able to split up the components of a change when we call _apply_change
:
add_or_update = self._generate_change(name, resource) # maybe we could make a Change type but just a tuple seems fine too
if self.dry_run:
continue
if add_or_update:
changed = True
await self._apply_change(add_or_update)
deletes = self._get_files_to_delete(resources)
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a class to encapsulate a change that I think makes things nicer
src/pudl_archiver/orchestrator.py
Outdated
await self._upload_file( | ||
_UploadSpec(source=resource.local_path, dest=name) | ||
) | ||
case _DepositionAction.UPDATE: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, wonder if we could encode a delete as an update with no resource, since an update is a delete + an upload anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, I guess, an update could be encoded as a delete then a create.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lmk what you think of my change. I could see it being a bit confusing that the Update
triggers both if
statements, but it does remove redundant code.
tests/unit/archive_base_test.py
Outdated
self.call_count += 1 | ||
return Path(f"path{self.call_count-1}") | ||
|
||
tmpdir_mock = mocker.Mock(side_effect=TmpDir()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-blocking: would this work for the same purpose?
tmpdir_mock = mocker.Mock(side_effect=[f"path{i}" for i in range(5)]
I think if you wanted to have an infinitely incrementing path counter you could do this, too:
import itertools
tmpdir_mock = mocker.Mock(side_effect=(f"path{i}" for i in itertools.count(0)))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Didn't realize you could just pass an iterator as a side effect
@@ -34,6 +41,22 @@ class Config: | |||
arbitrary_types_allowed = True | |||
|
|||
|
|||
class FileWrapper(io.BytesIO): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The discussion on this issue implies that there's some sort of custom Payload we could pass in, and that this is the preferred method by the aiohttp maintainer. But, it's been a whole major version since then, and the documentation doesn't mention this at all.
This issue uses a "monkeypatch the .close()
function" approach which seems equally janky but a little shorter. 🤷
This existing option is good enough for me, though we should add some more documentation about this somewhat surprising behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New class looks great! tiny nit: now _apply_changes
only takes one change
, maybe we should rename 🤷
This script archives FERC EQR data from two sources: 2002-2013 and 2014-present. Technically, the code should work: it produces the expected URLs for each resource, and sends them to the
download_zipfile()
method. However, two issues will frustrate the easy use of this archiver and should be dealt with more creatively.This issue is blocked by #44