Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0ed36a1
[AIRFLOW-52] Fix bottlenecks when working with many tasks
jlowin May 7, 2016
0e5fb90
Fix corner case with joining processes/queues (#1473)
jlowin May 6, 2016
ff3a855
AIRFLOW-52 Warn about overwriting tasks in a DAG
jlowin May 9, 2016
edc718b
Handle queued tasks from multiple jobs/executors
jlowin Apr 13, 2016
77c7bc4
Add logic to lock DB and avoid race condition
jlowin May 6, 2016
4a5f4a0
AIRFLOW-92 Avoid unneeded upstream_failed session closes apache/incub…
bolkedebruin May 10, 2016
563be13
Fix : Don't treat premature tasks as could_not_run tasks
r39132 May 12, 2016
1d0d868
[AIRFLOW-52] 1.7.1 version bump and changelog
aoen May 19, 2016
16cf31d
1.7.1.1
mistercrunch May 21, 2016
408b7e6
Bump version to unblock pypi release
aoen May 21, 2016
f352925
Upgrade funcsigs to 1.0.2
Jun 16, 2017
86d251f
Loosen dependency on gunicorn
Jun 16, 2017
313ecd8
Support configuration key to toggle using IPs or hostnames
Jun 23, 2017
5586353
Merge pull request #7 from lyft/patch_socket
astahlman Jun 26, 2017
c6de0bc
Remove symlink to docs directory
Jul 19, 2017
ba2d0d0
Merge pull request #8 from lyft/disable-doc-build
astahlman Jul 20, 2017
f6f1eef
[DATA-4816] Take Alembic migrations from upstream
Aug 23, 2017
30c4ab9
Merge pull request #10 from lyft/data-4816-only-schema-upgrades
astahlman Aug 23, 2017
6b99ccd
Upgrade flask to 0.12.1
Sep 8, 2017
6b3a7fc
Merge pull request #11 from lyft/upgrade-flask
astahlman Sep 9, 2017
5d6cc80
Upgrade flask-wtf to 0.14
Sep 12, 2017
e338f65
Merge pull request #12 from lyft/upgrade-flask-wtf
astahlman Sep 12, 2017
05dc69d
[DATA-4839] Do not delete queued TIs from invisible DAGs
Sep 13, 2017
7d1108f
Merge pull request #13 from lyft/data-4839-ignore-phantom-tis
astahlman Sep 13, 2017
2158773
[DATA-4839] Ignore presto_query_logs DAG during 1.8 migration
Sep 19, 2017
50e17e3
Merge pull request #14 from lyft/whitelist-presto-query-log-dag
astahlman Sep 19, 2017
1d0aeb6
set logging level to debug and add more logging info for is_queueable
Sep 20, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 212 additions & 26 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
@@ -1,28 +1,214 @@
AIRFLOW 1.7.0, 2016-03-28
AIRFLOW 1.7.1, 2016-05-19
-------------------------

- Add MySQL->GCS, GCS->BQ operators
- Support pause/unpause DAGs in CLI
- SLA_miss_callbacks
- Pause newly minted DAGs by default if configured to do so
- Fixes related to solving the "infinite retry problem"
- Handle SKIPPED tasks as SUCCESS when depends_on_past=True
- Pass JSON task params via the airflow testCLI command
- Extend FERNET key encryption to VARIABLES and to the connections's OTHER field
- Add GCSHook for Google Cloud Storage
- Parameterize DagBag import timeouts
- Add startup scripts for upstart based systems
- Add template support in QBol operator
- Add SudDag documentation and examples
- Improve kerberos integration for hdfs hook
- Add SSL support for SMTP
- Add BigQuery and Google Cloud Storage operators
- Set effective user of (web)hdfs hook
- Add support for three-legged OAuth for Google connections
- Add Docker operator
- Add "search_scope" as a configuration variable for LDAP
- Add super user and profiler to ldap
- Disable concurrent executors for sqllite backend
- Implement GitHub Enterprise Authentication
- Fix invalid syntax in SSHHook
- Upgrade flask admin
- Fix : Don't treat premature tasks as could_not_run tasks
- AIRFLOW-92 Avoid unneeded upstream_failed session closes apache/incubator-airflow#1485
- Add logic to lock DB and avoid race condition
- Handle queued tasks from multiple jobs/executors
- AIRFLOW-52 Warn about overwriting tasks in a DAG
- Fix corner case with joining processes/queues (#1473)
- [AIRFLOW-52] Fix bottlenecks when working with many tasks
- Add columns to toggle extra detail in the connection list view.
- Log the number of errors when importing DAGs
- Log dagbag metrics dupplicate messages in queue into Statsd (#1406)
- Clean up issue template (#1419)
- correct missed arg.foreground to arg.daemon in cli
- Reinstate imports for github enterprise auth
- Use os.execvp instead of subprocess.Popen for the webserver
- Revert from using "--foreground" to "--daemon"
- Implement a Cloudant hook
- Add missing args to `airflow clear`
- Fixed a bug in the scheduler: num_runs used where runs intended
- Add multiprocessing support to the scheduler
- Partial fix to make sure next_run_date cannot be None
- Support list/get/set variables in the CLI
- Properly handle BigQuery booleans in BigQuery hook.
- Added the ability to view XCom variables in webserver
- Change DAG.tasks from a list to a dict
- Add support for zipped dags
- Stop creating hook on instantiating of S3 operator
- User subquery in views to find running DAGs
- Prevent DAGs from being reloaded on every scheduler iteration
- Add a missing word to docs
- Document the parameters of `DbApiHook`
- added oracle operator with existing oracle hook
- Add PyOpenSSL to Google cloud gcp_api.
- Remove executor error unit test
- Add DAG inference, deferral, and context manager
- Don't return error when writing files to Google cloud storage.
- Fix GCS logging for gcp_api.
- Ensure attr is in scope for error message
- Fixing misnamed PULL_REQUEST_TEMPLATE
- Extract non_pooled_task_slot_count into a configuration param
- Update plugins.rst for clarity on the example (#1309)
- Fix s3 logging issue
- Add twitter feed example dag
- Github ISSUE_TEMPLATE & PR_TEMPLATE cleanup
- Reduce logger verbosity
- Adding a PR Template
- Add Lucid to list of users
- Fix usage of asciiart
- Use session instead of outdated main_session for are_dependencies_met
- Fix celery flower port allocation
- Fix for missing edit actions due to flask-admin upgrade
- Fix typo in comment in prioritize_queued method
- Add HipchatOperator
- Include all example dags in backfill unit test
- Make sure skipped jobs are actually skipped
- Fixing a broken example dag, example_skip_dag.py
- Add consistent and thorough signal handling and logging
- Allow Operators to specify SKIPPED status internally
- Update docstring for executor trap unit test
- Doc: explain the usage of Jinja templating for templated params
- Don't schedule runs before the DAG's start_date
- Fix infinite retries with pools, with test
- Fix handling of deadlocked jobs
- Show only Airflow's deprecation warnings
- Set DAG_FOLDER for unit tests
- Missing comma in setup.py
- Deprecate *args and **kwargs in BaseOperator
- Raise deep scheduler exceptions to force a process restart.
- Change inconsistent example DAG owners
- Fix module path of send_email_smtp in configuration
- added Gentner Lab to list of users
- Increase timeout time for unit test
- Fix reading strings from conf
- CHORE - Remove Trailing Spaces
- Fix SSHExecuteOperator crash when using a custom ssh port
- Add note about airflow components to template
- Rewrite BackfillJob logic for clarity
- Add unit tests
- Fix miscellaneous bugs and clean up code
- Fix logic for determining DagRun states
- Make SchedulerJob not run EVERY queued task
- Improve BackfillJob handling of queued/deadlocked tasks
- Introduce ignore_depends_on_past parameters
- Use Popen with CeleryExecutor
- Rename user table to users to avoid conflict with postgres
- Beware of negative pool slots.
- Add support for calling_format from boto to S3_Hook
- Add pypi meta data and sync version number
- Set dags_are_paused_at_creation's default value to True
- Resurface S3Log class eaten by rebase/push -f
- Add missing session.commit() at end of initdb
- Validate that subdag tasks have pool slots available, and test
- Use urlparse for remote GCS logs, and add unit tests
- Make webserver worker timeout configurable
- Fixed scheduling for @once interval
- Use psycopg2's API for serializing postgres cell values
- Make the provide_session decorator more robust
- update link to Lyft's website
- use num_shards instead of partitions to be consistent with batch ingestion
- Add documentation links to README
- Update docs with separate configuration section
- Fix airflow.utils deprecation warning code being Python 3 incompatible
- Extract dbapi cell serialization into its own method
- Set Postgres autocommit as supported only if server version is < 7.4
- Use refactored utils module in unit test imports
- Add changelog for 1.7.0
- Use LocalExecutor on Travis if possible
- remove unused logging,errno, MiniHiveCluster imports
- remove extra import of logging lib
- Fix required gcloud version
- Refactoring utils into smaller submodules
- Properly measure number of task retry attempts
- Add function to get configuration as dict, plus unit tests
- Merge branch 'master' into hivemeta_sasl
- Add wiki link to README.md
- [hotfix] make email.Utils > email.utils for py3
- Add the missing "Date" header to the warning e-mails
- Add the missing "Date" header to the warning e-mails
- Check name of SubDag class instead of class itself
- [hotfix] removing repo_token from .coveralls.yml
- Set the service_name in coverals.yml
- Fixes #1223
- Update Airflow docs for remote logging
- Add unit tests for trapping Executor errors
- Make sure Executors properly trap errors
- Fix HttpOpSensorTest to use fake resquest session
- Linting
- Add an example on pool usage in the documentation
- Add two methods to bigquery hook's base cursor: run_table_upsert, which adds a table or updates an existing table; and run_grant_dataset_view_access, which grants view access to a given dataset for a given table.
- Tasks references upstream and downstream tasks using strings instead of references
- Fix typos in models.py
- Fix broken links in documentation
- [hotfix] fixing the Scheduler CLI to make dag_id optional
- Update link to Common Pitfalls wiki page in README
- Allow disabling periodic committing when inserting rows with DbApiHook
- added Glassdoor to "who uses airflow"
- Fix typo preventing from launching webserver
- Documentation badge
- Fixing ISSUE_TEMPLATE name to include .md suffix
- Adding an ISSUE_TEMPLATE to ensure that issues are adequately defined
- Linting & debugging
- Refactoring the CLI to be data-driven
- Updating the Bug Reporting protocol in the Contributing.md file
- Fixing the docs
- clean up references to old session
- remove session reference
- resolve conflict
- clear xcom data when task instance starts
- replace main_session with @provide_session
- Add extras to installation.rst
- Changes to Contributing to reflect more closely the current state of development.
- Modifying README to link to the wiki committer list
- docs: fixes a spelling mistake in default config
- Set killMode to 'control-group' for webservice.service
- Set KillMode to 'control-group' for worker.service
- Linting
- Fix WebHdfsSensor
- Adding more licenses to pass checks
- fixing landscape's config
- [hotfix] typo that made it in master
- [hotfix] fixing landscape requirement detection
- Make testing on hive conditional
- Merge remote-tracking branch 'upstream/master' into minicluster
- Update README.md
- Throwing in a few license to pass the build
- Adding a reqs.txt for landscape.io
- Pointing to a reqs file
- Some linting
- Adding a .landscape.yml file
- badge for pypi version
- Add license and ignore for sql and csv
- Use correct connection id
- Use correct table name
- Provide data for ci tests
- new badge for showing staleness of reqs
- removing requirements.txt as it is uni-dimensional
- Make it work on py3
- Remove decode for logging
- Also keep py2 compatible
- More py3 fixes
- Convert to bytes for py3 compat
- Make sure to be py3 compatible
- Use unicodecsv to make it py3 compatible
- Replace tab with spaces Remove unused import
- Merge remote-tracking branch 'upstream/master'
- Support decimal types in MySQL to GCS
- Make sure to write binary as string can be unicode
- Ignore metastore
- More impyla fixes
- Test HivemetaStore if python 2
- Allow users to set hdfs_namenode_principal in HDFSHook config
- Add tests for Hiveserver2 and fix some issues from impyla
- Merge branch 'impyla' into minicluster
- This patch allows for testing of hive operators and hooks. Sasl is used (NoSasl in connection string is not possible). Tests have been adjusted.
- Treat SKIPPED and SUCCESS the same way when evaluating depends_on_past=True
- fix bigquery hook
- version cap for gcp_api
- Fix typo when returning VerticaHook
- Adding fernet key to use it as part of stdout commands
- Adding support for ssl parameters. (picking up from jthomas123)
- more detail in error message.
- make sure paths don't conflict bc of trailing /
- change gcs_hook to self.hook
- refactor remote log read/write and add GCS support
- Only use multipart upload in S3Hook if file is large enough
- Merge branch 'airbnb/master'
- Add GSSAPI SASL to HiveMetaStoreHook.
- Add warning for deprecated setting
- Use kerberos_service_name = 'hive' as standard instead of 'impala'.
- Use GSSAPI instead of KERBEROS and provide backwards compatibility
- ISSUE-1123 Use impyla instead of pyhs2
- set celery_executor to use queue name as exchange
2 changes: 1 addition & 1 deletion airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
`airflow.www.login`
"""
from builtins import object
__version__ = "1.7.0"
__version__ = "1.7.1.2"

import logging
import os
Expand Down
21 changes: 21 additions & 0 deletions airflow/bin/airflow
Original file line number Diff line number Diff line change
@@ -1,9 +1,30 @@
#!/usr/bin/env python
import logging
import os
import socket
import requests
from airflow import configuration
from airflow.bin.cli import CLIFactory

def get_private_ip(name=''):
r = requests.get("http://169.254.169.254/latest/meta-data/local-ipv4")
return str(r.text)


def getfqdn(name=''):
return get_private_ip()


should_patch_socket = False
try:
should_patch_socket = configuration.getboolean('lyft', 'prefer_ip_over_hostname')
except configuration.AirflowConfigException:
pass # Default to False if not configured

if should_patch_socket:
logging.info("Using IP addresses instead of hostnames.")
socket.gethostname = socket.getfqdn = getfqdn

if __name__ == '__main__':

if configuration.get("core", "security") == 'kerberos':
Expand Down
33 changes: 24 additions & 9 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ def start(self): # pragma: no cover
"""
pass

def queue_command(self, key, command, priority=1, queue=None):
def queue_command(self, task_instance, command, priority=1, queue=None):
key = task_instance.key
if key not in self.queued_tasks and key not in self.running:
self.logger.info("Adding to queue: {}".format(command))
self.queued_tasks[key] = (command, priority, queue)
self.queued_tasks[key] = (command, priority, queue, task_instance)

def queue_task_instance(
self,
Expand All @@ -54,7 +55,7 @@ def queue_task_instance(
pool=pool,
pickle_id=pickle_id)
self.queue_command(
task_instance.key,
task_instance,
command,
priority=task_instance.task.priority_weight_total,
queue=task_instance.task.queue)
Expand All @@ -67,9 +68,6 @@ def sync(self):
pass

def heartbeat(self):
# Calling child class sync method
self.logger.debug("Calling the {} sync method".format(self.__class__))
self.sync()

# Triggering new jobs
if not self.parallelism:
Expand All @@ -86,10 +84,27 @@ def heartbeat(self):
key=lambda x: x[1][1],
reverse=True)
for i in range(min((open_slots, len(self.queued_tasks)))):
key, (command, priority, queue) = sorted_queue.pop(0)
self.running[key] = command
key, (command, _, queue, ti) = sorted_queue.pop(0)
# TODO(jlowin) without a way to know what Job ran which tasks,
# there is a danger that another Job started running a task
# that was also queued to this executor. This is the last chance
# to check if that hapened. The most probable way is that a
# Scheduler tried to run a task that was originally queued by a
# Backfill. This fix reduces the probability of a collision but
# does NOT eliminate it.
self.queued_tasks.pop(key)
self.execute_async(key, command=command, queue=queue)
ti.refresh_from_db()
if ti.state != State.RUNNING:
self.running[key] = command
self.execute_async(key, command=command, queue=queue)
else:
self.logger.debug(
'Task is already running, not sending to '
'executor: {}'.format(key))

# Calling child class sync method
self.logger.debug("Calling the {} sync method".format(self.__class__))
self.sync()

def change_state(self, key, state):
self.running.pop(key)
Expand Down
1 change: 1 addition & 0 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,4 @@ def end(self, synchronous=False):
async.state not in celery_states.READY_STATES
for async in self.tasks.values()]):
time.sleep(5)
self.sync()
2 changes: 1 addition & 1 deletion airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,4 @@ def end(self):
[self.queue.put((None, None)) for w in self.workers]
# Wait for commands to finish
self.queue.join()

self.sync()
Loading