Skip to content

Commit

Permalink
[AIRFLOW-1094] Run unit tests under contrib in Travis
Browse files Browse the repository at this point in the history
Rename all unit tests under tests/contrib to start
with test_* and fix
broken unit tests so that they run for the Python
2 and 3 builds.

Closes #2234 from hgrif/AIRFLOW-1094
  • Loading branch information
hgrif authored and bolkedebruin committed Apr 17, 2017
1 parent 74c1ce2 commit 219c506
Show file tree
Hide file tree
Showing 34 changed files with 176 additions and 119 deletions.
2 changes: 1 addition & 1 deletion airflow/contrib/operators/ecs_operator.py
Expand Up @@ -89,7 +89,7 @@ def execute(self, context):

def _wait_for_task_ended(self):
waiter = self.client.get_waiter('tasks_stopped')
waiter.config.max_attempts = sys.maxint # timeout is managed by airflow
waiter.config.max_attempts = sys.maxsize # timeout is managed by airflow
waiter.wait(
cluster=self.cluster,
tasks=[self.arn]
Expand Down
1 change: 1 addition & 0 deletions airflow/hooks/__init__.py
Expand Up @@ -48,6 +48,7 @@
'samba_hook': ['SambaHook'],
'sqlite_hook': ['SqliteHook'],
'S3_hook': ['S3Hook'],
'zendesk_hook': ['ZendeskHook'],
'http_hook': ['HttpHook'],
'druid_hook': ['DruidHook'],
'jdbc_hook': ['JdbcHook'],
Expand Down
2 changes: 1 addition & 1 deletion airflow/hooks/zendesk_hook.py
Expand Up @@ -21,7 +21,7 @@
import logging
import time
from zdesk import Zendesk, RateLimitError, ZendeskError
from airflow.hooks import BaseHook
from airflow.hooks.base_hook import BaseHook


class ZendeskHook(BaseHook):
Expand Down
5 changes: 5 additions & 0 deletions scripts/ci/requirements.txt
Expand Up @@ -3,6 +3,7 @@ azure-storage>=0.34.0
bcrypt
bleach
boto
boto3
celery
cgroupspy
chartkick
Expand All @@ -11,6 +12,7 @@ coverage
coveralls
croniter
cryptography
datadog
dill
distributed
docker-py
Expand All @@ -25,6 +27,7 @@ Flask-WTF
flower
freezegun
future
google-api-python-client>=1.5.0,<1.6.0
gunicorn
hdfs
hive-thrift-py
Expand All @@ -37,6 +40,7 @@ ldap3
lxml
markdown
mock
moto
mysqlclient
nose
nose-exclude
Expand Down Expand Up @@ -69,3 +73,4 @@ statsd
thrift
thrift_sasl
unicodecsv
zdesk
File renamed without changes.
Expand Up @@ -110,7 +110,7 @@ def test_invalid_source_format(self):
hook.BigQueryBaseCursor("test", "test").run_load("test.test", "test_schema.json", ["test_data.json"], source_format="json")

# since we passed 'json' in, and it's not valid, make sure it's present in the error string.
self.assertIn("json", str(context.exception))
self.assertIn("JSON", str(context.exception))


class TestBigQueryBaseCursor(unittest.TestCase):
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Expand Up @@ -12,16 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import sys
import unittest
from io import StringIO

import mock

from airflow import configuration, models
from airflow.utils import db
from airflow.exceptions import AirflowException
from airflow.contrib.hooks.spark_submit_hook import SparkSubmitHook


class TestSparkSubmitHook(unittest.TestCase):

_spark_job_file = 'test_application.py'
_config = {
'conf': {
Expand All @@ -43,6 +46,11 @@ class TestSparkSubmitHook(unittest.TestCase):
}

def setUp(self):

if sys.version_info[0] == 3:
raise unittest.SkipTest('TestSparkSubmitHook won\'t work with '
'python3. No need to test anything here')

configuration.load_test_config()
db.merge_conn(
models.Connection(
Expand Down Expand Up @@ -97,13 +105,17 @@ def test_build_command(self):
if self._config['verbose']:
assert "--verbose" in cmd

def test_submit(self):
@mock.patch('airflow.contrib.hooks.spark_submit_hook.subprocess')
def test_submit(self, mock_process):
# We don't have spark-submit available, and this is hard to mock, so let's
# just use this simple mock.
mock_Popen = mock_process.Popen.return_value
mock_Popen.stdout = StringIO(u'stdout')
mock_Popen.stderr = StringIO(u'stderr')
mock_Popen.returncode = None
mock_Popen.communicate.return_value = ['extra stdout', 'extra stderr']
hook = SparkSubmitHook()

# We don't have spark-submit available, and this is hard to mock, so just accept
# an exception for now.
with self.assertRaises(AirflowException):
hook.submit(self._spark_job_file)
hook.submit(self._spark_job_file)

def test_resolve_connection(self):

Expand Down
Expand Up @@ -15,7 +15,6 @@

import json
import unittest
from exceptions import OSError

from airflow import configuration, models
from airflow.contrib.hooks.sqoop_hook import SqoopHook
Expand Down
89 changes: 89 additions & 0 deletions tests/contrib/hooks/test_zendesk_hook.py
@@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import unittest

import mock

from airflow.hooks.zendesk_hook import ZendeskHook
from zdesk import RateLimitError


class TestZendeskHook(unittest.TestCase):

@mock.patch("airflow.hooks.zendesk_hook.time")
def test_sleeps_for_correct_interval(self, mocked_time):
sleep_time = 10
# To break out of the otherwise infinite tries
mocked_time.sleep = mock.Mock(side_effect=ValueError, return_value=3)
conn_mock = mock.Mock()
mock_response = mock.Mock()
mock_response.headers.get.return_value = sleep_time
conn_mock.call = mock.Mock(
side_effect=RateLimitError(msg="some message", code="some code",
response=mock_response))

zendesk_hook = ZendeskHook("conn_id")
zendesk_hook.get_conn = mock.Mock(return_value=conn_mock)

with self.assertRaises(ValueError):
zendesk_hook.call("some_path", get_all_pages=False)
mocked_time.sleep.assert_called_with(sleep_time)

@mock.patch("airflow.hooks.zendesk_hook.Zendesk")
def test_returns_single_page_if_get_all_pages_false(self, _):
zendesk_hook = ZendeskHook("conn_id")
mock_connection = mock.Mock()
mock_connection.host = "some_host"
zendesk_hook.get_connection = mock.Mock(return_value=mock_connection)
zendesk_hook.get_conn()

mock_conn = mock.Mock()
mock_call = mock.Mock(
return_value={'next_page': 'https://some_host/something', 'path':
[]})
mock_conn.call = mock_call
zendesk_hook.get_conn = mock.Mock(return_value=mock_conn)
zendesk_hook.call("path", get_all_pages=False)
mock_call.assert_called_once_with("path", None)

@mock.patch("airflow.hooks.zendesk_hook.Zendesk")
def test_returns_multiple_pages_if_get_all_pages_true(self, _):
zendesk_hook = ZendeskHook("conn_id")
mock_connection = mock.Mock()
mock_connection.host = "some_host"
zendesk_hook.get_connection = mock.Mock(return_value=mock_connection)
zendesk_hook.get_conn()

mock_conn = mock.Mock()
mock_call = mock.Mock(
return_value={'next_page': 'https://some_host/something', 'path': []})
mock_conn.call = mock_call
zendesk_hook.get_conn = mock.Mock(return_value=mock_conn)
zendesk_hook.call("path", get_all_pages=True)
assert mock_call.call_count == 2

@mock.patch("airflow.hooks.zendesk_hook.Zendesk")
def test_zdesk_is_inited_correctly(self, mock_zendesk):
conn_mock = mock.Mock()
conn_mock.host = "conn_host"
conn_mock.login = "conn_login"
conn_mock.password = "conn_pass"

zendesk_hook = ZendeskHook("conn_id")
zendesk_hook.get_connection = mock.Mock(return_value=conn_mock)
zendesk_hook.get_conn()
mock_zendesk.assert_called_with('https://conn_host', 'conn_login',
'conn_pass', True)
90 changes: 0 additions & 90 deletions tests/contrib/hooks/zendesk_hook.py

This file was deleted.

3 changes: 0 additions & 3 deletions tests/contrib/operators/__init__.py
Expand Up @@ -13,6 +13,3 @@
# limitations under the License.
#

from __future__ import absolute_import
from .ssh_execute_operator import *
from .fs_operator import *
Expand Up @@ -60,23 +60,22 @@ def test_init(self):
ADDITIONAL_OPTIONS)

@mock.patch('airflow.contrib.operators.dataflow_operator.DataFlowHook')
@mock.patch(GCS_HOOK_STRING.format('GoogleCloudStorageHook'))
@mock.patch(GCS_HOOK_STRING.format('GoogleCloudBucketHelper'))
def test_exec(self, gcs_hook, dataflow_mock):
"""Test DataFlowHook is created and the right args are passed to
start_python_workflow.
"""
start_python_hook = dataflow_mock.return_value.start_python_dataflow
gcs_download_hook = gcs_hook.return_value.download
gcs_download_hook = gcs_hook.return_value.google_cloud_to_local
self.dataflow.execute(None)
self.assertTrue(dataflow_mock.called)
expected_options = {
'project': 'test',
'staging_location': 'gs://test/staging',
'output': 'gs://test/output'
}
gcs_download_hook.assert_called_once_with(
'my-bucket', 'my-object.py', mock.ANY)
gcs_download_hook.assert_called_once_with(PY_FILE)
start_python_hook.assert_called_once_with(TASK_ID, expected_options,
mock.ANY, PY_OPTIONS)
self.assertTrue(self.dataflow.py_file.startswith('/tmp/dataflow'))

0 comments on commit 219c506

Please sign in to comment.