Skip to content

Commit

Permalink
Linting
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Mar 21, 2016
1 parent d40a7d3 commit fbac2bf
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 75 deletions.
1 change: 1 addition & 0 deletions .landscape.yml
Expand Up @@ -8,5 +8,6 @@ pylint:
disable:
- cyclic-import
- invalid-name
- super-on-old-class
options:
docstring-min-length: 10
58 changes: 29 additions & 29 deletions airflow/bin/cli.py
Expand Up @@ -14,20 +14,20 @@

import airflow
from airflow import jobs, settings, utils
from airflow import configuration
from airflow import configuration as conf
from airflow.executors import DEFAULT_EXECUTOR
from airflow.models import DagModel, DagBag, TaskInstance, DagPickle, DagRun
from airflow.utils import AirflowException, State

DAGS_FOLDER = os.path.expanduser(configuration.get('core', 'DAGS_FOLDER'))
DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))

# Common help text across subcommands
mark_success_help = "Mark jobs as succeeded without running them"
subdir_help = "File location or directory from which to look for the dag"


def process_subdir(subdir):
dags_folder = configuration.get("core", "DAGS_FOLDER")
dags_folder = conf.get("core", "DAGS_FOLDER")
dags_folder = os.path.expanduser(dags_folder)
if subdir:
if "DAGS_FOLDER" in subdir:
Expand Down Expand Up @@ -78,7 +78,7 @@ def backfill(args):
mark_success=args.mark_success,
include_adhoc=args.include_adhoc,
local=args.local,
donot_pickle=(args.donot_pickle or configuration.getboolean('core', 'donot_pickle')),
donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
ignore_dependencies=args.ignore_dependencies,
pool=args.pool)

Expand Down Expand Up @@ -139,7 +139,7 @@ def run(args):
utils.pessimistic_connection_handling()

# Setting up logging
log_base = os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER'))
log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
directory = log_base + "/{args.dag_id}/{args.task_id}".format(args=args)
if not os.path.exists(directory):
os.makedirs(directory)
Expand Down Expand Up @@ -229,16 +229,16 @@ def run(args):
executor.end()

# store logs remotely
remote_base = configuration.get('core', 'REMOTE_BASE_LOG_FOLDER')
remote_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')

# deprecated as of March 2016
if not remote_base and configuration.get('core', 'S3_LOG_FOLDER'):
if not remote_base and conf.get('core', 'S3_LOG_FOLDER'):
warnings.warn(
'The S3_LOG_FOLDER configuration key has been replaced by '
'REMOTE_BASE_LOG_FOLDER. Your configuration still works but please '
'The S3_LOG_FOLDER conf key has been replaced by '
'REMOTE_BASE_LOG_FOLDER. Your conf still works but please '
'update airflow.cfg to ensure future compatibility.',
DeprecationWarning)
remote_base = configuration.get('core', 'S3_LOG_FOLDER')
remote_base = conf.get('core', 'S3_LOG_FOLDER')

if os.path.exists(filename):
# read log and remove old logs to get just the latest additions
Expand Down Expand Up @@ -367,8 +367,8 @@ def webserver(args):
print(settings.HEADER)

from airflow.www.app import cached_app
app = cached_app(configuration)
workers = args.workers or configuration.get('webserver', 'workers')
app = cached_app(conf)
workers = args.workers or conf.get('webserver', 'workers')
if args.debug:
print(
"Starting the web server on port {0} and host {1}.".format(
Expand Down Expand Up @@ -402,15 +402,15 @@ def serve_logs(args):
flask_app = flask.Flask(__name__)

@flask_app.route('/log/<path:filename>')
def serve_logs(filename):
log = os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER'))
def serve_logs(filename): # noqa
log = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
return flask.send_from_directory(
log,
filename,
mimetype="application/json",
as_attachment=False)
WORKER_LOG_SERVER_PORT = \
int(configuration.get('celery', 'WORKER_LOG_SERVER_PORT'))
int(conf.get('celery', 'WORKER_LOG_SERVER_PORT'))
flask_app.run(
host='0.0.0.0', port=WORKER_LOG_SERVER_PORT)

Expand All @@ -436,7 +436,7 @@ def worker(args):
sp.kill()


def initdb(args):
def initdb(args): # noqa
print("DB: " + repr(settings.engine.url))
utils.initdb()
print("Done.")
Expand All @@ -454,18 +454,18 @@ def resetdb(args):
print("Bail.")


def upgradedb(args):
def upgradedb(args): # noqa
print("DB: " + repr(settings.engine.url))
utils.upgradedb()


def version(args):
def version(args): # noqa
print(settings.HEADER + " v" + airflow.__version__)


def flower(args):
broka = configuration.get('celery', 'BROKER_URL')
args.port = args.port or configuration.get('celery', 'FLOWER_PORT')
broka = conf.get('celery', 'BROKER_URL')
args.port = args.port or conf.get('celery', 'FLOWER_PORT')
port = '--port=' + args.port
api = ''
if args.broker_api:
Expand All @@ -474,7 +474,7 @@ def flower(args):
sp.wait()


def kerberos(args):
def kerberos(args): # noqa
print(settings.HEADER)

import airflow.security.kerberos
Expand Down Expand Up @@ -659,22 +659,22 @@ def get_parser():
parser_webserver = subparsers.add_parser('webserver', help=ht)
parser_webserver.add_argument(
"-p", "--port",
default=configuration.get('webserver', 'WEB_SERVER_PORT'),
default=conf.get('webserver', 'WEB_SERVER_PORT'),
type=int,
help="Set the port on which to run the web server")
parser_webserver.add_argument(
"-w", "--workers",
default=configuration.get('webserver', 'WORKERS'),
default=conf.get('webserver', 'WORKERS'),
type=int,
help="Number of workers to run the webserver on")
parser_webserver.add_argument(
"-k", "--workerclass",
default=configuration.get('webserver', 'WORKER_CLASS'),
default=conf.get('webserver', 'WORKER_CLASS'),
choices=['sync', 'eventlet', 'gevent', 'tornado'],
help="The worker class to use for gunicorn")
parser_webserver.add_argument(
"-hn", "--hostname",
default=configuration.get('webserver', 'WEB_SERVER_HOST'),
default=conf.get('webserver', 'WEB_SERVER_HOST'),
help="Set the hostname on which to run the web server")
ht = "Use the server that ships with Flask in debug mode"
parser_webserver.add_argument(
Expand Down Expand Up @@ -743,12 +743,12 @@ def get_parser():
parser_worker.add_argument(
"-q", "--queues",
help="Comma delimited list of queues to serve",
default=configuration.get('celery', 'DEFAULT_QUEUE'))
default=conf.get('celery', 'DEFAULT_QUEUE'))
parser_worker.add_argument(
"-c", "--concurrency",
type=int,
help="The number of worker processes",
default=configuration.get('celery', 'celeryd_concurrency'))
default=conf.get('celery', 'celeryd_concurrency'))
parser_worker.set_defaults(func=worker)

ht = "Serve logs generate by worker"
Expand All @@ -770,10 +770,10 @@ def get_parser():
parser_kerberos = subparsers.add_parser('kerberos', help=ht)
parser_kerberos.add_argument(
"-kt", "--keytab", help="keytab",
nargs='?', default=configuration.get('kerberos', 'keytab'))
nargs='?', default=conf.get('kerberos', 'keytab'))
parser_kerberos.add_argument(
"principal", help="kerberos principal",
nargs='?', default=configuration.get('kerberos', 'principal'))
nargs='?', default=conf.get('kerberos', 'principal'))
parser_kerberos.set_defaults(func=kerberos)

