From 535798dc538c01e2ed1ccf11cc11d0c44bf349af Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:25:01 -0400 Subject: [PATCH 01/29] explicitly import object for class definitions --- airflow/__init__.py | 1 + airflow/executors/base_executor.py | 1 + airflow/executors/celery_executor.py | 1 + airflow/hooks/base_hook.py | 1 + airflow/plugins_manager.py | 1 + airflow/utils.py | 3 ++- airflow/www/utils.py | 1 + 7 files changed, 8 insertions(+), 1 deletion(-) diff --git a/airflow/__init__.py b/airflow/__init__.py index 70dacdefa85d1..d338dfcbd570b 100644 --- a/airflow/__init__.py +++ b/airflow/__init__.py @@ -4,6 +4,7 @@ in their PYTHONPATH. airflow_login should be based off the `airflow.www.login` """ +from builtins import object __version__ = "1.3.0" import logging diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index ab110f8320f66..bff5f3b92ffc9 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -1,3 +1,4 @@ +from builtins import object import logging from airflow.utils import State diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 5b475144f4001..7c614aff5d627 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -1,3 +1,4 @@ +from builtins import object import logging import subprocess import time diff --git a/airflow/hooks/base_hook.py b/airflow/hooks/base_hook.py index 4265b2e20e77c..96f0923530066 100644 --- a/airflow/hooks/base_hook.py +++ b/airflow/hooks/base_hook.py @@ -1,3 +1,4 @@ +from builtins import object import logging import random diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index f3843ae7b6efd..0bbd16c391879 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -1,3 +1,4 @@ +from builtins import object import imp import inspect import logging diff --git a/airflow/utils.py b/airflow/utils.py index 08a283b172675..d28eef40fde2a 100644 --- a/airflow/utils.py +++ b/airflow/utils.py @@ -1,4 +1,5 @@ from __future__ import print_function +from builtins import object from copy import copy from datetime import datetime, timedelta from email.mime.text import MIMEText @@ -445,7 +446,7 @@ class AirflowTaskTimeout(Exception): pass -class timeout: +class timeout(object): """ To be used in a ``with`` block and timeout its content. """ diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 03a00e695c8b1..b7e313969bfe0 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -1,3 +1,4 @@ +from builtins import object from cgi import escape from cStringIO import StringIO as IO import gzip From 9885555b8deb838d1193f6143eb95a04d75cfe2d Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:29:20 -0400 Subject: [PATCH 02/29] import str from builtins --- airflow/configuration.py | 1 + airflow/example_dags/example_bash_operator.py | 1 + airflow/example_dags/example_python_operator.py | 1 + airflow/executors/local_executor.py | 1 + airflow/executors/sequential_executor.py | 1 + airflow/hooks/dbapi_hook.py | 1 + airflow/hooks/http_hook.py | 1 + airflow/hooks/jdbc_hook.py | 1 + airflow/hooks/presto_hook.py | 1 + airflow/jobs.py | 1 + airflow/models.py | 1 + airflow/operators/check_operator.py | 1 + airflow/operators/hive_stats_operator.py | 1 + airflow/operators/python_operator.py | 1 + airflow/operators/sensors.py | 1 + airflow/utils.py | 3 ++- airflow/www/app.py | 1 + airflow/www/utils.py | 1 + 18 files changed, 19 insertions(+), 1 deletion(-) diff --git a/airflow/configuration.py b/airflow/configuration.py index fbb2a7172151c..8e6f7b0814749 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -1,4 +1,5 @@ from ConfigParser import ConfigParser +from builtins import str import errno import logging import os diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py index 53ede85042a7d..3892c3d6d6846 100644 --- a/airflow/example_dags/example_bash_operator.py +++ b/airflow/example_dags/example_bash_operator.py @@ -1,3 +1,4 @@ +from builtins import str from airflow.operators import BashOperator, DummyOperator from airflow.models import DAG from datetime import datetime, timedelta diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py index 61d7ad7ae4346..459f73c8281e4 100644 --- a/airflow/example_dags/example_python_operator.py +++ b/airflow/example_dags/example_python_operator.py @@ -1,4 +1,5 @@ from __future__ import print_function +from builtins import str from airflow.operators import PythonOperator from airflow.models import DAG from datetime import datetime, timedelta diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index 54519d372c848..722656d838df2 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -1,3 +1,4 @@ +from builtins import str import logging import multiprocessing import subprocess diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index d1c0839d9f885..a071f1525ead0 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -1,3 +1,4 @@ +from builtins import str import logging import subprocess diff --git a/airflow/hooks/dbapi_hook.py b/airflow/hooks/dbapi_hook.py index b491123c10956..374912da6f9d3 100644 --- a/airflow/hooks/dbapi_hook.py +++ b/airflow/hooks/dbapi_hook.py @@ -1,3 +1,4 @@ +from builtins import str from datetime import datetime import numpy import logging diff --git a/airflow/hooks/http_hook.py b/airflow/hooks/http_hook.py index 4ad0cd28528d9..d2a954454e83f 100644 --- a/airflow/hooks/http_hook.py +++ b/airflow/hooks/http_hook.py @@ -1,3 +1,4 @@ +from builtins import str import logging import requests diff --git a/airflow/hooks/jdbc_hook.py b/airflow/hooks/jdbc_hook.py index af83c49130655..1f9275f4bd70c 100644 --- a/airflow/hooks/jdbc_hook.py +++ b/airflow/hooks/jdbc_hook.py @@ -1,3 +1,4 @@ +from builtins import str __author__ = 'janomar' import logging diff --git a/airflow/hooks/presto_hook.py b/airflow/hooks/presto_hook.py index f2eabf581e6bb..076bc9716eb0b 100644 --- a/airflow/hooks/presto_hook.py +++ b/airflow/hooks/presto_hook.py @@ -1,3 +1,4 @@ +from builtins import str from pyhive import presto from pyhive.exc import DatabaseError diff --git a/airflow/jobs.py b/airflow/jobs.py index dc05762f584cf..d9f9e56b446ce 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1,3 +1,4 @@ +from builtins import str from collections import defaultdict from datetime import datetime import getpass diff --git a/airflow/models.py b/airflow/models.py index 0be5eaa42bfd1..a37ace8f2b13e 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -1,4 +1,5 @@ from __future__ import print_function +from builtins import str import copy from datetime import datetime, timedelta import getpass diff --git a/airflow/operators/check_operator.py b/airflow/operators/check_operator.py index 7618e7486dea6..d2237925a724a 100644 --- a/airflow/operators/check_operator.py +++ b/airflow/operators/check_operator.py @@ -1,3 +1,4 @@ +from builtins import str import logging from airflow.utils import AirflowException diff --git a/airflow/operators/hive_stats_operator.py b/airflow/operators/hive_stats_operator.py index 80232570dabf0..65719921b03a9 100644 --- a/airflow/operators/hive_stats_operator.py +++ b/airflow/operators/hive_stats_operator.py @@ -1,3 +1,4 @@ +from builtins import str from collections import OrderedDict import json import logging diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py index 857dc825fcd2c..79b01d2a4f06b 100644 --- a/airflow/operators/python_operator.py +++ b/airflow/operators/python_operator.py @@ -1,3 +1,4 @@ +from builtins import str from datetime import datetime import logging diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py index ba54c194d675f..4270af8a0f4e3 100644 --- a/airflow/operators/sensors.py +++ b/airflow/operators/sensors.py @@ -1,4 +1,5 @@ from __future__ import print_function +from builtins import str from datetime import datetime import logging from urlparse import urlparse diff --git a/airflow/utils.py b/airflow/utils.py index d28eef40fde2a..9fb5296f98ce3 100644 --- a/airflow/utils.py +++ b/airflow/utils.py @@ -1,4 +1,5 @@ from __future__ import print_function +from builtins import str from builtins import object from copy import copy from datetime import datetime, timedelta @@ -355,7 +356,7 @@ def ask_yesno(question): def send_email(to, subject, html_content): SMTP_MAIL_FROM = conf.get('smtp', 'SMTP_MAIL_FROM') - if isinstance(to, unicode) or isinstance(to, str): + if isinstance(to, str) or isinstance(to, str): if ',' in to: to = to.split(',') elif ';' in to: diff --git a/airflow/www/app.py b/airflow/www/app.py index 9cb2905408f33..57ffc2c07044e 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -1,4 +1,5 @@ from __future__ import print_function +from builtins import str import copy from datetime import datetime, timedelta import dateutil.parser diff --git a/airflow/www/utils.py b/airflow/www/utils.py index b7e313969bfe0..f65a9d8f94caa 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -1,3 +1,4 @@ +from builtins import str from builtins import object from cgi import escape from cStringIO import StringIO as IO From b9519773d94cb49b75c0f950ae52a83aa34ecb14 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:29:51 -0400 Subject: [PATCH 03/29] import object explicitly --- airflow/models.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/models.py b/airflow/models.py index a37ace8f2b13e..54e74a2971b74 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -1,5 +1,6 @@ from __future__ import print_function from builtins import str +from builtins import object import copy from datetime import datetime, timedelta import getpass From 4c7c20c4da448cc1ae5eb584fe52d29b27a30035 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:30:00 -0400 Subject: [PATCH 04/29] unicode -> str --- airflow/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/models.py b/airflow/models.py index 54e74a2971b74..870ea825f5bda 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -277,7 +277,7 @@ def __repr__(self): return self.username def get_id(self): - return unicode(self.id) + return str(self.id) class Connection(Base): From 34f02dc1360dcd7857f23bc7b92c02c119365848 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:31:12 -0400 Subject: [PATCH 05/29] import range --- airflow/example_dags/example_bash_operator.py | 1 + airflow/example_dags/example_python_operator.py | 1 + airflow/executors/base_executor.py | 1 + airflow/executors/local_executor.py | 1 + 4 files changed, 4 insertions(+) diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py index 3892c3d6d6846..f3eb31182ae3e 100644 --- a/airflow/example_dags/example_bash_operator.py +++ b/airflow/example_dags/example_bash_operator.py @@ -1,4 +1,5 @@ from builtins import str +from builtins import range from airflow.operators import BashOperator, DummyOperator from airflow.models import DAG from datetime import datetime, timedelta diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py index 459f73c8281e4..7b7033a844ae4 100644 --- a/airflow/example_dags/example_python_operator.py +++ b/airflow/example_dags/example_python_operator.py @@ -1,5 +1,6 @@ from __future__ import print_function from builtins import str +from builtins import range from airflow.operators import PythonOperator from airflow.models import DAG from datetime import datetime, timedelta diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index bff5f3b92ffc9..c69dd192978f5 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -1,3 +1,4 @@ +from builtins import range from builtins import object import logging diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index 722656d838df2..01494c800e7b8 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -1,4 +1,5 @@ from builtins import str +from builtins import range import logging import multiprocessing import subprocess From 7f41a66acef3df2130cf26d286d906a31b7cfe93 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:31:59 -0400 Subject: [PATCH 06/29] xrange -> range --- airflow/executors/local_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index 01494c800e7b8..93e9fcbcd5f68 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -52,7 +52,7 @@ def start(self): self.result_queue = multiprocessing.Queue() self.workers = [ LocalWorker(self.queue, self.result_queue) - for i in xrange(self.parallelism) + for i in range(self.parallelism) ] for w in self.workers: From faeda2d8bfe4f0d6291614c695e1c263b9967f73 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:32:46 -0400 Subject: [PATCH 07/29] import input --- airflow/bin/cli.py | 3 ++- airflow/utils.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index ee4066e713f86..9b79cdbbda2fb 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -1,5 +1,6 @@ #!/usr/bin/env python from __future__ import print_function +from builtins import input import argparse import dateutil.parser from datetime import datetime @@ -309,7 +310,7 @@ def initdb(args): def resetdb(args): print("DB: " + conf.get('core', 'SQL_ALCHEMY_CONN')) - if raw_input( + if input( "This will drop existing tables if they exist. " "Proceed? (y/n)").upper() == "Y": logging.basicConfig(level=settings.LOGGING_LEVEL, diff --git a/airflow/utils.py b/airflow/utils.py index 9fb5296f98ce3..72bbfcde95503 100644 --- a/airflow/utils.py +++ b/airflow/utils.py @@ -1,5 +1,6 @@ from __future__ import print_function from builtins import str +from builtins import input from builtins import object from copy import copy from datetime import datetime, timedelta @@ -344,7 +345,7 @@ def ask_yesno(question): done = False print(question) while not done: - choice = raw_input().lower() + choice = input().lower() if choice in yes: return True elif choice in no: From e94fce1a7aca3c95c8c2c8e2cc10fa2a84fdd877 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:33:15 -0400 Subject: [PATCH 08/29] import basestring --- airflow/hooks/dbapi_hook.py | 1 + airflow/hooks/hive_hooks.py | 1 + airflow/jobs.py | 1 + airflow/models.py | 1 + 4 files changed, 4 insertions(+) diff --git a/airflow/hooks/dbapi_hook.py b/airflow/hooks/dbapi_hook.py index 374912da6f9d3..46a50d350ca65 100644 --- a/airflow/hooks/dbapi_hook.py +++ b/airflow/hooks/dbapi_hook.py @@ -1,4 +1,5 @@ from builtins import str +from past.builtins import basestring from datetime import datetime import numpy import logging diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py index 799e4e515b3b5..97d6a05a4d722 100644 --- a/airflow/hooks/hive_hooks.py +++ b/airflow/hooks/hive_hooks.py @@ -1,4 +1,5 @@ from __future__ import print_function +from past.builtins import basestring import csv import logging import subprocess diff --git a/airflow/jobs.py b/airflow/jobs.py index d9f9e56b446ce..91fca295b7b02 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1,4 +1,5 @@ from builtins import str +from past.builtins import basestring from collections import defaultdict from datetime import datetime import getpass diff --git a/airflow/models.py b/airflow/models.py index 870ea825f5bda..7cdba70e02514 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -1,5 +1,6 @@ from __future__ import print_function from builtins import str +from past.builtins import basestring from builtins import object import copy from datetime import datetime, timedelta From 35807bd857718e3e1685256ee7e2fed33f509921 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:33:55 -0400 Subject: [PATCH 09/29] import zip --- airflow/hooks/hive_hooks.py | 1 + airflow/models.py | 10 +++++----- airflow/operators/check_operator.py | 1 + airflow/operators/hive_stats_operator.py | 1 + airflow/operators/s3_to_hive_operator.py | 1 + 5 files changed, 9 insertions(+), 5 deletions(-) diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py index 97d6a05a4d722..d394996e62bcc 100644 --- a/airflow/hooks/hive_hooks.py +++ b/airflow/hooks/hive_hooks.py @@ -1,4 +1,5 @@ from __future__ import print_function +from builtins import zip from past.builtins import basestring import csv import logging diff --git a/airflow/models.py b/airflow/models.py index 7cdba70e02514..a4f9c4e426e9f 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -167,7 +167,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): self.file_last_changed[filepath] = dttm return - for dag in m.__dict__.values(): + for dag in list(m.__dict__.values()): if isinstance(dag, DAG): dag.full_filepath = filepath dag.is_subdag = False @@ -249,7 +249,7 @@ def collect_dags( pass def deactivate_inactive_dags(self): - active_dag_ids = [dag.dag_id for dag in self.dags.values()] + active_dag_ids = [dag.dag_id for dag in list(self.dags.values())] session = settings.Session() for dag in session.query( DagModel).filter(~DagModel.dag_id.in_(active_dag_ids)).all(): @@ -966,7 +966,7 @@ def render_templates(self): elif isinstance(content, dict): result = { k: rt(v, jinja_context) - for k, v in content.items()} + for k, v in list(content.items())} else: raise AirflowException("Type not supported for templating") setattr(task, attr, result) @@ -1260,7 +1260,7 @@ def __deepcopy__(self, memo): self._upstream_list = sorted(self._upstream_list, key=lambda x: x.task_id) self._downstream_list = sorted(self._downstream_list, key=lambda x: x.task_id) - for k, v in self.__dict__.items(): + for k, v in list(self.__dict__.items()): if k not in ('user_defined_macros', 'params'): setattr(result, k, copy.deepcopy(v, memo)) @@ -1766,7 +1766,7 @@ def __deepcopy__(self, memo): cls = self.__class__ result = cls.__new__(cls) memo[id(self)] = result - for k, v in self.__dict__.items(): + for k, v in list(self.__dict__.items()): if k not in ('user_defined_macros', 'params'): setattr(result, k, copy.deepcopy(v, memo)) diff --git a/airflow/operators/check_operator.py b/airflow/operators/check_operator.py index d2237925a724a..36fcceef662f3 100644 --- a/airflow/operators/check_operator.py +++ b/airflow/operators/check_operator.py @@ -1,3 +1,4 @@ +from builtins import zip from builtins import str import logging diff --git a/airflow/operators/hive_stats_operator.py b/airflow/operators/hive_stats_operator.py index 65719921b03a9..49024a5938b7a 100644 --- a/airflow/operators/hive_stats_operator.py +++ b/airflow/operators/hive_stats_operator.py @@ -1,4 +1,5 @@ from builtins import str +from builtins import zip from collections import OrderedDict import json import logging diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py index 17e82273680c3..8a187a832ae07 100644 --- a/airflow/operators/s3_to_hive_operator.py +++ b/airflow/operators/s3_to_hive_operator.py @@ -1,3 +1,4 @@ +from builtins import zip import logging from tempfile import NamedTemporaryFile From d3e6abfcb9c1cce161b8e412da710d36ccce7011 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:39:09 -0400 Subject: [PATCH 10/29] import next --- airflow/operators/s3_to_hive_operator.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py index 8a187a832ae07..20d23e781e3d8 100644 --- a/airflow/operators/s3_to_hive_operator.py +++ b/airflow/operators/s3_to_hive_operator.py @@ -1,3 +1,4 @@ +from builtins import next from builtins import zip import logging from tempfile import NamedTemporaryFile From 1a10bad404d15aba51590f4991f288dcc0defdeb Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:39:18 -0400 Subject: [PATCH 11/29] import chr --- airflow/operators/mssql_to_hive.py | 1 + airflow/operators/mysql_to_hive.py | 1 + 2 files changed, 2 insertions(+) diff --git a/airflow/operators/mssql_to_hive.py b/airflow/operators/mssql_to_hive.py index 1be3a80cdc5f8..90da11ef68937 100644 --- a/airflow/operators/mssql_to_hive.py +++ b/airflow/operators/mssql_to_hive.py @@ -1,3 +1,4 @@ +from builtins import chr from collections import OrderedDict import unicodecsv as csv import logging diff --git a/airflow/operators/mysql_to_hive.py b/airflow/operators/mysql_to_hive.py index 69442e18d25cb..a5e4e6746b971 100644 --- a/airflow/operators/mysql_to_hive.py +++ b/airflow/operators/mysql_to_hive.py @@ -1,3 +1,4 @@ +from builtins import chr from collections import OrderedDict import csv import logging From be08c4c179fce4c454bee5540c1a1b89d6edc81b Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:42:06 -0400 Subject: [PATCH 12/29] import standard library and create aliases --- airflow/configuration.py | 2 ++ airflow/hooks/S3_hook.py | 2 ++ airflow/operators/sensors.py | 2 ++ airflow/www/utils.py | 2 ++ 4 files changed, 8 insertions(+) diff --git a/airflow/configuration.py b/airflow/configuration.py index 8e6f7b0814749..ba7877d5857a9 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -1,4 +1,6 @@ from ConfigParser import ConfigParser +from future import standard_library +standard_library.install_aliases() from builtins import str import errno import logging diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py index 2045cde6c8049..6fba101fccb50 100644 --- a/airflow/hooks/S3_hook.py +++ b/airflow/hooks/S3_hook.py @@ -1,3 +1,5 @@ +from future import standard_library +standard_library.install_aliases() import logging import json import re diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py index 4270af8a0f4e3..d634d4eb32083 100644 --- a/airflow/operators/sensors.py +++ b/airflow/operators/sensors.py @@ -1,4 +1,6 @@ from __future__ import print_function +from future import standard_library +standard_library.install_aliases() from builtins import str from datetime import datetime import logging diff --git a/airflow/www/utils.py b/airflow/www/utils.py index f65a9d8f94caa..89cd0fbaebff0 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -1,3 +1,5 @@ +from future import standard_library +standard_library.install_aliases() from builtins import str from builtins import object from cgi import escape From 66dbdbc97c05365d0df8598253c65ee6a6311d47 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:42:38 -0400 Subject: [PATCH 13/29] import configparser --- airflow/configuration.py | 2 +- airflow/hooks/S3_hook.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/configuration.py b/airflow/configuration.py index ba7877d5857a9..8595e4caf36a6 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -1,7 +1,7 @@ -from ConfigParser import ConfigParser from future import standard_library standard_library.install_aliases() from builtins import str +from configparser import ConfigParser import errno import logging import os diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py index 6fba101fccb50..f442bcc4fd51d 100644 --- a/airflow/hooks/S3_hook.py +++ b/airflow/hooks/S3_hook.py @@ -4,8 +4,8 @@ import json import re import fnmatch -import ConfigParser from urlparse import urlparse +import configparser import boto from boto.s3.connection import S3Connection @@ -30,7 +30,7 @@ def _parse_s3_config(config_file_name, config_format='boto', profile=None): :param profile: profile name in AWS type config file :type profile: str """ - Config = ConfigParser.ConfigParser() + Config = configparser.ConfigParser() if Config.read(config_file_name): sections = Config.sections() else: From 8272bb3de85ad0184ee187af0e6e199c2841c5d0 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:42:59 -0400 Subject: [PATCH 14/29] urlparse -> urllib.parse --- airflow/hooks/S3_hook.py | 2 +- airflow/operators/sensors.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py index f442bcc4fd51d..c87cc106c7406 100644 --- a/airflow/hooks/S3_hook.py +++ b/airflow/hooks/S3_hook.py @@ -4,8 +4,8 @@ import json import re import fnmatch -from urlparse import urlparse import configparser +from urllib.parse import urlparse import boto from boto.s3.connection import S3Connection diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py index d634d4eb32083..de4863e633973 100644 --- a/airflow/operators/sensors.py +++ b/airflow/operators/sensors.py @@ -4,7 +4,7 @@ from builtins import str from datetime import datetime import logging -from urlparse import urlparse +from urllib.parse import urlparse from time import sleep from airflow import hooks, settings From a559493bfa54d7a685e8e66139c52f0f2dd8e1bb Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:44:13 -0400 Subject: [PATCH 15/29] Use list to allow modification in-place --- airflow/executors/celery_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 7c614aff5d627..6d86a57740191 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -65,7 +65,7 @@ def execute_async(self, key, command, queue=DEFAULT_QUEUE): def sync(self): logging.debug( "Inquiring about {} celery task(s)".format(len(self.tasks))) - for key, async in self.tasks.items(): + for key, async in list(self.tasks.items()): state = async.state if self.last_state[key] != state: if state == celery_states.SUCCESS: From 05f1eb6449222e3430057faf259232ae5b707510 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:45:47 -0400 Subject: [PATCH 16/29] use list to allow indexing --- airflow/hooks/hive_hooks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py index d394996e62bcc..94fd8b02a1786 100644 --- a/airflow/hooks/hive_hooks.py +++ b/airflow/hooks/hive_hooks.py @@ -301,7 +301,7 @@ def max_partition(self, schema, table_name, field=None, filter=None): if not parts: return None elif len(parts[0]) == 1: - field = parts[0].keys()[0] + field = list(parts[0].keys())[0] elif not field: raise AirflowException( "Please specify the field you want the max " From 403fa31ec88b95afb5bac67febd1db40006250bb Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:46:39 -0400 Subject: [PATCH 17/29] use list to be safe --- airflow/jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/jobs.py b/airflow/jobs.py index 91fca295b7b02..b9d7f09870393 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -422,7 +422,7 @@ def prioritize_queued(self, session, executor, dagbag): else: d[ti.pool].append(ti) - for pool, tis in d.items(): + for pool, tis in list(d.items()): open_slots = pools[pool].open_slots(session=session) if open_slots > 0: tis = sorted( From 89111adf059483780955f4176f536607d3e559a6 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:46:59 -0400 Subject: [PATCH 18/29] use list to modify in-place --- airflow/jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/jobs.py b/airflow/jobs.py index b9d7f09870393..b25499c18f024 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -590,7 +590,7 @@ def _execute(self): # Triggering what is ready to get triggered while tasks_to_run: - for key, ti in tasks_to_run.items(): + for key, ti in list(tasks_to_run.items()): ti.refresh_from_db() if ti.state == State.SUCCESS and key in tasks_to_run: succeeded.append(key) From f4675ecfdfb0ca40a7a34e1edfc348fdb771786b Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:47:21 -0400 Subject: [PATCH 19/29] use list to be safe --- airflow/jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/jobs.py b/airflow/jobs.py index b25499c18f024..e59b4bbff436f 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -609,7 +609,7 @@ def _execute(self): executor.heartbeat() # Reacting to events - for key, state in executor.get_event_buffer().items(): + for key, state in list(executor.get_event_buffer().items()): dag_id, task_id, execution_date = key if key not in tasks_to_run: continue From e7ee93ff8c4553d60c10d3f2d483c0c77cf78723 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:48:08 -0400 Subject: [PATCH 20/29] use list to allow indexing --- airflow/macros/hive.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/macros/hive.py b/airflow/macros/hive.py index aa7ae025be418..2f69d6604ddad 100644 --- a/airflow/macros/hive.py +++ b/airflow/macros/hive.py @@ -85,7 +85,7 @@ def closest_ds_partition( partitions = hh.get_partitions(schema=schema, table_name=table) if not partitions: return None - part_vals = [p.values()[0] for p in partitions] + part_vals = [list(p.values())[0] for p in partitions] if ds in part_vals: return ds else: From a5973c71c989c20c75c123896bef33b3eeefc327 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:48:55 -0400 Subject: [PATCH 21/29] use list to be safe --- airflow/operators/hive_stats_operator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/operators/hive_stats_operator.py b/airflow/operators/hive_stats_operator.py index 49024a5938b7a..09f85e17af105 100644 --- a/airflow/operators/hive_stats_operator.py +++ b/airflow/operators/hive_stats_operator.py @@ -98,7 +98,7 @@ def execute(self, context=None): exprs = { ('', 'count'): 'COUNT(*)' } - for col, col_type in field_types.items(): + for col, col_type in list(field_types.items()): d = {} if self.assignment_func: d = self.assignment_func(col, col_type) From 3f4e206fcb5ba2319f64f6bb32e324177c680b60 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:49:55 -0400 Subject: [PATCH 22/29] use list to be safe --- airflow/plugins_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index 0bbd16c391879..60ac4432d31be 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -46,7 +46,7 @@ def validate(cls): if file_ext != '.py': continue m = imp.load_source(mod_name, filepath) - for obj in m.__dict__.values(): + for obj in list(m.__dict__.values()): if ( inspect.isclass(obj) and issubclass(obj, AirflowPlugin) and From 8c9b5bbf9a5603a4f33b1196e42ea1cc3ab9344e Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:50:19 -0400 Subject: [PATCH 23/29] use list to be safe --- airflow/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/utils.py b/airflow/utils.py index 72bbfcde95503..9c02dc782c7bb 100644 --- a/airflow/utils.py +++ b/airflow/utils.py @@ -405,7 +405,7 @@ def import_module_attrs(parent_module_globals, module_attrs_dict): brings functional operators to those namespaces. ''' imported_attrs = [] - for mod, attrs in module_attrs_dict.items(): + for mod, attrs in list(module_attrs_dict.items()): try: folder = os.path.dirname(parent_module_globals['__file__']) f, filename, description = imp.find_module(mod, [folder]) From 3042959faf7734d4a42e2cc88552ca3d025b2958 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:52:53 -0400 Subject: [PATCH 24/29] preserve int division --- airflow/www/app.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airflow/www/app.py b/airflow/www/app.py index 57ffc2c07044e..057aefa6f6a02 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -1,5 +1,7 @@ from __future__ import print_function +from __future__ import division from builtins import str +from past.utils import old_div import copy from datetime import datetime, timedelta import dateutil.parser @@ -1224,10 +1226,10 @@ def landing_times(self): for ti in task.get_task_instances(session, from_date): if ti.end_date: data.append([ - ti.execution_date.isoformat(), ( + ti.execution_date.isoformat(), old_div(( ti.end_date - ( ti.execution_date + task.schedule_interval) - ).total_seconds()/(60*60) + ).total_seconds(),(60*60)) ]) all_data.append({'data': data, 'name': task.task_id}) From 417a43a61742c05a3a00ef5214dea4a1eaa61f52 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:53:37 -0400 Subject: [PATCH 25/29] iteritems -> items --- airflow/www/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/www/app.py b/airflow/www/app.py index 057aefa6f6a02..13624ce4a2c78 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -516,7 +516,7 @@ def date_handler(obj): 'name': col, 'data': [ (i, v) - for i, v in df[col].iteritems() if not np.isnan(v)] + for i, v in df[col].items() if not np.isnan(v)] }) series = [serie for serie in sorted( series, key=lambda s: s['data'][0][1], reverse=True)] From ef3ba1761ce17752ec7e22d0c04db7183ef55e37 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:55:15 -0400 Subject: [PATCH 26/29] use list to be safe --- airflow/www/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/www/app.py b/airflow/www/app.py index 13624ce4a2c78..c3451b48cb45c 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -1634,7 +1634,7 @@ def on_form_prefill(self, form, id): except Exception as e: d = {} - for field in self.form_extra_fields.keys(): + for field in list(self.form_extra_fields.keys()): value = d.get(field, '') if value: field = getattr(form, field) From a641f6da0af65a3c8d4c3df4aa751cf5630f8c06 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:55:41 -0400 Subject: [PATCH 27/29] cStringIO -> io --- airflow/www/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 89cd0fbaebff0..ca579ca8c6013 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -3,7 +3,7 @@ from builtins import str from builtins import object from cgi import escape -from cStringIO import StringIO as IO +from io import StringIO as IO import gzip import functools From 3c47d4be663ed41f5d709af8a149d824124bad74 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Thu, 6 Aug 2015 23:55:52 -0400 Subject: [PATCH 28/29] StringIO -> BytesIO --- airflow/www/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/www/utils.py b/airflow/www/utils.py index ca579ca8c6013..8b7357a593586 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -3,7 +3,7 @@ from builtins import str from builtins import object from cgi import escape -from io import StringIO as IO +from io import BytesIO as IO import gzip import functools From 4090b2967f70b4fe7923bcd186cf0159b34088bd Mon Sep 17 00:00:00 2001 From: Maxime Date: Sat, 8 Aug 2015 23:46:55 +0000 Subject: [PATCH 29/29] Debugging py3 stage2 --- airflow/utils.py | 2 +- airflow/www/app.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airflow/utils.py b/airflow/utils.py index 9c02dc782c7bb..ee3d3d67f6551 100644 --- a/airflow/utils.py +++ b/airflow/utils.py @@ -203,7 +203,7 @@ def resetdb(): def validate_key(k, max_length=250): - if type(k) is not str: + if not isinstance(k, basestring): raise TypeError("The key has to be a string") elif len(k) > max_length: raise AirflowException("The key has to be less than {0} characters".format( diff --git a/airflow/www/app.py b/airflow/www/app.py index c3451b48cb45c..1ac5ce8ce71f9 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -515,8 +515,9 @@ def date_handler(obj): series.append({ 'name': col, 'data': [ - (i, v) - for i, v in df[col].items() if not np.isnan(v)] + (k, df[col][k]) + for k in df[col].keys() + if not np.isnan(df[col][k])] }) series = [serie for serie in sorted( series, key=lambda s: s['data'][0][1], reverse=True)]