-
Notifications
You must be signed in to change notification settings - Fork 177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Modify add_license_url
DAG to use batched_update
#4370
Conversation
30556b5
to
ca08682
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good! Only a few comments, nothing blocking. Thanks for the helpful testing instructions, so glad we can leverage the batched update for this 🚀
), | ||
# Merge existing metadata with the new license_url | ||
"update_query": f"SET meta_data = ({Json(license_url_dict)}::jsonb || meta_data), updated_on = now()", | ||
"update_timeout": 259200, # 3 days in seconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could maybe be better expressed as:
"update_timeout": 259200, # 3 days in seconds | |
"update_timeout": 60 * 60 * 24 * 3, # 3 days in seconds |
trigger = TriggerDagRunOperator.partial( | ||
task_id="trigger_batched_update", | ||
trigger_dag_id=BATCHED_UPDATE_DAG_ID, | ||
wait_for_completion=True, | ||
execution_timeout=timedelta(hours=5), | ||
max_active_tis_per_dag=1, | ||
retries=0, | ||
).expand(conf=get_confs(licenses, batch_size="{{ params.batch_size }}")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we could do map_index_template
here? Similar to this:
@task(map_index_template="{{ task.op_kwargs['upstream_table_name'] }}") |
That way we could see the licenses as part of the mapped index/task name!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried it in the previous DAG version as suggested in the Airflow documentation, but an upstream issue prevented it from working with the parameters of the task: apache/airflow#29366.
Do you see a workaround here?
Edit: I must have been doing something wrong before because now I made it work 😄
Co-authored-by: Madison Swain-Bowden <bowdenm@spu.edu>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the detailed testing instructions! They worked well locally.
I added a non-blocking suggestion for code clarity inline.
@task | ||
def get_confs(licenses, batch_size: int) -> list[dict]: | ||
if not licenses: | ||
raise AirflowSkipException("No config required.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds confusing. What does "No config required." mean here? Should it be the opposite, "License config required."?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are no licenses to backfill, then the DAG stops here. There is no need to create a set of configurations for the batched_update
DAG. I rephrased it; I hope it's clearer now!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely clearer :) Thank you!
) | ||
report_completion(updated, query) | ||
updated >> report_failed_license_pairs() | ||
licenses = get_licenses(query) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think it would be easier to understand the code flow if the query is moved inside get_licenses
function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, the previous version of the DAG reused this query for two tasks. Now this can be in there as you suggest 👍
Thanks for the reviews and suggestions folks! |
Fixes
Fixes #4348 by @krysal
Description
As expressed in the title, this PR changes the
add_license_url DAG
to trigger onebatched_update
DAG run per group of licenses found to backfill. It also installs python-tabulate to improve the format of Slack messages and logs, so numbers are more easily readable, and the license group is clearly identified with its mapped tasks index number.Testing Instructions
just built
just catalog/pgcli
batched_update
in the Airflow UIadd_license_url
DAG and wait for it to finishmeta_data->license_url
fieldChecklist
Update index.md
).main
) or a parent feature branch.just catalog/generate-docs
for catalogPRs) or the media properties generator (
just catalog/generate-docs media-props
for the catalog or
just api/generate-docs
for the API) where applicable.Developer Certificate of Origin
Developer Certificate of Origin