ht = "Render a task instance's template(s)"
Expand Down
28 changes: 15 additions & 13 deletions airflow/configuration.py
Expand Up @@ -19,7 +19,7 @@ class AirflowConfigException(Exception):

try:
from cryptography.fernet import Fernet
except:
except ImportError:
pass


Expand Down Expand Up @@ -227,7 +227,8 @@ def run_command(command):
# Expose the configuration file in the web server
expose_config = true
# Set to true to turn on authentication : http://pythonhosted.org/airflow/installation.html#web-authentication
# Set to true to turn on authentication:
# http://pythonhosted.org/airflow/installation.html#web-authentication
authenticate = False
# Filter the list of dags by owner name (requires authentication to be enabled)
Expand Down Expand Up @@ -322,8 +323,10 @@ def run_command(command):
checkpoint = False
# Failover timeout in milliseconds.
# When checkpointing is enabled and this option is set, Mesos waits until the configured timeout for
# the MesosExecutor framework to re-register after a failover. Mesos shuts down running tasks if the
# When checkpointing is enabled and this option is set, Mesos waits
# until the configured timeout for
# the MesosExecutor framework to re-register after a failover. Mesos
# shuts down running tasks if the
# MesosExecutor framework fails to re-register within this timeframe.
# failover_timeout = 604800
Expand Down Expand Up @@ -399,10 +402,11 @@ def __init__(self, defaults, *args, **kwargs):
self.is_validated = False

def _validate(self):
if self.get("core", "executor") != 'SequentialExecutor' \
and "sqlite" in self.get('core', 'sql_alchemy_conn'):
if (
self.get("core", "executor") != 'SequentialExecutor' and
"sqlite" in self.get('core', 'sql_alchemy_conn')):
raise AirflowConfigException("error: cannot use sqlite with the {}".
format(self.get('core', 'executor')))
format(self.get('core', 'executor')))

self.is_validated = True

Expand Down Expand Up @@ -508,11 +512,9 @@ def parameterized_config(template):
f.write(parameterized_config(TEST_CONFIG))

if not os.path.isfile(AIRFLOW_CONFIG):
"""
These configuration options are used to generate a default configuration
when it is missing. The right way to change your configuration is to alter
your configuration file, not this code.
"""
# These configuration options are used to generate a default configuration
# when it is missing. The right way to change your configuration is to alter
# your configuration file, not this code.
logging.info("Creating new airflow config file in: " + AIRFLOW_CONFIG)
with open(AIRFLOW_CONFIG, 'w') as f:
f.write(parameterized_config(DEFAULT_CONFIG))
Expand Down Expand Up @@ -549,7 +551,7 @@ def has_option(section, key):
def remove_option(section, option):
return conf.remove_option(section, option)

def set(section, option, value):
def set(section, option, value): # noqa
return conf.set(section, option, value)

########################
Expand Down
11 changes: 4 additions & 7 deletions airflow/contrib/auth/backends/github_enterprise_auth.py
@@ -1,11 +1,11 @@
# Copyright 2015 Matthew Pelland (matt@pelland.io)
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -14,10 +14,7 @@
import logging

import flask_login
from flask_login import (
login_user, current_user,
logout_user, login_required
)
from flask_login import login_user

from flask import url_for, redirect, request

Expand Down

0 comments on commit fbac2bf

Please sign in to comment.