Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
in their PYTHONPATH. airflow_login should be based off the
`airflow.www.login`
"""
from builtins import object
__version__ = "1.3.0"

import logging
Expand Down
3 changes: 2 additions & 1 deletion airflow/bin/cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python
from __future__ import print_function
from builtins import input
import argparse
import dateutil.parser
from datetime import datetime
Expand Down Expand Up @@ -309,7 +310,7 @@ def initdb(args):

def resetdb(args):
print("DB: " + conf.get('core', 'SQL_ALCHEMY_CONN'))
if raw_input(
if input(
"This will drop existing tables if they exist. "
"Proceed? (y/n)").upper() == "Y":
logging.basicConfig(level=settings.LOGGING_LEVEL,
Expand Down
5 changes: 4 additions & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from ConfigParser import ConfigParser
from future import standard_library
standard_library.install_aliases()
from builtins import str
from configparser import ConfigParser
import errno
import logging
import os
Expand Down
2 changes: 2 additions & 0 deletions airflow/example_dags/example_bash_operator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from builtins import str
from builtins import range
from airflow.operators import BashOperator, DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta
Expand Down
2 changes: 2 additions & 0 deletions airflow/example_dags/example_python_operator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from __future__ import print_function
from builtins import str
from builtins import range
from airflow.operators import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
Expand Down
2 changes: 2 additions & 0 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from builtins import range
from builtins import object
import logging

from airflow.utils import State
Expand Down
3 changes: 2 additions & 1 deletion airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from builtins import object
import logging
import subprocess
import time
Expand Down Expand Up @@ -64,7 +65,7 @@ def execute_async(self, key, command, queue=DEFAULT_QUEUE):
def sync(self):
logging.debug(
"Inquiring about {} celery task(s)".format(len(self.tasks)))
for key, async in self.tasks.items():
for key, async in list(self.tasks.items()):
state = async.state
if self.last_state[key] != state:
if state == celery_states.SUCCESS:
Expand Down
4 changes: 3 additions & 1 deletion airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from builtins import str
from builtins import range
import logging
import multiprocessing
import subprocess
Expand Down Expand Up @@ -50,7 +52,7 @@ def start(self):
self.result_queue = multiprocessing.Queue()
self.workers = [
LocalWorker(self.queue, self.result_queue)
for i in xrange(self.parallelism)
for i in range(self.parallelism)
]

for w in self.workers:
Expand Down
1 change: 1 addition & 0 deletions airflow/executors/sequential_executor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from builtins import str
import logging
import subprocess

Expand Down
8 changes: 5 additions & 3 deletions airflow/hooks/S3_hook.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from future import standard_library
standard_library.install_aliases()
import logging
import json
import re
import fnmatch
import ConfigParser
from urlparse import urlparse
import configparser
from urllib.parse import urlparse

import boto
from boto.s3.connection import S3Connection
Expand All @@ -28,7 +30,7 @@ def _parse_s3_config(config_file_name, config_format='boto', profile=None):
:param profile: profile name in AWS type config file
:type profile: str
"""
Config = ConfigParser.ConfigParser()
Config = configparser.ConfigParser()
if Config.read(config_file_name):
sections = Config.sections()
else:
Expand Down
1 change: 1 addition & 0 deletions airflow/hooks/base_hook.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from builtins import object
import logging
import random

