Skip to content

Commit

Permalink
Add test_connection to Azure Batch hook (#25235)
Browse files Browse the repository at this point in the history
* Add test_connection to Azure Batch hook

* Apply review suggestions
  • Loading branch information
phanikumv committed Jul 22, 2022
1 parent ceb1658 commit eab0167
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 0 deletions.
13 changes: 13 additions & 0 deletions airflow/providers/microsoft/azure/hooks/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,3 +362,16 @@ def wait_for_job_tasks_to_complete(self, job_id: str, timeout: int) -> None:
self.log.info("Waiting for %s to complete, currently on %s state", task.id, task.state)
time.sleep(15)
raise TimeoutError("Timed out waiting for tasks to complete")

def test_connection(self):
"""Test a configured Azure Batch connection."""
try:
# Attempt to list existing jobs under the configured Batch account and retrieve
# the first in the returned iterator. The Azure Batch API does allow for creation of a
# BatchServiceClient with incorrect values but then will fail properly once items are
# retrieved using the client. We need to _actually_ try to retrieve an object to properly
# test the connection.
next(self.get_conn().job.list(), None)
except Exception as e:
return False, str(e)
return True, "Successfully connected to Azure Batch."
17 changes: 17 additions & 0 deletions tests/providers/microsoft/azure/hooks/test_azure_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import json
import unittest
from unittest import mock
from unittest.mock import PropertyMock

from azure.batch import BatchServiceClient, models as batch_models

Expand Down Expand Up @@ -165,3 +166,19 @@ def test_add_single_task_to_job(self, mock_batch):
def test_wait_for_all_task_to_complete(self, mock_batch):
# TODO: Add test
pass

@mock.patch('airflow.providers.microsoft.azure.hooks.batch.BatchServiceClient')
def test_connection_success(self, mock_batch):
hook = AzureBatchHook(azure_batch_conn_id=self.test_cloud_conn_id)
hook.get_conn().job.return_value = {}
status, msg = hook.test_connection()
assert status is True
assert msg == "Successfully connected to Azure Batch."

@mock.patch('airflow.providers.microsoft.azure.hooks.batch.BatchServiceClient')
def test_connection_failure(self, mock_batch):
hook = AzureBatchHook(azure_batch_conn_id=self.test_cloud_conn_id)
hook.get_conn().job.list = PropertyMock(side_effect=Exception("Authentication failed."))
status, msg = hook.test_connection()
assert status is False
assert msg == "Authentication failed."

0 comments on commit eab0167

Please sign in to comment.