-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix/dataset ingest fixes #183
Conversation
….5.1. Changes to discover pipeline to fix bugs.
@@ -42,60 +43,59 @@ def get_files_to_process(ti): | |||
"""Get files from S3 produced by the discovery task. | |||
Used as part of both the parallel_run_process_rasters and parallel_run_process_vectors tasks. | |||
""" | |||
payload = ti.xcom_pull(task_ids=f"{group_kwgs['group_id']}.discover_from_s3") | |||
payloads_xcom = payload.pop("payload", []) | |||
dynamic_group_id = ti.task_id.split(".")[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The group ID gets assigned and overwritten automatically when the subdag is used with an .expand()
or .chain()
call. This change should allow the task to xcom_pull
from the correct task which precedes it.
|
||
@task_group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The decorated TaskFlow
grouping adds an interface to support different upstream/downstream definitions.
start = EmptyOperator(task_id="start", dag=dag) | ||
end = EmptyOperator(task_id="end", trigger_rule=TriggerRule.ONE_SUCCESS, dag=dag) | ||
end = EmptyOperator(task_id="end", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, dag=dag) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This last task is a no-op
, and keeps getting skipped. I'm not sure why. The DAG still gets marked as a success or failure as appropriate, but I think there's a trigger rule nuance I'm missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think NONE_FAILED would be enough? there were only two direct upstream tasks, right? Should handle the skipped one as well - unless, is there a scenario where both direct upstreams are skipped?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't tried it myself yet, but @amarouane-ABDELHAK thinks ONE_SUCCESS
is the correct trigger here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None_failed can work too, suggested ONE_SUCCESS because we expect at any giving time only one task to be successful
@task | ||
def extract_discovery_items(**kwargs): | ||
ti = kwargs.get("ti") | ||
discovery_items = ti.dag_run.conf.get("discovery_items") | ||
print(discovery_items) | ||
return discovery_items | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In datasets (or normal discovery inputs) with multiple objects, we need to pass each input separately. This seemed easier than refactoring the discovery handler to support it. We should probably include some version of this in the normal discover pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't subdag_discover
require more keys that what we have in discovery_items
? For example collection
payloads_xcom = payload.pop("payload", []) | ||
dynamic_group_id = ti.task_id.split(".")[0] | ||
payload = ti.xcom_pull(task_ids=f"{dynamic_group_id}.discover_from_s3") | ||
payloads_xcom = payload[0].get("payload", []) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The veda_dataset_pipeline DAG is working as expected (caveats in a different issue) but now I am no longer to trigger veda_discover. I.e.
"collection": "OMI_trno2-COG-deleteme",
"bucket": "veda-data-store-staging",
"datetime_range": "year",
"discovery": "s3",
"filename_regex": "^(.*).tif$",
"prefix": "OMI_trno2-COG/"
}
results in
AIRFLOW_CTX_DAG_RUN_ID=manual__2024-07-09T19:58:28+00:00
[2024-07-09, 19:58:59 UTC] {{taskinstance.py:1768}} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 75, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/airflow/.local/lib/python3.10/site-packages/airflow_multi_dagrun/operators.py", line 28, in execute
for conf in self.python_callable(*self.op_args, **self.op_kwargs):
File "/usr/local/airflow/dags/veda_data_pipeline/groups/discover_group.py", line 48, in get_files_to_process
payloads_xcom = payload[0].get("payload", [])
KeyError: 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't looked at the change but a quick test in SIT the error seems to have moved a few lines down in discover_group
[2024-07-12, 14:27:24 UTC] {{taskinstance.py:1768}} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 75, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/airflow/.local/lib/python3.10/site-packages/airflow_multi_dagrun/operators.py", line 28, in execute
for conf in self.python_callable(*self.op_args, **self.op_kwargs):
File "/usr/local/airflow/dags/veda_data_pipeline/groups/discover_group.py", line 57, in get_files_to_process
**payload[0],
This got into some obscure Airflow details, so I wanted to split this into a second PR before it goes into the
ingest_dag
branch. This PR also fixes bugs that are present ondev
already, we'll need to crosswalk some of it regardless.