diff --git a/airflow/providers/airbyte/hooks/airbyte.py b/airflow/providers/airbyte/hooks/airbyte.py index d931772a14971..0cc3ed7ad7f8f 100644 --- a/airflow/providers/airbyte/hooks/airbyte.py +++ b/airflow/providers/airbyte/hooks/airbyte.py @@ -112,3 +112,22 @@ def get_job(self, job_id: int) -> Any: json={"id": job_id}, headers={"accept": "application/json"}, ) + + def test_connection(self): + """Tests the Airbyte connection by hitting the health API""" + self.method = 'GET' + try: + res = self.run( + endpoint=f"api/{self.api_version}/health", + headers={"accept": "application/json"}, + extra_options={'check_response': False}, + ) + + if res.status_code == 200: + return True, 'Connection successfully tested' + else: + return False, res.text + except Exception as e: # noqa pylint: disable=broad-except + return False, str(e) + finally: + self.method = 'POST' diff --git a/tests/providers/airbyte/hooks/test_airbyte.py b/tests/providers/airbyte/hooks/test_airbyte.py index f9709a519e4ae..e506553cc4af6 100644 --- a/tests/providers/airbyte/hooks/test_airbyte.py +++ b/tests/providers/airbyte/hooks/test_airbyte.py @@ -38,6 +38,7 @@ class TestAirbyteHook(unittest.TestCase): job_id = 1 sync_connection_endpoint = 'http://test-airbyte:8001/api/v1/connections/sync' get_job_endpoint = 'http://test-airbyte:8001/api/v1/jobs/get' + health_endpoint = 'http://test-airbyte:8001/api/v1/health' _mock_sync_conn_success_response_body = {'job': {'id': 1}} _mock_job_status_success_response_body = {'job': {'status': 'succeeded'}} @@ -124,3 +125,19 @@ def test_wait_for_job_cancelled(self, mock_get_job): calls = [mock.call(job_id=self.job_id), mock.call(job_id=self.job_id)] assert mock_get_job.has_calls(calls) + + @requests_mock.mock() + def test_connection_success(self, m): + m.get(self.health_endpoint, status_code=200,) + + status, msg = self.hook.test_connection() + assert status is True + assert msg == 'Connection successfully tested' + + @requests_mock.mock() + def test_connection_failure(self, m): + m.get(self.health_endpoint, status_code=500, json={"message": "internal server error"}) + + status, msg = self.hook.test_connection() + assert status is False + assert msg == '{"message": "internal server error"}'