From fbac2bff2beb804e82cc9aea8fc3f42db5a3595e Mon Sep 17 00:00:00 2001 From: Maxime Beauchemin Date: Sun, 20 Mar 2016 15:03:39 -0700 Subject: [PATCH] Linting --- .landscape.yml | 1 + airflow/bin/cli.py | 58 ++++++------- airflow/configuration.py | 28 +++--- .../auth/backends/github_enterprise_auth.py | 11 +-- airflow/contrib/hooks/bigquery_hook.py | 86 ++++++++++++++----- airflow/contrib/hooks/ftp_hook.py | 3 +- .../operators/bigquery_check_operator.py | 9 +- 7 files changed, 121 insertions(+), 75 deletions(-) diff --git a/.landscape.yml b/.landscape.yml index 151de6f26060b..46604394f8e65 100644 --- a/.landscape.yml +++ b/.landscape.yml @@ -8,5 +8,6 @@ pylint: disable: - cyclic-import - invalid-name + - super-on-old-class options: docstring-min-length: 10 diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 6e6c0cb5dce09..a37dfe2850d76 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -14,12 +14,12 @@ import airflow from airflow import jobs, settings, utils -from airflow import configuration +from airflow import configuration as conf from airflow.executors import DEFAULT_EXECUTOR from airflow.models import DagModel, DagBag, TaskInstance, DagPickle, DagRun from airflow.utils import AirflowException, State -DAGS_FOLDER = os.path.expanduser(configuration.get('core', 'DAGS_FOLDER')) +DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER')) # Common help text across subcommands mark_success_help = "Mark jobs as succeeded without running them" @@ -27,7 +27,7 @@ def process_subdir(subdir): - dags_folder = configuration.get("core", "DAGS_FOLDER") + dags_folder = conf.get("core", "DAGS_FOLDER") dags_folder = os.path.expanduser(dags_folder) if subdir: if "DAGS_FOLDER" in subdir: @@ -78,7 +78,7 @@ def backfill(args): mark_success=args.mark_success, include_adhoc=args.include_adhoc, local=args.local, - donot_pickle=(args.donot_pickle or configuration.getboolean('core', 'donot_pickle')), + donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')), ignore_dependencies=args.ignore_dependencies, pool=args.pool) @@ -139,7 +139,7 @@ def run(args): utils.pessimistic_connection_handling() # Setting up logging - log_base = os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER')) + log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER')) directory = log_base + "/{args.dag_id}/{args.task_id}".format(args=args) if not os.path.exists(directory): os.makedirs(directory) @@ -229,16 +229,16 @@ def run(args): executor.end() # store logs remotely - remote_base = configuration.get('core', 'REMOTE_BASE_LOG_FOLDER') + remote_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') # deprecated as of March 2016 - if not remote_base and configuration.get('core', 'S3_LOG_FOLDER'): + if not remote_base and conf.get('core', 'S3_LOG_FOLDER'): warnings.warn( - 'The S3_LOG_FOLDER configuration key has been replaced by ' - 'REMOTE_BASE_LOG_FOLDER. Your configuration still works but please ' + 'The S3_LOG_FOLDER conf key has been replaced by ' + 'REMOTE_BASE_LOG_FOLDER. Your conf still works but please ' 'update airflow.cfg to ensure future compatibility.', DeprecationWarning) - remote_base = configuration.get('core', 'S3_LOG_FOLDER') + remote_base = conf.get('core', 'S3_LOG_FOLDER') if os.path.exists(filename): # read log and remove old logs to get just the latest additions @@ -367,8 +367,8 @@ def webserver(args): print(settings.HEADER) from airflow.www.app import cached_app - app = cached_app(configuration) - workers = args.workers or configuration.get('webserver', 'workers') + app = cached_app(conf) + workers = args.workers or conf.get('webserver', 'workers') if args.debug: print( "Starting the web server on port {0} and host {1}.".format( @@ -402,15 +402,15 @@ def serve_logs(args): flask_app = flask.Flask(__name__) @flask_app.route('/log/') - def serve_logs(filename): - log = os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER')) + def serve_logs(filename): # noqa + log = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER')) return flask.send_from_directory( log, filename, mimetype="application/json", as_attachment=False) WORKER_LOG_SERVER_PORT = \ - int(configuration.get('celery', 'WORKER_LOG_SERVER_PORT')) + int(conf.get('celery', 'WORKER_LOG_SERVER_PORT')) flask_app.run( host='0.0.0.0', port=WORKER_LOG_SERVER_PORT) @@ -436,7 +436,7 @@ def worker(args): sp.kill() -def initdb(args): +def initdb(args): # noqa print("DB: " + repr(settings.engine.url)) utils.initdb() print("Done.") @@ -454,18 +454,18 @@ def resetdb(args): print("Bail.") -def upgradedb(args): +def upgradedb(args): # noqa print("DB: " + repr(settings.engine.url)) utils.upgradedb() -def version(args): +def version(args): # noqa print(settings.HEADER + " v" + airflow.__version__) def flower(args): - broka = configuration.get('celery', 'BROKER_URL') - args.port = args.port or configuration.get('celery', 'FLOWER_PORT') + broka = conf.get('celery', 'BROKER_URL') + args.port = args.port or conf.get('celery', 'FLOWER_PORT') port = '--port=' + args.port api = '' if args.broker_api: @@ -474,7 +474,7 @@ def flower(args): sp.wait() -def kerberos(args): +def kerberos(args): # noqa print(settings.HEADER) import airflow.security.kerberos @@ -659,22 +659,22 @@ def get_parser(): parser_webserver = subparsers.add_parser('webserver', help=ht) parser_webserver.add_argument( "-p", "--port", - default=configuration.get('webserver', 'WEB_SERVER_PORT'), + default=conf.get('webserver', 'WEB_SERVER_PORT'), type=int, help="Set the port on which to run the web server") parser_webserver.add_argument( "-w", "--workers", - default=configuration.get('webserver', 'WORKERS'), + default=conf.get('webserver', 'WORKERS'), type=int, help="Number of workers to run the webserver on") parser_webserver.add_argument( "-k", "--workerclass", - default=configuration.get('webserver', 'WORKER_CLASS'), + default=conf.get('webserver', 'WORKER_CLASS'), choices=['sync', 'eventlet', 'gevent', 'tornado'], help="The worker class to use for gunicorn") parser_webserver.add_argument( "-hn", "--hostname", - default=configuration.get('webserver', 'WEB_SERVER_HOST'), + default=conf.get('webserver', 'WEB_SERVER_HOST'), help="Set the hostname on which to run the web server") ht = "Use the server that ships with Flask in debug mode" parser_webserver.add_argument( @@ -743,12 +743,12 @@ def get_parser(): parser_worker.add_argument( "-q", "--queues", help="Comma delimited list of queues to serve", - default=configuration.get('celery', 'DEFAULT_QUEUE')) + default=conf.get('celery', 'DEFAULT_QUEUE')) parser_worker.add_argument( "-c", "--concurrency", type=int, help="The number of worker processes", - default=configuration.get('celery', 'celeryd_concurrency')) + default=conf.get('celery', 'celeryd_concurrency')) parser_worker.set_defaults(func=worker) ht = "Serve logs generate by worker" @@ -770,10 +770,10 @@ def get_parser(): parser_kerberos = subparsers.add_parser('kerberos', help=ht) parser_kerberos.add_argument( "-kt", "--keytab", help="keytab", - nargs='?', default=configuration.get('kerberos', 'keytab')) + nargs='?', default=conf.get('kerberos', 'keytab')) parser_kerberos.add_argument( "principal", help="kerberos principal", - nargs='?', default=configuration.get('kerberos', 'principal')) + nargs='?', default=conf.get('kerberos', 'principal')) parser_kerberos.set_defaults(func=kerberos) ht = "Render a task instance's template(s)" diff --git a/airflow/configuration.py b/airflow/configuration.py index f08f9602ca80e..98661723b7695 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -19,7 +19,7 @@ class AirflowConfigException(Exception): try: from cryptography.fernet import Fernet -except: +except ImportError: pass @@ -227,7 +227,8 @@ def run_command(command): # Expose the configuration file in the web server expose_config = true -# Set to true to turn on authentication : http://pythonhosted.org/airflow/installation.html#web-authentication +# Set to true to turn on authentication: +# http://pythonhosted.org/airflow/installation.html#web-authentication authenticate = False # Filter the list of dags by owner name (requires authentication to be enabled) @@ -322,8 +323,10 @@ def run_command(command): checkpoint = False # Failover timeout in milliseconds. -# When checkpointing is enabled and this option is set, Mesos waits until the configured timeout for -# the MesosExecutor framework to re-register after a failover. Mesos shuts down running tasks if the +# When checkpointing is enabled and this option is set, Mesos waits +# until the configured timeout for +# the MesosExecutor framework to re-register after a failover. Mesos +# shuts down running tasks if the # MesosExecutor framework fails to re-register within this timeframe. # failover_timeout = 604800 @@ -399,10 +402,11 @@ def __init__(self, defaults, *args, **kwargs): self.is_validated = False def _validate(self): - if self.get("core", "executor") != 'SequentialExecutor' \ - and "sqlite" in self.get('core', 'sql_alchemy_conn'): + if ( + self.get("core", "executor") != 'SequentialExecutor' and + "sqlite" in self.get('core', 'sql_alchemy_conn')): raise AirflowConfigException("error: cannot use sqlite with the {}". - format(self.get('core', 'executor'))) + format(self.get('core', 'executor'))) self.is_validated = True @@ -508,11 +512,9 @@ def parameterized_config(template): f.write(parameterized_config(TEST_CONFIG)) if not os.path.isfile(AIRFLOW_CONFIG): - """ - These configuration options are used to generate a default configuration - when it is missing. The right way to change your configuration is to alter - your configuration file, not this code. - """ + # These configuration options are used to generate a default configuration + # when it is missing. The right way to change your configuration is to alter + # your configuration file, not this code. logging.info("Creating new airflow config file in: " + AIRFLOW_CONFIG) with open(AIRFLOW_CONFIG, 'w') as f: f.write(parameterized_config(DEFAULT_CONFIG)) @@ -549,7 +551,7 @@ def has_option(section, key): def remove_option(section, option): return conf.remove_option(section, option) -def set(section, option, value): +def set(section, option, value): # noqa return conf.set(section, option, value) ######################## diff --git a/airflow/contrib/auth/backends/github_enterprise_auth.py b/airflow/contrib/auth/backends/github_enterprise_auth.py index ba04babc809b7..cf918086ae9ea 100644 --- a/airflow/contrib/auth/backends/github_enterprise_auth.py +++ b/airflow/contrib/auth/backends/github_enterprise_auth.py @@ -1,11 +1,11 @@ # Copyright 2015 Matthew Pelland (matt@pelland.io) -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -14,10 +14,7 @@ import logging import flask_login -from flask_login import ( - login_user, current_user, - logout_user, login_required -) +from flask_login import login_user from flask import url_for, redirect, request diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index 1f7477172ad6b..4a2a0b59f5172 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -97,6 +97,7 @@ def get_pandas_df(self, bql, parameters=None): else: return gbq_parse_data(schema, []) + class BigQueryPandasConnector(GbqConnector): """ This connector behaves identically to GbqConnector (from Pandas), except @@ -111,6 +112,7 @@ def __init__(self, project_id, service, reauth=False): self.reauth = reauth self.service = service + class BigQueryConnection(object): """ BigQuery does not have a notion of a persistent connection. Thus, these @@ -135,7 +137,8 @@ def cursor(self): return BigQueryCursor(*self._args, **self._kwargs) def rollback(self): - raise NotImplemented("BigQueryConnection does not have transactions") + raise NotImplementedError( + "BigQueryConnection does not have transactions") class BigQueryBaseCursor(object): @@ -183,7 +186,7 @@ def run_query( if destination_dataset_table: assert '.' in destination_dataset_table, ( 'Expected destination_dataset_table in the format of ' - '.. Got: {}'.format(destination_dataset_table) + '.
. Got: {}').format(destination_dataset_table) destination_dataset, destination_table = \ destination_dataset_table.split('.', 1) configuration['query'].update({ @@ -203,7 +206,7 @@ def run_query( return self.run_with_configuration(configuration) - def run_extract( + def run_extract( # noqa self, source_project_dataset_table, destination_cloud_storage_uris, compression='NONE', export_format='CSV', field_delimiter=',', print_header=True): @@ -215,7 +218,8 @@ def run_extract( For more details about these parameters. - :param source_project_dataset_table: The dotted .
BigQuery table to use as the source data. + :param source_project_dataset_table: The dotted .
+ BigQuery table to use as the source data. :type source_project_dataset_table: string :param destination_cloud_storage_uris: The destination Google Cloud Storage URI (e.g. gs://some-bucket/some-file.txt). Follows @@ -231,7 +235,9 @@ def run_extract( :param print_header: Whether to print a header for a CSV file extract. :type print_header: boolean """ - source_project, source_dataset, source_table = self._split_project_dataset_table_input('source_project_dataset_table', source_project_dataset_table) + source_project, source_dataset, source_table = \ + self._split_project_dataset_table_input( + 'source_project_dataset_table', source_project_dataset_table) configuration = { 'extract': { 'sourceTable': { @@ -267,10 +273,12 @@ def run_copy(self, For more details about these parameters. - :param source_project_dataset_tables: One or more dotted (.).
+ :param source_project_dataset_tables: One or more dotted + (.).
BigQuery tables to use as the source data. Use a list if there are multiple source tables. - If is not included, project will be the project defined in the connection json. + If is not included, project will be the project defined + in the connection json. :type source_project_dataset_tables: list|string :param destination_project_dataset_table: The destination BigQuery table. Format is: ..
@@ -280,21 +288,29 @@ def run_copy(self, :param create_disposition: The create disposition if the table doesn't exist. :type create_disposition: string """ - source_project_dataset_tables = [source_project_dataset_tables] if not isinstance(source_project_dataset_tables, list) else source_project_dataset_tables + source_project_dataset_tables = ( + [source_project_dataset_tables] + if not isinstance(source_project_dataset_tables, list) + else source_project_dataset_tables) source_project_dataset_tables_fixup = [] for source_project_dataset_table in source_project_dataset_tables: - source_project, source_dataset, source_table = self._split_project_dataset_table_input('source_project_dataset_table', source_project_dataset_table) + source_project, source_dataset, source_table = \ + self._split_project_dataset_table_input( + 'source_project_dataset_table', source_project_dataset_table) source_project_dataset_tables_fixup.append({ 'projectId': source_project, 'datasetId': source_dataset, 'tableId': source_table }) - assert 3 == len(destination_project_dataset_table.split('.')), \ - 'Expected destination_project_dataset_table in the format of ..
. Got: {}'.format(destination_project_dataset_table) + assert 3 == len(destination_project_dataset_table.split('.')), ( + 'Expected destination_project_dataset_table in the format of ' + '..
. ' + 'Got: {}').format(destination_project_dataset_table) - destination_project, destination_dataset, destination_table = destination_project_dataset_table.split('.', 2) + destination_project, destination_dataset, destination_table = \ + destination_project_dataset_table.split('.', 2) configuration = { 'copy': { 'createDisposition': create_disposition, @@ -326,8 +342,10 @@ def run_load(self, For more details about these parameters. - :param destination_project_dataset_table: The dotted (.).
BigQuery table to load data into. - If is not included, project will be the project defined in the connection json. + :param destination_project_dataset_table: + The dotted (.).
BigQuery table to load data into. + If is not included, project will be the project defined in + the connection json. :type destination_project_dataset_table: string :param schema_fields: The schema field list as defined here: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load @@ -347,7 +365,9 @@ def run_load(self, :param field_delimiter: The delimiter to use when loading from a CSV. :type field_delimiter: string """ - destination_project, destination_dataset, destination_table = self._split_project_dataset_table_input('destination_project_dataset_table', destination_project_dataset_table) + destination_project, destination_dataset, destination_table = \ + self._split_project_dataset_table_input( + 'destination_project_dataset_table', destination_project_dataset_table) configuration = { 'load': { @@ -382,8 +402,9 @@ def _split_project_dataset_table_input(self, var_name, project_dataset_table): :return: (project, dataset, table) tuple """ table_split = project_dataset_table.split('.') - assert len(table_split) == 2 or len(table_split) == 3, \ - 'Expected {var} in the format of (.
, got {input}'.format(var=var_name, input=project_dataset_table) + assert len(table_split) == 2 or len(table_split) == 3, ( + 'Expected {var} in the format of (.
, ' + 'got {input}').format(var=var_name, input=project_dataset_table) if len(table_split) == 2: logging.info('project not included in {var}: {input}; using project "{project}"'.format(var=var_name, input=project_dataset_table, project=self.project_id)) @@ -426,7 +447,8 @@ def run_with_configuration(self, configuration): # Check if job had errors. if 'errorResult' in job['status']: - raise Exception('BigQuery job failed. Final error was: %s', job['status']['errorResult']) + raise Exception( + 'BigQuery job failed. Final error was: %s', job['status']['errorResult']) return job_id @@ -453,7 +475,8 @@ def get_tabledata(self, dataset_id, table_id, :param dataset_id: the dataset ID of the requested table. :param table_id: the table ID of the requested table. :param max_results: the maximum results to return. - :param page_token: page token, returned from a previous call, identifying the result set. + :param page_token: page token, returned from a previous call, + identifying the result set. :param start_index: zero based index of the starting row to read. :return: map containing the requested rows. """ @@ -464,9 +487,13 @@ def get_tabledata(self, dataset_id, table_id, optional_params['pageToken'] = page_token if start_index: optional_params['startIndex'] = start_index - return self.service.tabledata() \ - .list(projectId=self.project_id, datasetId=dataset_id, tableId=table_id, **optional_params) \ + return ( + self.service.tabledata() + .list( + projectId=self.project_id, datasetId=dataset_id, + tableId=table_id, **optional_params) .execute() + ) class BigQueryCursor(BigQueryBaseCursor): @@ -542,7 +569,14 @@ def next(self): if self.all_pages_loaded: return None - query_results = self.service.jobs().getQueryResults(projectId=self.project_id, jobId=self.job_id, pageToken=self.page_token).execute() + query_results = ( + self.service.jobs() + .getQueryResults( + projectId=self.project_id, + jobId=self.job_id, + pageToken=self.page_token) + .execute() + ) if 'rows' in query_results and query_results['rows']: self.page_token = query_results.get('pageToken') @@ -551,7 +585,10 @@ def next(self): rows = query_results['rows'] for dict_row in rows: - typed_row = [_bq_cast(vs['v'], col_types[idx]) for idx, vs in enumerate(dict_row['f'])] + typed_row = ([ + _bq_cast(vs['v'], col_types[idx]) + for idx, vs in enumerate(dict_row['f']) + ]) self.buffer.append(typed_row) if not self.page_token: @@ -620,6 +657,7 @@ def setoutputsize(self, size, column=None): """ Does nothing by default """ pass + def _bind_parameters(operation, parameters): """ Helper method that binds parameters to a SQL query. """ # inspired by MySQL Python Connector (conversion.py) @@ -633,6 +671,7 @@ def _bind_parameters(operation, parameters): string_parameters[name] = str(value) return operation % string_parameters + def _escape(s): """ Helper method that escapes parameters to a SQL query. """ e = s @@ -643,6 +682,7 @@ def _escape(s): e = e.replace('"', '\\"') return e + def _bq_cast(string_field, bq_type): """ Helper method that casts a BigQuery row to the appropriate data types. diff --git a/airflow/contrib/hooks/ftp_hook.py b/airflow/contrib/hooks/ftp_hook.py index e8135d97ffda8..841b959265645 100644 --- a/airflow/contrib/hooks/ftp_hook.py +++ b/airflow/contrib/hooks/ftp_hook.py @@ -6,7 +6,7 @@ from past.builtins import basestring -def mlsd(conn, path="", facts=[]): +def mlsd(conn, path="", facts=None): ''' BACKPORT FROM PYTHON3 FTPLIB @@ -21,6 +21,7 @@ def mlsd(conn, path="", facts=[]): including a variable number of "facts" depending on the server and whether "facts" argument has been provided. ''' + facts = facts or [] if facts: conn.sendcmd("OPTS MLST " + ";".join(facts) + ";") if path: diff --git a/airflow/contrib/operators/bigquery_check_operator.py b/airflow/contrib/operators/bigquery_check_operator.py index 1650395f9d361..69b69c37f36da 100644 --- a/airflow/contrib/operators/bigquery_check_operator.py +++ b/airflow/contrib/operators/bigquery_check_operator.py @@ -50,6 +50,7 @@ def __init__( def get_db_hook(self): return BigQueryHook(bigquery_conn_id=self.bigquery_conn_id) + class BigQueryValueCheckOperator(ValueCheckOperator): """ Performs a simple value check using sql code. @@ -63,12 +64,15 @@ def __init__( self, sql, pass_value, tolerance=None, bigquery_conn_id='bigquery_default', *args, **kwargs): - super(BigQueryValueCheckOperator, self).__init__(sql=sql, pass_value=pass_value, tolerance=tolerance, *args, **kwargs) + super(BigQueryValueCheckOperator, self).__init__( + sql=sql, pass_value=pass_value, tolerance=tolerance, + *args, **kwargs) self.bigquery_conn_id = bigquery_conn_id def get_db_hook(self): return BigQueryHook(bigquery_conn_id=self.bigquery_conn_id) + class BigQueryIntervalCheckOperator(IntervalCheckOperator): """ Checks that the values of metrics given as SQL expressions are within @@ -97,7 +101,8 @@ def __init__( bigquery_conn_id='bigquery_default', *args, **kwargs): super(BigQueryIntervalCheckOperator, self).__init__( - table=table, metrics_thresholds=metrics_thresholds, date_filter_column=date_filter_column, days_back=days_back, + table=table, metrics_thresholds=metrics_thresholds, + date_filter_column=date_filter_column, days_back=days_back, *args, **kwargs) self.bigquery_conn_id = bigquery_conn_id