Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import subprocess
import sys
from datetime import datetime

from builtins import input
import argparse
Expand All @@ -13,7 +14,7 @@
from airflow import jobs, settings, utils
from airflow.configuration import conf
from airflow.executors import DEFAULT_EXECUTOR
from airflow.models import DagBag, TaskInstance, DagPickle
from airflow.models import DagBag, TaskInstance, DagPickle, DagRun
from airflow.utils import AirflowException

DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
Expand Down Expand Up @@ -74,6 +75,21 @@ def backfill(args):
ignore_dependencies=args.ignore_dependencies)


def trigger_dag(args):
session = settings.Session()
# TODO: verify dag_id
dag_id = args.dag_id
run_id = args.run_id or None
execution_date = datetime.now()
trigger = DagRun(
dag_id=dag_id,
run_id=run_id,
execution_date=execution_date,
external_trigger=True)
session.add(trigger)
session.commit()


def run(args):

utils.pessimistic_connection_handling()
Expand Down Expand Up @@ -436,6 +452,14 @@ def get_parser():
"-c", "--no_confirm", help=ht, action="store_true")
parser_clear.set_defaults(func=clear)

ht = "Trigger a DAG"
parser_trigger_dag = subparsers.add_parser('trigger_dag', help=ht)
parser_trigger_dag.add_argument("dag_id", help="The id of the dag to run")
parser_trigger_dag.add_argument(
"-r", "--run_id",
help="Helps to indentify this run")
parser_trigger_dag.set_defaults(func=trigger_dag)

ht = "Run a single task instance"
parser_run = subparsers.add_parser('run', help=ht)
parser_run.add_argument("dag_id", help="The id of the dag to run")
Expand Down
77 changes: 68 additions & 9 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import sys
from time import sleep

from sqlalchemy import Column, Integer, String, DateTime, func, Index
from sqlalchemy import Column, Integer, String, DateTime, func, Index, and_
from sqlalchemy.orm.session import make_transient

from airflow import executors, models, settings, utils
Expand Down Expand Up @@ -215,8 +215,8 @@ def __init__(
self.num_runs = 1
else:
self.num_runs = num_runs
self.refresh_dags_every = refresh_dags_every
self.do_pickle = do_pickle
self.refresh_dags_every = refresh_dags_every
self.do_pickle = do_pickle
super(SchedulerJob, self).__init__(*args, **kwargs)

self.heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC')
Expand Down Expand Up @@ -329,6 +329,38 @@ def import_errors(self, dagbag):
filename=filename, stacktrace=stacktrace))
session.commit()


def schedule_dag(self, dag):
"""
This method checks whether a new DagRun needs to be created
for a DAG based on scheduling interval
"""
DagRun = models.DagRun
session = settings.Session()
qry = session.query(func.max(DagRun.execution_date)).filter(and_(
DagRun.dag_id == dag.dag_id,
DagRun.external_trigger == False
))
last_scheduled_run = qry.scalar()
if not last_scheduled_run or last_scheduled_run <= datetime.now():
if last_scheduled_run:
next_run_date = last_scheduled_run + dag.schedule_interval
else:
next_run_date = dag.default_args['start_date']
if not next_run_date:
raise Exception('no next_run_date defined!')
next_run = DagRun(
dag_id=dag.dag_id,
run_id='scheduled',
execution_date=next_run_date,
state='active',
external_trigger=False
)
session.add(next_run)
session.commit()



def process_dag(self, dag, executor):
"""
This method schedules a single DAG by looking at the latest
Expand Down Expand Up @@ -392,6 +424,7 @@ def process_dag(self, dag, executor):
if task.adhoc:
continue
if task.task_id not in ti_dict:
# TODO: Check whether this needs to be changed with DagRun refactoring
# Brand new task, let's get started
ti = TI(task, task.start_date)
ti.refresh_from_db()
Expand All @@ -415,13 +448,15 @@ def process_dag(self, dag, executor):
# in self.prioritize_queued
continue
else:
# Trying to run the next schedule
next_schedule = (
ti.execution_date + task.schedule_interval)
if (
ti.task.end_date and
next_schedule > ti.task.end_date):
# Checking whether there is a DagRun for which a task
# needs to be created
qry = session.query(func.min(models.DagRun.execution_date)).filter(
and_(models.DagRun.dag_id == dag.dag_id,
models.DagRun.execution_date > ti.execution_date))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have the state of the DAG run in DagRun, we could get a list of active runs outside of the tasks for loop, and attempt to schedule all tasks for all active job run.

This would solve confusing limitations documented on "The Scheduler" around never filling in holes and only moving forward from the last execution of each task.

next_schedule = qry.scalar()
if not next_schedule:
continue

ti = TI(
task=task,
execution_date=next_schedule,
Expand All @@ -430,6 +465,29 @@ def process_dag(self, dag, executor):
if ti.is_queueable(flag_upstream_failed=True):
logging.debug('Queuing next run: ' + str(ti))
executor.queue_task_instance(ti, pickle_id=pickle_id)

# Checking state of active DagRuns
active_runs = session.query(models.DagRun).filter(
models.DagRun.dag_id == dag.dag_id,
models.DagRun.state == 'active'
).all()
for run in active_runs:
logging.info("Checking state for " + str(run))
task_instances = session.query(TI).filter(
TI.dag_id == run.dag_id,
TI.execution_date == run.execution_date
).all()
if len(task_instances) == len(dag.tasks):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can a dag have branches with tasks that are never executed within one run? If yes, then this check would not be sufficient.

task_states = [ti.state for ti in task_instances]
if 'failed' in task_states:
logging.info(str(run) + 'is failed')
run.state = 'failed'
if set(task_states) == set(['success']):
logging.info(str(run) + 'is successful')
run.state = 'success'
else:
logging.info('not all tasks are finished')

# Releasing the lock
logging.debug("Unlocking DAG (scheduler_lock)")
db_dag = (
Expand Down Expand Up @@ -543,6 +601,7 @@ def signal_handler(signum, frame):
if not dag or (dag.dag_id in paused_dag_ids):
continue
try:
self.schedule_dag(dag)
self.process_dag(dag, executor)
self.manage_slas(dag)
except Exception as e:
Expand Down
32 changes: 32 additions & 0 deletions airflow/migrations/versions/19054f4ff36_add_dagrun.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""add DagRun

Revision ID: 19054f4ff36
Revises: 338e90f54d61
Create Date: 2015-10-12 09:55:52.475712

"""

# revision identifiers, used by Alembic.
revision = '19054f4ff36'
down_revision = '338e90f54d61'
branch_labels = None
depends_on = None

from alembic import op
import sqlalchemy as sa


def upgrade():
op.create_table(
'dag_run',
sa.Column('dag_id', sa.String(length=250), nullable=False),
sa.Column('execution_date', sa.DateTime(), nullable=False),
sa.Column('state', sa.String(length=50), nullable=True),
sa.Column('run_id', sa.String(length=250), nullable=True),
sa.Column('external_trigger', sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint('dag_id', 'execution_date')
)


def downgrade():
op.drop_table('dag_run')
26 changes: 25 additions & 1 deletion airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ def is_queueable(self, flag_upstream_failed=False):
path to add the feature
:type flag_upstream_failed: boolean
"""
if self.execution_date > datetime.now() - self.task.schedule_interval:
if self.execution_date > datetime.now():
return False
elif self.state == State.UP_FOR_RETRY and not self.ready_for_retry():
return False
Expand Down Expand Up @@ -2463,6 +2463,30 @@ def get_many(
return query.all()


class DagRun(Base):
"""
DagRun describes an instance of a Dag. It can be created
by the scheduler (for regular runs) or by an external trigger
"""
__tablename__ = "dag_run"

dag_id = Column(String(ID_LEN), primary_key=True)
execution_date = Column(DateTime, primary_key=True)
state = Column(String(50))
run_id = Column(String(ID_LEN))
external_trigger = Column(Boolean, default=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may want to add a state here, so that the scheduler could completely disregard DAGs that are fully processed. I'm not sure whether it should just be boolean or if having more states would help.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use a string to be able to mark failed dags as well.


def __repr__(self):
return '<DagRun {dag_id} @ {execution_date}: {run_id}, \
externally triggered: {external_trigger}>'.format(
dag_id=self.dag_id,
execution_date=self.execution_date,
run_id=self.run_id,
external_trigger=self.external_trigger)
return str((
self.dag_id, self.run_id, self.execution_date.isoformat()))


class Pool(Base):
__tablename__ = "slot_pool"

Expand Down