-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/earthbeam dynamic task mapping #48
base: main
Are you sure you want to change the base?
Conversation
'{{ ds_nodash }}', '{{ ts_nodash }}' | ||
) | ||
|
||
raw_to_s3_kwargs = list_files_task.output.map(lambda raw_dir: {'local_filepath': raw_dir['local_filepath'] }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rlittle08 Why not just update this line to the following:
raw_to_s3_kwargs = list_files_task.output.map(lambda raw_dir: {
'local_filepath': raw_dir['local_filepath'],
's3_conn_id': s3_conn_id,
's3_destination_key': s3_raw_filepath,
'remove_local_filepath': False,
})
And then remove the need for the static_kwargs
argument?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might work. I thought I had some issue with passing static values as dynamic args, but I can't remember the details of it, so I think this is worth a shot
'input_file': input_file['local_filepath'], | ||
'output_dir': edfi_api_client.url_join( | ||
em_output_dir, | ||
input_file['ds_nodash_string'], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we passing ds_nodash
and ts_nodash
strings across these methods? These date stamps are already added when a user uses EarthbeamDAG.build_local_raw_dir()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you help me with this? I'm not sure how to correctly construct output_dir here
Introduces build_dynamic_tenant_year_taskgroup, for running dynamic task mapping in earthbeam. This allows you to run multiple instances of earthmover, etc. if you receive multiple files
Some things that definitely need review:
dynamicPythonOperator
. I added this to allow for some kwargs to be static, some dynamic. But there is likely a better way to accomplish thisevaluate_kwargs
and handling of kwargs in general. This probably overlaps with Jay's work to make a configurable factory.. we need to be flexible to multiple different structures of earthmover kwargs, depending on the configured template. I sort of have that here, but thecontext
from here is hard coded to include one input_file, when we likely need to be flexible to multiple input parameters and other earthmover kwarg constructions??I have been testing with this refactored DAG in texas - https://github.com/edanalytics/stadium_txexchange/blob/feature/refactor_dynamic_earthbeam/airflow/dags/refactor_dynamic_earthbeam_dags.py