Conversation
This is some really exciting work! I took a look at your first commit, I think that would probably be the most ideal route. Ultimately that was what we had discussed early on I believe - circumventing the python-based provider data ingestion steps entirely and only doing the operations in SQL. The license table you've set up is probably something we'd want to hard code for all SQL-only loads, and I wonder what other cleaning operations/checks we could standardize in SQL-land which would match what we currently have in python land too. Maintaining that parity will probably be the most difficult piece going forward, but it's worth it if that is the best way of doing bulk imports IMO 🙂 |
Thanks @AetherUnbound ! To help me make sure that all of the data quality / processing steps in python get moved to SQL here, I made this sheet to try to list out where cleaning is happening in the catalog and API repos. Maybe it would help with WordPress/openverse#244 as well. I wonder if you and/or @krysal would have a second to take a look at it and give me feedback? I haven't finished the API part, but it's a start there. I'm also thinking about the performance issues that @obulat raised, and the impact of having so many "TOASTed" fields in such a large table. I'm wondering how to navigate the trade-offs between complete data (e.g. repeating the provider name for each and every tag in the intermediate table), and performance of the load. More to learn there! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whew, so much of this SQL magic is way over my head 🤯 I'm excited to see you're making awesome progress on this though!! Please feel free to ping us if there's anything you'd like us to look at or test specifically as you chug along 😄 🚋 🔬
@@ -72,7 +72,7 @@ def create_loading_table( | |||
columns_definition = f"{create_column_definitions(loading_table_columns)}" | |||
table_creation_query = dedent( | |||
f""" | |||
CREATE TABLE public.{load_table}( | |||
CREATE UNLOGGED TABLE public.{load_table}( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Verrrrrry cool 😮
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yay!!! Right???
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL! Would this ever be an issue in the future if we use log based replication? Is that something the catalogue would ever need? Maybe not something we need to worry about if we think we might move towards parquet or some other data storage than a relational DB?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the documentation, the biggest downsides to an unlogged table are that they 1) are not crash resistant and will be truncated on an unclean shutdown and 2) are not replicated. Since this is a transient table (and we don't do replication anyway), we should be able to recover it if postgres shuts down by re-running this task. I don't think there's any additional concern about having this one be unlogged, even if we don't move to some other data storage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, great question, @sarayourfriend , I was kind of concerned at first when I saw what a big difference it made, but then... well, what @AetherUnbound said. :)
# Tried importing common.loader.paths.STAGING_DIRECTORY but it didn't work in | ||
# local environment, TO DO: find a better place for this process. | ||
DATA_DIR = Path(__file__).parents[4] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, we no longer have STAGING_DIRECTORY
there! Best place for it might be OUTPUT_DIR
? Although would we want to make sure this gets cleaned up, or have it stick around for the lifetime of the container (e.g. however long it is until the next deployment)? I see you have a note about getting this into S3, would that be more resilient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'm not sure. I did a project once that would download tables via an API (kind of like the Catalog of Life data), and store them on S3 under a dated prefix, just because I was a hard core data hoarder, and then Redshift would pretty much only have the current day's data except for very special cases where if we wanted to know when something changed it wasn't impossible to go back through the archive on S3. But I think that might be overkill for here. (?) Maybe it would make sense to have something like a last_inaturalist_data_load/
prefix in the bucket where the tsv files normally go, where the raw source files could go? And yeah, changing this to OUTPUT_DIR
sounds great.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could see that working! Especially with the cadence this DAG will run at, putting this where we'd store the other data (either TSV or otherwise) sounds like a good call 🙂
f"No download from Catalog of Life. {DATA_DIR}/{local_zip_file} exists." | ||
) | ||
else: | ||
with requests.get(COL_URL, stream=True) as r: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ooo, this is a new request pattern! Unfortunately we don't have a way to handle streaming yet in the DelayedRequester
machinery. Something to think about down the line maybe!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally! I thought about trying to use DelayedRequester
, and I think it does take kwargs
, so technically it could just pass along the stream=True
, but then it wouldn't have the rest of the pattern. It seems like it's more designed for a bunch of smaller requests than the one big one, but maybe I should at least be more explicit about retries? Or maybe it would be better to break this into smaller tasks and let airflow handle it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is solid as is, because even with DelayedRequester
's retries we'd still need to change a LOT in order to get the context manager + stream
to work. Something to look at again if it's a pattern we notice though!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this would be possible outside the context manager, but we would need to ensure the connection is closed once the operation is complete:
r = None
try:
r = self.delayed_requester.get(COL_URL, stream=True)
with open(DATA_DIR / local_zip_file, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
finally:
if r is not None:
r.close()
(1, 'Animals'), | ||
(47118, 'Spider'), | ||
(47124, 'Flowers'), | ||
(47126, 'Plants'), | ||
(47157, 'Butterflies and Moths'), | ||
(47158, 'Insect'), | ||
(47167, 'Mushroom'), | ||
(47434, 'Grass'), | ||
(47604, 'Daisy'), | ||
(48662, 'Monarch Butterfly'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any concern these taxon_ids
could change? Would it be better to map the Latin to the English here and then link that to IDs via a join?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is so much a better idea!!!
FROM inaturalist.col_name_usage n | ||
INNER JOIN inaturalist.col_vernacular v on v.taxonid = n.id | ||
where length(n.id) <= 10 | ||
group by cast(md5(n.scientificname) as uuid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simple SQL literacy question, could this just be group by 1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is such a great question! Yeah, I feel like there are some dialects or configurations where that doesn't work and others where it's more performant, but I think our postgres is one of the latter, so yeah, I'll totally change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! I was mostly thinking about not having to declare the same functions on the column, but it'd be even better if that is more performant 🏎️
,'|')) as TAGS, | ||
taxa_enriched.title, | ||
LICENSE_CODES.license_url_metadata as META_DATA, | ||
taxa_enriched.tags, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tag enrichment is exactly the kind of behavior I was thinking we'd need to have SQL equivalents for 😅 This is great for now, but definitely as we start to have more SQL-only ingestions we'll want some mechanism for abstracting this! Maybe a set of defined functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah... I worry about navigating costs of context switching with user defined functions in SQL, particularly for functions that will be run many times over a table, but that might just be something I need to learn to do better in postgres. dbt lets you write jinja macros (e.g. pivot) for this kind of thing so that you're always running something in SQL, but your code can still be DRY.
And I guess for this one in particular, I have a bit of a fantasy that we will make a separate tags
table (e.g. with media_type, provider, identifier, tag_name
and maybe authority
if that makes sense) and stop trying to handle them in JSON at all.
But yeah, at a high level, I totally agree that this is not ideal for the longer term. I'm adding it to the list at the top of the PR for now.
openverse_catalog/dags/providers/provider_api_scripts/inaturalist.py
Outdated
Show resolved
Hide resolved
I'm tracking my test runs here. I haven't done a full load to
But, on this test run, airflow completely crashed before the DB had a chance to run out of space. :/ Maybe you can test it locally again and/or offer advice @AetherUnbound ? |
Ah okay, that's all helpful! I had assumed that it was the Airflow disk space being filled up because of the TSVs, but on second glance that seems incorrect since we're loading directly from PG->PG now! The production database has plenty of space so that shouldn't be a problem 😄 It's odd that Airflow went down on that last run. Would you be able to try another and see what happens? If the scheduler crashes again, I'd love to see the last few lines of output of |
Yeah, I can give it a try this weekend, just it would have to wait until then, because I need to kind of leave my computer alone for a couple of days while it's running locally. I did try |
I just started reviewing this, and I have to say, this is such an incredible work of investigation and improvements! I haven't read the whole thread or code yet but observing the following make it very promising:
I ran the DAG as is and I got 14 rows in the |
It used to be all in one container! But now it's spread across |
Yeah, this has been one of the key challenges of this work. I've been downloading four separate zipped files (
No apologies necessary at all, and I'm psyched to hear/read your thoughts @krysal ! |
I added comments with more details on testing. And then, it occurred to me that I sometimes comment out the post-ingestion parts of the dag for testing, so that I can compare the raw inaturalist and catalog of life tables against the target image table. Maybe I'll take a look at how to set up a run time variable in airflow to skip those steps for testing. Hmmmm... Overkill? Helpful? What do you think @krysal and @AetherUnbound ? |
Ah, good question! While that would be possible certainly, I think it's okay to just add that as an additional comment to the most recent testing documentation you added 🙂 |
I started looking at how to do use a runtime parameter today, just because I was curious, and in the process realized that I had commented out the code to delete the catalog of life downloads. :/ So this uses the parameter for both the catalog of life files, and the inaturalist db schema. But it also raises the question -- like catalog of life could be it's own dag in some ways. Anyway, I don't know, I can roll back this commit. I have some concerns about side effects given that I had to set it at the dag level rather than the task level. Or maybe instead I should move the COL file removal into its own separate post-ingestion task, so that the parameter can just be for the ShortCircuit step rather than the whole dag... ??? |
Ooo, very cool! I think your current implementation looks great - defaulting to prod behavior (removing the files) was all I wanted to assure and it seems that's the case 🙂 |
I finished a full local run with data downloaded on 1/1/2023. Some details and visualizations are here.
Reflections:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That info is so helpful, thank you @rwidom!! I'm feeling good about this based on your data, and your changes to the tests look great! Note that there are likely to be some conflicts with #939 because of the executor changes. I think I'm going to merge that PR shortly so if it's OK with you I'll also rebase this PR and address any merge conflicts that might come up with that one.
Thank you for all your continued efforts on this PR! It's exciting to think how much we'll be increasing our catalog's data size 💥
Okay, I think I've done the merge correctly and gotten this in line with main! I was able to run the DAG with the test data successfully 🙂 Please give it a double-check @rwidom in case I missed anything! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After taking the time to review this in detail, I have to say this is impressive! Watching all the parts involved, like importing from the S3 CSV files, the communication between different tasks in the DAG, the use of SQL templates, and even more! (I won't be able to name them all). Seriously huge kudos for transforming iNaturalist to almost a purely SQL load DAG provider 🎉
I left a few comments on the fields used that are quick to fix, so I don't want to hold more onto this PR! Just being able to finish a DAG run is amazing 😄 and as you already noted, we can work on improvements in separate issues/PRs. For Example, one of the things we introduced a while was standardized filetypes, would be nice to port that to SQL, and avoid having mixed jpg
jpeg
and JPG
but it's no super necessary right now.
openverse_catalog/dags/providers/provider_api_scripts/inaturalist.py
Outdated
Show resolved
Hide resolved
openverse_catalog/dags/providers/provider_api_scripts/inaturalist.py
Outdated
Show resolved
Hide resolved
..._catalog/dags/providers/provider_csv_load_scripts/inaturalist/transformed_table.template.sql
Outdated
Show resolved
Hide resolved
COALESCE(INATURALIST.OBSERVERS.LOGIN, INATURALIST.PHOTOS.OBSERVER_ID::text) | ||
as CREATOR, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of the observer will be preferred over their username.
COALESCE(INATURALIST.OBSERVERS.LOGIN, INATURALIST.PHOTOS.OBSERVER_ID::text) | |
as CREATOR, | |
COALESCE(INATURALIST.OBSERVERS.NAME, INATURALIST.OBSERVERS.LOGIN) | |
as CREATOR, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ooh! Great catch! I think I must have had a left join to observers in a prior version and maybe I was worried about some names being truncated or something? I don't know. Hopefully my fix doesn't over-complicate things.
Co-authored-by: Krystle Salazar <krystle.salazar@automattic.com>
Co-authored-by: Krystle Salazar <krystle.salazar@automattic.com>
Related to
This relates to WordPress/openverse#1456 in that my understanding is that we have not yet completed a full load of iNaturalist data, and if we were going to start loading the data incrementally, we would probably want a full load to start from. In practice, it also runs the cleaning and upsert steps on smaller batches, rather than all at once across 120+ million records.
It also integrates Catalog of Life data to address WordPress/openverse#1452.
Description
The existing iNaturalist dag essentially follows these steps: load normalized data from s3 to PostgreSQL, pull integrated JSON out in batches of 10,000 joined records, and then follow the rest of the steps in any provider dag, to reload the JSON into PostgreSQL.
The initial load of normalized data takes on the order of 10-15 minutes for approximately 120,000,000 records in `inaturalist.photos', and somewhat fewer in the other inaturalist tables. However, the dag has timed out after several days on the step where joined the data is being pulled out of PostgreSQL.
This PR moves all of the processing to SQL. The latest local run timed out after 24 hours and 87,943,312 records loaded to
public.image
. I increased the timeout to be 48 hours, but/and we could consider longer. I estimate that it should complete within 36 hours. More on that on the chart 12-14-2022 tab in this google sheet. The workbook also contains visualizations for other test runs during this development process.Issues I will make and add to comments for future work
Testing Instructions
I increased the disk image size in my local docker to 192 GB. You should be able to run the whole thing with a very small sample of test data in the repo, and I made some changes to
tests/dags/providers/provider_api_scripts/resources/inaturalist/pull_sample_records.py
to make it easier/faster to pull a test sample of your choice. And then poke around for data quality/formatting stuff. Instructions to download the inaturalist data.I could really use a second set of eyes to double check that I have implemented all of the python data cleaning steps (initial attempt on the "image table ddl" tab here, in the SQL. After running the large majority of the dataset, the log for the python cleaning step looks promising to me, but without more on WordPress/openverse#1331, a lot of this has to happen manually.
Developer Certificate of Origin
Developer Certificate of Origin