Expand Down
2 changes: 2 additions & 0 deletions airflow/hooks/dbapi_hook.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from builtins import str
from past.builtins import basestring
from datetime import datetime
import numpy
import logging
Expand Down
4 changes: 3 additions & 1 deletion airflow/hooks/hive_hooks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from __future__ import print_function
from builtins import zip
from past.builtins import basestring
import csv
import logging
import subprocess
Expand Down Expand Up @@ -299,7 +301,7 @@ def max_partition(self, schema, table_name, field=None, filter=None):
if not parts:
return None
elif len(parts[0]) == 1:
field = parts[0].keys()[0]
field = list(parts[0].keys())[0]
elif not field:
raise AirflowException(
"Please specify the field you want the max "
Expand Down
1 change: 1 addition & 0 deletions airflow/hooks/http_hook.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from builtins import str
import logging

import requests
Expand Down
1 change: 1 addition & 0 deletions airflow/hooks/jdbc_hook.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from builtins import str
__author__ = 'janomar'

import logging
Expand Down
1 change: 1 addition & 0 deletions airflow/hooks/presto_hook.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from builtins import str
from pyhive import presto
from pyhive.exc import DatabaseError

Expand Down
8 changes: 5 additions & 3 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from builtins import str
from past.builtins import basestring
from collections import defaultdict
from datetime import datetime
import getpass
Expand Down Expand Up @@ -420,7 +422,7 @@ def prioritize_queued(self, session, executor, dagbag):
else:
d[ti.pool].append(ti)

for pool, tis in d.items():
for pool, tis in list(d.items()):
open_slots = pools[pool].open_slots(session=session)
if open_slots > 0:
tis = sorted(
Expand Down Expand Up @@ -588,7 +590,7 @@ def _execute(self):

# Triggering what is ready to get triggered
while tasks_to_run:
for key, ti in tasks_to_run.items():
for key, ti in list(tasks_to_run.items()):
ti.refresh_from_db()
if ti.state == State.SUCCESS and key in tasks_to_run:
succeeded.append(key)
Expand All @@ -607,7 +609,7 @@ def _execute(self):
executor.heartbeat()

# Reacting to events
for key, state in executor.get_event_buffer().items():
for key, state in list(executor.get_event_buffer().items()):
dag_id, task_id, execution_date = key
if key not in tasks_to_run:
continue
Expand Down
2 changes: 1 addition & 1 deletion airflow/macros/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def closest_ds_partition(
partitions = hh.get_partitions(schema=schema, table_name=table)
if not partitions:
return None
part_vals = [p.values()[0] for p in partitions]
part_vals = [list(p.values())[0] for p in partitions]
if ds in part_vals:
return ds
else:
Expand Down
15 changes: 9 additions & 6 deletions airflow/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from __future__ import print_function
from builtins import str
from past.builtins import basestring
from builtins import object
import copy
from datetime import datetime, timedelta
import getpass
Expand Down Expand Up @@ -164,7 +167,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
self.file_last_changed[filepath] = dttm
return

for dag in m.__dict__.values():
for dag in list(m.__dict__.values()):
if isinstance(dag, DAG):
dag.full_filepath = filepath
dag.is_subdag = False
Expand Down Expand Up @@ -246,7 +249,7 @@ def collect_dags(
pass

def deactivate_inactive_dags(self):
active_dag_ids = [dag.dag_id for dag in self.dags.values()]
active_dag_ids = [dag.dag_id for dag in list(self.dags.values())]
session = settings.Session()
for dag in session.query(
DagModel).filter(~DagModel.dag_id.in_(active_dag_ids)).all():
Expand Down Expand Up @@ -275,7 +278,7 @@ def __repr__(self):
return self.username

def get_id(self):
return unicode(self.id)
return str(self.id)


class Connection(Base):
Expand Down Expand Up @@ -963,7 +966,7 @@ def render_templates(self):
elif isinstance(content, dict):
result = {
k: rt(v, jinja_context)
for k, v in content.items()}
for k, v in list(content.items())}
else:
raise AirflowException("Type not supported for templating")
setattr(task, attr, result)
Expand Down Expand Up @@ -1257,7 +1260,7 @@ def __deepcopy__(self, memo):

self._upstream_list = sorted(self._upstream_list, key=lambda x: x.task_id)
self._downstream_list = sorted(self._downstream_list, key=lambda x: x.task_id)
for k, v in self.__dict__.items():
for k, v in list(self.__dict__.items()):
if k not in ('user_defined_macros', 'params'):
setattr(result, k, copy.deepcopy(v, memo))

Expand Down Expand Up @@ -1763,7 +1766,7 @@ def __deepcopy__(self, memo):
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
for k, v in self.__dict__.items():
for k, v in list(self.__dict__.items()):
if k not in ('user_defined_macros', 'params'):
setattr(result, k, copy.deepcopy(v, memo))

Expand Down
2 changes: 2 additions & 0 deletions airflow/operators/check_operator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from builtins import zip
from builtins import str
import logging

from airflow.utils import AirflowException
Expand Down
4 changes: 3 additions & 1 deletion airflow/operators/hive_stats_operator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from builtins import str
from builtins import zip
from collections import OrderedDict
import json
import logging
Expand Down Expand Up @@ -96,7 +98,7 @@ def execute(self, context=None):
exprs = {
('', 'count'): 'COUNT(*)'
}
for col, col_type in field_types.items():
for col, col_type in list(field_types.items()):
d = {}
if self.assignment_func:
d = self.assignment_func(col, col_type)
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/mssql_to_hive.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from builtins import chr
from collections import OrderedDict
import unicodecsv as csv
import logging
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/mysql_to_hive.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from builtins import chr
from collections import OrderedDict
import csv
import logging
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/python_operator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from builtins import str
from datetime import datetime
import logging

Expand Down
2 changes: 2 additions & 0 deletions airflow/operators/s3_to_hive_operator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from builtins import next
from builtins import zip
import logging
from tempfile import NamedTemporaryFile

Expand Down
5 changes: 4 additions & 1 deletion airflow/operators/sensors.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from __future__ import print_function
from future import standard_library
standard_library.install_aliases()
from builtins import str
from datetime import datetime
import logging
from urlparse import urlparse
from urllib.parse import urlparse
from time import sleep

from airflow import hooks, settings
Expand Down
3 changes: 2 additions & 1 deletion airflow/plugins_manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from builtins import object
import imp
import inspect
import logging
Expand Down Expand Up @@ -45,7 +46,7 @@ def validate(cls):
if file_ext != '.py':
continue
m = imp.load_source(mod_name, filepath)
for obj in m.__dict__.values():
for obj in list(m.__dict__.values()):
if (
inspect.isclass(obj) and
issubclass(obj, AirflowPlugin) and
Expand Down
13 changes: 8 additions & 5 deletions airflow/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from __future__ import print_function
from builtins import str
from builtins import input
from builtins import object
from copy import copy
from datetime import datetime, timedelta
from email.mime.text import MIMEText
Expand Down Expand Up @@ -200,7 +203,7 @@ def resetdb():


def validate_key(k, max_length=250):
if type(k) is not str:
if not isinstance(k, basestring):
raise TypeError("The key has to be a string")
elif len(k) > max_length:
raise AirflowException("The key has to be less than {0} characters".format(
Expand Down Expand Up @@ -342,7 +345,7 @@ def ask_yesno(question):
done = False
print(question)
while not done:
choice = raw_input().lower()
choice = input().lower()
if choice in yes:
return True
elif choice in no:
Expand All @@ -354,7 +357,7 @@ def ask_yesno(question):
def send_email(to, subject, html_content):
SMTP_MAIL_FROM = conf.get('smtp', 'SMTP_MAIL_FROM')

if isinstance(to, unicode) or isinstance(to, str):
if isinstance(to, str) or isinstance(to, str):
if ',' in to:
to = to.split(',')
elif ';' in to:
Expand Down Expand Up @@ -402,7 +405,7 @@ def import_module_attrs(parent_module_globals, module_attrs_dict):
brings functional operators to those namespaces.
'''
imported_attrs = []
for mod, attrs in module_attrs_dict.items():
for mod, attrs in list(module_attrs_dict.items()):
try:
folder = os.path.dirname(parent_module_globals['__file__'])
f, filename, description = imp.find_module(mod, [folder])
Expand Down Expand Up @@ -445,7 +448,7 @@ class AirflowTaskTimeout(Exception):
pass


class timeout:
class timeout(object):
"""
To be used in a ``with`` block and timeout its content.
"""
Expand Down
Loading