Skip to content

Commit

Permalink
Merge pull request #2781 from bolkedebruin/AIRFLOW-1802
Browse files Browse the repository at this point in the history
  • Loading branch information
bolkedebruin committed Nov 27, 2017
2 parents d8115e9 + f1ab56c commit d990531
Show file tree
Hide file tree
Showing 53 changed files with 1,104 additions and 395 deletions.
9 changes: 5 additions & 4 deletions airflow/api/common/experimental/mark_tasks.py
Expand Up @@ -12,16 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime

from airflow.jobs import BackfillJob
from airflow.models import DagRun, TaskInstance
from airflow.operators.subdag_operator import SubDagOperator
from airflow.settings import Session
from airflow.utils import timezone
from airflow.utils.state import State

from sqlalchemy import or_


def _create_dagruns(dag, execution_dates, state, run_id_template):
"""
Infers from the dates which dag runs need to be created and does so.
Expand All @@ -39,7 +39,7 @@ def _create_dagruns(dag, execution_dates, state, run_id_template):
dr = dag.create_dagrun(
run_id=run_id_template.format(date.isoformat()),
execution_date=date,
start_date=datetime.datetime.utcnow(),
start_date=timezone.utcnow(),
external_trigger=False,
state=state,
)
Expand Down Expand Up @@ -67,7 +67,7 @@ def set_state(task, execution_date, upstream=False, downstream=False,
:param commit: Commit tasks to be altered to the database
:return: list of tasks that have been created and updated
"""
assert isinstance(execution_date, datetime.datetime)
assert timezone.is_localized(execution_date)

# microseconds are supported by the database, but is not handled
# correctly by airflow on e.g. the filesystem and in other places
Expand Down Expand Up @@ -185,6 +185,7 @@ def set_state(task, execution_date, upstream=False, downstream=False,

return tis_altered


def set_dag_run_state(dag, execution_date, state=State.SUCCESS, commit=False):
"""
Set the state of a dag run and all task instances associated with the dag
Expand Down
6 changes: 3 additions & 3 deletions airflow/api/common/experimental/trigger_dag.py
Expand Up @@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import json

from airflow.exceptions import AirflowException
from airflow.models import DagRun, DagBag
from airflow.utils import timezone
from airflow.utils.state import State


Expand All @@ -29,9 +29,9 @@ def trigger_dag(dag_id, run_id=None, conf=None, execution_date=None):
dag = dagbag.get_dag(dag_id)

if not execution_date:
execution_date = datetime.datetime.utcnow()
execution_date = timezone.utcnow()

assert isinstance(execution_date, datetime.datetime)
assert timezone.is_localized(execution_date)
execution_date = execution_date.replace(microsecond=0)

if not run_id:
Expand Down
10 changes: 7 additions & 3 deletions airflow/config_templates/default_airflow.cfg
Expand Up @@ -54,6 +54,10 @@ logging_config_class =
log_format = [%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s

# Default timezone in case supplied date times are naive
# can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam)
default_timezone = utc

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
executor = SequentialExecutor
Expand Down Expand Up @@ -364,12 +368,12 @@ authenticate = False

[ldap]
# set this to ldaps://<your.ldap.server>:<port>
uri =
uri =
user_filter = objectClass=*
user_name_attr = uid
group_member_attr = memberOf
superuser_filter =
data_profiler_filter =
superuser_filter =
data_profiler_filter =
bind_user = cn=Manager,dc=example,dc=com
bind_password = insecure
basedn = dc=example,dc=com
Expand Down
80 changes: 41 additions & 39 deletions airflow/jobs.py
Expand Up @@ -28,13 +28,15 @@
import sys
import threading
import time
import datetime

from collections import defaultdict
from datetime import datetime
from past.builtins import basestring
from sqlalchemy import (
Column, Integer, String, DateTime, func, Index, or_, and_, not_)
Column, Integer, String, func, Index, or_, and_, not_)
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.session import make_transient
from sqlalchemy_utc import UtcDateTime
from tabulate import tabulate
from time import sleep

Expand All @@ -46,7 +48,7 @@
from airflow.settings import Stats
from airflow.task_runner import get_task_runner
from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
from airflow.utils import asciiart
from airflow.utils import asciiart, timezone
from airflow.utils.dag_processing import (AbstractDagFileProcessor,
DagFileProcessorManager,
SimpleDag,
Expand Down Expand Up @@ -76,9 +78,9 @@ class BaseJob(Base, LoggingMixin):
dag_id = Column(String(ID_LEN),)
state = Column(String(20))
job_type = Column(String(30))
start_date = Column(DateTime())
end_date = Column(DateTime())
latest_heartbeat = Column(DateTime())
start_date = Column(UtcDateTime())
end_date = Column(UtcDateTime())
latest_heartbeat = Column(UtcDateTime())
executor_class = Column(String(500))
hostname = Column(String(500))
unixname = Column(String(1000))
Expand All @@ -100,22 +102,22 @@ def __init__(
self.hostname = socket.getfqdn()
self.executor = executor
self.executor_class = executor.__class__.__name__
self.start_date = datetime.utcnow()
self.latest_heartbeat = datetime.utcnow()
self.start_date = timezone.utcnow()
self.latest_heartbeat = timezone.utcnow()
self.heartrate = heartrate
self.unixname = getpass.getuser()
super(BaseJob, self).__init__(*args, **kwargs)

def is_alive(self):
return (
(datetime.utcnow() - self.latest_heartbeat).seconds <
(timezone.utcnow() - self.latest_heartbeat).seconds <
(conf.getint('scheduler', 'JOB_HEARTBEAT_SEC') * 2.1)
)

@provide_session
def kill(self, session=None):
job = session.query(BaseJob).filter(BaseJob.id == self.id).first()
job.end_date = datetime.utcnow()
job.end_date = timezone.utcnow()
try:
self.on_kill()
except:
Expand Down Expand Up @@ -165,14 +167,14 @@ def heartbeat(self):
if job.latest_heartbeat:
sleep_for = max(
0,
self.heartrate - (datetime.utcnow() - job.latest_heartbeat).total_seconds())
self.heartrate - (timezone.utcnow() - job.latest_heartbeat).total_seconds())

sleep(sleep_for)

# Update last heartbeat time
with create_session() as session:
job = session.query(BaseJob).filter(BaseJob.id == self.id).first()
job.latest_heartbeat = datetime.utcnow()
job.latest_heartbeat = timezone.utcnow()
session.merge(job)
session.commit()

Expand All @@ -194,7 +196,7 @@ def run(self):
self._execute()

# Marking the success in the DB
self.end_date = datetime.utcnow()
self.end_date = timezone.utcnow()
self.state = State.SUCCESS
session.merge(self)
session.commit()
Expand Down Expand Up @@ -399,7 +401,7 @@ def start(self):
self._pickle_dags,
self._dag_id_white_list,
"DagFileProcessor{}".format(self._instance_id))
self._start_time = datetime.utcnow()
self._start_time = timezone.utcnow()

def terminate(self, sigkill=False):
"""
Expand Down Expand Up @@ -615,16 +617,16 @@ def manage_slas(self, dag, session=None):
TI.execution_date == sq.c.max_ti,
).all()

