Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to make expand task params as optional #182

Closed
wants to merge 1 commit into from

Conversation

ifulia
Copy link

@ifulia ifulia commented Feb 22, 2024

Make expand task params as optional, so not all tasks need to provide it. Currently, if there is no expand field in tasks YAML, it fails to generate the DAG from YAML. Also using expand creates a task mapping in airflow which doesn't not work when you want to use a branch operator to skip certain tasks based on the condition. Fix it here by making it optional.

Make expand task params as optional, so not all tasks need to provide it. 
Currently, if there is no expand field in tasks YAML, it fails to generate the DAG from YAML. Also using expand creates a task mapping in airflow which doesn't not work when you want to use a branch operator to skip certain tasks based on the condition.
Fix it here by making it optional.
@matveykortsev
Copy link
Contributor

Hey @ifulia, I dont think its actually fix.
Its already possible to make it optional:
https://github.com/ajbosco/dag-factory/blob/aad7600f3df56f6c5cc2c0aa304816f43eb29a32/dagfactory/dagbuilder.py#L579-L589
These checks are made in dag builder, before get_expand_partial_kwargs which you edited.
Here is examples with expand and without expand which is currently in my production:

dag_name:
  description: ""
  schedule_interval: "50 9 * * *"
  template_searchpath: ['/opt/airflow/dags/folder']
  tasks:
    task_1:
      operator: operator.Operator
      conn_id: 'conn_id'
    task_2:
      operator: airflow.operators.python_operator.PythonOperator
      python_callable_name: python_callable
      python_callable_file: /opt/airflow/plugins/utils.py
      dependencies: [task_1]
    task_3:
      operator: airflow.operators.python_operator.PythonOperator
      python_callable_name: prometheus_task
      python_callable_file: /opt/airflow/plugins/utils.py
      partial:
        op_kwargs:
          conn_id: "conn_id"
          prometheus_url: "{{ conn.PROMETHEUS }}"
          pg_host: "{{ conn.PG.host }}"
          pg_schema: "{{ conn.PG.schema }}"
          pg_login: "{{ conn.PG.login }}"
          pg_password: "{{ conn.PG.password }}"
      expand:
        op_args:
          task_2.output
      dependencies:
        - task_2
dag_name:
  template_searchpath: ['/opt/airflow/dags/sql']
  schedule_interval: "0 5 * * MON"
  tasks:
    task_1:
      sql: 'insert_editor_event_week.sql'
      operator: path_to_sql_operator.Operator
      sql_truncate: 'alter.sql'
      conn_id: 'conn_id'
      params:
        table: 'table_name'
        delete_condition: 'timestamp'

@ifulia
Copy link
Author

ifulia commented Feb 23, 2024

Hey @ifulia, I dont think its actually fix. Its already possible to make it optional:

https://github.com/ajbosco/dag-factory/blob/aad7600f3df56f6c5cc2c0aa304816f43eb29a32/dagfactory/dagbuilder.py#L579-L589

These checks are made in dag builder, before get_expand_partial_kwargs which you edited.
Here is examples with expand and without expand which is currently in my production:

dag_name:
  description: ""
  schedule_interval: "50 9 * * *"
  template_searchpath: ['/opt/airflow/dags/folder']
  tasks:
    task_1:
      operator: operator.Operator
      conn_id: 'conn_id'
    task_2:
      operator: airflow.operators.python_operator.PythonOperator
      python_callable_name: python_callable
      python_callable_file: /opt/airflow/plugins/utils.py
      dependencies: [task_1]
    task_3:
      operator: airflow.operators.python_operator.PythonOperator
      python_callable_name: prometheus_task
      python_callable_file: /opt/airflow/plugins/utils.py
      partial:
        op_kwargs:
          conn_id: "conn_id"
          prometheus_url: "{{ conn.PROMETHEUS }}"
          pg_host: "{{ conn.PG.host }}"
          pg_schema: "{{ conn.PG.schema }}"
          pg_login: "{{ conn.PG.login }}"
          pg_password: "{{ conn.PG.password }}"
      expand:
        op_args:
          task_2.output
      dependencies:
        - task_2
dag_name:
  template_searchpath: ['/opt/airflow/dags/sql']
  schedule_interval: "0 5 * * MON"
  tasks:
    task_1:
      sql: 'insert_editor_event_week.sql'
      operator: path_to_sql_operator.Operator
      sql_truncate: 'alter.sql'
      conn_id: 'conn_id'
      params:
        table: 'table_name'
        delete_condition: 'timestamp'

@matveykortsev
If you don't use partial and expand then it works. Problem comes when you just use partial.

Here is what I am using and getting error when creating dag.

dag_name:
  default_args:
    owner: 'example_owner'
    retries: 1
    start_date: 2018-01-01
    retry_delay_sec: 300
  description: ""
  schedule_interval: "50 9 * * *"
  tasks:
    task_1:
      operator: airflow.operators.python.PythonOperator
      python_callable_name: start
      python_callable_file: /opt/airflow/dags/create_env.py
    task_2:
      operator: airflow.operators.python.PythonOperator
      python_callable_name: join
      python_callable_file: /opt/airflow/dags/create_env.py
      partial:
        op_kwargs:
          env_name: "test"
      dependencies: [task_1]

The error I get is

Traceback (most recent call last):
  File "/opt/homebrew/lib/python3.11/site-packages/dagfactory/dagbuilder.py", line 589, in make_task
    ) = utils.get_expand_partial_kwargs(task_params)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/dagfactory/utils.py", line 235, in get_expand_partial_kwargs
    for expand_key, expand_value in task_params["expand"].items():
                                    ~~~~~~~~~~~^^^^^^^^^^
KeyError: 'expand'

@ifulia
Copy link
Author

ifulia commented Feb 26, 2024

@matveykortsev @ajbosco Could you please help merge this PR.

@ifulia ifulia marked this pull request as draft February 27, 2024 18:08
@ifulia ifulia marked this pull request as ready for review February 27, 2024 18:08
@ifulia
Copy link
Author

ifulia commented Feb 27, 2024

@ajbosco @matveykortsev Could you please help review and merge this. This is blocking our use-case where we want to use only partial params. Let me know if you have any concerns.

@matveykortsev
Copy link
Contributor

matveykortsev commented Feb 27, 2024

@ajbosco @matveykortsev Could you please help review and merge this. This is blocking our use-case where we want to use only partial params. Let me know if you have any concerns.

I still don’t understand of use case with partial but without expand. You can just pass params directly to operators then

@ifulia
Copy link
Author

ifulia commented Feb 27, 2024

I still don’t understand of use case with partial but without expand. You can just pass params directly to operators then

Ah, I didnt realize that you can pass it with params. I thought the only way to pass params is through partial and/or expand. This solves the problem. Thanks @matveykortsev
Closing this PR

@ifulia ifulia closed this Feb 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants