From a83ca7032bf92aaaafdb2fa1b9a631a8c60ab6d5 Mon Sep 17 00:00:00 2001 From: Paul Yang Date: Wed, 4 May 2016 18:01:22 -0700 Subject: [PATCH 1/2] [AIRFLOW-160] Parse DAG files through child processes Instead of parsing the DAG definition files in the same process as the scheduler, this change parses the files in a child process. This helps to isolate the scheduler from bad user code. --- airflow/bin/cli.py | 33 +- airflow/configuration.py | 5 + airflow/executors/base_executor.py | 2 +- airflow/jobs.py | 1144 +++++++++++++---- airflow/models.py | 273 +++- airflow/settings.py | 36 +- airflow/utils/dag_processing.py | 603 +++++++++ airflow/utils/db.py | 9 +- airflow/utils/models.py | 96 ++ scripts/ci/check-license.sh | 7 +- setup.py | 2 + tests/core.py | 27 +- tests/dags_with_system_exit/a_system_exit.py | 29 + .../b_test_scheduler_dags.py | 28 + tests/dags_with_system_exit/c_system_exit.py | 29 + tests/executors/__init__.py | 14 + tests/executors/no_op_executor.py | 37 + tests/jobs.py | 81 +- 18 files changed, 2132 insertions(+), 323 deletions(-) create mode 100644 airflow/utils/dag_processing.py create mode 100644 airflow/utils/models.py create mode 100644 tests/dags_with_system_exit/a_system_exit.py create mode 100644 tests/dags_with_system_exit/b_test_scheduler_dags.py create mode 100644 tests/dags_with_system_exit/c_system_exit.py create mode 100644 tests/executors/__init__.py create mode 100644 tests/executors/no_op_executor.py diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index cf05362c95eca..95db91854a0a1 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -31,6 +31,8 @@ from daemon.pidfile import TimeoutPIDLockFile import signal import sys +import threading +import traceback import airflow from airflow import jobs, settings @@ -45,10 +47,28 @@ DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER')) -def sigint_handler(signal, frame): +def sigint_handler(sig, frame): sys.exit(0) +def sigquit_handler(sig, frame): + """Helps debug deadlocks by printing stacktraces when this gets a SIGQUIT + e.g. kill -s QUIT or CTRL+\ + """ + print("Dumping stack traces for all threads in PID {}".format(os.getpid())) + id_to_name = dict([(th.ident, th.name) for th in threading.enumerate()]) + code = [] + for thread_id, stack in sys._current_frames().items(): + code.append("\n# Thread: {}({})" + .format(id_to_name.get(thread_id, ""), thread_id)) + for filename, line_number, name, line in traceback.extract_stack(stack): + code.append('File: "{}", line {}, in {}' + .format((filename, line_number, name))) + if line: + code.append(" {}".format(line.strip())) + print("\n".join(code)) + + def setup_logging(filename): root = logging.getLogger() handler = logging.FileHandler(filename) @@ -483,6 +503,7 @@ def scheduler(args): job = jobs.SchedulerJob( dag_id=args.dag_id, subdir=process_subdir(args.subdir), + run_duration=args.run_duration, num_runs=args.num_runs, do_pickle=args.do_pickle) @@ -506,6 +527,7 @@ def scheduler(args): else: signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGTERM, sigint_handler) + signal.signal(signal.SIGQUIT, sigquit_handler) job.run() @@ -851,6 +873,10 @@ class CLIFactory(object): default=False), # scheduler 'dag_id_opt': Arg(("-d", "--dag_id"), help="The id of the dag to run"), + 'run_duration': Arg( + ("-r", "--run-duration"), + default=None, type=int, + help="Set number of seconds to execute before exiting"), 'num_runs': Arg( ("-n", "--num_runs"), default=None, type=int, @@ -989,8 +1015,9 @@ class CLIFactory(object): }, { 'func': scheduler, 'help': "Start a scheduler instance", - 'args': ('dag_id_opt', 'subdir', 'num_runs', 'do_pickle', - 'pid', 'daemon', 'stdout', 'stderr', 'log_file'), + 'args': ('dag_id_opt', 'subdir', 'run_duration', 'num_runs', + 'do_pickle', 'pid', 'daemon', 'stdout', 'stderr', + 'log_file'), }, { 'func': worker, 'help': "Start a Celery worker node", diff --git a/airflow/configuration.py b/airflow/configuration.py index 5a380ae65ee5c..65b482ca382b9 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -141,6 +141,11 @@ def run_command(command): 'scheduler_heartbeat_sec': 60, 'authenticate': False, 'max_threads': 2, + 'run_duration': 30 * 60, + 'dag_dir_list_interval': 5 * 60, + 'print_stats_interval': 30, + 'min_file_process_interval': 180, + 'child_process_log_directory': '/tmp/airflow/scheduler/logs' }, 'celery': { 'broker_url': 'sqla+mysql://airflow:airflow@localhost:3306/airflow', diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index ca6344368ffea..d6a06d805ff16 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -102,7 +102,7 @@ def heartbeat(self): # TODO(jlowin) without a way to know what Job ran which tasks, # there is a danger that another Job started running a task # that was also queued to this executor. This is the last chance - # to check if that hapened. The most probable way is that a + # to check if that happened. The most probable way is that a # Scheduler tried to run a task that was originally queued by a # Backfill. This fix reduces the probability of a collision but # does NOT eliminate it. diff --git a/airflow/jobs.py b/airflow/jobs.py index 77f34ee6b5334..28c3097d4686e 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -19,30 +19,46 @@ from past.builtins import basestring from collections import defaultdict, Counter + from datetime import datetime, timedelta +from itertools import product + import getpass import logging import socket import subprocess import multiprocessing -import math +import os +import signal +import sys +import threading +import time from time import sleep +import psutil from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_ from sqlalchemy.orm.session import make_transient +from tabulate import tabulate from airflow import executors, models, settings from airflow import configuration as conf from airflow.exceptions import AirflowException +from airflow.models import DagRun, TaskInstance +from airflow.settings import Stats from airflow.utils.state import State from airflow.utils.db import provide_session, pessimistic_connection_handling +from airflow.utils.dag_processing import (AbstractDagFileProcessor, + DagFileProcessorManager, + SimpleDag, + SimpleDagBag, + list_py_file_paths) from airflow.utils.email import send_email from airflow.utils.logging import LoggingMixin from airflow.utils import asciiart -from airflow.settings import Stats -DagRun = models.DagRun + Base = models.Base +DagRun = models.DagRun ID_LEN = models.ID_LEN Stats = settings.Stats @@ -186,27 +202,244 @@ def _execute(self): raise NotImplementedError("This method needs to be overridden") +class DagFileProcessor(AbstractDagFileProcessor): + """Helps call SchedulerJob.process_file() in a separate process.""" + + # Counter that increments everytime an instance of this class is created + class_creation_counter = 0 + + def __init__(self, file_path, pickle_dags, dag_id_white_list, log_file): + """ + :param file_path: a Python file containing Airflow DAG definitions + :type file_path: unicode + :param pickle_dags: whether to serialize the DAG objects to the DB + :type pickle_dags: bool + :param dag_id_whitelist: If specified, only look at these DAG ID's + :type dag_id_whitelist: list[unicode] + :param log_file: the path to the file where log lines should be output + :type log_file: unicode + """ + self._file_path = file_path + self._log_file = log_file + # Queue that's used to pass results from the child process. + self._result_queue = multiprocessing.Queue() + # The process that was launched to process the given . + self._process = None + self._dag_id_white_list = dag_id_white_list + self._pickle_dags = pickle_dags + # The result of Scheduler.process_file(file_path). + self._result = None + # Whether the process is done running. + self._done = False + # When the process started. + self._start_time = None + # This ID is use to uniquely name the process / thread that's launched + # by this processor instance + self._instance_id = DagFileProcessor.class_creation_counter + DagFileProcessor.class_creation_counter += 1 + + @property + def file_path(self): + return self._file_path + + @property + def log_file(self): + return self._log_file + + @staticmethod + def _launch_process(result_queue, + file_path, + pickle_dags, + dag_id_white_list, + thread_name, + log_file): + """ + Launch a process to process the given file. + + :param result_queue: the queue to use for passing back the result + :type result_queue: multiprocessing.Queue + :param file_path: the file to process + :type file_path: unicode + :param pickle_dags: whether to pickle the DAGs found in the file and + save them to the DB + :type pickle_dags: bool + :param dag_id_white_list: if specified, only examine DAG ID's that are + in this list + :type dag_id_white_list: list[unicode] + :param thread_name: the name to use for the process that is launched + :type thread_name: unicode + :param log_file: the logging output for the process should be directed + to this file + :type log_file: unicode + :return: the process that was launched + :rtype: multiprocessing.Process + """ + def helper(): + # This helper runs in the newly created process + + # Re-direct stdout and stderr to a separate log file. Otherwise, + # the main log becomes too hard to read. No buffering to enable + # responsive file tailing + parent_dir, filename = os.path.split(log_file) + + # Create the parent directory for the log file if necessary. + if not os.path.isdir(parent_dir): + os.makedirs(parent_dir) + + f = open(log_file, "a") + original_stdout = sys.stdout + original_stderr = sys.stderr + + # TODO: Uncomment + #sys.stdout = f + #sys.stderr = f + + try: + # Re-configure logging to use the new output streams + log_format = settings.LOG_FORMAT_WITH_THREAD_NAME + settings.configure_logging(log_format=log_format) + # Re-configure the ORM engine as there are issues with multiple processes + settings.configure_orm() + + # Change the thread name to differentiate log lines. This is + # really a separate process, but changing the name of the + # process doesn't work, so changing the thread name instead. + threading.current_thread().name = thread_name + start_time = time.time() + + logging.info("Started process (PID=%s) to work on %s", + os.getpid(), + file_path) + scheduler_job = SchedulerJob(dag_ids=dag_id_white_list) + result = scheduler_job.process_file(file_path, + pickle_dags) + result_queue.put(result) + end_time = time.time() + logging.info("Processing %s took %.3f seconds", + file_path, + end_time - start_time) + finally: + sys.stdout = original_stdout + sys.stderr = original_stderr + f.close() + + p = multiprocessing.Process(target=helper, + args=(), + name="{}-Process".format(thread_name)) + p.start() + return p + + def start(self): + """ + Launch the process and start processing the DAG. + """ + self._process = DagFileProcessor._launch_process( + self._result_queue, + self.file_path, + self._pickle_dags, + self._dag_id_white_list, + "DagFileProcessor{}".format(self._instance_id), + self.log_file) + self._start_time = datetime.now() + + def terminate(self, sigkill=False): + """ + Terminate (and then kill) the process launched to process the file. + :param sigkill: whether to issue a SIGKILL if SIGTERM doesn't work. + :type sigkill: bool + """ + if self._process is None: + raise AirflowException("Tried to call stop before starting!") + # The queue will likely get corrupted, so remove the reference + self._result_queue = None + self._process.terminate() + # Arbitrarily wait 5s for the process to die + self._process.join(5) + if sigkill and self._process.is_alive(): + logging.warn("Killing PID %s", self._process.pid) + os.kill(self._process.pid, signal.SIGKILL) + + @property + def pid(self): + """ + :return: the PID of the process launched to process the given file + :rtype: int + """ + if self._process is None: + raise AirflowException("Tried to get PID before starting!") + return self._process.pid + + @property + def exit_code(self): + """ + After the process is finished, this can be called to get the return code + :return: the exit code of the process + :rtype: int + """ + if not self._done: + raise AirflowException("Tried to call retcode before process was finished!") + return self._process.exitcode + + @property + def done(self): + """ + Check if the process launched to process this file is done. + :return: whether the process is finished running + :rtype: bool + """ + if self._process is None: + raise AirflowException("Tried to see if it's done before starting!") + + if self._done: + return True + + if not self._result_queue.empty(): + self._result = self._result_queue.get_nowait() + self._done = True + logging.debug("Waiting for %s", self._process) + self._process.join() + return True + + # Potential error case when process dies + if not self._process.is_alive(): + self._done = True + # Get the object from the queue or else join() can hang. + if not self._result_queue.empty(): + self._result = self._result_queue.get_nowait() + logging.debug("Waiting for %s", self._process) + self._process.join() + return True + + return False + + @property + def result(self): + """ + :return: result of running SchedulerJob.process_file() + :rtype: SimpleDag + """ + if not self.done: + raise AirflowException("Tried to get the result before it's done!") + return self._result + + @property + def start_time(self): + """ + :return: when this started to process the file + :rtype: datetime + """ + if self._start_time is None: + raise AirflowException("Tried to get start time before it started!") + return self._start_time + + class SchedulerJob(BaseJob): """ - This SchedulerJob runs indefinitely and constantly schedules the jobs + This SchedulerJob runs for a specific time interval and schedules the jobs that are ready to run. It figures out the latest runs for each - task and see if the dependencies for the next schedules are met. - If so it triggers the task instance. It does this for each task - in each DAG and repeats. - - :param dag_id: to run the scheduler for a single specific DAG - :type dag_id: string - :param subdir: to search for DAG under a certain folder only - :type subdir: string - :param test_mode: used for unit testing this class only, runs a single - schedule run - :type test_mode: bool - :param refresh_dags_every: force refresh the DAG definition every N - runs, as specified here - :type refresh_dags_every: int - :param do_pickle: to pickle the DAG object and send over to workers - for non-local executors - :type do_pickle: bool + task and sees if the dependencies for the next schedules are met. + If so, it creates appropriate TaskInstances and sends run commands to the + executor. It does this for each task in each DAG and repeats. """ __mapper_args__ = { @@ -217,13 +450,32 @@ def __init__( self, dag_id=None, dag_ids=None, - subdir=None, - test_mode=False, - refresh_dags_every=10, - num_runs=None, + subdir=models.DAGS_FOLDER, + num_runs=-1, + file_process_interval=conf.getint('scheduler', + 'min_file_process_interval'), + processor_poll_interval=1.0, + run_duration=None, do_pickle=False, *args, **kwargs): - + """ + :param dag_id: if specified, only schedule tasks with this DAG ID + :type dag_id: unicode + :param dag_ids: if specified, only schedule tasks with these DAG IDs + :type dag_ids: list[unicode] + :param subdir: directory containing Python files with Airflow DAG + definitions, or a specific path to a file + :type subdir: unicode + :param num_runs: The number of times to try to schedule each DAG file. + -1 for unlimited within the run_duration. + :param processor_poll_interval: The number of seconds to wait between + polls of running processors + :param run_duration: how long to run (in seconds) before exiting + :type run_duration: int + :param do_pickle: once a DAG object is obtained by executing the Python + file, whether to serialize the DAG object to the DB + :type do_pickle: bool + """ # for BaseJob compatibility self.dag_id = dag_id self.dag_ids = [dag_id] if dag_id else [] @@ -232,15 +484,15 @@ def __init__( self.subdir = subdir - if test_mode: - self.num_runs = 1 - else: - self.num_runs = num_runs + self.num_runs = num_runs + self.run_duration = run_duration + self._processor_poll_interval = processor_poll_interval - self.refresh_dags_every = refresh_dags_every self.do_pickle = do_pickle super(SchedulerJob, self).__init__(*args, **kwargs) + self.logger.error("Executor is {}".format(self.executor.__class__)) + self.heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC') self.max_threads = min(conf.getint('scheduler', 'max_threads'), multiprocessing.cpu_count()) if 'sqlite' in conf.get('core', 'sql_alchemy_conn'): @@ -248,6 +500,23 @@ def __init__( self.logger.error("Cannot use more than 1 thread when using sqlite. Setting max_threads to 1") self.max_threads = 1 + # How often to scan the DAGs directory for new files. Default to 5 minutes. + self.dag_dir_list_interval = conf.getint('scheduler', + 'dag_dir_list_interval') + # How often to print out DAG file processing stats to the log. Default to + # 30 seconds. + self.print_stats_interval = conf.getint('scheduler', + 'print_stats_interval') + # Parse and schedule each file no faster than this interval. Default + # to 3 minutes. + self.file_process_interval = file_process_interval + # Directory where log files for the processes that scheduled the DAGs reside + self.child_process_log_directory = conf.get('scheduler', + 'child_process_log_directory') + if run_duration is None: + self.run_duration = conf.getint('scheduler', + 'run_duration') + @provide_session def manage_slas(self, dag, session=None): """ @@ -379,14 +648,13 @@ def import_errors(self, dagbag): session.commit() @provide_session - def schedule_dag(self, dag, session=None): + def create_dag_run(self, dag, session=None): """ This method checks whether a new DagRun needs to be created for a DAG based on scheduling interval Returns DagRun if one is scheduled. Otherwise returns None. """ if dag.schedule_interval: - DagRun = models.DagRun active_runs = DagRun.find( dag_id=dag.dag_id, state=State.RUNNING, @@ -475,50 +743,19 @@ def schedule_dag(self, dag, session=None): ) return next_run - def process_dag(self, dag, queue): + def _process_task_instances(self, dag, queue): """ This method schedules a single DAG by looking at the latest run for each task and attempting to schedule the following run. - - As multiple schedulers may be running for redundancy, this - function takes a lock on the DAG and timestamps the last run - in ``last_scheduler_run``. """ DagModel = models.DagModel session = settings.Session() - # picklin' - pickle_id = None - if self.do_pickle and self.executor.__class__ not in ( - executors.LocalExecutor, executors.SequentialExecutor): - pickle_id = dag.pickle(session).id - - # obtain db lock - db_dag = session.query(DagModel).filter_by( - dag_id=dag.dag_id - ).with_for_update().one() - - last_scheduler_run = db_dag.last_scheduler_run or datetime(2000, 1, 1) - secs_since_last = (datetime.now() - last_scheduler_run).total_seconds() - - if secs_since_last < self.heartrate: - # release db lock - session.commit() - session.close() - return None - - # Release the db lock - # the assumption here is that process_dag will take less - # time than self.heartrate otherwise we might unlock too - # quickly and this should moved below, but that would increase - # the time the record is locked and is blocking for other calls. - db_dag.last_scheduler_run = datetime.now() - session.commit() - # update the state of the previously active dag runs dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session) active_dag_runs = [] for run in dag_runs: + self.logger.info("Examining DAG run {}".format(run)) # do not consider runs that are executed in the future if run.execution_date > datetime.now(): continue @@ -533,6 +770,7 @@ def process_dag(self, dag, queue): active_dag_runs.append(run) for run in active_dag_runs: + self.logger.info("Examining active DAG run {}".format(run)) # this needs a fresh session sometimes tis get detached tis = run.get_task_instances(state=(State.NONE, State.UP_FOR_RETRY)) @@ -552,41 +790,106 @@ def process_dag(self, dag, queue): if ti.is_runnable(flag_upstream_failed=True): self.logger.debug('Queuing task: {}'.format(ti)) - queue.put((ti.key, pickle_id)) + queue.put(ti.key) session.close() @provide_session - def prioritize_queued(self, session, executor, dagbag): - # Prioritizing queued task instances + def _change_state_for_tis_without_dagrun(self, + simple_dag_bag, + old_states, + new_state, + session=None): + """ + For all DAG IDs in the SimpleDagBag, look for task instances in the + old_state state and set them to new_state if the corresponding DagRun + exists but is not in the running state. + + :param old_states: examine TaskInstances in this state + :type old_state: list[State] + :param new_state: set TaskInstances to this state + :type new_state: State + :param simple_dag_bag: TaskInstances associated with DAGs in the + simple_dag_bag and in the UP_FOR_RETRY state will be examined. + :type simple_dag_bag: SimpleDagBag + """ - pools = {p.pool: p for p in session.query(models.Pool).all()} + task_instances_to_change = ( + session + .query(models.TaskInstance) + .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids)) + .filter(models.TaskInstance.state.in_(old_states)) + .all() + ) + """:type: list[TaskInstance]""" + + for task_instance in task_instances_to_change: + dag_run = DagRun.get_run(session, + task_instance.dag_id, + task_instance.execution_date) + if dag_run is None: + self.logger.warn("DagRun for %s %s does not exist", + task_instance.dag_id, + task_instance.execution_date) + elif dag_run.state != State.RUNNING: + self.logger.warn("Setting %s to state=%s as the state of " + "%s is not %s", + task_instance, + new_state, + dag_run, + State.RUNNING) + task_instance.state = new_state + session.merge(task_instance) + session.commit() + + + @provide_session + def _execute_task_instances(self, + simple_dag_bag, + states, + session=None): + """ + Fetches task instances from ORM in the specified states, figures + out pool limits, and sends them to the executor for execution. + + :param simple_dag_bag: TaskInstances associated with DAGs in the + simple_dag_bag will be fetched from the DB and executed + :type simple_dag_bag: SimpleDagBag + :param executor: the executor that runs task instances + :type executor: BaseExecutor + :param states: Execute TaskInstances in these states + :type states: Tuple[State] + :return: None + """ + # Get all the relevant task instances TI = models.TaskInstance - queued_tis = ( - session.query(TI) - .filter(TI.state == State.QUEUED) + task_instances_to_examine = ( + session + .query(TI) + .filter(TI.dag_id.in_(simple_dag_bag.dag_ids)) + .filter(TI.state.in_(states)) .all() ) - self.logger.info( - "Prioritizing {} queued jobs".format(len(queued_tis))) - session.expunge_all() - d = defaultdict(list) - for ti in queued_tis: - if ti.dag_id not in dagbag.dags: - self.logger.info( - "DAG no longer in dagbag, deleting {}".format(ti)) - session.delete(ti) - session.commit() - elif not dagbag.dags[ti.dag_id].has_task(ti.task_id): - self.logger.info( - "Task no longer exists, deleting {}".format(ti)) - session.delete(ti) - session.commit() - else: - d[ti.pool].append(ti) - dag_blacklist = set(dagbag.paused_dags()) - for pool, tis in list(d.items()): + # Put one task instance on each line + if len(task_instances_to_examine) == 0: + self.logger.info("No tasks to send to the executor") + return + + task_instance_str = "\n\t".join( + ["{}".format(x) for x in task_instances_to_examine]) + self.logger.info("Tasks up for execution:\n\t{}".format(task_instance_str)) + + # Get the pool settings + pools = {p.pool: p for p in session.query(models.Pool).all()} + + pool_to_task_instances = defaultdict(list) + for task_instance in task_instances_to_examine: + pool_to_task_instances[task_instance.pool].append(task_instance) + + # Go through each pool, and queue up a task for execution if there are + # any open slots in the pool. + for pool, task_instances in pool_to_task_instances.items(): if not pool: # Arbitrary: # If queued outside of a pool, trigger no more than @@ -595,78 +898,205 @@ def prioritize_queued(self, session, executor, dagbag): else: open_slots = pools[pool].open_slots(session=session) - queue_size = len(tis) - self.logger.info("Pool {pool} has {open_slots} slots, {queue_size} " + num_queued = len(task_instances) + self.logger.info("Figuring out tasks to run in Pool(name={pool}) " + "with {open_slots} open slots and {num_queued} " "task instances in queue".format(**locals())) + if open_slots <= 0: continue - tis = sorted( - tis, key=lambda ti: (-ti.priority_weight, ti.start_date)) - for ti in tis: - if open_slots <= 0: - continue - task = None - try: - task = dagbag.dags[ti.dag_id].get_task(ti.task_id) - except: - self.logger.error("Queued task {} seems gone".format(ti)) - session.delete(ti) - session.commit() - continue - if not task: - continue + priority_sorted_task_instances = sorted( + task_instances, key=lambda ti: (-ti.priority_weight, ti.execution_date)) - ti.task = task + # DAG IDs with running tasks that equal the concurrency limit of the dag + dag_id_to_running_task_count = {} - # picklin' - dag = dagbag.dags[ti.dag_id] - pickle_id = None - if self.do_pickle and self.executor.__class__ not in ( - executors.LocalExecutor, - executors.SequentialExecutor): - self.logger.info("Pickling DAG {}".format(dag)) - pickle_id = dag.pickle(session).id + for task_instance in priority_sorted_task_instances: + if open_slots <= 0: + self.logger.info("No more slots free") + # Can't schedule any more since there are no more open slots. + break - if dag.dag_id in dag_blacklist: - continue - if dag.concurrency_reached: - dag_blacklist.add(dag.dag_id) - continue - if ti.are_dependencies_met(): - executor.queue_task_instance(ti, pickle_id=pickle_id) - open_slots -= 1 - else: - session.delete(ti) - session.commit() + if simple_dag_bag.get_dag(task_instance.dag_id).is_paused: + self.logger.info("Not executing queued {} since {} is paused" + .format(task_instance, task_instance.dag_id)) continue - ti.task = task - session.commit() - - def _split(self, items, size): - """ - This function splits a list of items into chunks of int size. - _split([1,2,3,4,5,6], 3) becomes [[1,2,3],[4,5,6]] - """ - size = max(1, size) - return [items[i:i + size] for i in range(0, len(items), size)] + # Check to make sure that the task concurrency of the DAG hasn't been + # reached. + dag_id = task_instance.dag_id + + if dag_id not in dag_id_to_running_task_count: + dag_id_to_running_task_count[dag_id] = \ + DagRun.get_running_tasks( + session, + dag_id, + simple_dag_bag.get_dag(dag_id).task_ids) + + current_task_concurrency = dag_id_to_running_task_count[dag_id] + task_concurrency_limit = simple_dag_bag.get_dag(dag_id).concurrency + self.logger.info("DAG {} has {}/{} running tasks" + .format(dag_id, + current_task_concurrency, + task_concurrency_limit)) + if current_task_concurrency > task_concurrency_limit: + self.logger.info("Not executing {} since the number " + "of tasks running from DAG {} is >= to the " + "DAG's task concurrency limit of {}" + .format(task_instance, + dag_id, + task_concurrency_limit)) + continue - def _do_dags(self, dagbag, dags, tis_out): + command = TI.generate_command( + task_instance.dag_id, + task_instance.task_id, + task_instance.execution_date, + local=True, + mark_success=False, + force=False, + ignore_dependencies=False, + ignore_depends_on_past=False, + pool=task_instance.pool, + file_path=simple_dag_bag.get_dag(task_instance.dag_id).full_filepath, + pickle_id=simple_dag_bag.get_dag(task_instance.dag_id).pickle_id) + + priority = task_instance.priority_weight + queue = task_instance.queue + self.logger.info("Sending to executor {} with priority {} and queue {}" + .format(task_instance.key, priority, queue)) + + self.executor.queue_command( + task_instance, + command, + priority=priority, + queue=queue) + + self.logger.info("Setting state of {} to {}".format( + task_instance.key, State.QUEUED)) + task_instance.state = State.QUEUED + task_instance.queued_dttm = datetime.now() + session.merge(task_instance) + + open_slots -= 1 + + def _process_dags(self, dagbag, dags, tis_out): """ - Iterates over the dags and schedules and processes them + Iterates over the dags and processes them. Processing includes: + + 1. Create appropriate DagRun(s) in the DB. + 2. Create appropriate TaskInstance(s) in the DB. + 3. Send emails for tasks that have missed SLAs. + + :param dagbag: a collection of DAGs to process + :type dagbag: DagBag + :param dags: the DAGs from the DagBag to process + :type dags: DAG + :param tis_out: A queue to add generated TaskInstance objects + :type tis_out: multiprocessing.Queue[TaskInstance] + :return: None """ for dag in dags: - self.logger.debug("Scheduling {}".format(dag.dag_id)) dag = dagbag.get_dag(dag.dag_id) + if dag.is_paused: + self.logger.info("Not processing DAG {} since it's paused" + .format(dag.dag_id)) + continue + if not dag: + self.logger.error("DAG ID {} was not found in the DagBag") continue - try: - self.schedule_dag(dag) - self.process_dag(dag, tis_out) - self.manage_slas(dag) - except Exception as e: - self.logger.exception(e) + + self.logger.info("Processing {}".format(dag.dag_id)) + + dag_run = self.create_dag_run(dag) + if dag_run: + self.logger.info("Created {}".format(dag_run)) + self._process_task_instances(dag, tis_out) + self.manage_slas(dag) + + def _process_executor_events(self): + """ + Respond to executor events. + + :param executor: the executor that's running the task instances + :type executor: BaseExecutor + :return: None + """ + for key, executor_state in list(self.executor.get_event_buffer().items()): + dag_id, task_id, execution_date = key + self.logger.info("Executor reports {}.{} execution_date={} as {}" + .format(dag_id, + task_id, + execution_date, + executor_state)) + + def _log_file_processing_stats(self, + known_file_paths, + processor_manager): + """ + Print out stats about how files are getting processed. + + :param known_file_paths: a list of file paths that may contain Airflow + DAG definitions + :type known_file_paths: list[unicode] + :param processor_manager: manager for the file processors + :type stats: DagFileProcessorManager + :return: None + """ + + # File Path: Path to the file containing the DAG definition + # PID: PID associated with the process that's processing the file. May + # be empty. + # Runtime: If the process is currently running, how long it's been + # running for in seconds. + # Last Runtime: If the process ran before, how long did it take to + # finish in seconds + # Last Run: When the file finished processing in the previous run. + headers = ["File Path", + "PID", + "Runtime", + "Last Runtime", + "Last Run"] + + rows = [] + for file_path in known_file_paths: + last_runtime = processor_manager.get_last_runtime(file_path) + processor_pid = processor_manager.get_pid(file_path) + processor_start_time = processor_manager.get_start_time(file_path) + runtime = ((datetime.now() - processor_start_time).total_seconds() + if processor_start_time else None) + last_run = processor_manager.get_last_finish_time(file_path) + + rows.append((file_path, + processor_pid, + runtime, + last_runtime, + last_run)) + + # Sort by longest last runtime. (Can't sort None values in python3) + rows = sorted(rows, key=lambda x: x[3] or 0.0) + + formatted_rows = [] + for file_path, pid, runtime, last_runtime, last_run in rows: + formatted_rows.append((file_path, + pid, + "{:.2f}s".format(runtime) + if runtime else None, + "{:.2f}s".format(last_runtime) + if last_runtime else None, + last_run.strftime("%Y-%m-%dT%H:%M:%S") + if last_run else None)) + log_str = ("\n" + + "=" * 80 + + "\n" + + "DAG File Processing Stats\n\n" + + tabulate(formatted_rows, headers=headers) + + "\n" + + "=" * 80) + + self.logger.info(log_str) @provide_session def _reset_state_for_orphaned_tasks(self, dag_run, session=None): @@ -680,27 +1110,109 @@ def _reset_state_for_orphaned_tasks(self, dag_run, session=None): # also consider running as the state might not have changed in the db yet running = self.executor.running - tis = dag_run.get_task_instances(state=State.SCHEDULED, session=session) + tis = list() + tis.extend(dag_run.get_task_instances(state=State.SCHEDULED, session=session)) + tis.extend(dag_run.get_task_instances(state=State.QUEUED, session=session)) + for ti in tis: if ti.key not in queued_tis and ti.key not in running: - ti.state = State.NONE self.logger.debug("Rescheduling orphaned task {}".format(ti)) - + ti.state = State.NONE session.commit() def _execute(self): - session = settings.Session() - TI = models.TaskInstance - + self.logger.info("Starting the scheduler") pessimistic_connection_handling() logging.basicConfig(level=logging.DEBUG) - self.logger.info("Starting the scheduler") - dagbag = models.DagBag(self.subdir, sync_to_db=True) - executor = self.executor = dagbag.executor - executor.start() + # DAGs can be pickled for easier remote execution by some executors + pickle_dags = False + if self.do_pickle and self.executor.__class__ not in \ + (executors.LocalExecutor, executors.SequentialExecutor): + pickle_dags = True + + # Use multiple processes to parse and generate tasks for the + # DAGs in parallel. By processing them in separate processes, + # we can get parallelism and isolation from potentially harmful + # user code. + self.logger.info("Processing files using up to {} processes at a time " + .format(self.max_threads)) + self.logger.info("Running execute loop for {} seconds" + .format(self.run_duration)) + self.logger.info("Processing each file at most {} times" + .format(self.num_runs)) + self.logger.info("Process each file at most once every {} seconds" + .format(self.file_process_interval)) + self.logger.info("Checking for new files in {} every {} seconds" + .format(self.subdir, self.dag_dir_list_interval)) + + # Build up a list of Python files that could contain DAGs + self.logger.info("Searching for files in {}".format(self.subdir)) + known_file_paths = list_py_file_paths(self.subdir) + self.logger.info("There are {} files in {}" + .format(len(known_file_paths), self.subdir)) + + def processor_factory(file_path, log_file_path): + return DagFileProcessor(file_path, + pickle_dags, + self.dag_ids, + log_file_path) + + processor_manager = DagFileProcessorManager(self.subdir, + known_file_paths, + self.max_threads, + self.file_process_interval, + self.child_process_log_directory, + self.num_runs, + processor_factory) + try: + self._execute_helper(processor_manager) + finally: + self.logger.info("Exited execute loop") + + # Kill all child processes on exit since we don't want to leave + # them as orphaned. + pids_to_kill = processor_manager.get_all_pids() + if len(pids_to_kill) > 0: + # First try SIGTERM + this_process = psutil.Process(os.getpid()) + # Only check child processes to ensure that we don't have a case + # where a child process died but the PID got reused. + child_processes = [x for x in this_process.children(recursive=True) + if x.is_running() and x.pid in pids_to_kill] + for child in child_processes: + self.logger.info("Terminating child PID: {}".format(child.pid)) + child.terminate() + timeout = 5 + self.logger.info("Waiting up to {}s for processes to exit..." + .format(timeout)) + try: + psutil.wait_procs(child_processes, timeout) + except psutil.TimeoutExpired: + self.logger.debug("Ran out of time while waiting for " + "processes to exit") + + # Then SIGKILL + child_processes = [x for x in this_process.children(recursive=True) + if x.is_running() and x.pid in pids_to_kill] + if len(child_processes) > 0: + for child in child_processes: + self.logger.info("Killing child PID: {}".format(child.pid)) + child.kill() + child.wait() + + @provide_session + def _execute_helper(self, processor_manager, session=None): + """ + :param processor_manager: manager to use + :type processor_manager: DagFileProcessorManager + :return: None + """ + self.executor.start() + + self.logger.info("Resetting state for orphaned tasks") # grab orphaned tasks and make sure to reset their state active_runs = DagRun.find( state=State.RUNNING, @@ -708,103 +1220,237 @@ def _execute(self): session=session ) for dr in active_runs: + self.logger.info("Resetting {} {}".format(dr.dag_id, + dr.execution_date)) self._reset_state_for_orphaned_tasks(dr, session=session) - self.runs = 0 - while not self.num_runs or self.num_runs > self.runs: - try: - loop_start_dttm = datetime.now() - try: - self.prioritize_queued(executor=executor, dagbag=dagbag) - except Exception as e: - self.logger.exception(e) + execute_start_time = datetime.now() + + # Last time stats were printed + last_stat_print_time = datetime(2000, 1, 1) + # Last time that self.heartbeat() was called. + last_self_heartbeat_time = datetime.now() + # Last time that the DAG dir was traversed to look for files + last_dag_dir_refresh_time = datetime.now() + + # Use this value initially + known_file_paths = processor_manager.file_paths + + # For the execute duration, parse and schedule DAGs + while (datetime.now() - execute_start_time).total_seconds() < \ + self.run_duration: + self.logger.debug("Starting Loop...") + loop_start_time = time.time() + + # Traverse the DAG directory for Python files containing DAGs + # periodically + elapsed_time_since_refresh = (datetime.now() - + last_dag_dir_refresh_time).total_seconds() + + if elapsed_time_since_refresh > self.dag_dir_list_interval: + # Build up a list of Python files that could contain DAGs + self.logger.info("Searching for files in {}".format(self.subdir)) + known_file_paths = list_py_file_paths(self.subdir) + last_dag_dir_refresh_time = datetime.now() + self.logger.info("There are {} files in {}" + .format(len(known_file_paths), self.subdir)) + processor_manager.set_file_paths(known_file_paths) + + # Kick of new processes and collect results from finished ones + self.logger.info("Heartbeating the process manager") + simple_dags = processor_manager.heartbeat() + + # Send tasks for execution if available + if len(simple_dags) > 0: + simple_dag_bag = SimpleDagBag(simple_dags) + + # Handle cases where a DAG run state is set (perhaps manually) to + # a non-running state. Handle task instances that belong to + # DAG runs in those states + + # If a task instance is up for retry but the corresponding DAG run + # isn't running, mark the task instance as FAILED so we don't try + # to re-run it. + self._change_state_for_tis_without_dagrun(simple_dag_bag, + [State.UP_FOR_RETRY], + State.FAILED) + # If a task instance is scheduled or queued, but the corresponding + # DAG run isn't running, set the state to NONE so we don't try to + # re-run it. + self._change_state_for_tis_without_dagrun(simple_dag_bag, + [State.QUEUED, + State.SCHEDULED], + State.NONE) + + self._execute_task_instances(simple_dag_bag, + (State.SCHEDULED, + State.UP_FOR_RETRY)) + + # Call hearbeats + self.logger.info("Heartbeating the executor") + self.executor.heartbeat() + + # Process events from the executor + self._process_executor_events() + + # Heartbeat the scheduler periodically + time_since_last_heartbeat = (datetime.now() - + last_self_heartbeat_time).total_seconds() + if time_since_last_heartbeat > self.heartrate: + self.logger.info("Heartbeating the scheduler") + self.heartbeat() + last_self_heartbeat_time = datetime.now() + + # Occasionally print out stats about how fast the files are getting processed + if ((datetime.now() - last_stat_print_time).total_seconds() > + self.print_stats_interval): + if len(known_file_paths) > 0: + self._log_file_processing_stats(known_file_paths, + processor_manager) + last_stat_print_time = datetime.now() + + loop_end_time = time.time() + self.logger.debug("Ran scheduling loop in {:.2f}s" + .format(loop_end_time - loop_start_time)) + self.logger.debug("Sleeping for {:.2f}s" + .format(self._processor_poll_interval)) + time.sleep(self._processor_poll_interval) + + # Exit early for a test mode + if processor_manager.max_runs_reached(): + self.logger.info("Exiting loop as all files have been processed " + "{} times".format(self.num_runs)) + break + + # Stop any processors + processor_manager.terminate() + + # Verify that all files were processed, and if so, deactivate DAGs that + # haven't been touched by the scheduler as they likely have been + # deleted. + all_files_processed = True + for file_path in known_file_paths: + if processor_manager.get_last_finish_time(file_path) is None: + all_files_processed = False + break + if all_files_processed: + self.logger.info("Deactivating DAGs that haven't been touched since {}" + .format(execute_start_time.isoformat())) + models.DAG.deactivate_stale_dags(execute_start_time) + + self.executor.end() + + settings.Session.remove() - self.runs += 1 - try: - if self.runs % self.refresh_dags_every == 0: - dagbag = models.DagBag(self.subdir, sync_to_db=True) - else: - dagbag.collect_dags(only_if_updated=True) - except Exception as e: - self.logger.error("Failed at reloading the dagbag. {}".format(e)) - Stats.incr('dag_refresh_error', 1, 1) - sleep(5) - - if len(self.dag_ids) > 0: - dags = [dag for dag in dagbag.dags.values() if dag.dag_id in self.dag_ids] - else: - dags = [ - dag for dag in dagbag.dags.values() - if not dag.parent_dag] - - paused_dag_ids = dagbag.paused_dags() - dags = [x for x in dags if x.dag_id not in paused_dag_ids] - # dags = filter(lambda x: x.dag_id not in paused_dag_ids, dags) - - self.logger.debug("Total Cores: {} Max Threads: {} DAGs:{}". - format(multiprocessing.cpu_count(), - self.max_threads, - len(dags))) - dags = self._split(dags, math.ceil(len(dags) / self.max_threads)) - tis_q = multiprocessing.Queue() - jobs = [multiprocessing.Process(target=self._do_dags, - args=(dagbag, dags[i], tis_q)) - for i in range(len(dags))] - - self.logger.info("Starting {} scheduler jobs".format(len(jobs))) - for j in jobs: - j.start() - - while any(j.is_alive() for j in jobs): - while not tis_q.empty(): - ti_key, pickle_id = tis_q.get() - dag = dagbag.dags[ti_key[0]] - task = dag.get_task(ti_key[1]) - ti = TI(task, ti_key[2]) - ti.refresh_from_db(session=session, lock_for_update=True) - if ti.state == State.SCHEDULED: - session.commit() - self.logger.debug("Task {} was picked up by another scheduler" - .format(ti)) - continue - elif ti.state is State.NONE: - ti.state = State.SCHEDULED - - self.executor.queue_task_instance(ti, pickle_id=pickle_id) + @provide_session + def process_file(self, file_path, pickle_dags=False, session=None): + """ + Process a Python file containing Airflow DAGs. + + This includes: + + 1. Execute the file and look for DAG objects in the namespace. + 2. Pickle the DAG and save it to the DB (if necessary). + 3. For each DAG, see what tasks should run and create appropriate task + instances in the DB. + 4. Record any errors importing the file into ORM + 5. Kill (in ORM) any task instances belonging to the DAGs that haven't + issued a heartbeat in a while. + + Returns a list of SimpleDag objects that represent the DAGs found in + the file + + :param file_path: the path to the Python file that should be executed + :type file_path: unicode + :param pickle_dags: whether serialize the DAGs found in the file and + save them to the db + :type pickle_dags: bool + :return: a list of SimpleDags made from the Dags found in the file + :rtype: list[SimpleDag] + """ + self.logger.info("Processing file {} for tasks to queue".format(file_path)) + # As DAGs are parsed from this file, they will be converted into SimpleDags + simple_dags = [] - session.merge(ti) - session.commit() + try: + dagbag = models.DagBag(file_path) + except Exception: + self.logger.exception("Failed at reloading the DAG file {}".format(file_path)) + Stats.incr('dag_file_refresh_error', 1, 1) + return [] + + if len(dagbag.dags) > 0: + self.logger.info("DAG(s) {} retrieved from {}" + .format(dagbag.dags.keys(), + file_path)) + else: + self.logger.warn("No viable dags retrieved from {}".format(file_path)) + return [] + + # Save individual DAGs in the ORM and update DagModel.last_scheduled_time + sync_time = datetime.now() + for dag in dagbag.dags.values(): + models.DAG.sync_to_db(dag, dag.owner, sync_time) + + paused_dag_ids = [dag.dag_id for dag in dagbag.dags.values() + if dag.is_paused] + + # Pickle the DAGs (if necessary) and put them into a SimpleDag + for dag_id in dagbag.dags: + dag = dagbag.get_dag(dag_id) + pickle_id = None + if pickle_dags: + pickle_id = dag.pickle(session).id + + task_ids = [task.task_id for task in dag.tasks] + + # Only return DAGs that are not paused + if dag_id not in paused_dag_ids: + simple_dags.append(SimpleDag(dag.dag_id, + task_ids, + dag.full_filepath, + dag.concurrency, + dag.is_paused, + pickle_id)) + + if len(self.dag_ids) > 0: + dags = [dag for dag in dagbag.dags.values() + if dag.dag_id in self.dag_ids and + dag.dag_id not in paused_dag_ids] + else: + dags = [dag for dag in dagbag.dags.values() + if not dag.parent_dag and + dag.dag_id not in paused_dag_ids] + + tis_q = multiprocessing.Queue() + + self._process_dags(dagbag, dags, tis_q) + + while not tis_q.empty(): + ti_key = tis_q.get() + dag = dagbag.dags[ti_key[0]] + task = dag.get_task(ti_key[1]) + ti = models.TaskInstance(task, ti_key[2]) + # Task starts out in the scheduled state. All tasks in the + # scheduled state will be sent to the executor + ti.state = State.SCHEDULED + + # Also save this task instance to the DB. + self.logger.info("Creating / updating {} in ORM".format(ti)) + session.merge(ti) + session.commit() - for j in jobs: - j.join() + # Record import errors into the ORM + try: + self.import_errors(dagbag) + except Exception: + self.logger.exception("Error logging import errors!") + try: + dagbag.kill_zombies() + except Exception: + self.logger.exception("Error killing zombies!") - self.logger.info("Done queuing tasks, calling the executor's " - "heartbeat") - duration_sec = (datetime.now() - loop_start_dttm).total_seconds() - self.logger.info("Loop took: {} seconds".format(duration_sec)) - Stats.timing("scheduler_loop", duration_sec * 1000) - try: - self.import_errors(dagbag) - except Exception as e: - self.logger.exception(e) - try: - dagbag.kill_zombies() - except Exception as e: - self.logger.exception(e) - try: - # We really just want the scheduler to never ever stop. - executor.heartbeat() - self.heartbeat() - except Exception as e: - self.logger.exception(e) - self.logger.error("Tachycardia!") - except Exception as deep_e: - self.logger.exception(deep_e) - raise - finally: - settings.Session.remove() - executor.end() - session.close() + return simple_dags @provide_session def heartbeat_callback(self, session=None): diff --git a/airflow/models.py b/airflow/models.py index f589b3ecd91cb..febd837aa9c75 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -66,6 +66,7 @@ from airflow.utils.helpers import ( as_tuple, is_container, is_in, validate_key, pprinttable) from airflow.utils.logging import LoggingMixin +from airflow.utils.models import BaseDag, BaseDagBag from airflow.utils.operator_resources import Resources from airflow.utils.state import State from airflow.utils.timeout import timeout @@ -129,7 +130,7 @@ def clear_task_instances(tis, session, activate_dag_runs=True): dr.start_date = datetime.now() -class DagBag(LoggingMixin): +class DagBag(BaseDagBag, LoggingMixin): """ A dagbag is a collection of dags, parsed out of a folder tree and has high level configuration settings, like what database to use as a backend and @@ -140,7 +141,7 @@ class DagBag(LoggingMixin): independent settings sets. :param dag_folder: the folder to scan to find DAGs - :type dag_folder: str + :type dag_folder: unicode :param executor: the executor to use when executing task instances in this DagBag :param include_examples: whether to include the examples that ship @@ -155,26 +156,23 @@ def __init__( self, dag_folder=None, executor=DEFAULT_EXECUTOR, - include_examples=configuration.getboolean('core', 'LOAD_EXAMPLES'), - sync_to_db=False): + include_examples=configuration.getboolean('core', 'LOAD_EXAMPLES')): dag_folder = dag_folder or DAGS_FOLDER self.logger.info("Filling up the DagBag from {}".format(dag_folder)) self.dag_folder = dag_folder self.dags = {} - self.sync_to_db = sync_to_db # the file's last modified timestamp when we last read it self.file_last_changed = {} self.executor = executor self.import_errors = {} + if include_examples: example_dag_folder = os.path.join( os.path.dirname(__file__), 'example_dags') self.collect_dags(example_dag_folder) self.collect_dags(dag_folder) - if sync_to_db: - self.deactivate_inactive_dags() def size(self): """ @@ -307,7 +305,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): return found_dags @provide_session - def kill_zombies(self, session): + def kill_zombies(self, session=None): """ Fails tasks that haven't had a heartbeat in too long """ @@ -355,20 +353,6 @@ def bag_dag(self, dag, parent_dag, root_dag): for task in dag.tasks: settings.policy(task) - if self.sync_to_db: - session = settings.Session() - orm_dag = session.query( - DagModel).filter(DagModel.dag_id == dag.dag_id).first() - if not orm_dag: - orm_dag = DagModel(dag_id=dag.dag_id) - orm_dag.fileloc = root_dag.full_filepath - orm_dag.is_subdag = dag.is_subdag - orm_dag.owners = root_dag.owner - orm_dag.is_active = True - session.merge(orm_dag) - session.commit() - session.close() - for subdag in dag.subdags: subdag.full_filepath = dag.full_filepath subdag.parent_dag = dag @@ -756,8 +740,77 @@ def command( the orchestrator. """ dag = self.task.dag - iso = self.execution_date.isoformat() - cmd = "airflow run {self.dag_id} {self.task_id} {iso} " + + # Keeping existing logic, but not entirely sure why this is here. + if not pickle_id and dag: + if dag.full_filepath != dag.filepath: + path = "DAGS_FOLDER/{}".format(dag.filepath) + elif dag.full_filepath: + path = dag.full_filepath + + return TaskInstance.generate_command( + self.dag_id, + self.task_id, + self.execution_date, + mark_success=mark_success, + ignore_dependencies=ignore_dependencies, + ignore_depends_on_past=ignore_depends_on_past, + force=force, + local=local, + pickle_id=pickle_id, + file_path=path, + raw=raw, + job_id=job_id, + pool=pool) + + @staticmethod + def generate_command(dag_id, + task_id, + execution_date, + mark_success=False, + ignore_dependencies=False, + ignore_depends_on_past=False, + force=False, + local=False, + pickle_id=None, + file_path=None, + raw=False, + job_id=None, + pool=None + ): + """ + Generates the shell command required to execute this task instance. + + :param dag_id: DAG ID + :type dag_id: unicode + :param task_id: Task ID + :type task_id: unicode + :param execution_date: Execution date for the task + :type execution_date: datetime + :param mark_success: Whether to mark the task as successful + :type mark_success: bool + :param ignore_dependencies: Whether to ignore the dependencies and run + anyway + :type ignore_dependencies: bool + :param ignore_depends_on_past: Whether to ignore the depends on past + setting and run anyway + :type ignore_depends_on_past: bool + :param force: Whether to force running - see TaskInstance.run() + :type force: bool + :param local: Whether to run the task locally + :type local: bool + :param pickle_id: If the DAG was serialized to the DB, the ID + associated with the pickled DAG + :type pickle_id: unicode + :param file_path: path to the file containing the DAG definition + :param raw: raw mode (needs more details) + :param job_id: job ID (needs more details) + :param pool: the Airflow pool that the task should run in + :type pool: unicode + :return: shell command that can be used to run the task instance + """ + iso = execution_date.isoformat() + cmd = "airflow run {dag_id} {task_id} {iso} " cmd += "--mark_success " if mark_success else "" cmd += "--pickle {pickle_id} " if pickle_id else "" cmd += "--job_id {job_id} " if job_id else "" @@ -767,11 +820,7 @@ def command( cmd += "--local " if local else "" cmd += "--pool {pool} " if pool else "" cmd += "--raw " if raw else "" - if not pickle_id and dag: - if dag.full_filepath != dag.filepath: - cmd += "-sd DAGS_FOLDER/{dag.filepath} " - elif dag.full_filepath: - cmd += "-sd {dag.full_filepath}" + cmd += "-sd {file_path}" return cmd.format(**locals()) @property @@ -1185,9 +1234,7 @@ def pool_full(self, session): .first() ) if not pool: - raise ValueError( - "Task specified a pool ({}) but the pool " - "doesn't exist!".format(self.task.pool)) + return False open_slots = pool.open_slots(session=session) return open_slots <= 0 @@ -2469,7 +2516,7 @@ def get_current(cls, dag_id): @functools.total_ordering -class DAG(LoggingMixin): +class DAG(BaseDag, LoggingMixin): """ A dag (directed acyclic graph) is a collection of tasks with directional dependencies. A dag also has a schedule, a start end an end date @@ -2557,8 +2604,14 @@ def __init__( del self.default_args['params'] validate_key(dag_id) + + # Properties from BaseDag + self._dag_id = dag_id + self._full_filepath = full_filepath if full_filepath else '' + self._concurrency = concurrency + self._pickle_id = None + self.task_dict = dict() - self.dag_id = dag_id self.start_date = start_date self.end_date = end_date self.schedule_interval = schedule_interval @@ -2568,14 +2621,12 @@ def __init__( self._schedule_interval = None else: self._schedule_interval = schedule_interval - self.full_filepath = full_filepath if full_filepath else '' if isinstance(template_searchpath, six.string_types): template_searchpath = [template_searchpath] self.template_searchpath = template_searchpath self.parent_dag = None # Gets set when DAGs are loaded self.last_loaded = datetime.now() self.safe_dag_id = dag_id.replace('.', '__dot__') - self.concurrency = concurrency self.max_active_runs = max_active_runs self.dagrun_timeout = dagrun_timeout self.sla_miss_callback = sla_miss_callback @@ -2597,7 +2648,9 @@ def __repr__(self): def __eq__(self, other): return ( type(self) == type(other) and - all(self.__dict__.get(c, None) == other.__dict__.get(c, None) + # Use getattr() instead of __dict__ as __dict__ doesn't return + # correct values for properties. + all(getattr(self, c, None) == getattr(other, c, None) for c in self._comps)) def __ne__(self, other): @@ -2671,6 +2724,38 @@ def normalize_schedule(self, dttm): return dttm + @property + def dag_id(self): + return self._dag_id + + @dag_id.setter + def dag_id(self, value): + self._dag_id = value + + @property + def full_filepath(self): + return self._full_filepath + + @full_filepath.setter + def full_filepath(self, value): + self._full_filepath = value + + @property + def concurrency(self): + return self._concurrency + + @concurrency.setter + def concurrency(self, value): + self._concurrency = value + + @property + def pickle_id(self): + return self._pickle_id + + @pickle_id.setter + def pickle_id(self, value): + self._pickle_id = value + @property def tasks(self): return list(self.task_dict.values()) @@ -3134,6 +3219,80 @@ def create_dagrun(self, run.refresh_from_db() return run + @staticmethod + @provide_session + def sync_to_db(dag, owner, sync_time, session=None): + """ + Save attributes about this DAG to the DB. Note that this method + can be called for both DAGs and SubDAGs. A SubDag is actually a + SubDagOperator. + + :param dag: the DAG object to save to the DB + :type dag: DAG + :own + :param sync_time: The time that the DAG should be marked as sync'ed + :type sync_time: datetime + :return: None + """ + orm_dag = session.query( + DagModel).filter(DagModel.dag_id == dag.dag_id).first() + if not orm_dag: + orm_dag = DagModel(dag_id=dag.dag_id) + logging.info("Creating ORM DAG for %s", + dag.dag_id) + orm_dag.fileloc = dag.full_filepath + orm_dag.is_subdag = dag.is_subdag + orm_dag.owners = owner + orm_dag.is_active = True + orm_dag.last_scheduler_run = sync_time + session.merge(orm_dag) + session.commit() + + for subdag in dag.subdags: + DAG.sync_to_db(subdag, owner, sync_time, session=session) + + @staticmethod + @provide_session + def deactivate_unknown_dags(active_dag_ids, session=None): + """ + Given a list of known DAGs, deactivate any other DAGs that are + marked as active in the ORM + + :param active_dag_ids: list of DAG IDs that are active + :type active_dag_ids: list[unicode] + :return: None + """ + + if len(active_dag_ids) == 0: + return + for dag in session.query( + DagModel).filter(~DagModel.dag_id.in_(active_dag_ids)).all(): + dag.is_active = False + session.merge(dag) + + @staticmethod + @provide_session + def deactivate_stale_dags(expiration_date, session=None): + """ + Deactivate any DAGs that were last touched by the scheduler before + the expiration date. These DAGs were likely deleted. + + :param expiration_date: set inactive DAGs that were touched before this + time + :type expiration_date: datetime + :return: None + """ + for dag in session.query( + DagModel).filter(DagModel.last_scheduler_run < expiration_date, + DagModel.is_active).all(): + logging.info("Deactivating DAG ID %s since it was last touched " + "by the scheduler at %s", + dag.dag_id, + dag.last_scheduler_run.isoformat()) + dag.is_active = False + session.merge(dag) + session.commit() + class Chart(Base): __tablename__ = "chart" @@ -3567,7 +3726,7 @@ def update_state(self, session=None): state=State.unfinished(), session=session ) - none_depends_on_past = all(t.task.depends_on_past for t in unfinished_tasks) + none_depends_on_past = all(not t.task.depends_on_past for t in unfinished_tasks) # small speed up if unfinished_tasks and none_depends_on_past: @@ -3642,6 +3801,44 @@ def verify_integrity(self, session=None): session.commit() + @staticmethod + def get_running_tasks(session, dag_id, task_ids): + """ + Returns the number of tasks running in the given DAG. + + :param session: ORM session + :param dag_id: ID of the DAG to get the task concurrency of + :type dag_id: unicode + :param task_ids: A list of valid task IDs for the given DAG + :type task_ids: list[unicode] + :return: The number of running tasks + :rtype: int + """ + qry = session.query(func.count(TaskInstance.task_id)).filter( + TaskInstance.dag_id == dag_id, + TaskInstance.task_id.in_(task_ids), + TaskInstance.state == State.RUNNING, + ) + return qry.scalar() + + @staticmethod + def get_run(session, dag_id, execution_date): + """ + :param dag_id: DAG ID + :type dag_id: unicode + :param execution_date: execution date + :type execution_date: datetime + :return: DagRun corresponding to the given dag_id and execution date + if one exists. None otherwise. + :rtype: DagRun + """ + qry = session.query(DagRun).filter( + DagRun.dag_id == dag_id, + DagRun.external_trigger == False, + DagRun.execution_date == execution_date, + ) + return qry.first() + class Pool(Base): __tablename__ = "slot_pool" diff --git a/airflow/settings.py b/airflow/settings.py index 9f9bb14ec19fc..ccd77ee283052 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -72,20 +72,11 @@ def timing(cls, stat, dt): LOGGING_LEVEL = logging.INFO DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER')) -engine_args = {} -if 'sqlite' not in SQL_ALCHEMY_CONN: - # Engine args not supported by sqlite - engine_args['pool_size'] = conf.getint('core', 'SQL_ALCHEMY_POOL_SIZE') - engine_args['pool_recycle'] = conf.getint('core', - 'SQL_ALCHEMY_POOL_RECYCLE') - -engine = create_engine(SQL_ALCHEMY_CONN, **engine_args) -Session = scoped_session( - sessionmaker(autocommit=False, autoflush=False, bind=engine)) - # can't move this to conf due to ConfigParser interpolation LOG_FORMAT = ( '[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s') +LOG_FORMAT_WITH_THREAD_NAME = ( + '[%(asctime)s] {%(filename)s:%(lineno)d} %(threadName)s %(levelname)s - %(message)s') SIMPLE_LOG_FORMAT = '%(asctime)s %(levelname)s - %(message)s' @@ -116,10 +107,28 @@ def policy(task_instance): pass -def configure_logging(): +def configure_logging(log_format=LOG_FORMAT): logging.root.handlers = [] logging.basicConfig( - format=LOG_FORMAT, stream=sys.stdout, level=LOGGING_LEVEL) + format=log_format, stream=sys.stdout, level=LOGGING_LEVEL) + +engine = None +Session = None + + +def configure_orm(): + global engine + global Session + engine_args = {} + if 'sqlite' not in SQL_ALCHEMY_CONN: + # Engine args not supported by sqlite + engine_args['pool_size'] = conf.getint('core', 'SQL_ALCHEMY_POOL_SIZE') + engine_args['pool_recycle'] = conf.getint('core', + 'SQL_ALCHEMY_POOL_RECYCLE') + + engine = create_engine(SQL_ALCHEMY_CONN, **engine_args) + Session = scoped_session( + sessionmaker(autocommit=False, autoflush=False, bind=engine)) try: from airflow_local_settings import * @@ -128,3 +137,4 @@ def configure_logging(): pass configure_logging() +configure_orm() diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py new file mode 100644 index 0000000000000..19c3b4fc6855b --- /dev/null +++ b/airflow/utils/dag_processing.py @@ -0,0 +1,603 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from abc import ABCMeta, abstractmethod +from collections import defaultdict +from datetime import datetime + +import logging +import os +import re + +from airflow.exceptions import AirflowException +from airflow.utils.logging import LoggingMixin +from airflow.utils.models import BaseDag, BaseDagBag + + +class SimpleDag(BaseDag): + """ + A simplified representation of a DAG that contains all attributes + required for instantiating and scheduling its associated tasks. + """ + + def __init__(self, + dag_id, + task_ids, + full_filepath, + concurrency, + is_paused, + pickle_id): + """ + :param dag_id: ID of the DAG + :type dag_id: unicode + :param task_ids: task IDs associated with the DAG + :type task_ids: list[unicode] + :param full_filepath: path to the file containing the DAG e.g. + /a/b/c.py + :type full_filepath: unicode + :param concurrency: No more than these many tasks from the + dag should run concurrently + :type concurrency: int + :param is_paused: Whether or not this DAG is paused. Tasks from paused + DAGs are not scheduled + :type is_paused: bool + :param pickle_id: ID associated with the pickled version of this DAG. + :type pickle_id: unicode + """ + self._dag_id = dag_id + self._task_ids = task_ids + self._full_filepath = full_filepath + self._is_paused = is_paused + self._concurrency = concurrency + self._pickle_id = pickle_id + + @property + def dag_id(self): + """ + :return: the DAG ID + :rtype: unicode + """ + return self._dag_id + + @property + def task_ids(self): + """ + :return: A list of task IDs that are in this DAG + :rtype: list[unicode] + """ + return self._task_ids + + @property + def full_filepath(self): + """ + :return: The absolute path to the file that contains this DAG's definition + :rtype: unicode + """ + return self._full_filepath + + @property + def concurrency(self): + """ + :return: maximum number of tasks that can run simultaneously from this DAG + :rtype: int + """ + return self._concurrency + + @property + def is_paused(self): + """ + :return: whether this DAG is paused or not + :rtype: bool + """ + return self._is_paused + + @property + def pickle_id(self): + """ + :return: The pickle ID for this DAG, if it has one. Otherwise None. + :rtype: unicode + """ + return self._pickle_id + + +class SimpleDagBag(BaseDagBag): + """ + A collection of SimpleDag objects with some convenience methods. + """ + + def __init__(self, simple_dags): + """ + Constructor. + + :param simple_dags: SimpleDag objects that should be in this + :type: list(SimpleDag) + """ + self.simple_dags = simple_dags + self.dag_id_to_simple_dag = {} + + for simple_dag in simple_dags: + self.dag_id_to_simple_dag[simple_dag.dag_id] = simple_dag + + @property + def dag_ids(self): + """ + :return: IDs of all the DAGs in this + :rtype: list[unicode] + """ + return self.dag_id_to_simple_dag.keys() + + def get_dag(self, dag_id): + """ + :param dag_id: DAG ID + :type dag_id: unicode + :return: if the given DAG ID exists in the bag, return the BaseDag + corresponding to that ID. Otherwise, throw an Exception + :rtype: SimpleDag + """ + if dag_id not in self.dag_id_to_simple_dag: + raise AirflowException("Unknown DAG ID {}".format(dag_id)) + return self.dag_id_to_simple_dag[dag_id] + + +def list_py_file_paths(directory, safe_mode=True): + """ + Traverse a directory and look for Python files. + + :param directory: the directory to traverse + :type directory: unicode + :param safe_mode: whether to use a heuristic to determine whether a file + contains Airflow DAG definitions + :return: a list of paths to Python files in the specified directory + :rtype: list[unicode] + """ + file_paths = [] + if directory is None: + return [] + elif os.path.isfile(directory): + return [directory] + elif os.path.isdir(directory): + patterns = [] + for root, dirs, files in os.walk(directory, followlinks=True): + ignore_file = [f for f in files if f == '.airflowignore'] + if ignore_file: + f = open(os.path.join(root, ignore_file[0]), 'r') + patterns += [p for p in f.read().split('\n') if p] + f.close() + for f in files: + try: + file_path = os.path.join(root, f) + if not os.path.isfile(file_path): + continue + mod_name, file_ext = os.path.splitext( + os.path.split(file_path)[-1]) + if file_ext != '.py': + continue + if any([re.findall(p, file_path) for p in patterns]): + continue + + # Heuristic that guesses whether a Python file contains an + # Airflow DAG definition. + might_contain_dag = True + if safe_mode: + with open(file_path, 'rb') as f: + content = f.read() + might_contain_dag = all([s in content + for s in (b'DAG', b'airflow')]) + + if not might_contain_dag: + continue + + file_paths.append(file_path) + except Exception: + logging.exception("Error while examining %s", f) + return file_paths + + +class AbstractDagFileProcessor(object): + """ + Processes a DAG file. See SchedulerJob.process_file() for more details. + """ + __metaclass__ = ABCMeta + + @abstractmethod + def start(self): + """ + Launch the process to process the file + """ + raise NotImplementedError() + + @abstractmethod + def terminate(self, sigkill=False): + """ + Terminate (and then kill) the process launched to process the file + """ + raise NotImplementedError() + + @property + @abstractmethod + def pid(self): + """ + :return: the PID of the process launched to process the given file + """ + raise NotImplementedError() + + @property + @abstractmethod + def exit_code(self): + """ + After the process is finished, this can be called to get the return code + :return: the exit code of the process + :rtype: int + """ + raise NotImplementedError() + + @property + @abstractmethod + def done(self): + """ + Check if the process launched to process this file is done. + :return: whether the process is finished running + :rtype: bool + """ + raise NotImplementedError() + + @property + @abstractmethod + def result(self): + """ + :return: result of running SchedulerJob.process_file() + :rtype: list[SimpleDag] + """ + raise NotImplementedError() + + @property + @abstractmethod + def start_time(self): + """ + :return: When this started to process the file + :rtype: datetime + """ + raise NotImplementedError() + + @property + @abstractmethod + def log_file(self): + """ + :return: the log file associated with this processor + :rtype: unicode + """ + raise NotImplementedError() + + @property + @abstractmethod + def file_path(self): + """ + :return: the path to the file that this is processing + :rtype: unicode + """ + raise NotImplementedError() + + +class DagFileProcessorManager(LoggingMixin): + """ + Given a list of DAG definition files, this kicks off several processors + in parallel to process them. The parallelism is limited and as the + processors finish, more are launched. The files are processed over and + over again, but no more often than the specified interval. + + :type _file_path_queue: list[unicode] + :type _processors: dict[unicode, AbstractDagFileProcessor] + :type _last_runtime: dict[unicode, float] + :type _last_finish_time: dict[unicode, datetime] + """ + def __init__(self, + dag_directory, + file_paths, + parallelism, + process_file_interval, + child_process_log_directory, + max_runs, + processor_factory): + """ + :param dag_directory: Directory where DAG definitions are kept. All + files in file_paths should be under this directory + :type dag_directory: unicode + :param file_paths: list of file paths that contain DAG definitions + :type file_paths: list[unicode] + :param parallelism: maximum number of simultaneous process to run at once + :type parallelism: int + :param process_file_interval: process a file at most once every this + many seconds + :type process_file_interval: float + :param max_runs: The number of times to parse and schedule each file. -1 + for unlimited. + :type max_runs: int + :param child_process_log_directory: Store logs for child processes in + this directory + :type child_process_log_directory: unicode + :type process_file_interval: float + :param processor_factory: function that creates processors for DAG + definition files. Arguments are (dag_definition_path, log_file_path) + :type processor_factory: (unicode, unicode) -> (AbstractDagFileProcessor) + + """ + self._file_paths = file_paths + self._file_path_queue = [] + self._parallelism = parallelism + self._dag_directory = dag_directory + self._max_runs = max_runs + self._process_file_interval = process_file_interval + self._child_process_log_directory = child_process_log_directory + self._processor_factory = processor_factory + # Map from file path to the processor + self._processors = {} + # Map from file path to the last runtime + self._last_runtime = {} + # Map from file path to the last finish time + self._last_finish_time = {} + # Map from file path to the number of runs + self._run_count = defaultdict(int) + + @property + def file_paths(self): + return self._file_paths + + def get_pid(self, file_path): + """ + :param file_path: the path to the file that's being processed + :type file_path: unicode + :return: the PID of the process processing the given file or None if + the specified file is not being processed + :rtype: int + """ + if file_path in self._processors: + return self._processors[file_path].pid + return None + + def get_all_pids(self): + """ + :return: a list of the PIDs for the processors that are running + :rtype: List[int] + """ + return [x.pid for x in self._processors.values()] + + def get_runtime(self, file_path): + """ + :param file_path: the path to the file that's being processed + :type file_path: unicode + :return: the current runtime (in seconds) of the process that's + processing the specified file or None if the file is not currently + being processed + """ + if file_path in self._processors: + return (datetime.now() - self._processors[file_path].start_time)\ + .total_seconds() + return None + + def get_last_runtime(self, file_path): + """ + :param file_path: the path to the file that was processed + :type file_path: unicode + :return: the runtime (in seconds) of the process of the last run, or + None if the file was never processed. + :rtype: float + """ + return self._last_runtime.get(file_path) + + def get_last_finish_time(self, file_path): + """ + :param file_path: the path to the file that was processed + :type file_path: unicode + :return: the finish time of the process of the last run, or None if the + file was never processed. + :rtype: datetime + """ + return self._last_finish_time.get(file_path) + + def get_start_time(self, file_path): + """ + :param file_path: the path to the file that's being processed + :type file_path: unicode + :return: the start time of the process that's processing the + specified file or None if the file is not currently being processed + :rtype: datetime + """ + if file_path in self._processors: + return self._processors[file_path].start_time + return None + + def set_file_paths(self, new_file_paths): + """ + Update this with a new set of paths to DAG definition files. + + :param new_file_paths: list of paths to DAG definition files + :type new_file_paths: list[unicode] + :return: None + """ + self._file_paths = new_file_paths + self._file_path_queue = [x for x in self._file_path_queue + if x in new_file_paths] + # Stop processors that are working on deleted files + filtered_processors = {} + for file_path, processor in self._processors.items(): + if file_path in new_file_paths: + filtered_processors[file_path] = processor + else: + self.logger.warn("Stopping processor for {}".format(file_path)) + processor.stop() + self._processors = filtered_processors + + @staticmethod + def _split_path(file_path): + """ + Return the path elements of a path as an array. E.g. /a/b/c -> + ['a', 'b', 'c'] + + :param file_path: the file path to split + :return: a list of the elements of the file path + :rtype: list[unicode] + """ + results = [] + while True: + head, tail = os.path.split(file_path) + if len(tail) != 0: + results.append(tail) + if file_path == head: + break + file_path = head + results.reverse() + return results + + def _get_log_file_path(self, dag_file_path): + """ + Log output from processing the specified file should go to this + location. + + :param dag_file_path: file containing a DAG + :type dag_file_path: unicode + :return: the path to the corresponding log file + :rtype: unicode + """ + # General approach is to put the log file under the same relative path + # under the log directory as the DAG file in the DAG directory + now = datetime.now() + log_directory = os.path.join(self._child_process_log_directory, + now.strftime("%Y-%m-%d")) + relative_dag_file_path = os.path.relpath(dag_file_path, start=self._dag_directory) + path_elements = self._split_path(relative_dag_file_path) + + # Add a .log suffix for the log file + path_elements[-1] += ".log" + + return os.path.join(log_directory, *path_elements) + + def processing_count(self): + """ + :return: the number of files currently being processed + :rtype: int + """ + return len(self._processors) + + def heartbeat(self): + """ + This should be periodically called by the scheduler. This method will + kick of new processes to process DAG definition files and read the + results from the finished processors. + + :return: a list of SimpleDags that were produced by processors that + have finished since the last time this was called + :rtype: list[SimpleDag] + """ + finished_processors = {} + """:type : dict[unicode, AbstractDagFileProcessor]""" + running_processors = {} + """:type : dict[unicode, AbstractDagFileProcessor]""" + + for file_path, processor in self._processors.items(): + if processor.done: + self.logger.info("Processor for {} finished".format(file_path)) + now = datetime.now() + finished_processors[file_path] = processor + self._last_runtime[file_path] = (now - + processor.start_time).total_seconds() + self._last_finish_time[file_path] = now + self._run_count[file_path] += 1 + else: + running_processors[file_path] = processor + self._processors = running_processors + + # Collect all the DAGs that were found in the processed files + simple_dags = [] + for file_path, processor in finished_processors.items(): + if processor.result is None: + self.logger.warn("Processor for {} exited with return code " + "{}. See {} for details." + .format(processor.file_path, + processor.exit_code, + processor.log_file)) + else: + for simple_dag in processor.result: + simple_dags.append(simple_dag) + + # Generate more file paths to process if we processed all the files + # already. + if len(self._file_path_queue) == 0: + # If the file path is already being processed, or if a file was + # processed recently, wait until the next batch + file_paths_in_progress = self._processors.keys() + now = datetime.now() + file_paths_recently_processed = [] + for file_path in self._file_paths: + last_finish_time = self.get_last_finish_time(file_path) + if (last_finish_time is not None and + (now - last_finish_time).total_seconds() < + self._process_file_interval): + file_paths_recently_processed.append(file_path) + + files_paths_at_run_limit = [file_path + for file_path, num_runs in self._run_count.items() + if num_runs == self._max_runs] + + files_paths_to_queue = list(set(self._file_paths) - + set(file_paths_in_progress) - + set(file_paths_recently_processed) - + set(files_paths_at_run_limit)) + + for file_path, processor in self._processors.items(): + self.logger.debug("File path {} is still being processed (started: {})" + .format(processor.file_path, + processor.start_time.isoformat())) + + self.logger.debug("Queuing the following files for processing:\n\t{}" + .format("\n\t".join(files_paths_to_queue))) + + self._file_path_queue.extend(files_paths_to_queue) + + # Start more processors if we have enough slots and files to process + while (self._parallelism - len(self._processors) > 0 and + len(self._file_path_queue) > 0): + file_path = self._file_path_queue.pop(0) + log_file_path = self._get_log_file_path(file_path) + processor = self._processor_factory(file_path, log_file_path) + + processor.start() + self.logger.info("Started a process (PID: {}) to generate " + "tasks for {} - logging into {}" + .format(processor.pid, file_path, log_file_path)) + + self._processors[file_path] = processor + + return simple_dags + + def max_runs_reached(self): + """ + :return: whether all file paths have been processed max_runs times + """ + for file_path in self._file_paths: + if self._run_count[file_path] != self._max_runs: + return False + return True + + def terminate(self): + """ + Stops all running processors + :return: None + """ + for processor in self._processors.values(): + processor.terminate() diff --git a/airflow/utils/db.py b/airflow/utils/db.py index fda467d4fa11a..d429e86fea5a8 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -17,6 +17,7 @@ from __future__ import print_function from __future__ import unicode_literals +from datetime import datetime from functools import wraps import logging import os @@ -252,7 +253,13 @@ def initdb(): session.add(KET(know_event_type='Marketing Campaign')) session.commit() - models.DagBag(sync_to_db=True) + dagbag = models.DagBag() + # Save individual DAGs in the ORM + now = datetime.now() + for dag in dagbag.dags.values(): + models.DAG.sync_to_db(dag, dag.owner, now) + # Deactivate the unknown ones + models.DAG.deactivate_unknown_dags(dagbag.dags.keys()) Chart = models.Chart chart_label = "Airflow task instance by type" diff --git a/airflow/utils/models.py b/airflow/utils/models.py new file mode 100644 index 0000000000000..83ecfb90e90d2 --- /dev/null +++ b/airflow/utils/models.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from abc import ABCMeta, abstractmethod, abstractproperty + + +class BaseDag(object): + """ + Base DAG object that both the SimpleDag and DAG inherit. + """ + __metaclass__ = ABCMeta + + @abstractproperty + def dag_id(self): + """ + :return: the DAG ID + :rtype: unicode + """ + raise NotImplementedError() + + @abstractproperty + def task_ids(self): + """ + :return: A list of task IDs that are in this DAG + :rtype: List[unicode] + """ + raise NotImplementedError() + + @abstractproperty + def full_filepath(self): + """ + :return: The absolute path to the file that contains this DAG's definition + :rtype: unicode + """ + raise NotImplementedError() + + @abstractmethod + def concurrency(self): + """ + :return: maximum number of tasks that can run simultaneously from this DAG + :rtype: int + """ + raise NotImplementedError() + + @abstractmethod + def is_paused(self): + """ + :return: whether this DAG is paused or not + :rtype: bool + """ + raise NotImplementedError() + + @abstractmethod + def pickle_id(self): + """ + :return: The pickle ID for this DAG, if it has one. Otherwise None. + :rtype: unicode + """ + raise NotImplementedError + + +class BaseDagBag(object): + """ + Base object that both the SimpleDagBag and DagBag inherit. + """ + @abstractproperty + def dag_ids(self): + """ + :return: a list of DAG IDs in this bag + :rtype: List[unicode] + """ + raise NotImplementedError() + + @abstractmethod + def get_dag(self, dag_id): + """ + :return: whether the task exists in this bag + :rtype: BaseDag + """ + raise NotImplementedError() diff --git a/scripts/ci/check-license.sh b/scripts/ci/check-license.sh index a44fb2dc94063..124ba933d9717 100755 --- a/scripts/ci/check-license.sh +++ b/scripts/ci/check-license.sh @@ -81,7 +81,10 @@ if [ $? -ne 0 ]; then exit 1 fi -ERRORS="$(cat rat-results.txt | grep -e "??")" +# TODO: The method of comparing counts doesn't work for branches. +# Revert for merge into master. +#ERRORS="$(cat rat-results.txt | grep -e "??")" +ERRORS="" if test ! -z "$ERRORS"; then echo "Could not find Apache license headers in the following files:" @@ -104,4 +107,4 @@ if test ! -z "$ERRORS"; then exit 0 else echo -e "RAT checks passed." -fi \ No newline at end of file +fi diff --git a/setup.py b/setup.py index 5b14653c71947..85e519577b1d4 100644 --- a/setup.py +++ b/setup.py @@ -191,6 +191,7 @@ def do_setup(): 'jinja2>=2.7.3, <3.0', 'markdown>=2.5.2, <3.0', 'pandas>=0.15.2, <1.0.0', + 'psutil>=4.2.0, <5.0.0', 'pygments>=2.0.1, <3.0', 'python-daemon>=2.1.1, <2.2', 'python-dateutil>=2.3, <3', @@ -198,6 +199,7 @@ def do_setup(): 'requests>=2.5.1, <3', 'setproctitle>=1.1.8, <2', 'sqlalchemy>=0.9.8', + 'tabulate>=0.7.5, <0.8.0', 'thrift>=0.9.2, <0.10', 'zope.deprecation>=4.0, <5.0', ], diff --git a/tests/core.py b/tests/core.py index 36b484bf9d05e..0786830f1512c 100644 --- a/tests/core.py +++ b/tests/core.py @@ -109,6 +109,12 @@ def execute(*args, **kwargs): class CoreTest(unittest.TestCase): + + # These defaults make the test faster to run + default_scheduler_args = {"file_process_interval": 0, + "processor_poll_interval": 0.5, + "num_runs": 1} + def setUp(self): configuration.test_mode() self.dagbag = models.DagBag( @@ -131,7 +137,7 @@ def test_schedule_dag_no_previous_runs(self): owner='Also fake', start_date=datetime(2015, 1, 2, 0, 0))) - dag_run = jobs.SchedulerJob(test_mode=True).schedule_dag(dag) + dag_run = jobs.SchedulerJob(**self.default_scheduler_args).create_dag_run(dag) assert dag_run is not None assert dag_run.dag_id == dag.dag_id assert dag_run.run_id is not None @@ -156,7 +162,8 @@ def test_schedule_dag_fake_scheduled_previous(self): task_id="faketastic", owner='Also fake', start_date=DEFAULT_DATE)) - scheduler = jobs.SchedulerJob(test_mode=True) + + scheduler = jobs.SchedulerJob(**self.default_scheduler_args) dag.create_dagrun(run_id=models.DagRun.id_for_date(DEFAULT_DATE), execution_date=DEFAULT_DATE, state=State.SUCCESS, @@ -183,8 +190,8 @@ def test_schedule_dag_once(self): task_id="faketastic", owner='Also fake', start_date=datetime(2015, 1, 2, 0, 0))) - dag_run = jobs.SchedulerJob(test_mode=True).schedule_dag(dag) - dag_run2 = jobs.SchedulerJob(test_mode=True).schedule_dag(dag) + dag_run = jobs.SchedulerJob(**self.default_scheduler_args).create_dag_run(dag) + dag_run2 = jobs.SchedulerJob(**self.default_scheduler_args).create_dag_run(dag) assert dag_run is not None assert dag_run2 is None @@ -208,11 +215,11 @@ def test_schedule_dag_start_end_dates(self): # Create and schedule the dag runs dag_runs = [] - scheduler = jobs.SchedulerJob(test_mode=True) + scheduler = jobs.SchedulerJob(**self.default_scheduler_args) for i in range(runs): dag_runs.append(scheduler.schedule_dag(dag)) - additional_dag_run = scheduler.schedule_dag(dag) + additional_dag_run = scheduler.create_dag_run(dag) for dag_run in dag_runs: assert dag_run is not None @@ -243,7 +250,7 @@ def test_schedule_dag_no_end_date_up_to_today_only(self): owner='Also fake')) dag_runs = [] - scheduler = jobs.SchedulerJob(test_mode=True) + scheduler = jobs.SchedulerJob(**self.default_scheduler_args) for i in range(runs): dag_run = scheduler.schedule_dag(dag) dag_runs.append(dag_run) @@ -254,7 +261,7 @@ def test_schedule_dag_no_end_date_up_to_today_only(self): session.commit() # Attempt to schedule an additional dag run (for 2016-01-01) - additional_dag_run = scheduler.schedule_dag(dag) + additional_dag_run = scheduler.create_dag_run(dag) for dag_run in dag_runs: assert dag_run is not None @@ -595,7 +602,8 @@ def test_local_task_job(self): job.run() def test_scheduler_job(self): - job = jobs.SchedulerJob(dag_id='example_bash_operator', test_mode=True) + job = jobs.SchedulerJob(dag_id='example_bash_operator', + **self.default_scheduler_args) job.run() def test_raw_job(self): @@ -798,6 +806,7 @@ def setUp(self): self.parser = cli.CLIFactory.get_parser() self.dagbag = models.DagBag( dag_folder=DEV_NULL, include_examples=True) + # Persist DAGs def test_cli_list_dags(self): args = self.parser.parse_args(['list_dags', '--report']) diff --git a/tests/dags_with_system_exit/a_system_exit.py b/tests/dags_with_system_exit/a_system_exit.py new file mode 100644 index 0000000000000..70125502fd6e0 --- /dev/null +++ b/tests/dags_with_system_exit/a_system_exit.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Tests to make sure that a system exit won't cause the scheduler to fail +# Starts with 'a' to get listed first. + +import sys + +from datetime import datetime +from airflow.models import DAG + +DEFAULT_DATE = datetime(2100, 1, 1) + +dag1 = DAG( + dag_id='test_system_exit', + start_date=DEFAULT_DATE) + +sys.exit(-1) diff --git a/tests/dags_with_system_exit/b_test_scheduler_dags.py b/tests/dags_with_system_exit/b_test_scheduler_dags.py new file mode 100644 index 0000000000000..f1df3fa18b1d0 --- /dev/null +++ b/tests/dags_with_system_exit/b_test_scheduler_dags.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime + +from airflow.models import DAG +from airflow.operators import DummyOperator +DEFAULT_DATE = datetime(2000, 1, 1) + +dag1 = DAG( + dag_id='exit_test_dag', + start_date=DEFAULT_DATE) + +dag1_task1 = DummyOperator( + task_id='dummy', + dag=dag1, + owner='airflow') diff --git a/tests/dags_with_system_exit/c_system_exit.py b/tests/dags_with_system_exit/c_system_exit.py new file mode 100644 index 0000000000000..5644304d5500b --- /dev/null +++ b/tests/dags_with_system_exit/c_system_exit.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Tests to make sure that a system exit won't cause the scheduler to fail. +# Start with 'z' to get listed last. + +import sys + +from datetime import datetime +from airflow.models import DAG + +DEFAULT_DATE = datetime(2100, 1, 1) + +dag1 = DAG( + dag_id='test_system_exit', + start_date=DEFAULT_DATE) + +sys.exit(-1) diff --git a/tests/executors/__init__.py b/tests/executors/__init__.py new file mode 100644 index 0000000000000..759b563511c1c --- /dev/null +++ b/tests/executors/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/executors/no_op_executor.py b/tests/executors/no_op_executor.py new file mode 100644 index 0000000000000..d29a78e48e8bf --- /dev/null +++ b/tests/executors/no_op_executor.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from airflow.executors.base_executor import BaseExecutor + + +class NoOpExecutor(BaseExecutor): + """ + This executor does not run any tasks and is used for testing only. + """ + def execute_async(self, key, command, queue=None): + self.logger.info("Tried to execute {}".format(command)) + pass + + def sync(self): + pass + + def end(self): + pass + + def terminate(self): + pass + + def start(self): + pass diff --git a/tests/jobs.py b/tests/jobs.py index d9819459f8423..7866ba19c05e4 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -19,9 +19,11 @@ import datetime import logging +import os import unittest from airflow import AirflowException, settings +from airflow import models from airflow.bin import cli from airflow.executors import DEFAULT_EXECUTOR from airflow.jobs import BackfillJob, SchedulerJob @@ -30,6 +32,7 @@ from airflow.utils.db import provide_session from airflow.utils.state import State from airflow.utils.timeout import timeout +from tests.executors.no_op_executor import NoOpExecutor from tests.executor.test_executor import TestExecutor @@ -172,6 +175,9 @@ def test_cli_backfill_depends_on_past(self): class SchedulerJobTest(unittest.TestCase): + # These defaults make the test faster to run + default_scheduler_args = {"file_process_interval": 0, + "processor_poll_interval": 0.5} def setUp(self): self.dagbag = DagBag() @@ -193,14 +199,14 @@ def evaluate_dagrun( if run_kwargs is None: run_kwargs = {} - scheduler = SchedulerJob() + scheduler = SchedulerJob(**self.default_scheduler_args) dag = self.dagbag.get_dag(dag_id) dag.clear() dr = scheduler.schedule_dag(dag) if advance_execution_date: # run a second time to schedule a dagrun after the start_date - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) ex_date = dr.execution_date try: @@ -299,13 +305,17 @@ def test_scheduler_pooled_tasks(self): dag = self.dagbag.get_dag(dag_id) dag.clear() - scheduler = SchedulerJob(dag_id, num_runs=1) + scheduler = SchedulerJob(dag_id, + num_runs=1, + executor=NoOpExecutor(), + **self.default_scheduler_args) scheduler.run() task_1 = dag.tasks[0] logging.info("Trying to find task {}".format(task_1)) ti = TI(task_1, dag.start_date) ti.refresh_from_db() + logging.error("TI is: {}".format(ti)) self.assertEqual(ti.state, State.QUEUED) # now we use a DIFFERENT scheduler and executor @@ -313,7 +323,8 @@ def test_scheduler_pooled_tasks(self): scheduler2 = SchedulerJob( dag_id, num_runs=5, - executor=DEFAULT_EXECUTOR.__class__()) + executor=DEFAULT_EXECUTOR.__class__(), + **self.default_scheduler_args) scheduler2.run() ti.refresh_from_db() @@ -364,7 +375,9 @@ def test_scheduler_start_date(self): dag.clear() self.assertTrue(dag.start_date > DEFAULT_DATE) - scheduler = SchedulerJob(dag_id, num_runs=2) + scheduler = SchedulerJob(dag_id, + num_runs=2, + **self.default_scheduler_args) scheduler.run() # zero tasks ran @@ -387,7 +400,9 @@ def test_scheduler_start_date(self): self.assertEqual( len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1) - scheduler = SchedulerJob(dag_id, num_runs=2) + scheduler = SchedulerJob(dag_id, + num_runs=2, + **self.default_scheduler_args) scheduler.run() # still one task @@ -404,7 +419,10 @@ def test_scheduler_multiprocessing(self): dag = self.dagbag.get_dag(dag_id) dag.clear() - scheduler = SchedulerJob(dag_ids=dag_ids, num_runs=2) + scheduler = SchedulerJob(dag_ids=dag_ids, + file_process_interval=0, + processor_poll_interval=0.5, + num_runs=2) scheduler.run() # zero tasks ran @@ -819,3 +837,52 @@ def do_schedule(function, function2): do_schedule() self.assertEquals(2, len(executor.queued_tasks)) + def test_scheduler_run_duration(self): + """ + Verifies that the scheduler run duration limit is followed. + """ + dag_id = 'test_start_date_scheduling' + dag = self.dagbag.get_dag(dag_id) + dag.clear() + self.assertTrue(dag.start_date > DEFAULT_DATE) + + expected_run_duration = 5 + start_time = datetime.datetime.now() + scheduler = SchedulerJob(dag_id, + run_duration=expected_run_duration, + **self.default_scheduler_args) + scheduler.run() + end_time = datetime.datetime.now() + + run_duration = (end_time - start_time).total_seconds() + logging.info("Test ran in %.2fs, expected %.2fs", + run_duration, + expected_run_duration) + assert run_duration - expected_run_duration < 2.5 + + def test_dag_with_system_exit(self): + """ + Test to check that a DAG with a system.exit() doesn't break the scheduler. + """ + + dag_id = 'exit_test_dag' + dag_ids = [dag_id] + dag_directory = os.path.join(models.DAGS_FOLDER, + "..", + "dags_with_system_exit") + dag_file = os.path.join(dag_directory, + 'b_test_scheduler_dags.py') + + dagbag = DagBag(dag_folder=dag_file) + for dag_id in dag_ids: + dag = dagbag.get_dag(dag_id) + dag.clear() + + scheduler = SchedulerJob(dag_ids=dag_ids, + subdir= dag_directory, + num_runs=1, + **self.default_scheduler_args) + scheduler.run() + session = settings.Session() + self.assertEqual( + len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1) \ No newline at end of file From 92ac29339349723025a963e3ede107e38de6efea Mon Sep 17 00:00:00 2001 From: Paul Yang Date: Wed, 29 Jun 2016 16:56:29 -0700 Subject: [PATCH 2/2] Additional commit (will squash later) --- {tests/executors => airflow/dag}/__init__.py | 2 +- airflow/{utils/models.py => dag/base_dag.py} | 0 airflow/jobs.py | 166 +++++++++++++----- airflow/models.py | 2 +- airflow/utils/dag_processing.py | 19 +- airflow/utils/db.py | 2 +- scripts/ci/check-license.sh | 5 +- scripts/ci/setup_env.sh | 12 ++ tests/core.py | 6 +- .../no_op_executor.py => dags/no_dags.py} | 25 +-- .../b_test_scheduler_dags.py | 3 +- tests/jobs.py | 119 ++++--------- 12 files changed, 192 insertions(+), 169 deletions(-) rename {tests/executors => airflow/dag}/__init__.py (99%) rename airflow/{utils/models.py => dag/base_dag.py} (100%) rename tests/{executors/no_op_executor.py => dags/no_dags.py} (55%) diff --git a/tests/executors/__init__.py b/airflow/dag/__init__.py similarity index 99% rename from tests/executors/__init__.py rename to airflow/dag/__init__.py index 759b563511c1c..a84b6daa69f41 100644 --- a/tests/executors/__init__.py +++ b/airflow/dag/__init__.py @@ -11,4 +11,4 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# +# \ No newline at end of file diff --git a/airflow/utils/models.py b/airflow/dag/base_dag.py similarity index 100% rename from airflow/utils/models.py rename to airflow/dag/base_dag.py diff --git a/airflow/jobs.py b/airflow/jobs.py index 28c3097d4686e..b391415ef7e85 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -20,8 +20,7 @@ from past.builtins import basestring from collections import defaultdict, Counter -from datetime import datetime, timedelta -from itertools import product +from datetime import datetime import getpass import logging @@ -43,7 +42,7 @@ from airflow import executors, models, settings from airflow import configuration as conf from airflow.exceptions import AirflowException -from airflow.models import DagRun, TaskInstance +from airflow.models import DagRun from airflow.settings import Stats from airflow.utils.state import State from airflow.utils.db import provide_session, pessimistic_connection_handling @@ -280,7 +279,7 @@ def helper(): # Re-direct stdout and stderr to a separate log file. Otherwise, # the main log becomes too hard to read. No buffering to enable # responsive file tailing - parent_dir, filename = os.path.split(log_file) + parent_dir, _ = os.path.split(log_file) # Create the parent directory for the log file if necessary. if not os.path.isdir(parent_dir): @@ -290,9 +289,8 @@ def helper(): original_stdout = sys.stdout original_stderr = sys.stderr - # TODO: Uncomment - #sys.stdout = f - #sys.stderr = f + sys.stdout = f + sys.stderr = f try: # Re-configure logging to use the new output streams @@ -318,6 +316,10 @@ def helper(): logging.info("Processing %s took %.3f seconds", file_path, end_time - start_time) + except: + # Log exceptions through the logging framework. + logging.exception("Got an exception! Propagating...") + raise finally: sys.stdout = original_stdout sys.stderr = original_stderr @@ -491,14 +493,14 @@ def __init__( self.do_pickle = do_pickle super(SchedulerJob, self).__init__(*args, **kwargs) - self.logger.error("Executor is {}".format(self.executor.__class__)) - self.heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC') self.max_threads = min(conf.getint('scheduler', 'max_threads'), multiprocessing.cpu_count()) + self.using_sqlite = False if 'sqlite' in conf.get('core', 'sql_alchemy_conn'): if self.max_threads > 1: self.logger.error("Cannot use more than 1 thread when using sqlite. Setting max_threads to 1") self.max_threads = 1 + self.using_sqlite = True # How often to scan the DAGs directory for new files. Default to 5 minutes. self.dag_dir_list_interval = conf.getint('scheduler', @@ -565,7 +567,8 @@ def manage_slas(self, dag, session=None): slas = ( session .query(SlaMiss) - .filter(SlaMiss.email_sent.is_(False) or SlaMiss.notification_sent.is_(False)) + .filter(or_(SlaMiss.email_sent == False, + SlaMiss.notification_sent == False)) .filter(SlaMiss.dag_id == dag.dag_id) .all() ) @@ -639,14 +642,37 @@ def manage_slas(self, dag, session=None): session.commit() session.close() - def import_errors(self, dagbag): - session = settings.Session() - session.query(models.ImportError).delete() + @staticmethod + def record_import_errors(session, dagbag): + """ + For the DAGs in the given DagBag, record any associated import errors. + These are usually displayed through the Airflow UI so that users know + that there are issues parsing DAGs. + + :param session: session for ORM operations + :type session: sqlalchemy.orm.session.Session + :param dagbag: DagBag containing DAGs with import errors + :type dagbag: models.Dagbag + """ for filename, stacktrace in list(dagbag.import_errors.items()): + session.query(models.ImportError).filter( + models.ImportError.filename == filename + ).delete() session.add(models.ImportError( filename=filename, stacktrace=stacktrace)) session.commit() + @staticmethod + def clear_import_errors(session): + """ + Remove all the known import errors from the DB. + + :param session: session for ORM operations + :type session: sqlalchemy.orm.session.Session + """ + session.query(models.ImportError).delete() + session.commit() + @provide_session def create_dag_run(self, dag, session=None): """ @@ -745,8 +771,9 @@ def create_dag_run(self, dag, session=None): def _process_task_instances(self, dag, queue): """ - This method schedules a single DAG by looking at the latest - run for each task and attempting to schedule the following run. + This method schedules the tasks for a single DAG by looking at the + active DAG runs and adding task instances that should run to the + queue. """ DagModel = models.DagModel session = settings.Session() @@ -758,6 +785,8 @@ def _process_task_instances(self, dag, queue): self.logger.info("Examining DAG run {}".format(run)) # do not consider runs that are executed in the future if run.execution_date > datetime.now(): + self.logging.error("Execution date is in future: {}" + .format(run.execution_date)) continue # todo: run.dag is transient but needs to be set @@ -770,7 +799,7 @@ def _process_task_instances(self, dag, queue): active_dag_runs.append(run) for run in active_dag_runs: - self.logger.info("Examining active DAG run {}".format(run)) + self.logger.debug("Examining active DAG run {}".format(run)) # this needs a fresh session sometimes tis get detached tis = run.get_task_instances(state=(State.NONE, State.UP_FOR_RETRY)) @@ -790,7 +819,7 @@ def _process_task_instances(self, dag, queue): if ti.is_runnable(flag_upstream_failed=True): self.logger.debug('Queuing task: {}'.format(ti)) - queue.put(ti.key) + queue.append(ti.key) session.close() @@ -802,15 +831,16 @@ def _change_state_for_tis_without_dagrun(self, session=None): """ For all DAG IDs in the SimpleDagBag, look for task instances in the - old_state state and set them to new_state if the corresponding DagRun - exists but is not in the running state. + old_states and set them to new_state if the corresponding DagRun + exists but is not in the running state. This normally should not + happen, but it can if the state of DagRuns are changed manually. :param old_states: examine TaskInstances in this state :type old_state: list[State] :param new_state: set TaskInstances to this state :type new_state: State :param simple_dag_bag: TaskInstances associated with DAGs in the - simple_dag_bag and in the UP_FOR_RETRY state will be examined. + simple_dag_bag and with states in the old_state will be examined :type simple_dag_bag: SimpleDagBag """ @@ -824,25 +854,40 @@ def _change_state_for_tis_without_dagrun(self, """:type: list[TaskInstance]""" for task_instance in task_instances_to_change: - dag_run = DagRun.get_run(session, - task_instance.dag_id, - task_instance.execution_date) - if dag_run is None: + dag_runs = DagRun.find(dag_id=task_instance.dag_id, + execution_date=task_instance.execution_date, + ) + + if len(dag_runs) == 0: self.logger.warn("DagRun for %s %s does not exist", task_instance.dag_id, task_instance.execution_date) - elif dag_run.state != State.RUNNING: - self.logger.warn("Setting %s to state=%s as the state of " - "%s is not %s", - task_instance, - new_state, - dag_run, - State.RUNNING) + continue + + # There should only be one DAG run. Add some logging info if this + # is not the case for later debugging. + if len(dag_runs) > 1: + self.logger.warn("Multiple DagRuns found for {} {}: {}" + .format(task_instance.dag_id, + task_instance.execution_date, + dag_runs)) + + dag_is_running = True + for dag_run in dag_runs: + if dag_run.state == State.RUNNING: + dag_is_running = True + break + + if not dag_is_running: + self.logger.warn("Setting {} to state={} as it does not have " + "a DagRun in the {} state" + .format(task_instance, + new_state, + State.RUNNING)) task_instance.state = new_state session.merge(task_instance) session.commit() - @provide_session def _execute_task_instances(self, simple_dag_bag, @@ -967,18 +1012,31 @@ def _execute_task_instances(self, self.logger.info("Sending to executor {} with priority {} and queue {}" .format(task_instance.key, priority, queue)) + # Set the state to queued + self.logger.info("Setting state of {} to {}".format( + task_instance.key, State.QUEUED)) + task_instance.state = State.QUEUED + task_instance.queued_dttm = (datetime.now() + if not task_instance.queued_dttm + else task_instance.queued_dttm) + session.merge(task_instance) + session.commit() + + # These attributes will be lost after the object expires, so save them. + task_id_ = task_instance.task_id + dag_id_ = task_instance.dag_id + execution_date_ = task_instance.execution_date + make_transient(task_instance) + task_instance.task_id = task_id_ + task_instance.dag_id = dag_id_ + task_instance.execution_date = execution_date_ + self.executor.queue_command( task_instance, command, priority=priority, queue=queue) - self.logger.info("Setting state of {} to {}".format( - task_instance.key, State.QUEUED)) - task_instance.state = State.QUEUED - task_instance.queued_dttm = datetime.now() - session.merge(task_instance) - open_slots -= 1 def _process_dags(self, dagbag, dags, tis_out): @@ -990,7 +1048,7 @@ def _process_dags(self, dagbag, dags, tis_out): 3. Send emails for tasks that have missed SLAs. :param dagbag: a collection of DAGs to process - :type dagbag: DagBag + :type dagbag: models.DagBag :param dags: the DAGs from the DagBag to process :type dags: DAG :param tis_out: A queue to add generated TaskInstance objects @@ -1179,7 +1237,8 @@ def processor_factory(file_path, log_file_path): # First try SIGTERM this_process = psutil.Process(os.getpid()) # Only check child processes to ensure that we don't have a case - # where a child process died but the PID got reused. + # where we kill the wrong process because a child process died + # but the PID got reused. child_processes = [x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill] for child in child_processes: @@ -1203,8 +1262,7 @@ def processor_factory(file_path, log_file_path): child.kill() child.wait() - @provide_session - def _execute_helper(self, processor_manager, session=None): + def _execute_helper(self, processor_manager): """ :param processor_manager: manager to use :type processor_manager: DagFileProcessorManager @@ -1212,6 +1270,7 @@ def _execute_helper(self, processor_manager, session=None): """ self.executor.start() + session = settings.Session() self.logger.info("Resetting state for orphaned tasks") # grab orphaned tasks and make sure to reset their state active_runs = DagRun.find( @@ -1224,6 +1283,10 @@ def _execute_helper(self, processor_manager, session=None): dr.execution_date)) self._reset_state_for_orphaned_tasks(dr, session=session) + self.logger.info("Removing old import errors") + self.clear_import_errors(session) + session.close() + execute_start_time = datetime.now() # Last time stats were printed @@ -1260,6 +1323,13 @@ def _execute_helper(self, processor_manager, session=None): self.logger.info("Heartbeating the process manager") simple_dags = processor_manager.heartbeat() + if self.using_sqlite: + # For the sqlite case w/ 1 thread, wait until the processor + # is finished to avoid concurrent access to the DB. + self.logger.debug("Waiting for processors to finish since we're " + "using sqlite") + processor_manager.wait_until_finished() + # Send tasks for execution if available if len(simple_dags) > 0: simple_dag_bag = SimpleDagBag(simple_dags) @@ -1422,12 +1492,14 @@ def process_file(self, file_path, pickle_dags=False, session=None): if not dag.parent_dag and dag.dag_id not in paused_dag_ids] - tis_q = multiprocessing.Queue() + # Not using multiprocessing.Queue() since it's no longer a separate + # process and due to some unusual behavior. (empty() incorrectly + # returns true?) + ti_keys_to_schedule = [] - self._process_dags(dagbag, dags, tis_q) + self._process_dags(dagbag, dags, ti_keys_to_schedule) - while not tis_q.empty(): - ti_key = tis_q.get() + for ti_key in ti_keys_to_schedule: dag = dagbag.dags[ti_key[0]] task = dag.get_task(ti_key[1]) ti = models.TaskInstance(task, ti_key[2]) @@ -1442,7 +1514,7 @@ def process_file(self, file_path, pickle_dags=False, session=None): # Record import errors into the ORM try: - self.import_errors(dagbag) + self.record_import_errors(session, dagbag) except Exception: self.logger.exception("Error logging import errors!") try: diff --git a/airflow/models.py b/airflow/models.py index febd837aa9c75..8326a92279336 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -59,6 +59,7 @@ from airflow.executors import DEFAULT_EXECUTOR, LocalExecutor from airflow import configuration from airflow.exceptions import AirflowException, AirflowSkipException +from airflow.dag.base_dag import BaseDag, BaseDagBag from airflow.utils.dates import cron_presets, date_range as utils_date_range from airflow.utils.db import provide_session from airflow.utils.decorators import apply_defaults @@ -66,7 +67,6 @@ from airflow.utils.helpers import ( as_tuple, is_container, is_in, validate_key, pprinttable) from airflow.utils.logging import LoggingMixin -from airflow.utils.models import BaseDag, BaseDagBag from airflow.utils.operator_resources import Resources from airflow.utils.state import State from airflow.utils.timeout import timeout diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index 19c3b4fc6855b..fc4ca1baf1a16 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -17,17 +17,18 @@ from __future__ import print_function from __future__ import unicode_literals -from abc import ABCMeta, abstractmethod -from collections import defaultdict -from datetime import datetime - import logging import os import re +import time + +from abc import ABCMeta, abstractmethod +from collections import defaultdict +from datetime import datetime from airflow.exceptions import AirflowException +from airflow.dag.base_dag import BaseDag, BaseDagBag from airflow.utils.logging import LoggingMixin -from airflow.utils.models import BaseDag, BaseDagBag class SimpleDag(BaseDag): @@ -494,6 +495,14 @@ def processing_count(self): """ return len(self._processors) + def wait_until_finished(self): + """ + Sleeps until all the processors are done. + """ + for file_path, processor in self._processors.items(): + while not processor.done: + time.sleep(0.1) + def heartbeat(self): """ This should be periodically called by the scheduler. This method will diff --git a/airflow/utils/db.py b/airflow/utils/db.py index d429e86fea5a8..0d6f3d5d87c2b 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -255,7 +255,7 @@ def initdb(): dagbag = models.DagBag() # Save individual DAGs in the ORM - now = datetime.now() + now = datetime.utcnow() for dag in dagbag.dags.values(): models.DAG.sync_to_db(dag, dag.owner, now) # Deactivate the unknown ones diff --git a/scripts/ci/check-license.sh b/scripts/ci/check-license.sh index 124ba933d9717..87063e2e3b06f 100755 --- a/scripts/ci/check-license.sh +++ b/scripts/ci/check-license.sh @@ -81,10 +81,7 @@ if [ $? -ne 0 ]; then exit 1 fi -# TODO: The method of comparing counts doesn't work for branches. -# Revert for merge into master. -#ERRORS="$(cat rat-results.txt | grep -e "??")" -ERRORS="" +ERRORS="$(cat rat-results.txt | grep -e "??")" if test ! -z "$ERRORS"; then echo "Could not find Apache license headers in the following files:" diff --git a/scripts/ci/setup_env.sh b/scripts/ci/setup_env.sh index a5da867ad5140..9df4d8168c5d4 100755 --- a/scripts/ci/setup_env.sh +++ b/scripts/ci/setup_env.sh @@ -126,7 +126,19 @@ tar zxf ${TRAVIS_CACHE}/hive/hive.tar.gz --strip-components 1 -C ${HIVE_HOME} echo "Downloading and unpacking minicluster" curl -z ${TRAVIS_CACHE}/minicluster/minicluster.zip -o ${TRAVIS_CACHE}/minicluster/minicluster.zip -L ${MINICLUSTER_URL} +ls -l ${TRAVIS_CACHE}/minicluster/minicluster.zip unzip ${TRAVIS_CACHE}/minicluster/minicluster.zip -d /tmp +if [ $? != 0 ] ; then + # Try downloading w/o cache if there's a failure + curl -o ${TRAVIS_CACHE}/minicluster/minicluster.zip -L ${MINICLUSTER_URL} + ls -l ${TRAVIS_CACHE}/minicluster/minicluster.zip + unzip ${TRAVIS_CACHE}/minicluster/minicluster.zip -d /tmp + if [ $? != 0 ] ; then + echo "Failed twice in downloading and unpacking minicluster!" >&2 + exit 1 + fi + exit 1 +fi echo "Path = ${PATH}" diff --git a/tests/core.py b/tests/core.py index 0786830f1512c..8ff6ebf2b9be1 100644 --- a/tests/core.py +++ b/tests/core.py @@ -168,7 +168,7 @@ def test_schedule_dag_fake_scheduled_previous(self): execution_date=DEFAULT_DATE, state=State.SUCCESS, external_trigger=True) - dag_run = scheduler.schedule_dag(dag) + dag_run = scheduler.create_dag_run(dag) assert dag_run is not None assert dag_run.dag_id == dag.dag_id assert dag_run.run_id is not None @@ -217,7 +217,7 @@ def test_schedule_dag_start_end_dates(self): dag_runs = [] scheduler = jobs.SchedulerJob(**self.default_scheduler_args) for i in range(runs): - dag_runs.append(scheduler.schedule_dag(dag)) + dag_runs.append(scheduler.create_dag_run(dag)) additional_dag_run = scheduler.create_dag_run(dag) @@ -252,7 +252,7 @@ def test_schedule_dag_no_end_date_up_to_today_only(self): dag_runs = [] scheduler = jobs.SchedulerJob(**self.default_scheduler_args) for i in range(runs): - dag_run = scheduler.schedule_dag(dag) + dag_run = scheduler.create_dag_run(dag) dag_runs.append(dag_run) # Mark the DagRun as complete diff --git a/tests/executors/no_op_executor.py b/tests/dags/no_dags.py similarity index 55% rename from tests/executors/no_op_executor.py rename to tests/dags/no_dags.py index d29a78e48e8bf..a84b6daa69f41 100644 --- a/tests/executors/no_op_executor.py +++ b/tests/dags/no_dags.py @@ -11,27 +11,4 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# - -from airflow.executors.base_executor import BaseExecutor - - -class NoOpExecutor(BaseExecutor): - """ - This executor does not run any tasks and is used for testing only. - """ - def execute_async(self, key, command, queue=None): - self.logger.info("Tried to execute {}".format(command)) - pass - - def sync(self): - pass - - def end(self): - pass - - def terminate(self): - pass - - def start(self): - pass +# \ No newline at end of file diff --git a/tests/dags_with_system_exit/b_test_scheduler_dags.py b/tests/dags_with_system_exit/b_test_scheduler_dags.py index f1df3fa18b1d0..ed6904f90bd80 100644 --- a/tests/dags_with_system_exit/b_test_scheduler_dags.py +++ b/tests/dags_with_system_exit/b_test_scheduler_dags.py @@ -15,7 +15,8 @@ from datetime import datetime from airflow.models import DAG -from airflow.operators import DummyOperator +from airflow.operators.dummy_operator import DummyOperator + DEFAULT_DATE = datetime(2000, 1, 1) dag1 = DAG( diff --git a/tests/jobs.py b/tests/jobs.py index 7866ba19c05e4..e86b9dadd18af 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -32,7 +32,6 @@ from airflow.utils.db import provide_session from airflow.utils.state import State from airflow.utils.timeout import timeout -from tests.executors.no_op_executor import NoOpExecutor from tests.executor.test_executor import TestExecutor @@ -202,7 +201,7 @@ def evaluate_dagrun( scheduler = SchedulerJob(**self.default_scheduler_args) dag = self.dagbag.get_dag(dag_id) dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) if advance_execution_date: # run a second time to schedule a dagrun after the start_date @@ -270,22 +269,6 @@ def test_dagrun_root_fail(self): }, dagrun_state=State.FAILED) - def test_dagrun_deadlock(self): - """ - Deadlocked DagRun is marked a failure - - Test that a deadlocked dagrun is marked as a failure by having - depends_on_past and an execution_date after the start_date - """ - self.evaluate_dagrun( - dag_id='test_dagrun_states_deadlock', - expected_task_states={ - 'test_depends_on_past': None, - 'test_depends_on_past_2': None, - }, - dagrun_state=State.FAILED, - advance_execution_date=True) - def test_scheduler_pooled_tasks(self): """ Test that the scheduler handles queued tasks correctly @@ -307,7 +290,7 @@ def test_scheduler_pooled_tasks(self): scheduler = SchedulerJob(dag_id, num_runs=1, - executor=NoOpExecutor(), + executor=TestExecutor(), **self.default_scheduler_args) scheduler.run() @@ -443,14 +426,15 @@ def test_scheduler_dagrun_once(self): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNone(dr) - def test_scheduler_process_execute_task(self): + def test_scheduler_process_task_instances(self): """ - Test if process dag sends a task to the executor + Test if _process_task_instances puts the right task instances into the + queue. """ dag = DAG( dag_id='test_scheduler_process_execute_task', @@ -468,51 +452,16 @@ def test_scheduler_process_execute_task(self): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) queue = mock.Mock() - scheduler.process_dag(dag, queue=queue) + scheduler._process_task_instances(dag, queue=queue) - queue.put.assert_called_with( - ((dag.dag_id, dag_task1.task_id, DEFAULT_DATE), None) + queue.append.assert_called_with( + (dag.dag_id, dag_task1.task_id, DEFAULT_DATE) ) - tis = dr.get_task_instances(state=State.SCHEDULED) - self.assertIsNotNone(tis) - - def test_scheduler_process_check_heartrate(self): - """ - Test if process dag honors the heartrate - """ - dag = DAG( - dag_id='test_scheduler_process_check_heartrate', - start_date=DEFAULT_DATE) - dag_task1 = DummyOperator( - task_id='dummy', - dag=dag, - owner='airflow') - - session = settings.Session() - orm_dag = DagModel(dag_id=dag.dag_id) - orm_dag.last_scheduler_run = datetime.datetime.now() - session.merge(orm_dag) - session.commit() - session.close() - - scheduler = SchedulerJob() - scheduler.heartrate = 1000 - - dag.clear() - - dr = scheduler.schedule_dag(dag) - self.assertIsNotNone(dr) - - queue = mock.Mock() - scheduler.process_dag(dag, queue=queue) - - queue.put.assert_not_called() - def test_scheduler_do_not_schedule_removed_task(self): dag = DAG( dag_id='test_scheduler_do_not_schedule_removed_task', @@ -531,7 +480,7 @@ def test_scheduler_do_not_schedule_removed_task(self): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) dag = DAG( @@ -539,7 +488,7 @@ def test_scheduler_do_not_schedule_removed_task(self): start_date=DEFAULT_DATE) queue = mock.Mock() - scheduler.process_dag(dag, queue=queue) + scheduler._process_task_instances(dag, queue=queue) queue.put.assert_not_called() @@ -561,11 +510,11 @@ def test_scheduler_do_not_schedule_too_early(self): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNone(dr) queue = mock.Mock() - scheduler.process_dag(dag, queue=queue) + scheduler._process_task_instances(dag, queue=queue) queue.put.assert_not_called() @@ -586,7 +535,7 @@ def test_scheduler_do_not_run_finished(self): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) tis = dr.get_task_instances(session=session) @@ -597,7 +546,7 @@ def test_scheduler_do_not_run_finished(self): session.close() queue = mock.Mock() - scheduler.process_dag(dag, queue=queue) + scheduler._process_task_instances(dag, queue=queue) queue.put.assert_not_called() @@ -623,7 +572,7 @@ def test_scheduler_add_new_task(self): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) tis = dr.get_task_instances() @@ -635,7 +584,7 @@ def test_scheduler_add_new_task(self): owner='airflow') queue = mock.Mock() - scheduler.process_dag(dag, queue=queue) + scheduler._process_task_instances(dag, queue=queue) tis = dr.get_task_instances() self.assertEquals(len(tis), 2) @@ -663,10 +612,10 @@ def test_scheduler_verify_max_active_runs(self): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNone(dr) def test_scheduler_fail_dagrun_timeout(self): @@ -691,13 +640,13 @@ def test_scheduler_fail_dagrun_timeout(self): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) dr.start_date = datetime.datetime.now() - datetime.timedelta(days=1) session.merge(dr) session.commit() - dr2 = scheduler.schedule_dag(dag) + dr2 = scheduler.create_dag_run(dag) self.assertIsNotNone(dr2) dr.refresh_from_db(session=session) @@ -728,18 +677,18 @@ def test_scheduler_verify_max_active_runs_and_dagrun_timeout(self): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) # Should not be scheduled as DagRun has not timedout and max_active_runs is reached - new_dr = scheduler.schedule_dag(dag) + new_dr = scheduler.create_dag_run(dag) self.assertIsNone(new_dr) # Should be scheduled as dagrun_timeout has passed dr.start_date = datetime.datetime.now() - datetime.timedelta(days=1) session.merge(dr) session.commit() - new_dr = scheduler.schedule_dag(dag) + new_dr = scheduler.create_dag_run(dag) self.assertIsNotNone(new_dr) def test_scheduler_auto_align(self): @@ -767,7 +716,7 @@ def test_scheduler_auto_align(self): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) self.assertEquals(dr.execution_date, datetime.datetime(2016, 1, 2, 5, 4)) @@ -789,7 +738,7 @@ def test_scheduler_auto_align(self): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) self.assertEquals(dr.execution_date, datetime.datetime(2016, 1, 1, 10, 10)) @@ -826,7 +775,13 @@ def test_scheduler_reschedule(self): @mock.patch('airflow.models.DagBag', return_value=dagbag) @mock.patch('airflow.models.DagBag.collect_dags') def do_schedule(function, function2): - scheduler = SchedulerJob(num_runs=1, executor=executor,) + # Use a empty file since the above mock will return the + # expected DAGs. Also specify only a single file so that it doesn't + # try to schedule the above DAG repeatedly. + scheduler = SchedulerJob(num_runs=1, + executor=executor, + subdir=os.path.join(models.DAGS_FOLDER, + "no_dags.py")) scheduler.heartrate = 0 scheduler.run() @@ -858,7 +813,7 @@ def test_scheduler_run_duration(self): logging.info("Test ran in %.2fs, expected %.2fs", run_duration, expected_run_duration) - assert run_duration - expected_run_duration < 2.5 + assert run_duration - expected_run_duration < 5.0 def test_dag_with_system_exit(self): """ @@ -885,4 +840,4 @@ def test_dag_with_system_exit(self): scheduler.run() session = settings.Session() self.assertEqual( - len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1) \ No newline at end of file + len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)