ts = datetime.utcnow()
ts = timezone.utcnow()
SlaMiss = models.SlaMiss
for ti in max_tis:
task = dag.get_task(ti.task_id)
dttm = ti.execution_date
if task.sla:
dttm = dag.following_schedule(dttm)
while dttm < datetime.utcnow():
while dttm < timezone.utcnow():
following_schedule = dag.following_schedule(dttm)
if following_schedule + task.sla < datetime.utcnow():
if following_schedule + task.sla < timezone.utcnow():
session.merge(models.SlaMiss(
task_id=ti.task_id,
dag_id=ti.dag_id,
Expand Down Expand Up @@ -772,9 +774,9 @@ def create_dag_run(self, dag, session=None):
for dr in active_runs:
if (
dr.start_date and dag.dagrun_timeout and
dr.start_date < datetime.utcnow() - dag.dagrun_timeout):
dr.start_date < timezone.utcnow() - dag.dagrun_timeout):
dr.state = State.FAILED
dr.end_date = datetime.utcnow()
dr.end_date = timezone.utcnow()
timedout_runs += 1
session.commit()
if len(active_runs) - timedout_runs >= dag.max_active_runs:
Expand All @@ -799,9 +801,9 @@ def create_dag_run(self, dag, session=None):
# don't do scheduler catchup for dag's that don't have dag.catchup = True
if not dag.catchup:
# The logic is that we move start_date up until
# one period before, so that datetime.utcnow() is AFTER
# one period before, so that timezone.utcnow() is AFTER
# the period end, and the job can be created...
now = datetime.utcnow()
now = timezone.utcnow()
next_start = dag.following_schedule(now)
last_start = dag.previous_schedule(now)
if next_start <= now:
Expand Down Expand Up @@ -847,7 +849,7 @@ def create_dag_run(self, dag, session=None):
)

# don't ever schedule in the future
if next_run_date > datetime.utcnow():
if next_run_date > timezone.utcnow():
return

# this structure is necessary to avoid a TypeError from concatenating
Expand All @@ -870,11 +872,11 @@ def create_dag_run(self, dag, session=None):
if next_run_date and min_task_end_date and next_run_date > min_task_end_date:
return

if next_run_date and period_end and period_end <= datetime.utcnow():
if next_run_date and period_end and period_end <= timezone.utcnow():
next_run = dag.create_dagrun(
run_id=DagRun.ID_PREFIX + next_run_date.isoformat(),
execution_date=next_run_date,
start_date=datetime.utcnow(),
start_date=timezone.utcnow(),
state=State.RUNNING,
external_trigger=False
)
Expand All @@ -894,7 +896,7 @@ def _process_task_instances(self, dag, queue, session=None):
for run in dag_runs:
self.log.info("Examining DAG run %s", run)
# don't consider runs that are executed in the future
if run.execution_date > datetime.utcnow():
if run.execution_date > timezone.utcnow():
self.log.error(
"Execution date is in future: %s",
run.execution_date
Expand Down Expand Up @@ -1231,7 +1233,7 @@ def _change_state_for_executable_task_instances(self, task_instances,
# set TIs to queued state
for task_instance in tis_to_set_to_queued:
task_instance.state = State.QUEUED
task_instance.queued_dttm = (datetime.utcnow()
task_instance.queued_dttm = (timezone.utcnow()
if not task_instance.queued_dttm
else task_instance.queued_dttm)
session.merge(task_instance)
Expand Down Expand Up @@ -1468,7 +1470,7 @@ def _log_file_processing_stats(self,
last_runtime = processor_manager.get_last_runtime(file_path)
processor_pid = processor_manager.get_pid(file_path)
processor_start_time = processor_manager.get_start_time(file_path)
runtime = ((datetime.utcnow() - processor_start_time).total_seconds()
runtime = ((timezone.utcnow() - processor_start_time).total_seconds()
if processor_start_time else None)
last_run = processor_manager.get_last_finish_time(file_path)

Expand Down Expand Up @@ -1585,34 +1587,34 @@ def _execute_helper(self, processor_manager):
self.log.info("Resetting orphaned tasks for active dag runs")
self.reset_state_for_orphaned_tasks()

execute_start_time = datetime.utcnow()
execute_start_time = timezone.utcnow()

# Last time stats were printed
last_stat_print_time = datetime(2000, 1, 1)
last_stat_print_time = datetime.datetime(2000, 1, 1, tzinfo=timezone.utc)
# Last time that self.heartbeat() was called.
last_self_heartbeat_time = datetime.utcnow()
last_self_heartbeat_time = timezone.utcnow()
# Last time that the DAG dir was traversed to look for files
last_dag_dir_refresh_time = datetime.utcnow()
last_dag_dir_refresh_time = timezone.utcnow()

# Use this value initially
known_file_paths = processor_manager.file_paths

# For the execute duration, parse and schedule DAGs
while (datetime.utcnow() - execute_start_time).total_seconds() < \
while (timezone.utcnow() - execute_start_time).total_seconds() < \
self.run_duration or self.run_duration < 0:
self.log.debug("Starting Loop...")
loop_start_time = time.time()

# Traverse the DAG directory for Python files containing DAGs
# periodically
elapsed_time_since_refresh = (datetime.utcnow() -
elapsed_time_since_refresh = (timezone.utcnow() -
last_dag_dir_refresh_time).total_seconds()

if elapsed_time_since_refresh > self.dag_dir_list_interval:
# Build up a list of Python files that could contain DAGs
self.log.info("Searching for files in %s", self.subdir)
known_file_paths = list_py_file_paths(self.subdir)
last_dag_dir_refresh_time = datetime.utcnow()
last_dag_dir_refresh_time = timezone.utcnow()
self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)
processor_manager.set_file_paths(known_file_paths)

Expand Down Expand Up @@ -1662,20 +1664,20 @@ def _execute_helper(self, processor_manager):
self._process_executor_events(simple_dag_bag)

# Heartbeat the scheduler periodically
time_since_last_heartbeat = (datetime.utcnow() -
time_since_last_heartbeat = (timezone.utcnow() -
last_self_heartbeat_time).total_seconds()
if time_since_last_heartbeat > self.heartrate:
self.log.info("Heartbeating the scheduler")
self.heartbeat()
last_self_heartbeat_time = datetime.utcnow()
last_self_heartbeat_time = timezone.utcnow()

# Occasionally print out stats about how fast the files are getting processed
if ((datetime.utcnow() - last_stat_print_time).total_seconds() >
if ((timezone.utcnow() - last_stat_print_time).total_seconds() >
self.print_stats_interval):
if len(known_file_paths) > 0:
self._log_file_processing_stats(known_file_paths,
processor_manager)
last_stat_print_time = datetime.utcnow()
last_stat_print_time = timezone.utcnow()

loop_end_time = time.time()
self.log.debug("Ran scheduling loop in %.2f seconds", loop_end_time - loop_start_time)
Expand Down Expand Up @@ -2049,7 +2051,7 @@ def _get_dag_run(self, run_date, session=None):
run = run or self.dag.create_dagrun(
run_id=run_id,
execution_date=run_date,
start_date=datetime.utcnow(),
start_date=timezone.utcnow(),
state=State.RUNNING,
external_trigger=False,
session=session
Expand Down

0 comments on commit d990531

Please sign in to comment.