Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow 1.10.10 + DAG SERIALIZATION = fails to start manually the DAG's operators #10155

Closed
ozw1z5rd opened this issue Aug 4, 2020 · 9 comments
Labels
area:serialization kind:bug This is a clearly a bug

Comments

@ozw1z5rd
Copy link

ozw1z5rd commented Aug 4, 2020

Apache Airflow 1.10.10:

Kubernetes version (if you are using kubernetes) (use kubectl version):

Environment:

  • Cloud provider or hardware configuration:
  • OS (e.g. from /etc/os-release):
    NAME="CentOS Linux"
    VERSION="7 (Core)"
    ID="centos"
    ID_LIKE="rhel fedora"
    VERSION_ID="7"
    PRETTY_NAME="CentOS Linux 7 (Core)"
    ANSI_COLOR="0;31"
    CPE_NAME="cpe:/o:centos:centos:7"
    HOME_URL="https://www.centos.org/"
    BUG_REPORT_URL="https://bugs.centos.org/"

CENTOS_MANTISBT_PROJECT="CentOS-7"
CENTOS_MANTISBT_PROJECT_VERSION="7"
REDHAT_SUPPORT_PRODUCT="centos"
REDHAT_SUPPORT_PRODUCT_VERSION="7"

What happened:

When dag serialisation is active, If I manually start an operator, the 1st one works fine, the next will fail with this error:

Could not queue task instance for execution, dependencies not met: Trigger Rule: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'skipped': Decimal('0'), 'successes': Decimal('0'), 'failed': Decimal('0'), 'upstream_failed': Decimal('0'), 'done': 0L, 'total': 1}, upstream_task_ids=set([u'query']

Settings dag serialisation to false the problem does not arise.

please note : Scheduler works fine.

What you expected to happen:

I expected to start manually all the dag's tasks from the 1st one to the last.

Code is not able to correctly find the task's status that is before the one I'm restarting.
If I start the 1st operator, anything works fine.

You can reproduce it following these steps:

  1. enable dag serialisation
  2. put the DAG in pause ( so that the scheduler won't touch it )
  3. start the 1st operator and wait it completes and it's successful
  4. start the 2nd operator....

op1 >> op2

Anything else we need to know:

This happens every time.
Mysql 5.7.x, Python 2.7

@ozw1z5rd ozw1z5rd added the kind:bug This is a clearly a bug label Aug 4, 2020
@potiuk
Copy link
Member

potiuk commented Aug 4, 2020

Interesting one!

@kaxil
Copy link
Member

kaxil commented Aug 4, 2020

Can you share your DAG please @ozw1z5rd

@kaxil
Copy link
Member

kaxil commented Aug 4, 2020

And also I would strongly suggest upgrading to Python 3

@ozw1z5rd
Copy link
Author

ozw1z5rd commented Aug 4, 2020

You must enable dag serialisation to replicate my issue, no serialisation no issue on company's system.

These are my setting ( from pilot installation )

min_serialized_dag_update_interval = 15
store_dag_code = True
max_num_rendered_ti_fields_per_task = 0 # this avoid the problem of Dead lock, which seems to affect MySQL only engine

Any dag is affected, my tests where on this specific one:

from builtins import range
from datetime import timedelta

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

args = {
    'owner': 'Airflow',
    'start_date': days_ago(2),
}

dag = DAG(
    dag_id='example_sequence_restart', 
    default_args=args,
    schedule_interval='0 0 * * *',
    dagrun_timeout=timedelta(minutes=60),
    tags=['example']
)

run_this_last = DummyOperator(
    task_id='run_this_last',
    dag=dag,
)

# [START howto_operator_bash]
run_this = BashOperator(
    task_id='run_after_loop',
    bash_command='echo 1',
    dag=dag,
)
# [END howto_operator_bash]

run_this >> run_this_last

task = BashOperator(
    task_id='start',
    bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
    dag=dag,
)
task >> run_this

# [START howto_operator_bash_template]
also_run_this = BashOperator(
    task_id='also_run_this',
    bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
    dag=dag,
)
# [END howto_operator_bash_template]

also_run_this >> run_this_last

I have to say that after the database migration I changes the database a bit:

  • dag_tag
    change the constraint to
    CONSTRAINT dag_tag_ibfk_1 FOREIGN KEY (dag_id) REFERENCES dag (dag_id) on delete cascade

  • rendered_task_instance_fields
    changed the execution_date from timestamp to timestamp(6)
    execution_date timestamp(6)

  • task_fail
    changed the execution_date to timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)

I was before these changes ( mostly the on on rendered_task_instance_fields ) i was unable to manually trigger the same task twice and get the two execution completed without errors. One completed, the other was unable to make the insert into the rendered_task_instance_fields:

IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'PARTITIONADD-partition_add-2020-07-28 17:17:13' for key 'PRIMARY'")
[SQL: INSERT INTO rendered_task_instance_fields (dag_id, task_id, execution_date, rendered_fields) VALUES (%s, %s, %s, %s)]
[parameters: ('PARTITIONADD', 'partition_add', datetime.datetime(2020, 7, 28, 17, 17, 13, 315192), '{"hql": "\\n             ALTER TABLE unifieddata_cat.transient_ww_eventsjson\\n             ADD IF NOT EXISTS PARTITION( country = \'{country}\',year ... (158 characters truncated) ... e_url": "http://httpfs-preprod.hd.docomodigital.com:14000", "hdfs_path_pattern": "/Vault/Docomodigital/Preproduction/rawEvents/{country}/2020/07/28"}')]
(Background on this error at: http://sqlalche.me/e/gkpj)

After the change on excution_time anything worked fine.

@potiuk
Copy link
Member

potiuk commented Aug 4, 2020

Definiitely Python 3 is one of the best choices you can make now @ozw1z5rd !

@ozw1z5rd
Copy link
Author

ozw1z5rd commented Aug 4, 2020

And also I would strongly suggest upgrading to Python 3

Yes, I agree. However, we need to convert our customizations code to Python 3.. So for next months, if we like or not it, Python 2.7 still will stay with us.

@potiuk
Copy link
Member

potiuk commented Aug 4, 2020

Yes, I agree. However, we need to convert our customizations code to Python 3.. So for next months, if we like or not it, Python 2.7 still will stay with us.

<advertisment>Do you need help with making the move faster :)? Maybe we can help :D </advertisement>

@chenzuoli
Copy link

chenzuoli commented Aug 9, 2020

I get this error too, and fix it:
vim airflow/serialization/serialized_objects.py +582
replace:
dag.task_dict[task_id]._upstream_task_ids.add(task_id) # pylint: disable=protected-access
to
dag.task_dict[task_id]._upstream_task_ids.add(task.task_id) # pylint: disable=protected-access
because the operator get the wrong upstream_task_ids, just add the right upstream task id,and works fine.

@kaxil
Copy link
Member

kaxil commented Aug 9, 2020

This has been fixed in 1.10.11 - #8775

@potiuk potiuk added this to the Airflow 1.10.11 milestone Aug 9, 2020
@potiuk potiuk closed this as completed Aug 9, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:serialization kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

4 participants