New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add DAG for ccMixter #3479
Add DAG for ccMixter #3479
Conversation
Full-stack documentation: https://docs.openverse.org/_preview/3479 Please note that GitHub pages takes a little time to deploy newly pushed code, if the links above don't work or you see old versions, wait 5 minutes and try again. You can check the GitHub pages deployment action list to see the current status of the deployments. Changed files 🔄: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm excited for this one, this looks great! The ingested data looks good to me. The blocking request is for the invalid control characters I encountered during testing -- another case of bad JSON we need to handle 😓 The workarounds here look great, though.
I also noticed an upload_num_scores
in the response; could we use this for popularity scores? This isn't documented super clearly, but from the API docs this is indicated to be the number of ratings
. It appears on ccMixter 'ratings' are thumbs-ups/votes to recommend. For example, upload_num_scores
for this record
is 6, and you can see the 6 👍 on the page.
To use this for a popularity metric we'd need to record it as a field in the meta_data
column and then add that field name to the metrics table definition here.
response_json = json.loads(cleaned_json) | ||
except json.JSONDecodeError as e: | ||
logger.warning(f"Could not get response_json.\n{e}") | ||
response_json = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noting that returning None
will cause the request to retry until failure -- though that may indeed be what we want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally there is no point in retrying the request because it will be the same response and raise the same error till it exhausts all tries. I just used this as the minimum change to DelayedRequested
so that it can workaround ccMixter's issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran into this case in my tests. For confirmation, do we want it to fail if the json can't be decoded or we want it to continue? In the first case, would that mean the DAG requires modifications?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would think that it makes more sense for the DAG to fail when the JSON is corrupt because then it's not a problem with a specific record (that we can ignore and move on) but rather with hundreds (an entire page's worth) of records.
@krysal did you run into the DAG retrying till failure? If so, could you share the stack trace (and the page/offset it was at). I'd very much like to iron out all known edge cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It ended up with "offset": 26800
. There is no much more info in the stack trace.
[2023-12-15, 02:06:51 UTC] {cc_mixter.py:49} WARNING - JSON had bad octals, substitutions were made.
[2023-12-15, 02:06:51 UTC] {cc_mixter.py:58} WARNING - Could not get response_json.
Expecting ',' delimiter: line 1 column 114472 (char 114471)
[2023-12-15, 02:06:51 UTC] {requester.py:151} WARNING - Bad response_json: None
[2023-12-15, 02:06:51 UTC] {requester.py:152} WARNING - Retrying:
_get_response_json(
https://ccmixter.org/api/query/,
{'format': 'json', 'limit': 100, 'offset': 26800},
retries=0)
[2023-12-15, 02:06:52 UTC] {cc_mixter.py:49} WARNING - JSON had bad octals, substitutions were made.
[2023-12-15, 02:06:52 UTC] {cc_mixter.py:58} WARNING - Could not get response_json.
Expecting ',' delimiter: line 1 column 114472 (char 114471)
[2023-12-15, 02:06:52 UTC] {requester.py:151} WARNING - Bad response_json: None
[2023-12-15, 02:06:52 UTC] {requester.py:152} WARNING - Retrying:
_get_response_json(
https://ccmixter.org/api/query/,
{'format': 'json', 'limit': 100, 'offset': 26800},
retries=-1)
[2023-12-15, 02:06:52 UTC] {requester.py:141} ERROR - No retries remaining. Failure.
[2023-12-15, 02:06:52 UTC] {media.py:233} INFO - Writing 31 lines from buffer to disk.
[2023-12-15, 02:06:52 UTC] {provider_data_ingester.py:505} INFO - Committed 26631 records
[2023-12-15, 02:06:52 UTC] {taskinstance.py:1937} ERROR - Task failed with exception
providers.provider_api_scripts.provider_data_ingester.IngestionError: Retries exceeded
query_params: {"format": "json", "limit": 100, "offset": 26800}
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py", line 192, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py", line 209, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/catalog/dags/providers/factory_utils.py", line 55, in pull_media_wrapper
data = ingester.ingest_records()
File "/opt/airflow/catalog/dags/providers/provider_api_scripts/provider_data_ingester.py", line 269, in ingest_records
raise error from ingestion_error
File "/opt/airflow/catalog/dags/providers/provider_api_scripts/provider_data_ingester.py", line 231, in ingest_records
batch, should_continue = self.get_batch(query_params)
File "/opt/airflow/catalog/dags/providers/provider_api_scripts/provider_data_ingester.py", line 392, in get_batch
response_json = self.get_response_json(query_params)
File "/opt/airflow/catalog/dags/providers/provider_api_scripts/provider_data_ingester.py", line 413, in get_response_json
return self.delayed_requester.get_response_json(
File "/opt/airflow/catalog/dags/common/requester.py", line 159, in get_response_json
response_json = self.get_response_json(
File "/opt/airflow/catalog/dags/common/requester.py", line 159, in get_response_json
response_json = self.get_response_json(
File "/opt/airflow/catalog/dags/common/requester.py", line 159, in get_response_json
response_json = self.get_response_json(
[Previous line repeated 1 more time]
File "/opt/airflow/catalog/dags/common/requester.py", line 142, in get_response_json
raise RetriesExceeded("Retries exceeded")
common.requester.RetriesExceeded: Retries exceeded
[2023-12-15, 02:06:52 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=cc_mixter_workflow, task_id=ingest_data.pull_audio_data, execution_date=20231215T020212, start_date=20231215T020218, end_date=20231215T020652
[2023-12-15, 02:06:52 UTC] {slack.py:320} INFO -
*DAG*: `cc_mixter_workflow`
*Task*: `ingest_data.pull_audio_data`
*Logical Date*: 2023-12-15T02:02:12Z
*Log*: http://localhost:8080/log?execution_date=2023-12-15T02%3A02%3A12%2B00%3A00&task_id=ingest_data.pull_audio_data&dag_id=cc_mixter_workflow&map_index=-1
*Exception*: Retries exceeded
*Exception Type*: `common.requester.RetriesExceeded`
[2023-12-15, 02:06:52 UTC] {base.py:73} INFO - Using connection ID 'slack_alerts' for task execution.
[2023-12-15, 02:06:52 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 97 for task ingest_data.pull_audio_data (Retries exceeded; 335)
[2023-12-15, 02:06:52 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2023-12-15, 02:06:53 UTC] {taskinstance.py:2778} INFO - 2 downstream tasks scheduled from follow-on schedule check
Co-authored-by: Staci Mullins <63313398+stacimc@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fantastic 🚀
Based on the medium urgency of this PR, the following reviewers are being gently reminded to review this PR: @krysal Excluding weekend1 days, this PR was ready for review 6 day(s) ago. PRs labelled with medium urgency are expected to be reviewed within 4 weekday(s)2. @dhruvkb, if this PR is not ready for a review, please draft it to prevent reviewers from getting further unnecessary pings. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great! I ran it and the new rows looked all correct. I don't have any blocker but just asking for a confirmation of the expected behavior for a case when it fails. Otherwise, excellent work with the workarounds and tests!
response_json = json.loads(cleaned_json) | ||
except json.JSONDecodeError as e: | ||
logger.warning(f"Could not get response_json.\n{e}") | ||
response_json = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran into this case in my tests. For confirmation, do we want it to fail if the json can't be decoded or we want it to continue? In the first case, would that mean the DAG requires modifications?
Co-authored-by: Krystle Salazar <krystle.salazar@automattic.com>
Co-authored-by: Krystle Salazar <krystle.salazar@automattic.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great! Records load as expected, and wow those workarounds are unique 😅 I have a few thoughts but nothing blocking.
Co-authored-by: Madison Swain-Bowden <bowdenm@spu.edu>
😱 I'm going to merge this on a Friday evening! |
Fixes
Fixes #1770 by @obulat
Description
This PR adds the DAG for ccMixter (finally!). It includes
twothree workarounds for ccMixter's shenanigans.This is my first time writing a DAG so please review a little more carefully and don't hold back on the criticisms. Thanks!
Testing Instructions
cc_mixter_workflow
DAG.Checklist
Update index.md
).main
) or a parent feature branch.Developer Certificate of Origin
Developer Certificate of Origin